Skip to content

Commit

Permalink
Hotfix to test FileTranslator hangs due to Combine misbehavior
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Cabanero committed Jan 19, 2022
1 parent 0f8c2cf commit 6637aa9
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 56 deletions.
4 changes: 2 additions & 2 deletions BlinkCode/TranslatorFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ extension TranslatorFactories {
return .fail(error: TranslatorError(message: "Configuration error - \(error)"))
}

return Just(())
return Just(config)
.receive(on: scheduler).flatMap {
SSHClient
.dial(hostName, with: config)
.dial(hostName, with: $0)
.print("Dialing...")
.flatMap { $0.requestSFTP() }
.tryMap { try SFTPTranslator(on: $0) }
Expand Down
1 change: 1 addition & 0 deletions BlinkFileProvider/Models/BlinkItemReference.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ final class BlinkItemReference: NSObject {
func downloadStarted(_ c: AnyCancellable) {
downloadingTask = c
downloadingError = nil
evaluate()
}

func downloadCompleted(_ error: Error?) {
Expand Down
63 changes: 53 additions & 10 deletions BlinkFileProvider/Models/FileTranslatorCache.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,23 @@ import Foundation
import BlinkFiles


class TranslatorReference {
let translator: Translator
let cancel: () -> Void

init(_ translator: Translator, cancel: @escaping (() -> Void)) {
self.translator = translator
self.cancel = cancel
}

deinit {
cancel()
}
}

final class FileTranslatorCache {
static let shared = FileTranslatorCache()
private var translators: [String: Translator] = [:]
private var translators: [String: TranslatorReference] = [:]
private var references: [String: BlinkItemReference] = [:]
private var fileList: [String: [BlinkItemReference]] = [:]
private var backgroundThread: Thread? = nil
Expand All @@ -57,17 +71,46 @@ final class FileTranslatorCache {

static func translator(for encodedRootPath: String) -> AnyPublisher<Translator, Error> {
// Check if we have it cached, if it is still working
if let translator = shared.translators[encodedRootPath],
translator.isConnected {
return .just(translator)
if let translatorRef = shared.translators[encodedRootPath],
translatorRef.translator.isConnected {
return .just(translatorRef.translator)
}

return buildTranslator(for: encodedRootPath)
.map { t -> Translator in
// Next time, the translator does not need to be built again (authentication, etc...)
shared.translators[encodedRootPath] = t
return t
}.eraseToAnyPublisher()
guard let rootData = Data(base64Encoded: encodedRootPath),
let rootPath = String(data: rootData, encoding: .utf8) else {
return Fail(error: "Wrong encoded identifier for Translator").eraseToAnyPublisher()
}

// rootPath: ssh:host:root_folder
let components = rootPath.split(separator: ":")

// TODO At least two components. Tweak for sftp
let remoteProtocol = BlinkFilesProtocol(rawValue: String(components[0]))
let pathAtFiles: String
let host: String?
if components.count == 2 {
pathAtFiles = String(components[1])
host = nil
} else {
pathAtFiles = String(components[2])
host = String(components[1])
}

switch remoteProtocol {
case .local:
return Local().walkTo(pathAtFiles)
case .sftp:
guard let host = host else {
return .fail(error: "Missing host in Translator route")
}
return sftp(host: host, path: pathAtFiles)
.map { tr -> Translator in
shared.translators[encodedRootPath] = tr
return tr.translator
}.eraseToAnyPublisher()
default:
return .fail(error: "Not implemented")
}
}

static func store(reference: BlinkItemReference) {
Expand Down
75 changes: 31 additions & 44 deletions BlinkFileProvider/Models/TranslatorFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,45 +45,8 @@ enum BlinkFilesProtocol: String {

var logCancellables = Set<AnyCancellable>()

func buildTranslator(for encodedRootPath: String) -> AnyPublisher<Translator, Error> {
guard let rootData = Data(base64Encoded: encodedRootPath),
let rootPath = String(data: rootData, encoding: .utf8) else {
return Fail(error: "Wrong encoded identifier for Translator").eraseToAnyPublisher()
}

// rootPath: ssh:host:root_folder
let components = rootPath.split(separator: ":")

// TODO At least two components. Tweak for sftp
let remoteProtocol = BlinkFilesProtocol(rawValue: String(components[0]))
let pathAtFiles: String
let host: String?
if components.count == 2 {
pathAtFiles = String(components[1])
host = nil
} else {
pathAtFiles = String(components[2])
host = String(components[1])
}

switch remoteProtocol {
case .local:
return local(path: pathAtFiles)
case .sftp:
guard let host = host else {
return .fail(error: "Missing host in Translator route")
}
return sftp(host: host, path: pathAtFiles)
default:
return .fail(error: "Not implemented")
}
}

fileprivate func local(path: String) -> AnyPublisher<Translator, Error> {
return Local().walkTo(path)
}

fileprivate func sftp(host: String, path: String) -> AnyPublisher<Translator, Error> {
func sftp(host: String, path: String) -> AnyPublisher<TranslatorReference, Error> {
let log = BlinkLogger("SFTP")
let hostName: String
let config: SSHClientConfig
Expand All @@ -96,14 +59,30 @@ fileprivate func sftp(host: String, path: String) -> AnyPublisher<Translator, Er
} catch {
return .fail(error: "Configuration error - \(error)")
}

var thread: Thread!
var runLoop: RunLoop? = nil

let threadIsReady = Future<RunLoop, Error> { promise in
thread = Thread {
let timer = Timer(timeInterval: TimeInterval(1), repeats: true) { _ in
//print("timer")
}
runLoop = RunLoop.current
RunLoop.current.add(timer, forMode: .default)
promise(.success(RunLoop.current))
CFRunLoopRun()
// Wrap it up
RunLoop.current.run(until: Date(timeIntervalSinceNow: 0.5))
}
thread.start()
}

// NOTE We use main queue as this is an extension. Should move it to a different one though,
// in case of future changes.
return Just(config).receive(on: DispatchQueue.main).flatMap {
SSHClient
.dial(hostName, with: $0)
return threadIsReady.flatMap { _ in
Just(()).receive(on: runLoop!).flatMap {
SSHClient
.dial(hostName, with: config)
.print("Dialing...")
//.receive(on: FileTranslatorPool.shared.backgroundRunLoop)
.flatMap { $0.requestSFTP() }
.tryMap { try SFTPTranslator(on: $0) }
.mapError { error -> Error in
Expand All @@ -115,8 +94,16 @@ fileprivate func sftp(host: String, path: String) -> AnyPublisher<Translator, Er
log.error("Error walking to base path \(path): \(error)")
return NSFileProviderError(.noSuchItem)
}
.map {
TranslatorReference($0, cancel: {
log.debug("Cancelling translator")
let cfRunLoop = runLoop!.getCFRunLoop()
CFRunLoopStop(cfRunLoop)
})
}
}
.eraseToAnyPublisher()
}
}.eraseToAnyPublisher()
}

Expand Down

0 comments on commit 6637aa9

Please sign in to comment.