Skip to content

Commit

Permalink
Moved transcript indexing to an XPC service
Browse files Browse the repository at this point in the history
  • Loading branch information
insidegui committed May 28, 2017
1 parent 25cdb3c commit 12bb3e5
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 68 deletions.
16 changes: 16 additions & 0 deletions ConfCore/Storage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ public final class Storage {

self.realmConfig = config
self.realm = try Realm(configuration: config)

DistributedNotificationCenter.default().addObserver(forName: .TranscriptIndexingDidStart, object: nil, queue: OperationQueue.main) { [unowned self] _ in
#if DEBUG
NSLog("[Storage] Locking realm autoupdates until transcript indexing is finished")
#endif

self.realm.autorefresh = false
}

DistributedNotificationCenter.default().addObserver(forName: .TranscriptIndexingDidStop, object: nil, queue: OperationQueue.main) { [unowned self] _ in
#if DEBUG
NSLog("[Storage] Realm autoupdates unlocked")
#endif

self.realm.autorefresh = true
}
}

internal static func migrate(migration: Migration, oldVersion: UInt64) {
Expand Down
24 changes: 20 additions & 4 deletions ConfCore/SyncEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,24 @@ public final class SyncEngine {
public let storage: Storage
public let client: AppleAPIClient

public let transcriptIndexer: TranscriptIndexer
private lazy var transcriptIndexingConnection: NSXPCConnection = {
let c = NSXPCConnection(serviceName: "io.wwdc.app.TranscriptIndexingService")

c.remoteObjectInterface = NSXPCInterface(with: TranscriptIndexingServiceProtocol.self)

return c
}()

private var transcriptIndexingService: TranscriptIndexingServiceProtocol? {
return transcriptIndexingConnection.remoteObjectProxy as? TranscriptIndexingServiceProtocol
}

public init(storage: Storage, client: AppleAPIClient) {
self.storage = storage
self.client = client

self.transcriptIndexer = TranscriptIndexer(storage)

NotificationCenter.default.addObserver(forName: .SyncEngineDidSyncSessionsAndSchedule, object: nil, queue: OperationQueue.main) { [unowned self] _ in
self.transcriptIndexer.downloadTranscriptsIfNeeded()
self.startTranscriptIndexingIfNeeded()
}
}

Expand All @@ -53,4 +61,12 @@ public final class SyncEngine {
}
}

private func startTranscriptIndexingIfNeeded() {
guard let url = storage.realmConfig.fileURL else { return }

transcriptIndexingConnection.resume()

transcriptIndexingService?.indexTranscriptsIfNeeded(storageURL: url, schemaVersion: storage.realmConfig.schemaVersion)
}

}
147 changes: 83 additions & 64 deletions ConfCore/TranscriptIndexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import RealmSwift
import SwiftyJSON

extension Notification.Name {
public static let TranscriptIndexingDidStart = Notification.Name("TranscriptIndexingDidStartNotification")
public static let TranscriptIndexingDidStop = Notification.Name("TranscriptIndexingDidStopNotification")
public static let TranscriptIndexingDidStart = Notification.Name("io.wwdc.app.TranscriptIndexingDidStartNotification")
public static let TranscriptIndexingDidStop = Notification.Name("io.wwdc.app.TranscriptIndexingDidStopNotification")
}

