Skip to content

Commit

Permalink
Fix concurrency issues on Translator Cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Cabanero committed Jun 10, 2024
1 parent 545eb5f commit efa84d2
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 37 deletions.
10 changes: 5 additions & 5 deletions BlinkFileProvider/FileProviderEnumerator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class FileProviderEnumerator: NSObject, NSFileProviderEnumerator {

let path = self.identifier.path
self.cache = cache

self.log = BlinkLogger("enumeratorFor \(path)")
self.log.debug("Initialized")

Expand Down Expand Up @@ -173,23 +173,23 @@ class FileProviderEnumerator: NSObject, NSFileProviderEnumerator {

let anchor = UInt(String(data: anchor.rawValue, encoding: .utf8)!)!
self.log.info("Enumerating changes at \(anchor) anchor")

guard let ref = self.cache.reference(identifier: self.identifier) else {
observer.finishEnumeratingWithError("Op not supported")
return
}

if let updatedItems = self.cache.updatedItems(container: self.identifier, since: anchor) {
// Atm only update changes, no deletion as we don't provide tombstone values.
self.log.info("\(updatedItems.count) items updated.")
observer.didUpdate(updatedItems)
} else if anchor < ref.syncAnchor {
observer.didUpdate([ref])
}

let newAnchor = ref.syncAnchor
let data = "\(newAnchor)".data(using: .utf8)

observer.finishEnumeratingChanges(upTo: NSFileProviderSyncAnchor(data!), moreComing: false)
}

Expand Down
20 changes: 10 additions & 10 deletions BlinkFileProvider/FileProviderExtension.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class FileProviderExtension: NSFileProviderExtension {
override func item(for identifier: NSFileProviderItemIdentifier) throws -> NSFileProviderItem {
let log = BlinkLogger("itemFor")
log.info("\(identifier)")

var queryableIdentifier: BlinkItemIdentifier!

if identifier == .rootContainer {
Expand Down Expand Up @@ -189,7 +189,7 @@ class FileProviderExtension: NSFileProviderExtension {
completionHandler(NSFileProviderError(.noSuchItem))
return
}

guard !blinkItemReference.isDownloaded else {
log.info("\(blinkItemReference.path) - current item up to date")
completionHandler(nil)
Expand Down Expand Up @@ -339,15 +339,15 @@ class FileProviderExtension: NSFileProviderExtension {
// Called at some point after the file has changed; the provider may then trigger an upload
let log = BlinkLogger("itemChanged")
log.info("\(url.path)")

guard var blinkItemReference = self.cache.reference(url: url) else {
log.error("Could not find reference to item")
return
}
// - if there are existing NSURLSessionTasks uploading this file, cancel them
// Cancel an upload if there is a reference to it.
blinkItemReference.uploadingTask?.cancel()

// - mark file at <url> as needing an update in the model
// Update the model
var attributes: FileAttributes!
Expand All @@ -374,7 +374,7 @@ class FileProviderExtension: NSFileProviderExtension {
let itemIdentifier = blinkItemReference.itemIdentifier
let destTranslator = self.cache.rootTranslator(for: BlinkItemIdentifier(itemIdentifier))
.flatMap { $0.cloneWalkTo(BlinkItemIdentifier(blinkItemReference.parentItemIdentifier).path) }

// 3. Upload
let c = destTranslator.flatMap { remotePathTranslator in
return srcTranslator.flatMap{ localFileTranslator -> CopyProgressInfoPublisher in
Expand All @@ -387,7 +387,7 @@ class FileProviderExtension: NSFileProviderExtension {
if case let .failure(error) = completion {
log.error("Upload failed \(localFileURLPath)- \(error)")
blinkItemReference.uploadCompleted(error)

self.signalEnumerator(for: blinkItemReference.parentItemIdentifier)
return
}
Expand All @@ -397,9 +397,9 @@ class FileProviderExtension: NSFileProviderExtension {

log.info("Upload completed \(localFileURLPath)")
} receiveValue: { _ in }

blinkItemReference.uploadStarted(c)

self.signalEnumerator(for: blinkItemReference.parentItemIdentifier)
}

Expand Down Expand Up @@ -630,12 +630,12 @@ class FileProviderExtension: NSFileProviderExtension {
let fpm = NSFileProviderManager(for: domain) else {
return
}

fpm.signalEnumerator(for: container, completionHandler: { error in
BlinkLogger("signalEnumerator").info("Enumerator Signaled with \(error ?? "no error")")
})
}

deinit {
print("OOOOUUUTTTTT!!!!!")
}
Expand Down
57 changes: 35 additions & 22 deletions BlinkFileProvider/Models/FileTranslatorCache.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TranslatorControl {
self.translator = translator
self.connectionControl = connectionControl
}

deinit {
self.connectionControl.cancel()
}
Expand All @@ -66,6 +66,7 @@ final class FileTranslatorCache {
private var translators: [String: TranslatorControl] = [:]
private var references: [String: BlinkItemReference] = [:]
private var fileList = [String: [BlinkItemReference]]()
private let queue = DispatchQueue(label: "FileTranslatorCache", attributes: .concurrent)

init() {}

Expand Down Expand Up @@ -118,7 +119,9 @@ final class FileTranslatorCache {
.tryMap { try SFTPTranslator(on: $0) }
.flatMap { $0.walkTo(pathAtFiles) }
.map { t -> Translator in
self.translators[encodedRootPath] = TranslatorControl(t, connectionControl: connControl)
self.queue.async(flags: .barrier) {
self.translators[encodedRootPath] = TranslatorControl(t, connectionControl: connControl)
}
return t
}
}
Expand All @@ -133,24 +136,30 @@ final class FileTranslatorCache {

func store(reference: BlinkItemReference) {
print("storing File BlinkItemReference : \(reference.itemIdentifier.rawValue)")
self.references[reference.itemIdentifier.rawValue] = reference
if reference.itemIdentifier != .rootContainer {
if var list = self.fileList[reference.parentItemIdentifier.rawValue] {
list.append(reference)
self.fileList[reference.parentItemIdentifier.rawValue] = list
} else {
self.fileList[reference.parentItemIdentifier.rawValue] = [reference]
queue.async(flags: .barrier) {
self.references[reference.itemIdentifier.rawValue] = reference
if reference.itemIdentifier != .rootContainer {
if var list = self.fileList[reference.parentItemIdentifier.rawValue] {
list.append(reference)
self.fileList[reference.parentItemIdentifier.rawValue] = list
} else {
self.fileList[reference.parentItemIdentifier.rawValue] = [reference]
}
}
}
}

func remove(reference: BlinkItemReference) {
self.references.removeValue(forKey: reference.itemIdentifier.rawValue)
queue.async(flags: .barrier) {
self.references.removeValue(forKey: reference.itemIdentifier.rawValue)
}
}

func reference(identifier: BlinkItemIdentifier) -> BlinkItemReference? {
print("requesting File BlinkItemReference : \(identifier.itemIdentifier.rawValue)")
return self.references[identifier.itemIdentifier.rawValue]
return queue.sync {
self.references[identifier.itemIdentifier.rawValue]
}
}

func reference(url: URL) -> BlinkItemReference? {
Expand All @@ -168,32 +177,36 @@ final class FileTranslatorCache {
cleanPath.removeFirst()
}

return self.references[String(cleanPath)]
return queue.sync {
self.references[String(cleanPath)]
}
}

func updatedItems(container: BlinkItemIdentifier, since anchor: UInt) -> [BlinkItemReference]? {
self.fileList[container.itemIdentifier.rawValue]?.filter {
anchor < $0.syncAnchor
queue.sync {
self.fileList[container.itemIdentifier.rawValue]?.filter {
anchor < $0.syncAnchor
}
}
}
}


class SSHClientConfigProvider {

static func config(host title: String) throws -> (String, SSHClientConfig) {

// NOTE This is just regular config initialization. Usually happens on AppDelegate, but the
// FileProvider doesn't get another chance.
BKHosts.loadHosts()
BKPubKey.loadIDS()

let bkConfig = try BKConfig()
let agent = SSHAgent()
let consts: [SSHAgentConstraint] = [SSHConstraintTrustedConnectionOnly()]

let host = try bkConfig.bkSSHHost(title)

if let signers = bkConfig.signer(forHost: host) {
signers.forEach { (signer, name) in
agent.loadKey(signer, aka: name, constraints: consts)
Expand All @@ -208,12 +221,12 @@ class SSHClientConfigProvider {
if let password = host.password, !password.isEmpty {
availableAuthMethods.append(AuthPassword(with: password))
}

let log = BlinkLogger("SSH")
let logger = PassthroughSubject<String, Never>()
logger.sink {
log.send($0)

}.store(in: &logCancellables)


Expand Down

0 comments on commit efa84d2

Please sign in to comment.