Skip to content

Commit

Permalink
remove COChan's onCancel api
Browse files Browse the repository at this point in the history
  • Loading branch information
NianJi committed Apr 25, 2019
1 parent 559b820 commit 1478c81
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 141 deletions.
74 changes: 73 additions & 1 deletion cocore/co_csp.m
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#pragma mark - queue

static void chancancelallalt(co_channel *c);

static void amove(void *dst, void *src, uint n) {
if(dst){
if(src == nil) {
Expand Down Expand Up @@ -198,6 +200,7 @@ void chanfree(co_channel *c) {
if(c == nil) {
return;
}
chancancelallalt(c);
if (c->buffer.expandsize) {
free(c->buffer.arr);
}
Expand Down Expand Up @@ -363,6 +366,75 @@ static int altexec(chan_alt *a) {
}
}

static void chancancelallalt(co_channel *c) {

if (!c) {
return;
}

chan_alt **remainList = NULL;
size_t remainCount = 0;

chanlock(c);

remainCount = c->asend.count + c->arecv.count;
if (remainCount > 0) {
remainList = calloc(remainCount, sizeof(chan_alt *));

chan_alt **ptr = remainList;

alt_queue *sending = &c->asend;
if (sending->count) {

for (int i = 0; i < sending->count; i++) {
altqueuepop(sending, ptr);
ptr++;
}
}
alt_queue *recv = &c->arecv;
if (recv->count) {

for (int i = 0; i < recv->count; i++) {
altqueuepop(recv, ptr);
ptr++;
}
}
}

chanunlock(c);

if (remainCount > 0) {
for (int i = 0; i < remainCount; i++) {
chan_alt *a = remainList[i];
if (!a) {
continue;
}
// custom cancel
if (a->cancel_exec) {
a->cancel_exec();
}

// resume the task.
coroutine_t *co = a->task;
void (*custom_resume)(coroutine_t *co) = c->custom_resume;

if (custom_resume) {
custom_resume(co);
} else {
coroutine_add(co);
}
}

}

if (remainList) {
free(remainList);
}

}



void altcancel(chan_alt *a) {
if (!a) {
return;
Expand All @@ -379,6 +451,7 @@ void altcancel(chan_alt *a) {
if(altqueue && altqueue->count){

altqueueremove(altqueue, a);
chanunlock(c);
a->is_cancelled = true;

// custom cancel
Expand All @@ -389,7 +462,6 @@ void altcancel(chan_alt *a) {
// resume the task.
coroutine_t *co = a->task;
void (*custom_resume)(coroutine_t *co) = c->custom_resume;
chanunlock(c);

if (custom_resume) {
custom_resume(co);
Expand Down
1 change: 1 addition & 0 deletions coobjc.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@
developmentRegion = English;
hasScannedForEncodings = 0;
knownRegions = (
English,
en,
Base,
);
Expand Down
8 changes: 6 additions & 2 deletions coobjc/co/COChan.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ typedef void (^COChanOnCancelBlock)(COChan *chan);
*/
- (void)send:(Value _Nullable )val;

- (void)send:(Value _Nullable )val onCancel:(COChanOnCancelBlock _Nullable)cancelBlock;

/**
Receive a value from the Channel, blocking.
Expand All @@ -77,6 +79,8 @@ typedef void (^COChanOnCancelBlock)(COChan *chan);
*/
- (Value _Nullable )receive;

- (Value _Nullable )receiveWithOnCancel:(COChanOnCancelBlock _Nullable)cancelBlock;;

/**
Send a value to the Channel, non blocking.
Expand Down Expand Up @@ -132,14 +136,14 @@ typedef void (^COChanOnCancelBlock)(COChan *chan);
*/
- (void)cancelForCoroutine:(COCoroutine *)co;

/**
/** @deprecated This method is not safe. use `send:onCancel:` or `receiveWithOnCancel:`
Set a callback block when the Channel cancel current blocking operation(send: or receive:)
Must set before `send:` or `receive:`, or it will not work.
@param onCancelBlock the cancel callback block.
*/
- (void)onCancel:(COChanOnCancelBlock _Nullable )onCancelBlock;
// - (void)onCancel:(COChanOnCancelBlock _Nullable )onCancelBlock;

@end

Expand Down
55 changes: 18 additions & 37 deletions coobjc/co/COChan.m
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ @interface COChan()
}

@property(nonatomic, assign) int count;
@property(nonatomic, strong) NSMapTable *cancelBlocksByCo;
@property(nonatomic, strong) NSMutableArray *buffList;

@end
Expand Down Expand Up @@ -76,6 +75,10 @@ - (instancetype)initWithBuffCount:(int32_t)buffCount {
}

- (void)send:(id)val {
[self send:val onCancel:NULL];
}

- (void)send:(id)val onCancel:(COChanOnCancelBlock)cancelBlock {

// send may blocking current process, so must check in a coroutine.
COCoroutine *co = [COCoroutine currentCoroutine];
Expand All @@ -91,7 +94,6 @@ - (void)send:(id)val {
});

IMP cancel_exec = NULL;
COChanOnCancelBlock cancelBlock = [self popCancelBlockForCo:co];
if (cancelBlock) {
cancel_exec = imp_implementationWithBlock(^{
cancelBlock(self);
Expand All @@ -108,6 +110,10 @@ - (void)send:(id)val {
}

- (id)receive {
return [self receiveWithOnCancel:NULL];
}

- (id)receiveWithOnCancel:(COChanOnCancelBlock)cancelBlock {

COCoroutine *co = [COCoroutine currentCoroutine];
if (!co) {
Expand All @@ -117,7 +123,6 @@ - (id)receive {
co.currentChan = self;

IMP cancel_exec = NULL;
COChanOnCancelBlock cancelBlock = [self popCancelBlockForCo:co];
if (cancelBlock) {
cancel_exec = imp_implementationWithBlock(^{
cancelBlock(self);
Expand Down Expand Up @@ -224,38 +229,14 @@ - (void)cancelForCoroutine:(COCoroutine *)co {
chan_cancel_alt_in_co(co.co);
}

- (NSMapTable *)cancelBlocksByCo {
if (!_cancelBlocksByCo) {
_cancelBlocksByCo = [NSMapTable mapTableWithKeyOptions:NSMapTableWeakMemory valueOptions:NSMapTableStrongMemory];
}
return _cancelBlocksByCo;
}
@end

- (void)onCancel:(COChanOnCancelBlock)onCancelBlock {
if (!onCancelBlock) {
return;
}
COCoroutine *co = [COCoroutine currentCoroutine];
if (!co) {
@throw [NSException exceptionWithName:COInvalidException reason:@"onCancel must called in a routine" userInfo:@{}];
}
COOBJC_SCOPELOCK(_buffLock);
[self.cancelBlocksByCo setObject:[onCancelBlock copy] forKey:co];
}
@interface COTimeChan()

- (COChanOnCancelBlock)popCancelBlockForCo:(COCoroutine *)co {
COOBJC_SCOPELOCK(_buffLock);
COChanOnCancelBlock block = [self.cancelBlocksByCo objectForKey:co];
if (block) {
[self.cancelBlocksByCo removeObjectForKey:co];
return block;
}
return nil;
}
@property (nonatomic, strong) CODispatchTimer *timer;

@end


@implementation COTimeChan
{
BOOL _isDone;
Expand All @@ -274,19 +255,19 @@ - (void)send_nonblock:(id)val {
[super send_nonblock:val];
}

- (id)receive {
return [self receiveWithOnCancel:^(COChan * _Nonnull chan) {
[[(COTimeChan *)chan timer] invalidate];
}];
}

+ (instancetype)sleep:(NSTimeInterval)duration {
COTimeChan *chan = [self chanWithDuration:duration];

CODispatchTimer *timer = [[CODispatch currentDispatch] dispatch_timer:^{
chan.timer = [[CODispatch currentDispatch] dispatch_timer:^{
[chan send_nonblock:@1];
} interval:duration];

[chan onCancel:^(COChan * _Nonnull chan) {
//dispatch_source_cancel(timer);
[timer invalidate];
}];


return chan;
}

Expand Down
2 changes: 1 addition & 1 deletion coobjc/co/COCoroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ extern NSString *const COInvalidException;
@param stackSize : stackSize of the coroutine.
@return The coroutine object.
*/
- (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t)queue stackSize:(NSUInteger)stackSize;
- (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t _Nullable)queue stackSize:(NSUInteger)stackSize;

/**
The coroutine is Finished.
Expand Down
89 changes: 33 additions & 56 deletions coobjc/co/COCoroutine.m
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#import "COChan.h"
#import "coroutine.h"
#import "co_queue.h"
#import "coobjc.h"

NSString *const COInvalidException = @"COInvalidException";

Expand Down Expand Up @@ -150,7 +151,6 @@ - (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t)qu
if (self) {
_execBlock = [block copy];
_dispatch = queue ? [CODispatch dispatchWithQueue:queue] : [CODispatch currentDispatch];
//_queue = queue ?: co_get_current_queue();

coroutine_t *co = coroutine_create((void (*)(void *))co_exec);
if (stackSize > 0 && stackSize < 1024*1024) { // Max 1M
Expand Down Expand Up @@ -322,12 +322,10 @@ id co_await(id awaitable) {
co.lastError = error;
[chan send_nonblock:nil];
}];

[chan onCancel:^(COChan * _Nonnull chan) {
id val = [chan receiveWithOnCancel:^(COChan * _Nonnull chan) {
[promise cancel];
}];

id val = [chan receive];
return val;

} else {
Expand All @@ -349,61 +347,40 @@ id co_await(id awaitable) {
return nil;
}

NSMutableArray *resultAwaitable = [[NSMutableArray alloc] initWithCapacity:awaitableList.count];

for (id awaitable in awaitableList) {

if ([awaitable isKindOfClass:[COChan class]]) {

[resultAwaitable addObject:awaitable];

} else if ([awaitable isKindOfClass:[COPromise class]]) {

COChan *chan = [COChan chanWithBuffCount:1];
COCoroutine *co = co_get_obj(t);

COPromise *promise = awaitable;
[[promise
then:^id _Nullable(id _Nullable value) {

[chan send_nonblock:value];
return value;
}]
catch:^(NSError * _Nonnull error) {
co.lastError = error;
[chan send_nonblock:error];
}];

[chan onCancel:^(COChan * _Nonnull chan) {
[promise cancel];
}];

[resultAwaitable addObject:chan];

} else {
@throw [NSException exceptionWithName:COInvalidException
reason:[NSString stringWithFormat:@"Cannot await object: %@.", awaitable]
userInfo:nil];
}


uint32_t count = (uint32_t)awaitableList.count;

if (count == 0) {
return nil;
}

NSMutableArray *result = [[NSMutableArray alloc] initWithCapacity:awaitableList.count];
COCoroutine *currentCo = [COCoroutine currentCoroutine];
for (COChan *chan in resultAwaitable) {
if ([currentCo isCancelled]) {
[result addObject:[NSNull null]];
} else {
id val = co_await(chan);
if ([currentCo isCancelled]) {
[result addObject:[NSNull null]];
} else {
[result addObject:val ? val : [NSNull null]];
}
}
NSMutableArray *result = [[NSMutableArray alloc] initWithCapacity:count];

COChan *chan = [COChan chanWithBuffCount:count];

for (int i = 0; i < count; i++) {

[result addObject:[NSNull null]];
id awaitable = awaitableList[i];

// start subroutines
co_launch(^{

id val = co_await(awaitable);
if (!co_isCancelled()) {
if (val) {
[result replaceObjectAtIndex:i withObject:val];
} else {
NSError *error = co_getError();
if (error) {
[result replaceObjectAtIndex:i withObject:error];
}
}
}
[chan send_nonblock:@(i)];
});
}

[chan receiveWithCount:count];
return result.copy;
}

Expand Down
Loading

0 comments on commit 1478c81

Please sign in to comment.