diff --git a/cocore/co_csp.m b/cocore/co_csp.m index 56eed18..e780254 100644 --- a/cocore/co_csp.m +++ b/cocore/co_csp.m @@ -26,6 +26,8 @@ #pragma mark - queue +static void chancancelallalt(co_channel *c); + static void amove(void *dst, void *src, uint n) { if(dst){ if(src == nil) { @@ -198,6 +200,7 @@ void chanfree(co_channel *c) { if(c == nil) { return; } + chancancelallalt(c); if (c->buffer.expandsize) { free(c->buffer.arr); } @@ -363,6 +366,75 @@ static int altexec(chan_alt *a) { } } +static void chancancelallalt(co_channel *c) { + + if (!c) { + return; + } + + chan_alt **remainList = NULL; + size_t remainCount = 0; + + chanlock(c); + + remainCount = c->asend.count + c->arecv.count; + if (remainCount > 0) { + remainList = calloc(remainCount, sizeof(chan_alt *)); + + chan_alt **ptr = remainList; + + alt_queue *sending = &c->asend; + if (sending->count) { + + for (int i = 0; i < sending->count; i++) { + altqueuepop(sending, ptr); + ptr++; + } + } + alt_queue *recv = &c->arecv; + if (recv->count) { + + for (int i = 0; i < recv->count; i++) { + altqueuepop(recv, ptr); + ptr++; + } + } + } + + chanunlock(c); + + if (remainCount > 0) { + for (int i = 0; i < remainCount; i++) { + chan_alt *a = remainList[i]; + if (!a) { + continue; + } + // custom cancel + if (a->cancel_exec) { + a->cancel_exec(); + } + + // resume the task. + coroutine_t *co = a->task; + void (*custom_resume)(coroutine_t *co) = c->custom_resume; + + if (custom_resume) { + custom_resume(co); + } else { + coroutine_add(co); + } + } + + } + + if (remainList) { + free(remainList); + } + +} + + + void altcancel(chan_alt *a) { if (!a) { return; @@ -379,6 +451,7 @@ void altcancel(chan_alt *a) { if(altqueue && altqueue->count){ altqueueremove(altqueue, a); + chanunlock(c); a->is_cancelled = true; // custom cancel @@ -389,7 +462,6 @@ void altcancel(chan_alt *a) { // resume the task. coroutine_t *co = a->task; void (*custom_resume)(coroutine_t *co) = c->custom_resume; - chanunlock(c); if (custom_resume) { custom_resume(co); diff --git a/coobjc.xcodeproj/project.pbxproj b/coobjc.xcodeproj/project.pbxproj index 6644e0d..bb17c5b 100644 --- a/coobjc.xcodeproj/project.pbxproj +++ b/coobjc.xcodeproj/project.pbxproj @@ -461,6 +461,7 @@ developmentRegion = English; hasScannedForEncodings = 0; knownRegions = ( + English, en, Base, ); diff --git a/coobjc/co/COChan.h b/coobjc/co/COChan.h index 2607e57..66d4191 100644 --- a/coobjc/co/COChan.h +++ b/coobjc/co/COChan.h @@ -68,6 +68,8 @@ typedef void (^COChanOnCancelBlock)(COChan *chan); */ - (void)send:(Value _Nullable )val; +- (void)send:(Value _Nullable )val onCancel:(COChanOnCancelBlock _Nullable)cancelBlock; + /** Receive a value from the Channel, blocking. @@ -77,6 +79,8 @@ typedef void (^COChanOnCancelBlock)(COChan *chan); */ - (Value _Nullable )receive; +- (Value _Nullable )receiveWithOnCancel:(COChanOnCancelBlock _Nullable)cancelBlock;; + /** Send a value to the Channel, non blocking. @@ -132,14 +136,14 @@ typedef void (^COChanOnCancelBlock)(COChan *chan); */ - (void)cancelForCoroutine:(COCoroutine *)co; -/** +/** @deprecated This method is not safe. use `send:onCancel:` or `receiveWithOnCancel:` Set a callback block when the Channel cancel current blocking operation(send: or receive:) Must set before `send:` or `receive:`, or it will not work. @param onCancelBlock the cancel callback block. */ -- (void)onCancel:(COChanOnCancelBlock _Nullable )onCancelBlock; +// - (void)onCancel:(COChanOnCancelBlock _Nullable )onCancelBlock; @end diff --git a/coobjc/co/COChan.m b/coobjc/co/COChan.m index 3bcefd1..95831b0 100644 --- a/coobjc/co/COChan.m +++ b/coobjc/co/COChan.m @@ -36,7 +36,6 @@ @interface COChan() } @property(nonatomic, assign) int count; -@property(nonatomic, strong) NSMapTable *cancelBlocksByCo; @property(nonatomic, strong) NSMutableArray *buffList; @end @@ -76,6 +75,10 @@ - (instancetype)initWithBuffCount:(int32_t)buffCount { } - (void)send:(id)val { + [self send:val onCancel:NULL]; +} + +- (void)send:(id)val onCancel:(COChanOnCancelBlock)cancelBlock { // send may blocking current process, so must check in a coroutine. COCoroutine *co = [COCoroutine currentCoroutine]; @@ -91,7 +94,6 @@ - (void)send:(id)val { }); IMP cancel_exec = NULL; - COChanOnCancelBlock cancelBlock = [self popCancelBlockForCo:co]; if (cancelBlock) { cancel_exec = imp_implementationWithBlock(^{ cancelBlock(self); @@ -108,6 +110,10 @@ - (void)send:(id)val { } - (id)receive { + return [self receiveWithOnCancel:NULL]; +} + +- (id)receiveWithOnCancel:(COChanOnCancelBlock)cancelBlock { COCoroutine *co = [COCoroutine currentCoroutine]; if (!co) { @@ -117,7 +123,6 @@ - (id)receive { co.currentChan = self; IMP cancel_exec = NULL; - COChanOnCancelBlock cancelBlock = [self popCancelBlockForCo:co]; if (cancelBlock) { cancel_exec = imp_implementationWithBlock(^{ cancelBlock(self); @@ -224,38 +229,14 @@ - (void)cancelForCoroutine:(COCoroutine *)co { chan_cancel_alt_in_co(co.co); } -- (NSMapTable *)cancelBlocksByCo { - if (!_cancelBlocksByCo) { - _cancelBlocksByCo = [NSMapTable mapTableWithKeyOptions:NSMapTableWeakMemory valueOptions:NSMapTableStrongMemory]; - } - return _cancelBlocksByCo; -} +@end -- (void)onCancel:(COChanOnCancelBlock)onCancelBlock { - if (!onCancelBlock) { - return; - } - COCoroutine *co = [COCoroutine currentCoroutine]; - if (!co) { - @throw [NSException exceptionWithName:COInvalidException reason:@"onCancel must called in a routine" userInfo:@{}]; - } - COOBJC_SCOPELOCK(_buffLock); - [self.cancelBlocksByCo setObject:[onCancelBlock copy] forKey:co]; -} +@interface COTimeChan() -- (COChanOnCancelBlock)popCancelBlockForCo:(COCoroutine *)co { - COOBJC_SCOPELOCK(_buffLock); - COChanOnCancelBlock block = [self.cancelBlocksByCo objectForKey:co]; - if (block) { - [self.cancelBlocksByCo removeObjectForKey:co]; - return block; - } - return nil; -} +@property (nonatomic, strong) CODispatchTimer *timer; @end - @implementation COTimeChan { BOOL _isDone; @@ -274,19 +255,19 @@ - (void)send_nonblock:(id)val { [super send_nonblock:val]; } +- (id)receive { + return [self receiveWithOnCancel:^(COChan * _Nonnull chan) { + [[(COTimeChan *)chan timer] invalidate]; + }]; +} + + (instancetype)sleep:(NSTimeInterval)duration { COTimeChan *chan = [self chanWithDuration:duration]; - CODispatchTimer *timer = [[CODispatch currentDispatch] dispatch_timer:^{ + chan.timer = [[CODispatch currentDispatch] dispatch_timer:^{ [chan send_nonblock:@1]; } interval:duration]; - [chan onCancel:^(COChan * _Nonnull chan) { - //dispatch_source_cancel(timer); - [timer invalidate]; - }]; - - return chan; } diff --git a/coobjc/co/COCoroutine.h b/coobjc/co/COCoroutine.h index a5924b7..ff8b75b 100644 --- a/coobjc/co/COCoroutine.h +++ b/coobjc/co/COCoroutine.h @@ -144,7 +144,7 @@ extern NSString *const COInvalidException; @param stackSize : stackSize of the coroutine. @return The coroutine object. */ -- (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t)queue stackSize:(NSUInteger)stackSize; +- (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t _Nullable)queue stackSize:(NSUInteger)stackSize; /** The coroutine is Finished. diff --git a/coobjc/co/COCoroutine.m b/coobjc/co/COCoroutine.m index c74d023..e9c6647 100644 --- a/coobjc/co/COCoroutine.m +++ b/coobjc/co/COCoroutine.m @@ -20,6 +20,7 @@ #import "COChan.h" #import "coroutine.h" #import "co_queue.h" +#import "coobjc.h" NSString *const COInvalidException = @"COInvalidException"; @@ -150,7 +151,6 @@ - (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t)qu if (self) { _execBlock = [block copy]; _dispatch = queue ? [CODispatch dispatchWithQueue:queue] : [CODispatch currentDispatch]; - //_queue = queue ?: co_get_current_queue(); coroutine_t *co = coroutine_create((void (*)(void *))co_exec); if (stackSize > 0 && stackSize < 1024*1024) { // Max 1M @@ -322,12 +322,10 @@ id co_await(id awaitable) { co.lastError = error; [chan send_nonblock:nil]; }]; - - [chan onCancel:^(COChan * _Nonnull chan) { + + id val = [chan receiveWithOnCancel:^(COChan * _Nonnull chan) { [promise cancel]; }]; - - id val = [chan receive]; return val; } else { @@ -349,61 +347,40 @@ id co_await(id awaitable) { return nil; } - NSMutableArray *resultAwaitable = [[NSMutableArray alloc] initWithCapacity:awaitableList.count]; - - for (id awaitable in awaitableList) { - - if ([awaitable isKindOfClass:[COChan class]]) { - - [resultAwaitable addObject:awaitable]; - - } else if ([awaitable isKindOfClass:[COPromise class]]) { - - COChan *chan = [COChan chanWithBuffCount:1]; - COCoroutine *co = co_get_obj(t); - - COPromise *promise = awaitable; - [[promise - then:^id _Nullable(id _Nullable value) { - - [chan send_nonblock:value]; - return value; - }] - catch:^(NSError * _Nonnull error) { - co.lastError = error; - [chan send_nonblock:error]; - }]; - - [chan onCancel:^(COChan * _Nonnull chan) { - [promise cancel]; - }]; - - [resultAwaitable addObject:chan]; - - } else { - @throw [NSException exceptionWithName:COInvalidException - reason:[NSString stringWithFormat:@"Cannot await object: %@.", awaitable] - userInfo:nil]; - } - - + uint32_t count = (uint32_t)awaitableList.count; + + if (count == 0) { + return nil; } - NSMutableArray *result = [[NSMutableArray alloc] initWithCapacity:awaitableList.count]; - COCoroutine *currentCo = [COCoroutine currentCoroutine]; - for (COChan *chan in resultAwaitable) { - if ([currentCo isCancelled]) { - [result addObject:[NSNull null]]; - } else { - id val = co_await(chan); - if ([currentCo isCancelled]) { - [result addObject:[NSNull null]]; - } else { - [result addObject:val ? val : [NSNull null]]; - } - } + NSMutableArray *result = [[NSMutableArray alloc] initWithCapacity:count]; + + COChan *chan = [COChan chanWithBuffCount:count]; + + for (int i = 0; i < count; i++) { + + [result addObject:[NSNull null]]; + id awaitable = awaitableList[i]; + // start subroutines + co_launch(^{ + + id val = co_await(awaitable); + if (!co_isCancelled()) { + if (val) { + [result replaceObjectAtIndex:i withObject:val]; + } else { + NSError *error = co_getError(); + if (error) { + [result replaceObjectAtIndex:i withObject:error]; + } + } + } + [chan send_nonblock:@(i)]; + }); } + + [chan receiveWithCount:count]; return result.copy; } diff --git a/coswift/Chan.swift b/coswift/Chan.swift index 5555eb1..cb03b96 100644 --- a/coswift/Chan.swift +++ b/coswift/Chan.swift @@ -25,25 +25,52 @@ private let co_chan_custom_resume: @convention(c) (UnsafeMutablePointer { + public var onCancel: (Chan) -> Void + init(blk: @escaping (Chan) -> Void) { + onCancel = blk + } +} /// Define the Channel public class Chan { + + public typealias ChanOnCancelBlock = (Chan) -> Void - /// Callback when the channel cancel. - public var onCancel: ChanOnCancelBlock? + private var cancelBlocksByCo = NSMapTable>(keyOptions: NSMapTableWeakMemory, valueOptions: NSMapTableStrongMemory) - /// If the channel is cancelled. - public var isCancelled: Bool { + /// Callback when the channel cancel. + public var onCancel: ChanOnCancelBlock? { + set { + if let co = Coroutine.current() { + do { + lock.lock() + defer { lock.unlock() } + let method = ChanCancelMethod(blk: newValue!) + cancelBlocksByCo.setObject(method, forKey: co) + } + } + } get { - return cancelled + return nil + } + } + + private func popCancelBlockForCo(co: Coroutine) -> ChanOnCancelBlock? { + lock.lock() + defer { lock.unlock() } + + if let method = cancelBlocksByCo.object(forKey: co) { + return method.onCancel } + return nil } + private var buffCount: Int32 private var cchan: UnsafeMutablePointer - private var cancelled: Bool = false private var buffList: [T] = [] private let lock = NSRecursiveLock() @@ -86,20 +113,34 @@ public class Chan { public func send(val: T) throws { if let co = Coroutine.current() { - co.chanCancelBlock = { - self.cancel() + co.chanCancelBlock = { coroutine in + self.cancelForCoroutine(co: coroutine) } - do { - lock.lock() - defer { lock.unlock() } - buffList.append(val) - } - chansendi8(cchan, 1); - co.chanCancelBlock = nil - if cancelled { + + let custom_exec = imp_implementationWithBlock({ + self.lock.lock() + defer { self.lock.unlock() } + self.buffList.append(val) + }) + + let cancel_exec = imp_implementationWithBlock({ + // cancel call back + if let block = self.popCancelBlockForCo(co: co) { + block(self) + } + // throw cancelled error throw COError.coroutineCancelled + }) + + defer { + imp_removeBlock(custom_exec) + imp_removeBlock(cancel_exec) } + + var v: Int8 = 1; + + _ = chansend_custom_exec(cchan, &v, custom_exec, cancel_exec) } } @@ -202,31 +243,8 @@ public class Chan { /// Why we provide this api? /// Sometimes, we need cancel a operation, such as a Network Connection. So, a coroutine is cancellable. /// But Channel may blocking the coroutine, so we need cancel the Channel when cancel a coroutine. - public func cancel() { + public func cancelForCoroutine(co: Coroutine) { - if cancelled { - return - } - cancelled = true - - if let cancelBlock = self.onCancel { - cancelBlock(self) - } - - var blockingSend:Int32 = 0 - var blockingReceive:Int32 = 0 - - if (changetblocking(cchan, &blockingSend, &blockingReceive) != 0) { - - if blockingSend > 0 { - for _ in 0.. 0 { - for _ in 0..? + public var co: UnsafeMutablePointer? /// The closure to cancel the blocking Channel when Coroutine cancel. /// If a channel blocking this coroutine, should set this block. - public var chanCancelBlock: (() -> Void)? + public var chanCancelBlock: ((Coroutine) -> Void)? /// The lastError occurred in the Coroutine. public var lastError: Error? @@ -195,7 +195,7 @@ open class Coroutine { self.co?.pointee.is_cancelled = true if let chanCancel = self.chanCancelBlock { - chanCancel(); + chanCancel(self); } }