Skip to content

Commit

Permalink
libceph: support for CEPH_OSD_OP_LIST_WATCHERS
Browse files Browse the repository at this point in the history
Add support for this Ceph OSD op, needed to support the RBD exclusive
lock feature.

Signed-off-by: Douglas Fuller <[email protected]>
[[email protected]: refactor, misc fixes throughout]
Signed-off-by: Ilya Dryomov <[email protected]>
Reviewed-by: Mike Christie <[email protected]>
Reviewed-by: Alex Elder <[email protected]>
  • Loading branch information
Douglas Fuller authored and idryomov committed Aug 24, 2016
1 parent f01d5cb commit a4ed38d
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 1 deletion.
15 changes: 14 additions & 1 deletion include/linux/ceph/osd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ struct ceph_osd_req_op {
struct ceph_osd_data request_data;
struct ceph_osd_data response_data;
} notify;
struct {
struct ceph_osd_data response_data;
} list_watchers;
struct {
u64 expected_object_size;
u64 expected_write_size;
Expand Down Expand Up @@ -249,6 +252,12 @@ struct ceph_osd_linger_request {
size_t *preply_len;
};

struct ceph_watch_item {
struct ceph_entity_name name;
u64 cookie;
struct ceph_entity_addr addr;
};

struct ceph_osd_client {
struct ceph_client *client;

Expand Down Expand Up @@ -346,7 +355,6 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
struct page **pages, u64 length,
u32 alignment, bool pages_from_pool,
bool own_pages);

extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
const char *class, const char *method);
Expand Down Expand Up @@ -434,5 +442,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
size_t *preply_len);
int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
struct ceph_osd_linger_request *lreq);
int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
struct ceph_watch_item **watchers,
u32 *num_watchers);
#endif

117 changes: 117 additions & 0 deletions net/ceph/osd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
ceph_osd_data_release(&op->notify.request_data);
ceph_osd_data_release(&op->notify.response_data);
break;
case CEPH_OSD_OP_LIST_WATCHERS:
ceph_osd_data_release(&op->list_watchers.response_data);
break;
default:
break;
}
Expand Down Expand Up @@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
case CEPH_OSD_OP_NOTIFY:
dst->notify.cookie = cpu_to_le64(src->notify.cookie);
break;
case CEPH_OSD_OP_LIST_WATCHERS:
break;
case CEPH_OSD_OP_SETALLOCHINT:
dst->alloc_hint.expected_object_size =
cpu_to_le64(src->alloc_hint.expected_object_size);
Expand Down Expand Up @@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
ceph_osdc_msg_data_add(req->r_reply,
&op->extent.osd_data);
break;
case CEPH_OSD_OP_LIST_WATCHERS:
ceph_osdc_msg_data_add(req->r_reply,
&op->list_watchers.response_data);
break;

/* both */
case CEPH_OSD_OP_CALL:
Expand Down Expand Up @@ -3891,6 +3900,114 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
return ret;
}

static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
{
u8 struct_v;
u32 struct_len;
int ret;

ret = ceph_start_decoding(p, end, 2, "watch_item_t",
&struct_v, &struct_len);
if (ret)
return ret;

ceph_decode_copy(p, &item->name, sizeof(item->name));
item->cookie = ceph_decode_64(p);
*p += 4; /* skip timeout_seconds */
if (struct_v >= 2) {
ceph_decode_copy(p, &item->addr, sizeof(item->addr));
ceph_decode_addr(&item->addr);
}

dout("%s %s%llu cookie %llu addr %s\n", __func__,
ENTITY_NAME(item->name), item->cookie,
ceph_pr_addr(&item->addr.in_addr));
return 0;
}

static int decode_watchers(void **p, void *end,
struct ceph_watch_item **watchers,
u32 *num_watchers)
{
u8 struct_v;
u32 struct_len;
int i;
int ret;

ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
&struct_v, &struct_len);
if (ret)
return ret;

*num_watchers = ceph_decode_32(p);
*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
if (!*watchers)
return -ENOMEM;

for (i = 0; i < *num_watchers; i++) {
ret = decode_watcher(p, end, *watchers + i);
if (ret) {
kfree(*watchers);
return ret;
}
}

return 0;
}

/*
* On success, the caller is responsible for:
*
* kfree(watchers);
*/
int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
struct ceph_watch_item **watchers,
u32 *num_watchers)
{
struct ceph_osd_request *req;
struct page **pages;
int ret;

req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
if (!req)
return -ENOMEM;

ceph_oid_copy(&req->r_base_oid, oid);
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ;

ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;

pages = ceph_alloc_page_vector(1, GFP_NOIO);
if (IS_ERR(pages)) {
ret = PTR_ERR(pages);
goto out_put_req;
}

osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
response_data),
pages, PAGE_SIZE, 0, false, true);

ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) {
void *p = page_address(pages[0]);
void *const end = p + req->r_ops[0].outdata_len;

ret = decode_watchers(&p, end, watchers, num_watchers);
}

out_put_req:
ceph_osdc_put_request(req);
return ret;
}
EXPORT_SYMBOL(ceph_osdc_list_watchers);

/*
* Call all pending notify callbacks - for use after a watch is
* unregistered, to make sure no more callbacks for it will be invoked
Expand Down

0 comments on commit a4ed38d

Please sign in to comment.