public final class TranscriptIndexer {
Expand All @@ -23,35 +23,12 @@ public final class TranscriptIndexer {
self.storage = storage
}

/// Whether transcripts are currently being indexed
public var isIndexingTranscripts = false {
didSet {
guard oldValue != isIndexingTranscripts else { return }

let notificationName: Notification.Name = isIndexingTranscripts ? .TranscriptIndexingDidStart : .TranscriptIndexingDidStop

DispatchQueue.main.async {
NotificationCenter.default.post(name: notificationName, object: nil)
}
}
}

/// The progress when the transcripts are being downloaded/indexed
public var transcriptIndexingProgress: Progress? {
didSet {
isIndexingTranscripts = (transcriptIndexingProgress != nil)

transcriptIndexingStartedCallback?()
}
}

/// Called when transcript downloading/indexing starts,
/// use `transcriptIndexingProgress` to track progress
public var transcriptIndexingStartedCallback: (() -> Void)?
public var transcriptIndexingProgress: Progress?

private let asciiWWDCURL = "http://asciiwwdc.com/"

fileprivate let bgThread = DispatchQueue.global(qos: .background)
fileprivate let bgThread = DispatchQueue.global(qos: .utility)

fileprivate lazy var backgroundOperationQueue: OperationQueue = {
let q = OperationQueue()
Expand All @@ -63,17 +40,16 @@ public final class TranscriptIndexer {
}()

/// Try to download transcripts for sessions that don't have transcripts yet
func downloadTranscriptsIfNeeded() {
public func downloadTranscriptsIfNeeded() {

let transcriptedSessions = storage.realm.objects(Session.self).filter("transcript == nil AND SUBQUERY(assets, $asset, $asset.rawAssetType == %@).@count > 0", SessionAssetType.streamingVideo.rawValue)
let transcriptedSessions = storage.realm.objects(Session.self).filter("year > 2012 AND transcript == nil AND SUBQUERY(assets, $asset, $asset.rawAssetType == %@).@count > 0", SessionAssetType.streamingVideo.rawValue)

let sessionKeys: [String] = transcriptedSessions.map({ $0.identifier })

self.indexTranscriptsForSessionsWithKeys(sessionKeys)
}

func indexTranscriptsForSessionsWithKeys(_ sessionKeys: [String]) {
guard !isIndexingTranscripts else { return }
guard sessionKeys.count > 0 else { return }

transcriptIndexingProgress = Progress(totalUnitCount: Int64(sessionKeys.count))
Expand All @@ -87,6 +63,8 @@ public final class TranscriptIndexer {
}
}

fileprivate var downloadedTranscripts: [Transcript] = []

fileprivate func indexTranscript(for sessionNumber: String, in year: Int, primaryKey: String) {
guard let url = URL(string: "\(asciiWWDCURL)\(year)//sessions/\(sessionNumber)") else { return }

Expand All @@ -95,52 +73,93 @@ public final class TranscriptIndexer {

let task = URLSession.shared.dataTask(with: request) { [unowned self] data, response, error in
guard let jsonData = data else {
print("No data returned from ASCIIWWDC for \(primaryKey)")
self.transcriptIndexingProgress?.completedUnitCount += 1
self.checkForCompletion()

NSLog("No data returned from ASCIIWWDC for \(primaryKey)")

return
}

self.backgroundOperationQueue.addOperation {
do {
let bgRealm = try Realm(configuration: self.storage.realmConfig)

guard let session = bgRealm.object(ofType: Session.self, forPrimaryKey: primaryKey) else { return }

let result = TranscriptsJSONAdapter().adapt(JSON(data: jsonData))

guard case .success(let transcript) = result else {
NSLog("Error parsing transcript for \(primaryKey)")
return
}

bgRealm.beginWrite()
bgRealm.add(transcript)
session.transcript = transcript

try bgRealm.commitWrite()

defer {
self.transcriptIndexingProgress?.completedUnitCount += 1
} catch let error {
NSLog("Error indexing transcript for \(primaryKey): \(error)")
self.checkForCompletion()
}

if let progress = self.transcriptIndexingProgress {
#if DEBUG
NSLog("Completed: \(progress.completedUnitCount) Total: \(progress.totalUnitCount)")
#endif

if progress.completedUnitCount >= progress.totalUnitCount - 1 {
DispatchQueue.main.async {
#if DEBUG
NSLog("Transcript indexing finished")
#endif
self.isIndexingTranscripts = false
}
}
let result = TranscriptsJSONAdapter().adapt(JSON(data: jsonData))

guard case .success(let transcript) = result else {
NSLog("Error parsing transcript for \(primaryKey)")
return
}

DispatchQueue.main.sync {
self.downloadedTranscripts.append(transcript)
}
}
}

task.resume()
}

private func checkForCompletion() {
guard let progress = self.transcriptIndexingProgress else { return }

#if DEBUG
NSLog("Completed: \(progress.completedUnitCount) Total: \(progress.totalUnitCount)")
#endif

if progress.completedUnitCount >= progress.totalUnitCount - 1 {
DispatchQueue.main.async {
#if DEBUG
NSLog("Transcript indexing finished")
#endif

self.storeDownloadedTranscripts()
}
}
}

private var isStoring = false

private func storeDownloadedTranscripts() {
guard !isStoring else { return }
isStoring = true

DispatchQueue.main.async {
DistributedNotificationCenter.default().post(name: .TranscriptIndexingDidStart, object: nil)
}

self.backgroundOperationQueue.addOperation { [unowned self] in
guard let realm = try? Realm(configuration: self.storage.realmConfig) else { return }

realm.beginWrite()

self.downloadedTranscripts.forEach { transcript in
guard let session = realm.object(ofType: Session.self, forPrimaryKey: transcript.identifier) else {
NSLog("Session not found for \(transcript.identifier)")
return
}

session.transcript = transcript

realm.add(transcript)
}

self.downloadedTranscripts.removeAll()

do {
try realm.commitWrite()

DispatchQueue.main.async {
DistributedNotificationCenter.default().post(name: .TranscriptIndexingDidStop, object: nil)
}
} catch {
NSLog("Error writing indexed transcripts to storage: \(error)")
}
}
}

}
36 changes: 36 additions & 0 deletions TranscriptIndexingService/Info.plist
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>CFBundleDevelopmentRegion</key>
<string>en</string>
<key>CFBundleDisplayName</key>
<string>TranscriptIndexingService</string>
<key>CFBundleExecutable</key>
<string>$(EXECUTABLE_NAME)</string>
<key>NSAppTransportSecurity</key>
<dict>
<key>NSAllowsArbitraryLoads</key>
<true/>
</dict>
<key>CFBundleIdentifier</key>
<string>$(PRODUCT_BUNDLE_IDENTIFIER)</string>
<key>CFBundleInfoDictionaryVersion</key>
<string>6.0</string>
<key>CFBundleName</key>
<string>$(PRODUCT_NAME)</string>
<key>CFBundlePackageType</key>
<string>XPC!</string>
<key>CFBundleShortVersionString</key>
<string>1.0</string>
<key>CFBundleVersion</key>
<string>1</string>
<key>NSHumanReadableCopyright</key>
<string>Copyright © 2017 Guilherme Rambo. All rights reserved.</string>
<key>XPCService</key>
<dict>
<key>ServiceType</key>
<string>Application</string>
</dict>
</dict>
</plist>
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//
// Use this file to import your target's public headers that you would like to expose to Swift.
//

32 changes: 32 additions & 0 deletions TranscriptIndexingService/TranscriptIndexingService.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//
// TranscriptIndexingService.swift
// WWDC
//
// Created by Guilherme Rambo on 28/05/17.
// Copyright © 2017 Guilherme Rambo. All rights reserved.
//

import Foundation
import ConfCore
import RealmSwift

final class TranscriptIndexingService: NSObject, TranscriptIndexingServiceProtocol {

private var transcriptIndexer: TranscriptIndexer!

func indexTranscriptsIfNeeded(storageURL: URL, schemaVersion: UInt64) {
if transcriptIndexer == nil {
do {
let config = Realm.Configuration(fileURL: storageURL, schemaVersion: schemaVersion)
let storage = try Storage(config)
transcriptIndexer = TranscriptIndexer(storage)
} catch {
NSLog("[TranscriptIndexingService] Error initializing: \(error)")
return
}
}

transcriptIndexer.downloadTranscriptsIfNeeded()
}

}
15 changes: 15 additions & 0 deletions TranscriptIndexingService/TranscriptIndexingServiceProtocol.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//
// TranscriptIndexingServiceProtocol.swift
// WWDC
//
// Created by Guilherme Rambo on 28/05/17.
// Copyright © 2017 Guilherme Rambo. All rights reserved.
//

import Foundation

@objc protocol TranscriptIndexingServiceProtocol: NSObjectProtocol {

func indexTranscriptsIfNeeded(storageURL: URL, schemaVersion: UInt64)

}
Loading

0 comments on commit 12bb3e5

Please sign in to comment.