Skip to content

Commit

Permalink
add Channel's errorno & change coswift's channel
Browse files Browse the repository at this point in the history
  • Loading branch information
NianJi committed Apr 25, 2019
1 parent 49f503f commit b2a68fd
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 129 deletions.
39 changes: 23 additions & 16 deletions cocore/co_csp.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ typedef enum {
CHANNEL_RECEIVE,
} channel_op;

enum channel_errorno {
CHANNEL_ALT_SUCCESS = 1,
CHANNEL_ALT_ERROR_COPYFAIL = 0,
CHANNEL_ALT_ERROR_CANCELLED = -1, // cancel the current alt.
CHANNEL_ALT_ERROR_BUFFER_FULL = -2, // no buffer remain, send_nonblock fail
CHANNEL_ALT_ERROR_NO_VALUE = -3, // receive_nonblock fail
};

typedef struct chan_alt chan_alt;
typedef struct chan_queue chan_queue;
Expand Down Expand Up @@ -109,23 +116,23 @@ void chanfree(co_channel *chan);
@param c channel
@param v the pointer will store received value.
@return 1 success, else fail.
@return channel_errorno
*/
int channbrecv(co_channel *c, void *v);

/**
Non-blocking receive a pointer value from channel.
@param c channel
@return received pointer value.
@return received pointer value, default NULL.
*/
void *channbrecvp(co_channel *c);

/**
Non-blocking receive a unsigned long value from channel.
@param c channel
@return received unsigned long value.
@return received unsigned long value, default 0.
*/
unsigned long channbrecvul(co_channel *c);

Expand All @@ -134,7 +141,7 @@ unsigned long channbrecvul(co_channel *c);
@param c channel
@param v the value's address.
@return 1 success, else fail.
@return channel_errorno
*/
int channbsend(co_channel *c, void *v);

Expand All @@ -143,7 +150,7 @@ int channbsend(co_channel *c, void *v);
@param c channel
@param v the pointer
@return 1 success, else fail.
@return channel_errorno
*/
int channbsendp(co_channel *c, void *v);

Expand All @@ -152,7 +159,7 @@ int channbsendp(co_channel *c, void *v);
@param c channel
@param v the unsigned long value
@return 1 success, else fail.
@return channel_errorno
*/
int channbsendul(co_channel *c, unsigned long v);

Expand All @@ -163,7 +170,7 @@ int channbsendul(co_channel *c, unsigned long v);
@param c channel
@param v the pointer will store received value.
@return 1 success, else fail.
@return channel_errorno
*/
int chanrecv(co_channel *c, void *v);

Expand All @@ -173,7 +180,7 @@ int chanrecv(co_channel *c, void *v);
If no one sending, and buffer is empty, blocking the current coroutine.
@param c channel
@return received pointer.
@return received pointer, default NULL.
*/
void *chanrecvp(co_channel *c);

Expand All @@ -183,7 +190,7 @@ void *chanrecvp(co_channel *c);
If no one sending, and buffer is empty, blocking the current coroutine.
@param c channel
@return received unsigned long value.
@return received unsigned long value, default 0.
*/
unsigned long chanrecvul(co_channel *c);

Expand All @@ -194,7 +201,7 @@ unsigned long chanrecvul(co_channel *c);
@param c channel
@param v the pointer will store received value.
@return 1 success, else fail.
@return channel_errorno
*/
int chansend(co_channel *c, void *v);

Expand All @@ -203,7 +210,7 @@ int chansend(co_channel *c, void *v);
@param c channel
@param v the pointer
@return 1 success, else fail.
@return channel_errorno
*/
int chansendp(co_channel *c, void *v);

Expand All @@ -212,7 +219,7 @@ int chansendp(co_channel *c, void *v);
@param c channel
@param v the unsigned long value
@return 1 success, else fail.
@return channel_errorno
*/
int chansendul(co_channel *c, unsigned long v);

Expand All @@ -221,7 +228,7 @@ int chansendul(co_channel *c, unsigned long v);
to cancel the blocking.
@param co the coroutine object
@return 1 success, else fail.
@return channel_errorno
*/
int chan_cancel_alt_in_co(coroutine_t *co);

Expand All @@ -234,7 +241,7 @@ int chan_cancel_alt_in_co(coroutine_t *co);
@param v the pointer pass the send value.
@param exec run at sending.
@param cancelExec run at cancel a alt.
@return 1 success, else fail.
@return channel_errorno
*/
int chansend_custom_exec(co_channel *c, void *v, IMP exec, IMP cancelExec);

Expand All @@ -244,7 +251,7 @@ int chansend_custom_exec(co_channel *c, void *v, IMP exec, IMP cancelExec);
@param c channel
@param v the value's address.
@param exec run at sending.
@return 1 success, else fail.
@return channel_errorno
*/
int channbsend_custom_exec(co_channel *c, void *v, IMP exec);

Expand All @@ -256,7 +263,7 @@ int channbsend_custom_exec(co_channel *c, void *v, IMP exec);
@param c channel
@param v the pointer will store received value.
@param cancelExec run at cancel a alt.
@return 1 success, else fail.
@return channel_errorno
*/
int chanrecv_custom_exec(co_channel *c, void *v, IMP cancelExec);

