Skip to content

Commit

Permalink
simplify KCN response protocol and its code.
Browse files Browse the repository at this point in the history
  • Loading branch information
ohmori7 committed Jan 4, 2014
1 parent b465dad commit 8bfacd3
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 88 deletions.
13 changes: 7 additions & 6 deletions kcndbd/kcndb_db.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include "kcn.h"
#include "kcn_str.h"
#include "kcn_info.h"
#include "kcn_eq.h"
#include "kcn_log.h"
#include "kcn_buf.h"
Expand Down Expand Up @@ -257,12 +256,13 @@ kcndb_db_record_add(enum kcn_eq_type type, struct kcndb_db_record *kdr)
}

bool
kcndb_db_search(struct kcn_info *ki, const struct kcn_eq *ke)
kcndb_db_search(const struct kcn_eq *ke, size_t maxnlocs,
bool (*cb)(const struct kcndb_db_record *, size_t, void *), void *arg)
{
struct kcndb_db_table *kdt;
struct kcn_buf *kb;
struct kcndb_db_record kdr;
size_t i, score;
size_t i, n, score;

kdt = kcndb_db_table_lookup(ke->ke_type);
if (! kcndb_file_seek_head(kdt->kdt_table, 0))
Expand All @@ -271,7 +271,7 @@ kcndb_db_search(struct kcn_info *ki, const struct kcn_eq *ke)
kcn_buf_reset(kb, 0);

score = 0; /* XXX: should compute score. */
for (i = 0; kcn_info_nlocs(ki) < kcn_info_maxnlocs(ki); i++) {
for (i = 0, n = 0; n < maxnlocs; i++) {
if (! kcndb_db_record_read(kdt, &kdr)) {
if (errno == ESHUTDOWN)
break;
Expand Down Expand Up @@ -318,13 +318,14 @@ kcndb_db_search(struct kcn_info *ki, const struct kcn_eq *ke)
goto bad;
KCN_LOG(DEBUG, "record[%zu]: match loc=%.*s",
i, (int)kdr.kdr_loclen, kdr.kdr_loc);
if (! kcn_info_loc_add(ki, kdr.kdr_loc, kdr.kdr_loclen, score))
if (! (*cb)(&kdr, score, arg))
goto bad;
++n;
}

KCN_LOG(INFO, "%zu record(s) read", i);

if (kcn_info_nlocs(ki) == 0) {
if (n == 0) {
KCN_LOG(DEBUG, "no matching record found");
errno = ESRCH;
goto bad;
Expand Down
3 changes: 2 additions & 1 deletion kcndbd/kcndb_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct kcndb_db_record {
void kcndb_db_path_set(const char *);
const char *kcndb_db_path_get(void);
bool kcndb_db_record_add(enum kcn_eq_type, struct kcndb_db_record *);
bool kcndb_db_search(struct kcn_info *, const struct kcn_eq *);
bool kcndb_db_search(const struct kcn_eq *, size_t,
bool (*)(const struct kcndb_db_record *, size_t, void *), void *);
bool kcndb_db_init(void);
void kcndb_db_finish(void);
54 changes: 29 additions & 25 deletions kcndbd/kcndb_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <event.h>

#include "kcn.h"
#include "kcn_info.h"
#include "kcn_eq.h"
#include "kcn_log.h"
#include "kcn_socket.h"
Expand Down Expand Up @@ -130,37 +129,42 @@ kcndb_server_loop(void)
}

static bool
kcndb_server_query_process(struct kcn_net *kn, struct kcn_buf *ikb,
const struct kcn_msg_header *kmh)
kcndb_server_response_send(const struct kcndb_db_record *kdr, size_t score,
void *arg)
{
struct kcn_net *kn = arg;
struct kcn_buf okb;
struct kcn_msg_query kmq;
struct kcn_msg_response kmr;
struct kcn_info *ki;

if (! kcn_msg_query_decode(ikb, kmh, &kmq))
ki = NULL;
else
/* XXX */
ki = kcn_info_new(kmq.kmq_loctype, kmq.kmq_maxcount);

kcn_msg_response_init(&kmr, ki);
if (ki == NULL || ! kcndb_db_search(ki, &kmq.kmq_eq))
kmr.kmr_error = errno;
else {
assert(kcn_info_nlocs(ki) > 0);
kmr.kmr_leftcount = kcn_info_nlocs(ki) - 1;
}

kcn_net_opkt(kn, &okb);
do {
kcn_msg_response_encode(&okb, &kmr);
if (! kcn_net_write(kn, &okb))
break;
} while (kmr.kmr_leftcount-- > 0);
#define KCNDB_SERVER_RESPONSE_MAGIC ((void *)1U)
if (kdr != KCNDB_SERVER_RESPONSE_MAGIC) {
kmr.kmr_error = 0;
kmr.kmr_score = score;
kmr.kmr_loc = kdr->kdr_loc;
kmr.kmr_loclen = kdr->kdr_loclen;
} else {
kmr.kmr_error = score;
kmr.kmr_score = 0;
kmr.kmr_loc = 0;
kmr.kmr_loclen = 0;
}
kcn_msg_response_encode(&okb, &kmr);
return kcn_net_write(kn, &okb);
}

kcn_info_destroy(ki); /* XXX */
static bool
kcndb_server_query_process(struct kcn_net *kn, struct kcn_buf *ikb,
const struct kcn_msg_header *kmh)
{
struct kcn_msg_query kmq;

if (kcn_msg_query_decode(ikb, kmh, &kmq) &&
kcndb_db_search(&kmq.kmq_eq, kmq.kmq_maxcount,
kcndb_server_response_send, kn))
errno = 0;
kcndb_server_response_send(KCNDB_SERVER_RESPONSE_MAGIC, errno, kn);
/* always return 0 in order to return a response with an error. */
return true;
}

Expand Down
55 changes: 35 additions & 20 deletions lib/kcn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
#include "kcn_netstat.h"
#include "kcn_client.h"

struct kcn_client_response {
int kcr_error;
struct kcn_info *kcr_ki;
};

static int kcn_client_read(struct kcn_net *, struct kcn_buf *, void *);

struct kcn_net *
Expand Down Expand Up @@ -59,25 +64,38 @@ kcn_client_finish(struct kcn_net *kn)
static int
kcn_client_read(struct kcn_net *kn, struct kcn_buf *kb, void *arg)
{
struct kcn_msg_response *kmr = arg;
struct kcn_client_response *kcr = arg;
struct kcn_info *ki = kcr->kcr_ki;
struct kcn_msg_response kmr;
struct kcn_msg_header kmh;

(void)kn;
do {
for (;;) {
if (! kcn_msg_header_decode(kb, &kmh))
goto bad;
break;
if (kmh.kmh_type != KCN_MSG_TYPE_RESPONSE) {
errno = EINVAL;
goto bad;
break;
}
if (! kcn_msg_response_decode(kb, &kmh, kmr))
goto bad;
} while (kmr->kmr_leftcount > 0);
return 0;
bad:
kcn_msg_response_init(&kmr);
if (! kcn_msg_response_decode(kb, &kmh, &kmr))
break;
if (kmr.kmr_loclen == 0) {
if (kmr.kmr_error != EAGAIN)
errno = kmr.kmr_error;
break;
}
if (kcn_info_maxnlocs(ki) == kcn_info_nlocs(ki)) {
errno = ETOOMANYREFS; /* XXX */
break;
}
if (! kcn_info_loc_add(ki,
kmr.kmr_loc, kmr.kmr_loclen, kmr.kmr_score))
break;
}
if (errno != EAGAIN)
kmr->kmr_error = errno;
return errno;
kcr->kcr_error = errno;
return kcr->kcr_error;
}

static bool
Expand Down Expand Up @@ -107,27 +125,24 @@ kcn_client_search(struct kcn_info *ki, const struct kcn_msg_query *kmq)
{
struct event_base *evb;
struct kcn_net *kn;
struct kcn_msg_response kmr;
struct kcn_client_response kcr;

evb = event_init();
if (evb == NULL)
goto bad;

kn = kcn_client_init(evb, &kmr);
kcr.kcr_error = 0;
kcr.kcr_ki = ki;
kn = kcn_client_init(evb, &kcr);
if (kn == NULL)
goto bad;

kcn_msg_response_init(&kmr, ki);
if (! kcn_client_query_send(kn, kmq))
goto bad;
if (! kcn_net_loop(kn))
goto bad;
if (kmr.kmr_error != 0) {
errno = kmr.kmr_error;
goto bad;
}
if (kcn_info_nlocs(ki) == 0) {
errno = ESRCH;
if (kcr.kcr_error != 0) {
errno = kcr.kcr_error;
goto bad;
}
kcn_client_finish(kn);
Expand Down
43 changes: 11 additions & 32 deletions lib/kcn_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "kcn.h"
#include "kcn_log.h"
#include "kcn_buf.h"
#include "kcn_info.h"
#include "kcn_eq.h"
#include "kcn_msg.h"

Expand Down Expand Up @@ -120,44 +119,31 @@ kcn_msg_query_decode(struct kcn_buf *kb, const struct kcn_msg_header *kmh,
}

void
kcn_msg_response_init(struct kcn_msg_response *kmr, struct kcn_info *ki)
kcn_msg_response_init(struct kcn_msg_response *kmr)
{

kmr->kmr_error = 0;
kmr->kmr_leftcount = 0;
kmr->kmr_score = 0;
kmr->kmr_ki = ki;
kmr->kmr_loc = NULL;
kmr->kmr_loclen = 0;
}

void
kcn_msg_response_encode(struct kcn_buf *kb, const struct kcn_msg_response *kmr)
{
const struct kcn_info *ki;
const char *loc;
size_t idx;

kcn_msg_pkt_init(kb);
kcn_buf_put8(kb, kmr->kmr_error);
kcn_buf_put8(kb, kmr->kmr_leftcount);
kcn_buf_put8(kb, kmr->kmr_score);

ki = kmr->kmr_ki;
if (ki != NULL && kcn_info_nlocs(ki) > 0) {
assert(kcn_info_nlocs(ki) > kmr->kmr_leftcount);
idx = kcn_info_nlocs(ki) - kmr->kmr_leftcount - 1;
loc = kcn_info_loc(ki, idx);
assert(loc != NULL);
kcn_buf_put(kb, loc, strlen(loc));
}

if (kmr->kmr_loc != NULL)
kcn_buf_put(kb, kmr->kmr_loc, kmr->kmr_loclen);
kcn_msg_header_encode(kb, KCN_MSG_TYPE_RESPONSE);
}

bool
kcn_msg_response_decode(struct kcn_buf *kb, const struct kcn_msg_header *kmh,
struct kcn_msg_response *kmr)
{
struct kcn_info *ki = kmr->kmr_ki;
size_t len = kmh->kmh_len;

if (kcn_buf_trailingdata(kb) < KCN_MSG_RESPONSE_MINSIZ ||
Expand All @@ -166,21 +152,14 @@ kcn_msg_response_decode(struct kcn_buf *kb, const struct kcn_msg_header *kmh,
goto bad;
}
kmr->kmr_error = kcn_buf_get8(kb);
kmr->kmr_leftcount = kcn_buf_get8(kb);
kmr->kmr_score = kcn_buf_get8(kb);
len -= KCN_MSG_RESPONSE_MINSIZ;
kmr->kmr_loclen = len - KCN_MSG_RESPONSE_MINSIZ;
if (kmr->kmr_loclen > 0)
kmr->kmr_loc = kcn_buf_current(kb);
else
kmr->kmr_loc = NULL;
kcn_buf_forward(kb, kmr->kmr_loclen);
kcn_buf_trim_head(kb, kcn_buf_headingdata(kb));
if (kcn_info_maxnlocs(ki) - kcn_info_nlocs(ki) < kmr->kmr_leftcount) {
KCN_LOG(ERR, "too many responses, %u", kmr->kmr_leftcount);
errno = ETOOMANYREFS; /* XXX */
goto bad;
}

if (! kcn_info_loc_add(kmr->kmr_ki, kcn_buf_current(kb), len,
kmr->kmr_score))
goto bad;
kcn_buf_trim_head(kb, len);

return true;
bad:
return false;
Expand Down
8 changes: 4 additions & 4 deletions lib/kcn_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#define KCN_MSG_MAXSIZ 4096
#define KCN_MSG_MAXBODYSIZ (KCN_MSG_MAXSIZ - KCN_MSG_HDRSIZ)
#define KCN_MSG_QUERY_SIZ (1 + 1 + 1 + 1 + 8 + 8 + 8)
#define KCN_MSG_RESPONSE_MINSIZ (1 + 1 + 1)
#define KCN_MSG_RESPONSE_MINSIZ (1 + 1)
#define KCN_MSG_ADD_MINSIZ (1 + 8 + 8)
#define KCN_MSG_MAXLOCSIZ \
(KCN_MSG_MAXBODYSIZ - max(KCN_MSG_RESPONSE_MINSIZ, KCN_MSG_ADD_MINSIZ))
Expand All @@ -30,9 +30,9 @@ struct kcn_msg_query {

struct kcn_msg_response {
uint8_t kmr_error;
uint8_t kmr_leftcount;
uint8_t kmr_score;
struct kcn_info *kmr_ki;
const char *kmr_loc;
size_t kmr_loclen;
};

struct kcn_msg_add {
Expand All @@ -47,7 +47,7 @@ bool kcn_msg_header_decode(struct kcn_buf *, struct kcn_msg_header *);
void kcn_msg_query_encode(struct kcn_buf *, const struct kcn_msg_query *);
bool kcn_msg_query_decode(struct kcn_buf *, const struct kcn_msg_header *,
struct kcn_msg_query *);
void kcn_msg_response_init(struct kcn_msg_response *, struct kcn_info *);
void kcn_msg_response_init(struct kcn_msg_response *);
void kcn_msg_response_encode(struct kcn_buf *, const struct kcn_msg_response *);
bool kcn_msg_response_decode(struct kcn_buf *, const struct kcn_msg_header *,
struct kcn_msg_response *);
Expand Down

0 comments on commit 8bfacd3

Please sign in to comment.