Skip to content

Commit

Permalink
add receive all & add receive count
Browse files Browse the repository at this point in the history
  • Loading branch information
NianJi committed Apr 23, 2019
1 parent cfa1c93 commit 93de4d3
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ static id testPromise3() {
}
});
waitUntil(^(DoneCallback done) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(5 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(7 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[co cancel];

dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
Expand Down Expand Up @@ -742,6 +742,69 @@ static id testPromise3() {
});
});

it(@"test receive all", ^{
__block int val = 0;
[[TestThreadObject1 sharedInstance] runBlock:^{

COChan *chan = [COChan chanWithBuffCount:10];
co_launch(^{

for (int i = 0; i < 10; i++) {
[chan send_nonblock:@(i)];
}
});

co_launch(^{

NSArray *ret = [chan receiveAll];
for (int i = 0; i < 10; i++) {
expect(ret[i]).to.equal(@(i));
}
val = 11;
});
}];

waitUntil(^(DoneCallback done) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(6 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
expect(val).to.equal(11);
done();
});
});
});

it(@"test receive count", ^{
__block int val = 0;
[[TestThreadObject1 sharedInstance] runBlock:^{

COChan *chan = [COChan chanWithBuffCount:10];

co_launch(^{

NSArray *ret = [chan receiveWithCount:10];
for (int i = 0; i < 10; i++) {
expect(ret[i]).to.equal(@(i));
}
val = 11;
});

co_launch(^{

for (int i = 0; i < 10; i++) {
[chan send_nonblock:@(i)];
}
});


}];

waitUntil(^(DoneCallback done) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(6 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
expect(val).to.equal(11);
done();
});
});
});

});

describe(@"https://github.com/alibaba/coobjc/issues/62", ^{
Expand Down
25 changes: 24 additions & 1 deletion coobjc/co/COChan.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ typedef void (^COChanOnCancelBlock)(COChan *chan);
/**
Create a Channel object, and you can set the buffcount.
@param buffCount then max buffer count of the channel.
@param buffCount the max buffer count of the channel.
@return the Channel object
*/
+ (instancetype)chanWithBuffCount:(int32_t)buffCount;
Expand Down Expand Up @@ -97,6 +97,29 @@ typedef void (^COChanOnCancelBlock)(COChan *chan);
*/
- (Value _Nullable)receive_nonblock;

/**
Blocking receive all values in the channel.
1. If no values in channel, blocking waiting for one.
2. If has values in channel, returning all values.
3. If did send nil, the received value in array will be [NSNull null],
so you need check the returning value type in array, important!!!
@return the values received.
*/
- (NSArray<Value> * _Nonnull)receiveAll;

/**
Blocking receive count values in the channel.
1. It will continue blocking the current coroutine, until receive count objects.
2. If did send nil, the received value in array will be [NSNull null],
so you need check the returning value type in array, important!!!
@return the values received.
*/
- (NSArray<Value> * _Nonnull)receiveWithCount:(NSUInteger)count;

/**
Cancel the Channel.
Expand Down
21 changes: 21 additions & 0 deletions coobjc/co/COChan.m
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ - (id)receive {
}
}

- (NSArray *)receiveAll {
NSMutableArray *retArray = [[NSMutableArray alloc] init];
id obj = [self receive];
[retArray addObject:obj == kCOChanNilObj ? [NSNull null] : obj];
while ((obj = [self receive_nonblock])) {
[retArray addObject:obj == kCOChanNilObj ? [NSNull null] : obj];
}
return retArray.copy;
}

- (NSArray *)receiveWithCount:(NSUInteger)count {
NSMutableArray *retArray = [[NSMutableArray alloc] initWithCapacity:count];
id obj = nil;
NSUInteger currCount = 0;
while (currCount < count && (obj = [self receive])) {
[retArray addObject:obj == kCOChanNilObj ? [NSNull null] : obj];
currCount ++;
}
return retArray.copy;
}

- (void)send_nonblock:(id)val {

do {
Expand Down
66 changes: 66 additions & 0 deletions coswift/Chan.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ public class Chan<T> {

public typealias ChanOnCancelBlock = (Chan) -> Void

/// Callback when the channel cancel.
public var onCancel: ChanOnCancelBlock?

/// If the channel is cancelled.
public var isCancelled: Bool {
get {
return cancelled
Expand All @@ -44,13 +47,19 @@ public class Chan<T> {
private var buffList: [T] = []
private let lock = NSRecursiveLock()


/// Create a channel with the buffcount.
///
/// - Parameter buffCount: the max buffer count of the channel.
public init(buffCount: Int32) {

self.buffCount = buffCount
let eleSize = Int32(MemoryLayout<Int8>.size)
cchan = chancreate(eleSize, buffCount, co_chan_custom_resume)
}


/// Create a channel with the buffcount 0.
public convenience init() {
self.init(buffCount: 0)
}
Expand All @@ -59,11 +68,21 @@ public class Chan<T> {
chanfree(cchan)
}


/// Create a expandable Channel. the buffer count is expandable, which means,
/// `send` will not blocking current process. And, val send to channel will not abandon.
/// The bufferCount value is being set to -1.
///
/// - Returns: the channel object
public class func expandable() -> Chan<T> {

return Chan(buffCount: -1)
}

/// Blocking send a value to the channel.
///
/// - Parameter val: the value to send.
/// - Throws: COError types
public func send(val: T) throws {

if let co = Coroutine.current() {
Expand All @@ -84,6 +103,9 @@ public class Chan<T> {
}
}

/// Non-blocking send a value to the channel
///
/// - Parameter val: the value to send.
public func send_nonblock(val: T) {

do {
Expand All @@ -94,6 +116,10 @@ public class Chan<T> {
channbsendi8(cchan, 1)
}

/// Blocking receive a value from the channel
///
/// - Returns: the received value
/// - Throws: COError types
public func receive() throws -> T {

if let co = Coroutine.current() {
Expand Down Expand Up @@ -121,6 +147,9 @@ public class Chan<T> {
}
}

/// Non-blocking receive a value from the channel
///
/// - Returns: the receive value or nil.
public func receive_nonblock() -> T? {

let ret = channbrecvi8(cchan)
Expand All @@ -136,6 +165,43 @@ public class Chan<T> {
}
}

/// Blocking receive all values in the channel for now.
/// At least receive one value.
///
/// - Returns: the values
/// - Throws: COError types
public func receiveAll() throws -> [T] {

var retArray:[T] = []

retArray.append(try self.receive())

while let obj = self.receive_nonblock() {
retArray.append(obj)
}
return retArray
}

/// Blocking receive count values in the channel.
///
/// - Parameter count: the value count will receive.
/// - Returns: the values
/// - Throws: COError types
public func receiveWithCount(count: UInt) throws -> [T] {

var retArray:[T] = []
var currCount = 0
while currCount < count, let obj = self.receive_nonblock() {
retArray.append(obj)
currCount += 1;
}
return retArray
}

/// Cancel the channel
/// Why we provide this api?
/// Sometimes, we need cancel a operation, such as a Network Connection. So, a coroutine is cancellable.
/// But Channel may blocking the coroutine, so we need cancel the Channel when cancel a coroutine.
public func cancel() {

if cancelled {
Expand Down

0 comments on commit 93de4d3

Please sign in to comment.