Expand Down
60 changes: 28 additions & 32 deletions cocore/co_csp.m
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ static void altqueue(chan_alt *a) {
* data goes from sender to receiver. If the channel is full,
* the receiver removes some from the channel and the sender
* gets to put some in.
* 1 success, 0 fail.
*/
static int altcopy(chan_alt *s, chan_alt *r) {
chan_alt *t;
Expand Down Expand Up @@ -310,13 +311,7 @@ static int altcopy(chan_alt *s, chan_alt *r) {
// receive first from buffer
queuepop(&c->buffer, r->value);
// then send to buffer
int ret = queuepush(&c->buffer, s->value);
if (ret == 1) {
if (s->custom_exec) {
s->custom_exec();
}
}
return ret;
return queuepush(&c->buffer, s->value);
}
}

Expand All @@ -326,14 +321,7 @@ static int altcopy(chan_alt *s, chan_alt *r) {
if(r){
return queuepop(&c->buffer, r->value);
} else if(s) {

int ret = queuepush(&c->buffer, s->value);
if (ret == 1) {
if (s->custom_exec) {
s->custom_exec();
}
}
return ret;
return queuepush(&c->buffer, s->value);
}
return 0;
}
Expand All @@ -348,21 +336,34 @@ static int altexec(chan_alt *a) {
altqueue = chanarray(c, otherop(a->op));
if(altqueuepop(altqueue, &other)){

altcopy(a, other);
int copyRet = altcopy(a, other);
assert(copyRet == 1);
coroutine_t *co = other->task;
void (*custom_resume)(coroutine_t *co) = c->custom_resume;
chanunlock(c);

// call back sender
chan_alt *sender = a->op == CHANNEL_SEND ? a : other;
if (sender->custom_exec) {
sender->custom_exec();
}

if (custom_resume) {
custom_resume(co);
} else {
coroutine_add(co);
}
return 1;
return CHANNEL_ALT_SUCCESS;
} else {
int ret = altcopy(a, nil);
int copyRet = altcopy(a, nil);
chanunlock(c);
return ret;

if (copyRet && a->op == CHANNEL_SEND) {
if (a->custom_exec) {
a->custom_exec();
}
}
return copyRet ? CHANNEL_ALT_SUCCESS : CHANNEL_ALT_ERROR_COPYFAIL;
}
}

Expand Down Expand Up @@ -409,6 +410,7 @@ static void chancancelallalt(co_channel *c) {
if (!a) {
continue;
}
a->is_cancelled = true;
// custom cancel
if (a->cancel_exec) {
a->cancel_exec();
Expand Down Expand Up @@ -499,7 +501,7 @@ int chanalt(chan_alt *a) {

if(!canblock) {
chanunlock(c);
return 0;
return a->op == CHANNEL_SEND ? CHANNEL_ALT_ERROR_BUFFER_FULL : CHANNEL_ALT_ERROR_NO_VALUE;
}

// add to queue
Expand All @@ -515,10 +517,10 @@ int chanalt(chan_alt *a) {
t->chan_alt = nil;
// alt is cancelled
if (a->is_cancelled) {
return 0;
return CHANNEL_ALT_ERROR_CANCELLED;
}

return 1;
return CHANNEL_ALT_SUCCESS;
}

static int _chanop(co_channel *c, int op, void *p, int canblock) {
Expand All @@ -535,12 +537,9 @@ static int _chanop(co_channel *c, int op, void *p, int canblock) {
a->custom_exec = NULL;
a->cancel_exec = NULL;

if(chanalt(a) == 0) {
free(a);
return 0;
}
int ret = chanalt(a);
free(a);
return 1;
return ret;
}

static int _chanop2(co_channel *c, int op, void *p, int canblock, IMP custom_exec, IMP cancel_exec) {
Expand All @@ -557,12 +556,9 @@ static int _chanop2(co_channel *c, int op, void *p, int canblock, IMP custom_exe
a->custom_exec = custom_exec;
a->cancel_exec = cancel_exec;

if(chanalt(a) == 0) {
free(a);
return 0;
}
int ret = chanalt(a);
free(a);
return 1;
return ret;
}

#pragma mark - public apis
Expand Down
4 changes: 2 additions & 2 deletions coobjc/co/COChan.m
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ - (id)receiveWithOnCancel:(COChanOnCancelBlock)cancelBlock {
}
co.currentChan = nil;

if (ret == 1) {
if (ret == CHANNEL_ALT_SUCCESS) {
// success
do {
COOBJC_SCOPELOCK(_buffLock);
Expand Down Expand Up @@ -200,7 +200,7 @@ - (id)receive_nonblock {
uint8_t val = 0;
int ret = channbrecv(_chan, &val);

if (ret == 1) {
if (ret == CHANNEL_ALT_SUCCESS) {

do {
COOBJC_SCOPELOCK(_buffLock);
Expand Down
Loading

0 comments on commit b2a68fd

Please sign in to comment.