Skip to content

Commit

Permalink
Modernized transcripts XPC, exposing progress to client. Fixes crash …
Browse files Browse the repository at this point in the history
…when changing language.
  • Loading branch information
insidegui committed May 27, 2020
1 parent 7745d6e commit bab5f0d
Showing 11 changed files with 323 additions and 195 deletions.
98 changes: 16 additions & 82 deletions ConfCore/SyncEngine.swift
Original file line number Diff line number Diff line change
@@ -29,17 +29,31 @@ public final class SyncEngine {

private let disposeBag = DisposeBag()

private let transcriptIndexingClient: TranscriptIndexingClient

public var transcriptLanguage: String {
get { transcriptIndexingClient.transcriptLanguage }
set { transcriptIndexingClient.transcriptLanguage = newValue }
}

public var isIndexingTranscripts: BehaviorSubject<Bool> { transcriptIndexingClient.isIndexing }
public var transcriptIndexingProgress: BehaviorSubject<Float> { transcriptIndexingClient.indexingProgress }

public init(storage: Storage, client: AppleAPIClient, transcriptLanguage: String) {
self.storage = storage
self.client = client
self.transcriptLanguage = transcriptLanguage
self.transcriptIndexingClient = TranscriptIndexingClient(
language: transcriptLanguage,
storage: storage,
appleClient: client
)

#if ICLOUD
self.userDataSyncEngine = UserDataSyncEngine(storage: storage)
#endif

NotificationCenter.default.rx.notification(.SyncEngineDidSyncSessionsAndSchedule).observeOn(MainScheduler.instance).subscribe(onNext: { [unowned self] _ in
self.startTranscriptIndexing(ignoringCache: !self.migratedTranscriptsToNativeVersion)
self.transcriptIndexingClient.startIndexing(ignoringCache: false)

#if ICLOUD
self.userDataSyncEngine.start()
@@ -80,84 +94,4 @@ public final class SyncEngine {
}
}

// MARK: - Transcripts

public var transcriptLanguage: String {
didSet {
guard transcriptLanguage != oldValue else { return }

didRunIndexingService = false
startTranscriptIndexing(ignoringCache: true)
}
}

private var didRunIndexingService = false

private lazy var transcriptIndexingConnection: NSXPCConnection = {
let c = NSXPCConnection(serviceName: "io.wwdc.app.TranscriptIndexingService")

c.remoteObjectInterface = NSXPCInterface(with: TranscriptIndexingServiceProtocol.self)

return c
}()

private var transcriptIndexingService: TranscriptIndexingServiceProtocol? {
return transcriptIndexingConnection.remoteObjectProxy as? TranscriptIndexingServiceProtocol
}

private var migratedTranscriptsToNativeVersion: Bool {
get { UserDefaults.standard.bool(forKey: #function) }
set { UserDefaults.standard.set(newValue, forKey: #function) }
}

private func startTranscriptIndexing(ignoringCache ignoreCache: Bool) {
guard !ProcessInfo.processInfo.arguments.contains("--disable-transcripts") else { return }

if !migratedTranscriptsToNativeVersion {
os_log("Transcripts need migration", log: self.log, type: .debug)
}

if !ignoreCache {
guard TranscriptIndexer.needsUpdate(in: storage) else { return }
}

guard !didRunIndexingService else { return }
didRunIndexingService = true

client.fetchConfig { [weak self] result in
guard let self = self else { return }

switch result {
case .success(let config):
self.doStartTranscriptIndexing(with: config, ignoringCache: ignoreCache)
case .failure(let error):
os_log("Config fetch failed: %{public}@", log: self.log, type: .error, String(describing: error))
}
}

}

private func doStartTranscriptIndexing(with config: RootConfig, ignoringCache ignoreCache: Bool) {
os_log("%{public}@", log: log, type: .debug, #function)

guard let feeds = config.feeds[transcriptLanguage] ?? config.feeds[RootConfig.fallbackFeedLanguage] else {
os_log("No feeds found for currently set language (%@) or fallback language (%@)", log: self.log, type: .error, transcriptLanguage, RootConfig.fallbackFeedLanguage)
return
}

guard let storageURL = storage.realmConfig.fileURL else { return }

TranscriptIndexer.lastManifestBasedUpdateDate = Date()
migratedTranscriptsToNativeVersion = true

transcriptIndexingConnection.resume()

transcriptIndexingService?.indexTranscriptsIfNeeded(
manifestURL: feeds.transcripts.url,
ignoringCache: ignoreCache,
storageURL: storageURL,
schemaVersion: storage.realmConfig.schemaVersion
)
}

}
10 changes: 10 additions & 0 deletions ConfCore/TranscriptIndexer.swift
Original file line number Diff line number Diff line change
@@ -85,9 +85,15 @@ public final class TranscriptIndexer {
makeDownloader()
}()

var didStart: () -> Void = { }
var progressChanged: (Float) -> Void = { _ in }
var didStop: () -> Void = { }

public func downloadTranscriptsIfNeeded() {
downloader = makeDownloader()

didStart()

DistributedNotificationCenter.default().postNotificationName(
.TranscriptIndexingDidStart,
object: nil,
@@ -106,6 +112,8 @@ public final class TranscriptIndexer {
guard let self = self else { return }

os_log("Transcript indexing progresss: %.2f", log: self.log, type: .default, progress)

self.progressChanged(progress)
}) { [weak self] in
self?.finished()
}
@@ -137,6 +145,8 @@ public final class TranscriptIndexer {
}

private func finished() {
didStop()

DistributedNotificationCenter.default().postNotificationName(
.TranscriptIndexingDidStop,
object: nil,
129 changes: 129 additions & 0 deletions ConfCore/TranscriptIndexingClient.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// TranscriptIndexingClient.swift
// ConfCore
//
// Created by Guilherme Rambo on 27/05/20.
// Copyright © 2020 Guilherme Rambo. All rights reserved.
//

import Foundation
import RxSwift
import os.log

final class TranscriptIndexingClient: NSObject, TranscriptIndexingClientProtocol {

private let log = OSLog(subsystem: "ConfCore", category: String(describing: TranscriptIndexingClient.self))

var transcriptLanguage: String {
didSet {
guard transcriptLanguage != oldValue else { return }

didRunService = false
startIndexing(ignoringCache: true)
}
}

private let storage: Storage
private let appleClient: AppleAPIClient

init(language: String, storage: Storage, appleClient: AppleAPIClient) {
self.transcriptLanguage = language
self.storage = storage
self.appleClient = appleClient

super.init()

transcriptIndexingConnection.resume()
}

let isIndexing = BehaviorSubject<Bool>(value: false)
let indexingProgress = BehaviorSubject<Float>(value: 0)

private var didRunService = false

private lazy var transcriptIndexingConnection: NSXPCConnection = {
let c = NSXPCConnection(serviceName: "io.wwdc.app.TranscriptIndexingService")

c.remoteObjectInterface = NSXPCInterface(with: TranscriptIndexingServiceProtocol.self)
c.exportedInterface = NSXPCInterface(with: TranscriptIndexingClientProtocol.self)
c.exportedObject = self

return c
}()

private var transcriptIndexingService: TranscriptIndexingServiceProtocol? {
return transcriptIndexingConnection.remoteObjectProxyWithErrorHandler { [weak self] error in
guard let self = self else { return }
os_log("Failed to get remote object proxy: %{public}@", log: self.log, type: .fault, String(describing: error))
} as? TranscriptIndexingServiceProtocol
}

private var migratedTranscriptsToNativeVersion: Bool {
get { UserDefaults.standard.bool(forKey: #function) }
set { UserDefaults.standard.set(newValue, forKey: #function) }
}

func startIndexing(ignoringCache ignoreCache: Bool) {
guard !ProcessInfo.processInfo.arguments.contains("--disable-transcripts") else { return }

if !migratedTranscriptsToNativeVersion {
os_log("Transcripts need migration", log: self.log, type: .debug)
}

if !ignoreCache && migratedTranscriptsToNativeVersion {
guard TranscriptIndexer.needsUpdate(in: storage) else { return }
}

guard !didRunService else { return }
didRunService = true

appleClient.fetchConfig { [weak self] result in
guard let self = self else { return }

switch result {
case .success(let config):
self.doStartTranscriptIndexing(with: config, ignoringCache: ignoreCache)
case .failure(let error):
os_log("Config fetch failed: %{public}@", log: self.log, type: .error, String(describing: error))
}
}
}

private func doStartTranscriptIndexing(with config: RootConfig, ignoringCache ignoreCache: Bool) {
os_log("%{public}@", log: log, type: .debug, #function)

guard let feeds = config.feeds[transcriptLanguage] ?? config.feeds[RootConfig.fallbackFeedLanguage] else {
os_log("No feeds found for currently set language (%@) or fallback language (%@)", log: self.log, type: .error, transcriptLanguage, RootConfig.fallbackFeedLanguage)
return
}

guard let storageURL = storage.realmConfig.fileURL else { return }

TranscriptIndexer.lastManifestBasedUpdateDate = Date()
migratedTranscriptsToNativeVersion = true

transcriptIndexingService?.indexTranscriptsIfNeeded(
manifestURL: feeds.transcripts.url,
ignoringCache: ignoreCache,
storageURL: storageURL,
schemaVersion: storage.realmConfig.schemaVersion
)
}

func transcriptIndexingStarted() {
os_log("%{public}@", log: log, type: .debug, #function)

isIndexing.on(.next(true))
}

func transcriptIndexingProgressDidChange(_ progress: Float) {
indexingProgress.on(.next(progress))
}

func transcriptIndexingStopped() {
os_log("%{public}@", log: log, type: .debug, #function)

isIndexing.on(.next(false))
}

}
17 changes: 17 additions & 0 deletions ConfCore/TranscriptIndexingClientProtocol.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//
// TranscriptIndexingClientProtocol.swift
// ConfCore
//
// Created by Guilherme Rambo on 27/05/20.
// Copyright © 2020 Guilherme Rambo. All rights reserved.
//

import Foundation

@objc public protocol TranscriptIndexingClientProtocol: NSObjectProtocol {

func transcriptIndexingStarted()
func transcriptIndexingProgressDidChange(_ progress: Float)
func transcriptIndexingStopped()

}
88 changes: 88 additions & 0 deletions ConfCore/TranscriptIndexingService.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// TranscriptIndexingService.swift
// WWDC
//
// Created by Guilherme Rambo on 28/05/17.
// Copyright © 2017 Guilherme Rambo. All rights reserved.
//

import Foundation
import RealmSwift
import os.log

@objcMembers public final class TranscriptIndexingService: NSObject, TranscriptIndexingServiceProtocol {

private var indexer: TranscriptIndexer!
private let log = OSLog(subsystem: "TranscriptIndexingService", category: "TranscriptIndexingService")

public func indexTranscriptsIfNeeded(manifestURL: URL, ignoringCache: Bool, storageURL: URL, schemaVersion: UInt64) {
do {
let config = Realm.Configuration(fileURL: storageURL, schemaVersion: schemaVersion)
let storage = try Storage(config)

indexer = TranscriptIndexer(storage, manifestURL: manifestURL)

indexer.didStart = { [weak self] in
self?.clients.forEach { $0.transcriptIndexingStarted() }
}
indexer.progressChanged = { [weak self] progress in
self?.clients.forEach { $0.transcriptIndexingProgressDidChange(progress) }
}
indexer.didStop = { [weak self] in
self?.clients.forEach { $0.transcriptIndexingStopped() }
}

indexer.manifestURL = manifestURL
indexer.ignoreExistingEtags = ignoringCache

indexer.downloadTranscriptsIfNeeded()
} catch {
os_log("Error initializing indexing service: %{public}@", log: self.log, type: .fault, String(describing: error))
return
}
}

private lazy var listener: NSXPCListener = {
let l = NSXPCListener.service()

l.delegate = self

return l
}()

public func resume() {
listener.resume()
}

private var connections: [NSXPCConnection] = []

private var clients: [TranscriptIndexingClientProtocol] {
connections.compactMap { $0.remoteObjectProxy as? TranscriptIndexingClientProtocol }
}

}

extension TranscriptIndexingService: NSXPCListenerDelegate {

public func listener(_ listener: NSXPCListener, shouldAcceptNewConnection newConnection: NSXPCConnection) -> Bool {
newConnection.exportedInterface = NSXPCInterface(with: TranscriptIndexingServiceProtocol.self)
newConnection.exportedObject = self

newConnection.remoteObjectInterface = NSXPCInterface(with: TranscriptIndexingClientProtocol.self)

newConnection.invalidationHandler = { [weak self] in
guard let self = self else { return }

os_log("Connection invalidated: %{public}@", log: self.log, type: .debug, String(describing: newConnection))

self.connections.removeAll(where: { $0 == newConnection })
}

connections.append(newConnection)

newConnection.resume()

return true
}

}
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@

import Foundation

@objc protocol TranscriptIndexingServiceProtocol: NSObjectProtocol {
@objc public protocol TranscriptIndexingServiceProtocol: NSObjectProtocol {

func indexTranscriptsIfNeeded(manifestURL: URL, ignoringCache: Bool, storageURL: URL, schemaVersion: UInt64)

Loading
Oops, something went wrong.

0 comments on commit bab5f0d

Please sign in to comment.