Skip to content

Commit

Permalink
Change channel's cancel, support mutiple coroutine use one channel
Browse files Browse the repository at this point in the history
  • Loading branch information
NianJi committed Apr 25, 2019
1 parent 93de4d3 commit 559b820
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 169 deletions.
58 changes: 37 additions & 21 deletions cocore/co_csp.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ typedef enum {
typedef struct chan_alt chan_alt;
typedef struct chan_queue chan_queue;
typedef struct co_channel co_channel;
typedef struct alt_queue alt_queue;

/**
Define the chan alt, record a send/receive context.
Expand All @@ -46,8 +47,20 @@ struct chan_alt
co_channel *channel;
void *value;
coroutine_t *task;
chan_alt *prev;
chan_alt *next;
IMP custom_exec;
IMP cancel_exec;
channel_op op;
int can_block;
bool is_cancelled;
};

struct alt_queue
{
chan_alt *head;
chan_alt *tail;
unsigned int count;
};

/**
Expand All @@ -69,8 +82,8 @@ struct chan_queue
*/
struct co_channel {
chan_queue buffer;
chan_queue asend;
chan_queue arecv;
alt_queue asend;
alt_queue arecv;
pthread_mutex_t lock;
void (*custom_resume)(coroutine_t *co);
};
Expand Down Expand Up @@ -204,44 +217,47 @@ int chansendp(co_channel *c, void *v);
int chansendul(co_channel *c, unsigned long v);

/**
Blocking send a int8_t value to channel.
@param c channel
@param val the int8 value
If a channel is blocking a coroutine, using this method
to cancel the blocking.
@param co the coroutine object
@return 1 success, else fail.
*/
int chansendi8(co_channel *c, int8_t val);
int chan_cancel_alt_in_co(coroutine_t *co);

/**
Blocking receive a int8_t value from channel.
Blocking send value to channel.
If no one sending, and buffer is empty, blocking the current coroutine.
@param c channel
@return received int8_t value.
@param v the pointer pass the send value.
@param exec run at sending.
@param cancelExec run at cancel a alt.
@return 1 success, else fail.
*/
int8_t chanrecvi8(co_channel *c);
int chansend_custom_exec(co_channel *c, void *v, IMP exec, IMP cancelExec);

/**
Non-blocking send a int8_t value to channel.
Non-blocking send value to channel.
@param c channel
@param val the int8_t value
@param v the value's address.
@param exec run at sending.
@return 1 success, else fail.
*/
int channbsendi8(co_channel *c, int8_t val);
int channbsend_custom_exec(co_channel *c, void *v, IMP exec);

/**
Non-blocking receive a int8_t value from channel.
Blocking receive value to channel.
If no one sending, and buffer is empty, blocking the current coroutine.
@param c channel
@return received int8_t value.
*/
int8_t channbrecvi8(co_channel *c);

/**
Get the blocking task count.
@param v the pointer will store received value.
@param cancelExec run at cancel a alt.
@return 1 success, else fail.
*/
int changetblocking(co_channel *c, int *sendBlockingCount, int *receiveBlockingCount);
int chanrecv_custom_exec(co_channel *c, void *v, IMP cancelExec);

#endif
Loading

0 comments on commit 559b820

Please sign in to comment.