From 93de4d3311a89514d1853da0e91f01bbbd89a8c0 Mon Sep 17 00:00:00 2001 From: NianJi <765409243@qq.com> Date: Tue, 23 Apr 2019 18:54:10 +0800 Subject: [PATCH] add receive all & add receive count --- .../coobjcPromiseTests.m | 65 +++++++++++++++++- coobjc/co/COChan.h | 25 ++++++- coobjc/co/COChan.m | 21 ++++++ coswift/Chan.swift | 66 +++++++++++++++++++ 4 files changed, 175 insertions(+), 2 deletions(-) diff --git a/Examples/coobjcBaseExample/coobjcBaseExampleTests/coobjcPromiseTests.m b/Examples/coobjcBaseExample/coobjcBaseExampleTests/coobjcPromiseTests.m index 4e01d0e..b958fa7 100644 --- a/Examples/coobjcBaseExample/coobjcBaseExampleTests/coobjcPromiseTests.m +++ b/Examples/coobjcBaseExample/coobjcBaseExampleTests/coobjcPromiseTests.m @@ -310,7 +310,7 @@ static id testPromise3() { } }); waitUntil(^(DoneCallback done) { - dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(5 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(7 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ [co cancel]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ @@ -742,6 +742,69 @@ static id testPromise3() { }); }); + it(@"test receive all", ^{ + __block int val = 0; + [[TestThreadObject1 sharedInstance] runBlock:^{ + + COChan *chan = [COChan chanWithBuffCount:10]; + co_launch(^{ + + for (int i = 0; i < 10; i++) { + [chan send_nonblock:@(i)]; + } + }); + + co_launch(^{ + + NSArray *ret = [chan receiveAll]; + for (int i = 0; i < 10; i++) { + expect(ret[i]).to.equal(@(i)); + } + val = 11; + }); + }]; + + waitUntil(^(DoneCallback done) { + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(6 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ + expect(val).to.equal(11); + done(); + }); + }); + }); + + it(@"test receive count", ^{ + __block int val = 0; + [[TestThreadObject1 sharedInstance] runBlock:^{ + + COChan *chan = [COChan chanWithBuffCount:10]; + + co_launch(^{ + + NSArray *ret = [chan receiveWithCount:10]; + for (int i = 0; i < 10; i++) { + expect(ret[i]).to.equal(@(i)); + } + val = 11; + }); + + co_launch(^{ + + for (int i = 0; i < 10; i++) { + [chan send_nonblock:@(i)]; + } + }); + + + }]; + + waitUntil(^(DoneCallback done) { + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(6 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ + expect(val).to.equal(11); + done(); + }); + }); + }); + }); describe(@"https://github.com/alibaba/coobjc/issues/62", ^{ diff --git a/coobjc/co/COChan.h b/coobjc/co/COChan.h index 88a01a3..376c165 100644 --- a/coobjc/co/COChan.h +++ b/coobjc/co/COChan.h @@ -42,7 +42,7 @@ typedef void (^COChanOnCancelBlock)(COChan *chan); /** Create a Channel object, and you can set the buffcount. - @param buffCount then max buffer count of the channel. + @param buffCount the max buffer count of the channel. @return the Channel object */ + (instancetype)chanWithBuffCount:(int32_t)buffCount; @@ -97,6 +97,29 @@ typedef void (^COChanOnCancelBlock)(COChan *chan); */ - (Value _Nullable)receive_nonblock; +/** + Blocking receive all values in the channel. + + 1. If no values in channel, blocking waiting for one. + 2. If has values in channel, returning all values. + 3. If did send nil, the received value in array will be [NSNull null], + so you need check the returning value type in array, important!!! + + @return the values received. + */ +- (NSArray * _Nonnull)receiveAll; + +/** + Blocking receive count values in the channel. + + 1. It will continue blocking the current coroutine, until receive count objects. + 2. If did send nil, the received value in array will be [NSNull null], + so you need check the returning value type in array, important!!! + + @return the values received. + */ +- (NSArray * _Nonnull)receiveWithCount:(NSUInteger)count; + /** Cancel the Channel. diff --git a/coobjc/co/COChan.m b/coobjc/co/COChan.m index c8676fd..9e54b3e 100644 --- a/coobjc/co/COChan.m +++ b/coobjc/co/COChan.m @@ -129,6 +129,27 @@ - (id)receive { } } +- (NSArray *)receiveAll { + NSMutableArray *retArray = [[NSMutableArray alloc] init]; + id obj = [self receive]; + [retArray addObject:obj == kCOChanNilObj ? [NSNull null] : obj]; + while ((obj = [self receive_nonblock])) { + [retArray addObject:obj == kCOChanNilObj ? [NSNull null] : obj]; + } + return retArray.copy; +} + +- (NSArray *)receiveWithCount:(NSUInteger)count { + NSMutableArray *retArray = [[NSMutableArray alloc] initWithCapacity:count]; + id obj = nil; + NSUInteger currCount = 0; + while (currCount < count && (obj = [self receive])) { + [retArray addObject:obj == kCOChanNilObj ? [NSNull null] : obj]; + currCount ++; + } + return retArray.copy; +} + - (void)send_nonblock:(id)val { do { diff --git a/coswift/Chan.swift b/coswift/Chan.swift index 0aea7bd..5555eb1 100644 --- a/coswift/Chan.swift +++ b/coswift/Chan.swift @@ -31,7 +31,10 @@ public class Chan { public typealias ChanOnCancelBlock = (Chan) -> Void + /// Callback when the channel cancel. public var onCancel: ChanOnCancelBlock? + + /// If the channel is cancelled. public var isCancelled: Bool { get { return cancelled @@ -44,6 +47,10 @@ public class Chan { private var buffList: [T] = [] private let lock = NSRecursiveLock() + + /// Create a channel with the buffcount. + /// + /// - Parameter buffCount: the max buffer count of the channel. public init(buffCount: Int32) { self.buffCount = buffCount @@ -51,6 +58,8 @@ public class Chan { cchan = chancreate(eleSize, buffCount, co_chan_custom_resume) } + + /// Create a channel with the buffcount 0. public convenience init() { self.init(buffCount: 0) } @@ -59,11 +68,21 @@ public class Chan { chanfree(cchan) } + + /// Create a expandable Channel. the buffer count is expandable, which means, + /// `send` will not blocking current process. And, val send to channel will not abandon. + /// The bufferCount value is being set to -1. + /// + /// - Returns: the channel object public class func expandable() -> Chan { return Chan(buffCount: -1) } + /// Blocking send a value to the channel. + /// + /// - Parameter val: the value to send. + /// - Throws: COError types public func send(val: T) throws { if let co = Coroutine.current() { @@ -84,6 +103,9 @@ public class Chan { } } + /// Non-blocking send a value to the channel + /// + /// - Parameter val: the value to send. public func send_nonblock(val: T) { do { @@ -94,6 +116,10 @@ public class Chan { channbsendi8(cchan, 1) } + /// Blocking receive a value from the channel + /// + /// - Returns: the received value + /// - Throws: COError types public func receive() throws -> T { if let co = Coroutine.current() { @@ -121,6 +147,9 @@ public class Chan { } } + /// Non-blocking receive a value from the channel + /// + /// - Returns: the receive value or nil. public func receive_nonblock() -> T? { let ret = channbrecvi8(cchan) @@ -136,6 +165,43 @@ public class Chan { } } + /// Blocking receive all values in the channel for now. + /// At least receive one value. + /// + /// - Returns: the values + /// - Throws: COError types + public func receiveAll() throws -> [T] { + + var retArray:[T] = [] + + retArray.append(try self.receive()) + + while let obj = self.receive_nonblock() { + retArray.append(obj) + } + return retArray + } + + /// Blocking receive count values in the channel. + /// + /// - Parameter count: the value count will receive. + /// - Returns: the values + /// - Throws: COError types + public func receiveWithCount(count: UInt) throws -> [T] { + + var retArray:[T] = [] + var currCount = 0 + while currCount < count, let obj = self.receive_nonblock() { + retArray.append(obj) + currCount += 1; + } + return retArray + } + + /// Cancel the channel + /// Why we provide this api? + /// Sometimes, we need cancel a operation, such as a Network Connection. So, a coroutine is cancellable. + /// But Channel may blocking the coroutine, so we need cancel the Channel when cancel a coroutine. public func cancel() { if cancelled {