From b2a68fd002d9ded8230877bbc41f2b7544805839 Mon Sep 17 00:00:00 2001 From: NianJi <765409243@qq.com> Date: Thu, 25 Apr 2019 13:29:29 +0800 Subject: [PATCH] add Channel's errorno & change coswift's channel --- cocore/co_csp.h | 39 ++++++++----- cocore/co_csp.m | 60 +++++++++---------- coobjc/co/COChan.m | 4 +- coswift/Chan.swift | 133 ++++++++++++++++++++---------------------- coswift/Errors.swift | 1 + coswift/coswift.swift | 15 ++--- 6 files changed, 123 insertions(+), 129 deletions(-) diff --git a/cocore/co_csp.h b/cocore/co_csp.h index e6a29e8..8006ed1 100644 --- a/cocore/co_csp.h +++ b/cocore/co_csp.h @@ -33,6 +33,13 @@ typedef enum { CHANNEL_RECEIVE, } channel_op; +enum channel_errorno { + CHANNEL_ALT_SUCCESS = 1, + CHANNEL_ALT_ERROR_COPYFAIL = 0, + CHANNEL_ALT_ERROR_CANCELLED = -1, // cancel the current alt. + CHANNEL_ALT_ERROR_BUFFER_FULL = -2, // no buffer remain, send_nonblock fail + CHANNEL_ALT_ERROR_NO_VALUE = -3, // receive_nonblock fail +}; typedef struct chan_alt chan_alt; typedef struct chan_queue chan_queue; @@ -109,7 +116,7 @@ void chanfree(co_channel *chan); @param c channel @param v the pointer will store received value. - @return 1 success, else fail. + @return channel_errorno */ int channbrecv(co_channel *c, void *v); @@ -117,7 +124,7 @@ int channbrecv(co_channel *c, void *v); Non-blocking receive a pointer value from channel. @param c channel - @return received pointer value. + @return received pointer value, default NULL. */ void *channbrecvp(co_channel *c); @@ -125,7 +132,7 @@ void *channbrecvp(co_channel *c); Non-blocking receive a unsigned long value from channel. @param c channel - @return received unsigned long value. + @return received unsigned long value, default 0. */ unsigned long channbrecvul(co_channel *c); @@ -134,7 +141,7 @@ unsigned long channbrecvul(co_channel *c); @param c channel @param v the value's address. - @return 1 success, else fail. + @return channel_errorno */ int channbsend(co_channel *c, void *v); @@ -143,7 +150,7 @@ int channbsend(co_channel *c, void *v); @param c channel @param v the pointer - @return 1 success, else fail. + @return channel_errorno */ int channbsendp(co_channel *c, void *v); @@ -152,7 +159,7 @@ int channbsendp(co_channel *c, void *v); @param c channel @param v the unsigned long value - @return 1 success, else fail. + @return channel_errorno */ int channbsendul(co_channel *c, unsigned long v); @@ -163,7 +170,7 @@ int channbsendul(co_channel *c, unsigned long v); @param c channel @param v the pointer will store received value. - @return 1 success, else fail. + @return channel_errorno */ int chanrecv(co_channel *c, void *v); @@ -173,7 +180,7 @@ int chanrecv(co_channel *c, void *v); If no one sending, and buffer is empty, blocking the current coroutine. @param c channel - @return received pointer. + @return received pointer, default NULL. */ void *chanrecvp(co_channel *c); @@ -183,7 +190,7 @@ void *chanrecvp(co_channel *c); If no one sending, and buffer is empty, blocking the current coroutine. @param c channel - @return received unsigned long value. + @return received unsigned long value, default 0. */ unsigned long chanrecvul(co_channel *c); @@ -194,7 +201,7 @@ unsigned long chanrecvul(co_channel *c); @param c channel @param v the pointer will store received value. - @return 1 success, else fail. + @return channel_errorno */ int chansend(co_channel *c, void *v); @@ -203,7 +210,7 @@ int chansend(co_channel *c, void *v); @param c channel @param v the pointer - @return 1 success, else fail. + @return channel_errorno */ int chansendp(co_channel *c, void *v); @@ -212,7 +219,7 @@ int chansendp(co_channel *c, void *v); @param c channel @param v the unsigned long value - @return 1 success, else fail. + @return channel_errorno */ int chansendul(co_channel *c, unsigned long v); @@ -221,7 +228,7 @@ int chansendul(co_channel *c, unsigned long v); to cancel the blocking. @param co the coroutine object - @return 1 success, else fail. + @return channel_errorno */ int chan_cancel_alt_in_co(coroutine_t *co); @@ -234,7 +241,7 @@ int chan_cancel_alt_in_co(coroutine_t *co); @param v the pointer pass the send value. @param exec run at sending. @param cancelExec run at cancel a alt. - @return 1 success, else fail. + @return channel_errorno */ int chansend_custom_exec(co_channel *c, void *v, IMP exec, IMP cancelExec); @@ -244,7 +251,7 @@ int chansend_custom_exec(co_channel *c, void *v, IMP exec, IMP cancelExec); @param c channel @param v the value's address. @param exec run at sending. - @return 1 success, else fail. + @return channel_errorno */ int channbsend_custom_exec(co_channel *c, void *v, IMP exec); @@ -256,7 +263,7 @@ int channbsend_custom_exec(co_channel *c, void *v, IMP exec); @param c channel @param v the pointer will store received value. @param cancelExec run at cancel a alt. - @return 1 success, else fail. + @return channel_errorno */ int chanrecv_custom_exec(co_channel *c, void *v, IMP cancelExec); diff --git a/cocore/co_csp.m b/cocore/co_csp.m index e780254..8e4ea81 100644 --- a/cocore/co_csp.m +++ b/cocore/co_csp.m @@ -273,6 +273,7 @@ static void altqueue(chan_alt *a) { * data goes from sender to receiver. If the channel is full, * the receiver removes some from the channel and the sender * gets to put some in. + * 1 success, 0 fail. */ static int altcopy(chan_alt *s, chan_alt *r) { chan_alt *t; @@ -310,13 +311,7 @@ static int altcopy(chan_alt *s, chan_alt *r) { // receive first from buffer queuepop(&c->buffer, r->value); // then send to buffer - int ret = queuepush(&c->buffer, s->value); - if (ret == 1) { - if (s->custom_exec) { - s->custom_exec(); - } - } - return ret; + return queuepush(&c->buffer, s->value); } } @@ -326,14 +321,7 @@ static int altcopy(chan_alt *s, chan_alt *r) { if(r){ return queuepop(&c->buffer, r->value); } else if(s) { - - int ret = queuepush(&c->buffer, s->value); - if (ret == 1) { - if (s->custom_exec) { - s->custom_exec(); - } - } - return ret; + return queuepush(&c->buffer, s->value); } return 0; } @@ -348,21 +336,34 @@ static int altexec(chan_alt *a) { altqueue = chanarray(c, otherop(a->op)); if(altqueuepop(altqueue, &other)){ - altcopy(a, other); + int copyRet = altcopy(a, other); + assert(copyRet == 1); coroutine_t *co = other->task; void (*custom_resume)(coroutine_t *co) = c->custom_resume; chanunlock(c); + // call back sender + chan_alt *sender = a->op == CHANNEL_SEND ? a : other; + if (sender->custom_exec) { + sender->custom_exec(); + } + if (custom_resume) { custom_resume(co); } else { coroutine_add(co); } - return 1; + return CHANNEL_ALT_SUCCESS; } else { - int ret = altcopy(a, nil); + int copyRet = altcopy(a, nil); chanunlock(c); - return ret; + + if (copyRet && a->op == CHANNEL_SEND) { + if (a->custom_exec) { + a->custom_exec(); + } + } + return copyRet ? CHANNEL_ALT_SUCCESS : CHANNEL_ALT_ERROR_COPYFAIL; } } @@ -409,6 +410,7 @@ static void chancancelallalt(co_channel *c) { if (!a) { continue; } + a->is_cancelled = true; // custom cancel if (a->cancel_exec) { a->cancel_exec(); @@ -499,7 +501,7 @@ int chanalt(chan_alt *a) { if(!canblock) { chanunlock(c); - return 0; + return a->op == CHANNEL_SEND ? CHANNEL_ALT_ERROR_BUFFER_FULL : CHANNEL_ALT_ERROR_NO_VALUE; } // add to queue @@ -515,10 +517,10 @@ int chanalt(chan_alt *a) { t->chan_alt = nil; // alt is cancelled if (a->is_cancelled) { - return 0; + return CHANNEL_ALT_ERROR_CANCELLED; } - return 1; + return CHANNEL_ALT_SUCCESS; } static int _chanop(co_channel *c, int op, void *p, int canblock) { @@ -535,12 +537,9 @@ static int _chanop(co_channel *c, int op, void *p, int canblock) { a->custom_exec = NULL; a->cancel_exec = NULL; - if(chanalt(a) == 0) { - free(a); - return 0; - } + int ret = chanalt(a); free(a); - return 1; + return ret; } static int _chanop2(co_channel *c, int op, void *p, int canblock, IMP custom_exec, IMP cancel_exec) { @@ -557,12 +556,9 @@ static int _chanop2(co_channel *c, int op, void *p, int canblock, IMP custom_exe a->custom_exec = custom_exec; a->cancel_exec = cancel_exec; - if(chanalt(a) == 0) { - free(a); - return 0; - } + int ret = chanalt(a); free(a); - return 1; + return ret; } #pragma mark - public apis diff --git a/coobjc/co/COChan.m b/coobjc/co/COChan.m index 95831b0..e34ddf9 100644 --- a/coobjc/co/COChan.m +++ b/coobjc/co/COChan.m @@ -136,7 +136,7 @@ - (id)receiveWithOnCancel:(COChanOnCancelBlock)cancelBlock { } co.currentChan = nil; - if (ret == 1) { + if (ret == CHANNEL_ALT_SUCCESS) { // success do { COOBJC_SCOPELOCK(_buffLock); @@ -200,7 +200,7 @@ - (id)receive_nonblock { uint8_t val = 0; int ret = channbrecv(_chan, &val); - if (ret == 1) { + if (ret == CHANNEL_ALT_SUCCESS) { do { COOBJC_SCOPELOCK(_buffLock); diff --git a/coswift/Chan.swift b/coswift/Chan.swift index cb03b96..db17467 100644 --- a/coswift/Chan.swift +++ b/coswift/Chan.swift @@ -25,49 +25,12 @@ private let co_chan_custom_resume: @convention(c) (UnsafeMutablePointer { - public var onCancel: (Chan) -> Void - init(blk: @escaping (Chan) -> Void) { - onCancel = blk - } -} - /// Define the Channel public class Chan { - - + // Define the channel's alt cancel type public typealias ChanOnCancelBlock = (Chan) -> Void - private var cancelBlocksByCo = NSMapTable>(keyOptions: NSMapTableWeakMemory, valueOptions: NSMapTableStrongMemory) - - /// Callback when the channel cancel. - public var onCancel: ChanOnCancelBlock? { - set { - if let co = Coroutine.current() { - do { - lock.lock() - defer { lock.unlock() } - let method = ChanCancelMethod(blk: newValue!) - cancelBlocksByCo.setObject(method, forKey: co) - } - } - } - get { - return nil - } - } - - private func popCancelBlockForCo(co: Coroutine) -> ChanOnCancelBlock? { - lock.lock() - defer { lock.unlock() } - - if let method = cancelBlocksByCo.object(forKey: co) { - return method.onCancel - } - return nil - } - private var buffCount: Int32 private var cchan: UnsafeMutablePointer @@ -110,37 +73,45 @@ public class Chan { /// /// - Parameter val: the value to send. /// - Throws: COError types - public func send(val: T) throws { + public func send(val: T, onCancel: ChanOnCancelBlock? = nil) throws { if let co = Coroutine.current() { co.chanCancelBlock = { coroutine in self.cancelForCoroutine(co: coroutine) } - - let custom_exec = imp_implementationWithBlock({ + let objcBlock: @convention(block) ()->Void = { self.lock.lock() defer { self.lock.unlock() } self.buffList.append(val) - }) + } + let custom_exec = imp_implementationWithBlock(objcBlock) - let cancel_exec = imp_implementationWithBlock({ - // cancel call back - if let block = self.popCancelBlockForCo(co: co) { + var cancel_exec: IMP? = nil + if let block = onCancel { + let objcBlock1: @convention(block) ()->Void = { block(self) } - // throw cancelled error - throw COError.coroutineCancelled - }) + cancel_exec = imp_implementationWithBlock(objcBlock1) + } defer { imp_removeBlock(custom_exec) - imp_removeBlock(cancel_exec) + if let exec = cancel_exec { + imp_removeBlock(exec) + } + co.chanCancelBlock = nil } - var v: Int8 = 1; + var v: Int8 = 1 + + let ret = chansend_custom_exec(cchan, &v, custom_exec, cancel_exec) - _ = chansend_custom_exec(cchan, &v, custom_exec, cancel_exec) + if ret == CHANNEL_ALT_ERROR_CANCELLED.rawValue { + throw COError.coroutineCancelled + } + } else { + throw COError.invalidCoroutine } } @@ -149,29 +120,49 @@ public class Chan { /// - Parameter val: the value to send. public func send_nonblock(val: T) { - do { - lock.lock() - defer { lock.unlock() } - buffList.append(val) + let objcBlock: @convention(block) ()->Void = { + self.lock.lock() + defer { self.lock.unlock() } + self.buffList.append(val) } - channbsendi8(cchan, 1) + let custom_exec = imp_implementationWithBlock(objcBlock) + var v: Int8 = 1 + _ = channbsend_custom_exec(cchan, &v, custom_exec) + imp_removeBlock(custom_exec) } /// Blocking receive a value from the channel /// /// - Returns: the received value /// - Throws: COError types - public func receive() throws -> T { + public func receive(onCancel: ChanOnCancelBlock? = nil) throws -> T { if let co = Coroutine.current() { - co.chanCancelBlock = { - self.cancel() + co.chanCancelBlock = { coroutine in + self.cancelForCoroutine(co: coroutine) } - let ret = chanrecvi8(cchan); - co.chanCancelBlock = nil + + var v: Int8 = 0 + + var cancel_exec: IMP? = nil + if let block = onCancel { + let objcBlock: @convention(block) ()->Void = { + block(self) + } + cancel_exec = imp_implementationWithBlock(objcBlock) + } + + defer { + if let exec = cancel_exec { + imp_removeBlock(exec) + } + co.chanCancelBlock = nil + } + + let ret = chanrecv_custom_exec(cchan, &v, cancel_exec) - if ret == 1 { + if ret == CHANNEL_ALT_SUCCESS.rawValue { do { lock.lock() @@ -179,8 +170,10 @@ public class Chan { let obj = buffList.removeFirst() return obj } - } else { + } else if ret == CHANNEL_ALT_ERROR_CANCELLED.rawValue { throw COError.coroutineCancelled + } else { + throw COError.chanReceiveFailUnknown } } else { @@ -192,9 +185,10 @@ public class Chan { /// /// - Returns: the receive value or nil. public func receive_nonblock() -> T? { - - let ret = channbrecvi8(cchan) - if ret == 1 { + var v: Int8 = 0 + let ret = channbrecv(cchan, &v) + + if ret == CHANNEL_ALT_SUCCESS.rawValue { do { lock.lock() defer { lock.unlock() } @@ -232,14 +226,15 @@ public class Chan { var retArray:[T] = [] var currCount = 0 - while currCount < count, let obj = self.receive_nonblock() { + while currCount < count { + let obj = try self.receive() retArray.append(obj) - currCount += 1; + currCount += 1 } return retArray } - /// Cancel the channel + /// Cancel the channel's current sending or receiving operation /// Why we provide this api? /// Sometimes, we need cancel a operation, such as a Network Connection. So, a coroutine is cancellable. /// But Channel may blocking the coroutine, so we need cancel the Channel when cancel a coroutine. diff --git a/coswift/Errors.swift b/coswift/Errors.swift index 429718f..7ba2112 100644 --- a/coswift/Errors.swift +++ b/coswift/Errors.swift @@ -27,6 +27,7 @@ public enum COError: String, LocalizedError { case generatorCancelled = "The generator is cancelled" case generatorClosed = "The generator is closed" case notGenerator = "The current coroutine is not a generator" + case chanReceiveFailUnknown = "Channel receive fails unknown" /// A localized message describing what error occurred. public var errorDescription: String? { diff --git a/coswift/coswift.swift b/coswift/coswift.swift index d0fd075..364529d 100644 --- a/coswift/coswift.swift +++ b/coswift/coswift.swift @@ -64,11 +64,9 @@ public func await(promise: Promise) throws -> Resolution { } } - chan.onCancel = { (channel) in + return try chan.receive(onCancel: { (channel) in promise.cancel() - } - - return try chan.receive() + }) } else { throw COError.invalidCoroutine @@ -113,7 +111,6 @@ public var co_isActive: Bool { } } - /// co_delay, pause current coroutine seconds. /// /// - Parameter seconds: paused time @@ -131,11 +128,9 @@ public func co_delay(_ seconds: TimeInterval) throws { } timer.schedule(deadline: .now() + seconds, repeating: .never) - chan.onCancel = { _ in - timer.cancel() - } - timer.resume() - try _ = chan.receive() + try _ = chan.receive(onCancel: { (chan) in + timer.cancel() + }) }