Skip to content

Commit

Permalink
Add ON_EMPTY 'e' flag on subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
jaracil committed Dec 5, 2019
1 parent 16f7362 commit b8773b0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ typedef struct ps_queue_s {
typedef struct subscriber_list_s {
ps_subscriber_t *su;
bool hidden;
bool on_empty;
struct subscriber_list_s *next;
struct subscriber_list_s *prev;
} subscriber_list_t;
Expand Down Expand Up @@ -458,6 +459,7 @@ int ps_subscribe(ps_subscriber_t *su, const char *topic_orig) {
char *topic = strdup(topic_orig);

bool hidden_flag = false;
bool on_empty_flag = false;
bool no_sticky_flag = false;
bool child_sticky_flag = false;

Expand All @@ -476,6 +478,9 @@ int ps_subscribe(ps_subscriber_t *su, const char *topic_orig) {
case 'S':
child_sticky_flag = true;
break;
case 'e':
on_empty_flag = true;
break;
}
fl_str++;
}
Expand All @@ -491,6 +496,7 @@ int ps_subscribe(ps_subscriber_t *su, const char *topic_orig) {
sl = calloc(1, sizeof(*sl));
sl->su = su;
sl->hidden = hidden_flag;
sl->on_empty = on_empty_flag;
DL_APPEND(tm->subscribers, sl);
subs = calloc(1, sizeof(*subs));
subs->tm = tm;
Expand Down Expand Up @@ -649,6 +655,9 @@ int ps_publish(ps_msg_t *msg) {
}
if (tm != NULL) {
DL_FOREACH (tm->subscribers, sl) {
if (sl->on_empty && ps_waiting(sl->su) != 0) {
continue;
}
if (push_subscriber_queue(sl->su, msg) == 0 && !sl->hidden) {
ret++;
}
Expand Down
21 changes: 21 additions & 0 deletions tests/tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,26 @@ void test_no_recursive(void) {
check_leak();
}

void test_on_empty(void) {
ps_msg_t *msg;
printf("Test on empty\n");
ps_subscriber_t *s1 = ps_new_subscriber(10, STRLIST("foo e"));
PUB_NIL("foo.bar");
assert(ps_waiting(s1) == 1);
PUB_NIL("foo.bar");
assert(ps_waiting(s1) == 1);
msg = ps_get(s1, 10);
assert(IS_NIL(msg));
ps_unref_msg(msg);
assert(ps_waiting(s1) == 0);
PUB_NIL("foo.bar");
assert(ps_waiting(s1) == 1);
PUB_NIL("foo.bar");
assert(ps_waiting(s1) == 1);
ps_free_subscriber(s1);
check_leak();
}

void test_pub_get(void) {
printf("Test pub->get\n");
ps_msg_t *msg;
Expand Down Expand Up @@ -311,6 +331,7 @@ void run_all(void) {
test_no_sticky_flag();
test_child_sticky_flag();
test_no_recursive();
test_on_empty();
test_pub_get();
test_overflow();
test_new_msg_cb();
Expand Down

0 comments on commit b8773b0

Please sign in to comment.