Skip to content

Commit

Permalink
allow observing a queue request
Browse files Browse the repository at this point in the history
  • Loading branch information
ralfebert committed Dec 4, 2020
1 parent 5c3a163 commit 0e69cec
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// MIT License
//
// Copyright (c) 2020 Ralf Ebert
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

import Endpoint
import Foundation
import os

public extension PersistentURLRequestQueue {

func add<A>(endpoint: Endpoint<A>, completion: ((A) -> Void)?) {
self.add(endpoint.request) { data, response in
DispatchQueue.main.async {
do {
completion?(try endpoint.handleResponse(data: data, response: response))
} catch {
os_log("Error handling request: %s", type: .error, String(describing: error))
}
}
}
}

}
43 changes: 31 additions & 12 deletions Sources/PersistentURLRequestQueue/PersistentURLRequestQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,25 @@ import Reachability

public class PersistentURLRequestQueue {

let name: String
let urlSession: URLSession
let queue: OperationQueue = {
internal let name: String
internal let urlSession: URLSession
internal let queue: OperationQueue = {
let queue = OperationQueue()
queue.qualityOfService = .userInitiated
queue.maxConcurrentOperationCount = 1
queue.name = "PersistentQueue"
return queue
}()

let log: OSLog
var clock = { Date() }
var subscriptions = Set<AnyCancellable>()
var retryTimeInterval: TimeInterval
var scheduleTimers: Bool = true
private var persistentContainer: NSPersistentContainer
internal let log: OSLog
internal var clock = { Date() }
internal var subscriptions = Set<AnyCancellable>()
internal var retryTimeInterval: TimeInterval
internal var scheduleTimers: Bool = true
internal var completionHandlers = [NSManagedObjectID: RequestCompletionHandler]()
internal var persistentContainer: NSPersistentContainer

public typealias RequestCompletionHandler = (_ data: Data, _ response: URLResponse) -> Void

public init(name: String, urlSession: URLSession = .shared, retryTimeInterval: TimeInterval = 30, connectionStatus: AnyPublisher<Reachability.Connection, Never>) {
self.name = name
Expand Down Expand Up @@ -127,13 +130,15 @@ public class PersistentURLRequestQueue {
return result
}

public func add(_ request: URLRequest) {
public func add(_ request: URLRequest, completion: RequestCompletionHandler? = nil) {
assert(Thread.isMainThread)
let operation = BlockOperation {
guard let encodedValue = self.withErrorHandling({ try self.encode(request) }) else { return }
let entry = QueueEntry.create(context: self.managedObjectContext)
entry.request = encodedValue
entry.date = self.clock()
self.save()
self.completionHandlers[entry.objectID] = completion
os_log("%@ added to queue", log: self.log, type: .info, self.infoString(request: request))
self.startProcessing()
}
Expand Down Expand Up @@ -201,13 +206,27 @@ public class PersistentURLRequestQueue {
let request = try self.decode(item.request)
os_log("Processing %s", log: self.log, type: .info, self.infoString(request: request))

let endpoint = Endpoint(request: request, urlSession: self.urlSession)
let endpoint = Endpoint<(Data, URLResponse)>(
request: request,
urlSession: self.urlSession,
validate: EndpointExpectation.expectSuccess,
parse: { data, response in
guard let data = data else {
throw NoDataError()
}
return (data, response)
}
)
self.processing = true
endpoint.load { result in
os_log("Processed %s: %s", log: self.log, type: .info, self.infoString(request: request), String(describing: result))

self.queue.addOperation {
switch result {
case .success():
case let .success((data, response)):
if let completionHandler = self.completionHandlers.removeValue(forKey: item.objectID) {
completionHandler(data, response)
}
self.managedObjectContext.delete(item)
case .failure:
item.pausedUntil = self.clock().addingTimeInterval(self.retryTimeInterval)
Expand Down

0 comments on commit 0e69cec

Please sign in to comment.