diff --git a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift index 39e12439..d6fec657 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift @@ -38,10 +38,14 @@ extension Lambda { /// `ByteBufferAllocator` to allocate `ByteBuffer` public let allocator: ByteBufferAllocator - init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) { + /// `Terminator` to register shutdown operations + public let terminator: LambdaTerminator + + init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: LambdaTerminator) { self.eventLoop = eventLoop self.logger = logger self.allocator = allocator + self.terminator = terminator } /// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning. @@ -52,7 +56,8 @@ extension Lambda { InitializationContext( logger: logger, eventLoop: eventLoop, - allocator: ByteBufferAllocator() + allocator: ByteBufferAllocator(), + terminator: LambdaTerminator() ) } } @@ -205,27 +210,3 @@ public struct LambdaContext: CustomDebugStringConvertible { ) } } - -// MARK: - ShutdownContext - -extension Lambda { - /// Lambda runtime shutdown context. - /// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument. - public final class ShutdownContext { - /// `Logger` to log with - /// - /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. - public let logger: Logger - - /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. - /// - /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. - /// Most importantly the `EventLoop` must never be blocked. - public let eventLoop: EventLoop - - internal init(logger: Logger, eventLoop: EventLoop) { - self.eventLoop = eventLoop - self.logger = logger - } - } -} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift index 3c2697ff..76d35af2 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift @@ -176,19 +176,6 @@ public protocol ByteBufferLambdaHandler { /// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine. /// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error` func handle(_ event: ByteBuffer, context: LambdaContext) -> EventLoopFuture - - /// Clean up the Lambda resources asynchronously. - /// Concrete Lambda handlers implement this method to shutdown resources like `HTTPClient`s and database connections. - /// - /// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method - /// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`. - func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture -} - -extension ByteBufferLambdaHandler { - public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { - context.eventLoop.makeSucceededFuture(()) - } } extension ByteBufferLambdaHandler { diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 38499a05..8cb7fbe9 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -34,13 +34,16 @@ extension Lambda { /// Run the user provided initializer. This *must* only be called once. /// /// - Returns: An `EventLoopFuture` fulfilled with the outcome of the initialization. - func initialize(logger: Logger, handlerType: Handler.Type) -> EventLoopFuture { + func initialize(logger: Logger, terminator: LambdaTerminator, handlerType: Handler.Type) -> EventLoopFuture { logger.debug("initializing lambda") // 1. create the handler from the factory - // 2. report initialization error if one occured - let context = InitializationContext(logger: logger, - eventLoop: self.eventLoop, - allocator: self.allocator) + // 2. report initialization error if one occurred + let context = InitializationContext( + logger: logger, + eventLoop: self.eventLoop, + allocator: self.allocator, + terminator: terminator + ) return Handler.makeHandler(context: context) // Hopping back to "our" EventLoop is important in case the factory returns a future // that originated from a foreign EventLoop/EventLoopGroup. diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift index 46e73d1b..0619dfa1 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift @@ -74,23 +74,22 @@ public final class LambdaRuntime { var logger = self.logger logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id) + let terminator = LambdaTerminator() let runner = Lambda.Runner(eventLoop: self.eventLoop, configuration: self.configuration) - let startupFuture = runner.initialize(logger: logger, handlerType: Handler.self) - startupFuture.flatMap { handler -> EventLoopFuture<(Handler, Result)> in + let startupFuture = runner.initialize(logger: logger, terminator: terminator, handlerType: Handler.self) + startupFuture.flatMap { handler -> EventLoopFuture> in // after the startup future has succeeded, we have a handler that we can use // to `run` the lambda. let finishedPromise = self.eventLoop.makePromise(of: Int.self) self.state = .active(runner, handler) self.run(promise: finishedPromise) - return finishedPromise.futureResult.mapResult { (handler, $0) } - } - .flatMap { handler, runnerResult -> EventLoopFuture in + return finishedPromise.futureResult.mapResult { $0 } + }.flatMap { runnerResult -> EventLoopFuture in // after the lambda finishPromise has succeeded or failed we need to // shutdown the handler - let shutdownContext = Lambda.ShutdownContext(logger: logger, eventLoop: self.eventLoop) - return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in - // if, we had an error shuting down the lambda, we want to concatenate it with + terminator.terminate(eventLoop: self.eventLoop).flatMapErrorThrowing { error in + // if, we had an error shutting down the handler, we want to concatenate it with // the runner result logger.error("Error shutting down handler: \(error)") throw Lambda.RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult) diff --git a/Sources/AWSLambdaRuntimeCore/Terminator.swift b/Sources/AWSLambdaRuntimeCore/Terminator.swift new file mode 100644 index 00000000..9ad62d3a --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/Terminator.swift @@ -0,0 +1,139 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOConcurrencyHelpers +import NIOCore + +/// Lambda terminator. +/// Utility to manage the lambda shutdown sequence. +public final class LambdaTerminator { + private typealias Handler = (EventLoop) -> EventLoopFuture + + private var storage: Storage + + init() { + self.storage = Storage() + } + + /// Register a shutdown handler with the terminator + /// + /// - parameters: + /// - name: Display name for logging purposes + /// - handler: The shutdown handler to call when terminating the Lambda. + /// Shutdown handlers are called in the reverse order of being registered. + /// + /// - Returns: A `RegistrationKey` that can be used to de-register the handler when its no longer needed. + @discardableResult + public func register(name: String, handler: @escaping (EventLoop) -> EventLoopFuture) -> RegistrationKey { + let key = RegistrationKey() + self.storage.add(key: key, name: name, handler: handler) + return key + } + + /// De-register a shutdown handler with the terminator + /// + /// - parameters: + /// - key: A `RegistrationKey` obtained from calling the register API. + public func deregister(_ key: RegistrationKey) { + self.storage.remove(key) + } + + /// Begin the termination cycle + /// Shutdown handlers are called in the reverse order of being registered. + /// + /// - parameters: + /// - eventLoop: The `EventLoop` to run the termination on. + /// + /// - Returns: An `EventLoopFuture` with the result of the termination cycle. + internal func terminate(eventLoop: EventLoop) -> EventLoopFuture { + func terminate(_ iterator: IndexingIterator<[(name: String, handler: Handler)]>, errors: [Error], promise: EventLoopPromise) { + var iterator = iterator + guard let handler = iterator.next()?.handler else { + if errors.isEmpty { + return promise.succeed(()) + } else { + return promise.fail(TerminationError(underlying: errors)) + } + } + handler(eventLoop).whenComplete { result in + var errors = errors + if case .failure(let error) = result { + errors.append(error) + } + return terminate(iterator, errors: errors, promise: promise) + } + } + + // terminate in cascading, reverse order + let promise = eventLoop.makePromise(of: Void.self) + terminate(self.storage.handlers.reversed().makeIterator(), errors: [], promise: promise) + return promise.futureResult + } +} + +extension LambdaTerminator { + /// Lambda terminator registration key. + public struct RegistrationKey: Hashable, CustomStringConvertible { + var value: String + + init() { + // UUID basically + self.value = LambdaRequestID().uuidString + } + + public var description: String { + self.value + } + } +} + +extension LambdaTerminator { + private final class Storage { + private let lock: Lock + private var index: [RegistrationKey] + private var map: [RegistrationKey: (name: String, handler: Handler)] + + init() { + self.lock = .init() + self.index = [] + self.map = [:] + } + + func add(key: RegistrationKey, name: String, handler: @escaping Handler) { + self.lock.withLock { + self.index.append(key) + self.map[key] = (name: name, handler: handler) + } + } + + func remove(_ key: RegistrationKey) { + self.lock.withLock { + self.index = self.index.filter { $0 != key } + self.map[key] = nil + } + } + + var handlers: [(name: String, handler: Handler)] { + self.lock.withLock { + self.index.compactMap { self.map[$0] } + } + } + } +} + +extension LambdaTerminator { + struct TerminationError: Error { + let underlying: [Error] + } +} diff --git a/Sources/AWSLambdaTesting/Lambda+Testing.swift b/Sources/AWSLambdaTesting/Lambda+Testing.swift index f514f38f..827bf3b9 100644 --- a/Sources/AWSLambdaTesting/Lambda+Testing.swift +++ b/Sources/AWSLambdaTesting/Lambda+Testing.swift @@ -51,8 +51,7 @@ extension Lambda { public init(requestID: String = "\(DispatchTime.now().uptimeNanoseconds)", traceID: String = "Root=\(DispatchTime.now().uptimeNanoseconds);Parent=\(DispatchTime.now().uptimeNanoseconds);Sampled=1", invokedFunctionARN: String = "arn:aws:lambda:us-west-1:\(DispatchTime.now().uptimeNanoseconds):function:custom-runtime", - timeout: DispatchTimeInterval = .seconds(5)) - { + timeout: DispatchTimeInterval = .seconds(5)) { self.requestID = requestID self.traceID = traceID self.invokedFunctionARN = invokedFunctionARN diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeTest.swift index 213f628a..fba1dde7 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeTest.swift @@ -66,23 +66,37 @@ class LambdaRuntimeTest: XCTestCase { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } - struct ShutdownError: Error {} + struct ShutdownError: Error { + let description: String + } struct ShutdownErrorHandler: EventLoopLambdaHandler { typealias Event = String typealias Output = Void static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture { - context.eventLoop.makeSucceededFuture(ShutdownErrorHandler()) + // register shutdown operation + context.terminator.register(name: "test 1", handler: { eventLoop in + eventLoop.makeFailedFuture(ShutdownError(description: "error 1")) + }) + context.terminator.register(name: "test 2", handler: { eventLoop in + eventLoop.makeSucceededVoidFuture() + }) + context.terminator.register(name: "test 3", handler: { eventLoop in + eventLoop.makeFailedFuture(ShutdownError(description: "error 2")) + }) + context.terminator.register(name: "test 4", handler: { eventLoop in + eventLoop.makeSucceededVoidFuture() + }) + context.terminator.register(name: "test 5", handler: { eventLoop in + eventLoop.makeFailedFuture(ShutdownError(description: "error 3")) + }) + return context.eventLoop.makeSucceededFuture(ShutdownErrorHandler()) } func handle(_ event: String, context: LambdaContext) -> EventLoopFuture { context.eventLoop.makeSucceededVoidFuture() } - - func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { - context.eventLoop.makeFailedFuture(ShutdownError()) - } } let eventLoop = eventLoopGroup.next() @@ -95,7 +109,11 @@ class LambdaRuntimeTest: XCTestCase { XCTFail("Unexpected error: \(error)"); return } - XCTAssert(shutdownError is ShutdownError) + XCTAssertEqual(shutdownError as? LambdaTerminator.TerminationError, LambdaTerminator.TerminationError(underlying: [ + ShutdownError(description: "error 3"), + ShutdownError(description: "error 2"), + ShutdownError(description: "error 1"), + ])) XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError)) } } diff --git a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift index 96b1a1c8..5d57a555 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift @@ -23,10 +23,11 @@ func runLambda(behavior: LambdaServerBehavior, defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } let logger = Logger(label: "TestLogger") let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100))) + let terminator = LambdaTerminator() let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration) let server = try MockLambdaServer(behavior: behavior).start().wait() defer { XCTAssertNoThrow(try server.stop().wait()) } - try runner.initialize(logger: logger, handlerType: handlerType).flatMap { handler in + try runner.initialize(logger: logger, terminator: terminator, handlerType: handlerType).flatMap { handler in runner.run(logger: logger, handler: handler) }.wait() } @@ -66,3 +67,13 @@ extension Lambda.RuntimeError: Equatable { String(describing: lhs) == String(describing: rhs) } } + +extension LambdaTerminator.TerminationError: Equatable { + public static func == (lhs: Self, rhs: Self) -> Bool { + guard lhs.underlying.count == rhs.underlying.count else { + return false + } + // technically incorrect, but good enough for our tests + return String(describing: lhs) == String(describing: rhs) + } +} diff --git a/Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift b/Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift index c11cf005..b0f2152d 100644 --- a/Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift +++ b/Tests/AWSLambdaRuntimeTests/Lambda+CodableTest.swift @@ -172,7 +172,8 @@ class CodableLambdaTest: XCTestCase { Lambda.InitializationContext( logger: Logger(label: "test"), eventLoop: self.eventLoopGroup.next(), - allocator: ByteBufferAllocator() + allocator: ByteBufferAllocator(), + terminator: LambdaTerminator() ) } }