Skip to content

Commit

Permalink
SSH Pool changes to simplify the way the thread is released
Browse files Browse the repository at this point in the history
- SSH Command deinit clean-ups
- SSH Copy files new loop changes
- Mosh adapted under new model
  • Loading branch information
Carlos Cabanero committed Mar 11, 2024
1 parent 8283a20 commit 14fc447
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 92 deletions.
9 changes: 5 additions & 4 deletions Blink/Commands/mosh/mosh.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ enum MoshError: Error, LocalizedError {
//.print()
.handleEvents(receiveCancel: { [weak self] in
if let self = self {
self.currentRunLoop.run(until: Date(timeIntervalSinceNow: 0.5))
awake(runLoop: currentRunLoop)
self.kill()
}
})
.sink(
Expand All @@ -213,7 +212,8 @@ enum MoshError: Error, LocalizedError {
})

self.isRunloopRunning = true
awaitRunLoop(currentRunLoop)
//awaitRunLoop(currentRunLoop)
CFRunLoopRunInMode(.defaultMode, TimeInterval(INT_MAX), false)
self.currentRunLoop.run(until: Date(timeIntervalSinceNow: 0.5))
self.isRunloopRunning = false

Expand Down Expand Up @@ -430,7 +430,8 @@ enum MoshError: Error, LocalizedError {
// Cancelling here makes sure the flows are cancelled.
// Trying to do it at the runloop has the issue that flows may continue running.
print("Kill received")
awake(runLoop: currentRunLoop)
CFRunLoopStop(currentRunLoop.getCFRunLoop())
//awake(runLoop: currentRunLoop)
sshCancellable = nil
} else {
// MOSH-ESC .
Expand Down
30 changes: 16 additions & 14 deletions Blink/Commands/ssh/CopyFiles.swift
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class FileLocationPath {

public class BlinkCopy: NSObject {
var copyCancellable: AnyCancellable?

let device: TermDevice = tty()
let currentRunLoop = RunLoop.current
var stdout = OutputStream(file: thread_stdout)
Expand Down Expand Up @@ -201,28 +202,26 @@ public class BlinkCopy: NSObject {
// Connect to the destination first, as it will be the one driving the operation.
let destProtocol = command.destination.proto ?? defaultRemoteProtocol

let destTranslator = (destProtocol == .local) ? localTranslator(to: command.destination.filePath) :
var destTranslator: AnyPublisher<Translator, Error>? = (destProtocol == .local) ? localTranslator(to: command.destination.filePath) :
remoteTranslator(toFilePath: command.destination.filePath, atHost: command.destination.hostPath!, using: destProtocol, isSource: false)

// Source
let sourceProtocol = command.source.proto ?? defaultRemoteProtocol
let sourceTranslator = (sourceProtocol == .local) ? localTranslator(to: command.source.filePath) :
var sourceTranslator: AnyPublisher<Translator, Error>? = (sourceProtocol == .local) ? localTranslator(to: command.source.filePath) :
remoteTranslator(toFilePath: command.source.filePath, atHost: command.source.hostPath!, using: sourceProtocol)

// TODO Output object for reports
var rc: Int32 = 0
var rootFilePath: String!
var currentFile = ""
var displayFileName = ""
var currentCopied: UInt64 = 0
var currentSpeed: String?
var sourceBasePath: String?
var startTimestamp = 0
var lastElapsed = 0
copyCancellable = destTranslator.flatMap { d -> CopyProgressInfoPublisher in
copyCancellable = destTranslator!.flatMap { d -> CopyProgressInfoPublisher in
rootFilePath = d.current

return sourceTranslator
return sourceTranslator!
.flatMap {
$0.cloneWalkTo(self.command.source.filePath)
}
Expand Down Expand Up @@ -267,7 +266,8 @@ public class BlinkCopy: NSObject {
print("Copy failed. \(error)", to: &self.stderr)
rc = -1
}
awake(runLoop: self.currentRunLoop)

self.kill()
}, receiveValue: { progress in //(file, size, written) in
// ProgressReport object, which we can use here or at the Dashboard.
if currentFile != progress.name {
Expand Down Expand Up @@ -306,11 +306,13 @@ public class BlinkCopy: NSObject {
}
})

awaitRunLoop(currentRunLoop)

// Make another run on the loop to close extra stuff in blocks.
// Run everything in its own loop...
CFRunLoopRunInMode(.defaultMode, TimeInterval(INT_MAX), false)

// ...and because of that, make another run after cleanup to let hanging self-loops close.
sourceTranslator = nil
destTranslator = nil
RunLoop.current.run(until: Date(timeIntervalSinceNow: 0.5))

return rc
}

Expand Down Expand Up @@ -350,8 +352,8 @@ public class BlinkCopy: NSObject {

// Make signals objc funcs so we can duck type them.
@objc func kill() {
copyCancellable?.cancel()

awake(runLoop: currentRunLoop)
print("\r\nOperation cancelled", to: &self.stderr)
copyCancellable = nil
CFRunLoopStop(self.currentRunLoop.getCFRunLoop())
}
}
13 changes: 0 additions & 13 deletions Blink/Commands/ssh/Helpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,3 @@ func tty() -> TermDevice {
let session = Unmanaged<MCPSession>.fromOpaque(thread_context).takeUnretainedValue()
return session.device
}

func awaitRunLoop(_ runLoop: RunLoop) {
let timer = Timer(timeInterval: TimeInterval(INT_MAX), repeats: true) { _ in
print("timer")
}
runLoop.add(timer, forMode: .default)
CFRunLoopRun()
}

func awake(runLoop: RunLoop) {
let cfRunLoop = runLoop.getCFRunLoop()
CFRunLoopStop(cfRunLoop)
}
62 changes: 30 additions & 32 deletions Blink/Commands/ssh/SSHPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,47 +55,48 @@ class SSHPool {
// For now we will not allow that situation.
return shared.startConnection(host, with: config, proxy: proxy, exposeSocket: false)
}
guard let ctrl = shared.control(for: host, with: config) else {
return shared.startConnection(host, with: config, proxy: proxy)
if let ctrl = shared.control(for: host, with: config) {
if let conn = ctrl.connection, conn.isConnected {
return .just(conn)
} else {
shared.removeControl(ctrl)
}
}

guard let conn = ctrl.connection, conn.isConnected else {
shared.removeControl(ctrl)
return shared.startConnection(host, with: config, proxy: proxy)
}

return .just(conn)
return shared.startConnection(host, with: config, proxy: proxy)
}

private func startConnection(_ host: String, with config: SSHClientConfig,
proxy: SSH.SSHClient.ExecProxyCommandCallback? = nil,
exposeSocket exposed: Bool = true) -> AnyPublisher<SSH.SSHClient, Error> {
let pb = PassthroughSubject<SSH.SSHClient, Error>()
var cancel: AnyCancellable?
var dial: AnyCancellable?
var runLoop: RunLoop!

let t = Thread {
runLoop = RunLoop.current

cancel = SSH.SSHClient.dial(host, with: config, withProxy: proxy)
.sink(receiveCompletion: { pb.send(completion: $0) },
receiveValue: { conn in
let control = SSHClientControl(for: conn, on: host, with: config, running: runLoop, exposed: exposed)
SSHPool.shared.controls.append(control)
pb.send(conn)
})

awaitRunLoop(runLoop)
// Make another run on the loop to close extra stuff in blocks.
RunLoop.current.run(until: Date(timeIntervalSinceNow: 0.5))
dial = SSH.SSHClient.dial(host, with: config, withProxy: proxy)
//.print("SSHClient Pool")
.sink(
receiveCompletion: { completion in
pb.send(completion: completion)
},
receiveValue: { conn in
let control = SSHClientControl(for: conn, on: host, with: config, running: runLoop, exposed: exposed)
SSHPool.shared.controls.append(control)
pb.send(conn)
})

SSH.SSHClient.run()
print("Pool Thread out")
}

t.start()

return pb.buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)
.handleEvents(receiveCancel: {
cancel?.cancel()
dial = nil
}).eraseToAnyPublisher()
}

Expand All @@ -116,16 +117,7 @@ class SSHPool {
print("\(control.localTunnels)")
print("\(control.remoteTunnels)")
if control.numChannels == 0 {
// For now, we just stop the connection as is
// We could use a delegate just to notify when a connection is dead, and the control could
// take care of figuring out when the connection it contains must go.
awake(runLoop: control.runLoop)
let idx = controls.firstIndex { $0 === control }!

// Removing references to connection to deinit.
// We could also handle the pool with references to the connection.
// But the shell or time based persistance may become more difficult.
controls.remove(at: idx)
self.removeControl(control)
}
}
}
Expand Down Expand Up @@ -256,12 +248,18 @@ extension SSHPool {
}

private func removeControl(_ control: SSHClientControl) {
awake(runLoop: control.runLoop)
// For now, we just stop the connection as is
// We could use a delegate just to notify when a connection is dead, and the control could
// take care of figuring out when the connection it contains must go.
guard
let idx = controls.firstIndex(where: { $0 === control })
else {
return
}

// Removing references to connection to deinit.
// We could also handle the pool with references to the connection.
// But the shell or time based persistance may become more difficult.
controls.remove(at: idx)
}
}
Expand Down
55 changes: 34 additions & 21 deletions Blink/Commands/ssh/ssh.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public func blink_ssh_main(argc: Int32, argv: Argv) -> Int32 {
setvbuf(thread_stdin, nil, _IONBF, 0)
setvbuf(thread_stdout, nil, _IONBF, 0)
setvbuf(thread_stderr, nil, _IONBF, 0)

let session = Unmanaged<MCPSession>.fromOpaque(thread_context).takeUnretainedValue()
let cmd = BlinkSSH(mcp: session)
return cmd.start(argc, argv: argv.args(count: argc))
Expand All @@ -60,7 +60,7 @@ public func blink_ssh_main(argc: Int32, argv: Argv) -> Int32 {
private var _mcp: MCPSession;

var exitCode: Int32 = 0
var cancellableBag: Set<AnyCancellable> = []
var connectionCancellable: AnyCancellable?
let currentRunLoop = RunLoop.current
var command: SSHCommand?
var stream: SSH.Stream?
Expand All @@ -69,13 +69,15 @@ public func blink_ssh_main(argc: Int32, argv: Argv) -> Int32 {
var remoteTunnels: [PortForwardInfo] = []
var proxyThread: Thread?
var socks: [OptionalBindAddressInfo] = []
var timer: Timer?

var outStream: DispatchOutputStream?
var inStream: DispatchInputStream?
var errStream: DispatchOutputStream?

init(mcp: MCPSession) {
_mcp = mcp;
// Owed by ios_system, so beware to dup before using.
self.outstream = fileno(thread_stdout)
self.instream = fileno(thread_stdin)
self.errstream = fileno(thread_stderr)
Expand Down Expand Up @@ -176,7 +178,7 @@ public func blink_ssh_main(argc: Int32, argv: Argv) -> Int32 {
}
})

connect.flatMap { conn -> SSHConnection in
connectionCancellable = connect.flatMap { conn -> SSHConnection in
self.connection = conn

if let banner = conn.issueBanner,
Expand Down Expand Up @@ -232,25 +234,18 @@ public func blink_ssh_main(argc: Int32, argv: Argv) -> Int32 {
self.kill()
}
})
.store(in: &cancellableBag)

awaitRunLoop(currentRunLoop)
awaitRunLoop()

stream?.cancel()
outStream?.close()
inStream?.close()
errStream?.close()
// Dispatch streams need a cycle to close.
RunLoop.current.run(until: Date(timeIntervalSinceNow: 0.1))

// Need to get rid of the stream because the channel needs a cycle to be closed.
self.stream = nil

if let conn = self.connection, cmd.blocks {
if cmd.startsSession { SSHPool.deregister(shellOn: conn) }
forwardTunnels.forEach { SSHPool.deregister(localForward: $0, on: conn) }
remoteTunnels.forEach { SSHPool.deregister(remoteForward: $0, on: conn) }
socks.forEach { SSHPool.deregister(socksBindAddress: $0, on: conn) }
} else {
connectionCancellable = nil
}

return exitCode
Expand Down Expand Up @@ -307,16 +302,19 @@ public func blink_ssh_main(argc: Int32, argv: Argv) -> Int32 {
}

return session.tryMap { s in
let outs = DispatchOutputStream(stream: self.outstream)
let ins = DispatchInputStream(stream: self.instream)
let errs = DispatchOutputStream(stream: self.errstream)
let outs = DispatchOutputStream(stream: dup(self.outstream))
let ins = DispatchInputStream(stream: dup(self.instream))
let errs = DispatchOutputStream(stream: dup(self.errstream))

s.handleCompletion = {
s.handleCompletion = { [weak self] in
// Once finished, exit.
self.kill()
self?.kill()
return
}
s.handleFailure = { error in
s.handleFailure = { [weak self] error in
guard let self = self else {
return
}
self.exitCode = -1
print("Interactive Shell error. \(error)", to: &self.stderr)
self.kill()
Expand Down Expand Up @@ -498,9 +496,24 @@ public func blink_ssh_main(argc: Int32, argv: Argv) -> Int32 {
// Cancelling here makes sure the flows are cancelled.
// Trying to do it at the runloop has the issue that flows may continue running.
print("Kill received")
cancellableBag = []
connectionCancellable = nil

awake()
}

func awaitRunLoop() {
let timer = Timer(timeInterval: TimeInterval(INT_MAX), repeats: true) { _ in
print("timer")
}
self.timer = timer
self.currentRunLoop.add(timer, forMode: .default)
CFRunLoopRun()
}

awake(runLoop: currentRunLoop)
func awake() {
let cfRunLoop = self.currentRunLoop.getCFRunLoop()
self.timer?.invalidate()
CFRunLoopStop(cfRunLoop)
}

deinit {
Expand Down
1 change: 0 additions & 1 deletion SSH/Publishers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ extension Publisher {
do {
return try operation(p)
} catch SSHError.again {
// Run the other mode?
RunLoop.current.run(mode: libSSHBlockMode, before: Date(timeIntervalSinceNow: 0.5))
//CFRunLoopRunInMode(libSSHBlockMode, 0.5, true)
Swift.print("Retrying..")
Expand Down
6 changes: 5 additions & 1 deletion SSH/SFTP.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public class SFTPClient {
}

deinit {
print("SFTP Out!!")
self.client.closeSFTP(sftp)
print("SFTP Out!!")
}
}

Expand Down Expand Up @@ -483,6 +483,10 @@ public class SFTPFile : BlinkFiles.File {
return true
}.eraseToAnyPublisher()
}

deinit {
print("SFTP file out")
}
}

extension SFTPFile: BlinkFiles.Reader, BlinkFiles.WriterTo {
Expand Down
Loading

0 comments on commit 14fc447

Please sign in to comment.