Skip to content

Commit

Permalink
ceph: use pagelist to present MDS request data
Browse files Browse the repository at this point in the history
Current code uses page array to present MDS request data. Pages in the
array are allocated/freed by caller of ceph_mdsc_do_request(). If request
is interrupted, the pages can be freed while they are still being used by
the request message.

The fix is use pagelist to present MDS request data. Pagelist is
reference counted.

Signed-off-by: Yan, Zheng <[email protected]>
Reviewed-by: Sage Weil <[email protected]>
  • Loading branch information
ukernel authored and liewegas committed Oct 14, 2014
1 parent e4339d2 commit 25e6bae
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 39 deletions.
14 changes: 9 additions & 5 deletions fs/ceph/mds_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ void ceph_mdsc_release_request(struct kref *kref)
}
kfree(req->r_path1);
kfree(req->r_path2);
if (req->r_pagelist)
ceph_pagelist_release(req->r_pagelist);
put_request_session(req);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
kfree(req);
Expand Down Expand Up @@ -1916,13 +1918,15 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);

if (req->r_data_len) {
/* outbound data set only by ceph_sync_setxattr() */
BUG_ON(!req->r_pages);
ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0);
if (req->r_pagelist) {
struct ceph_pagelist *pagelist = req->r_pagelist;
atomic_inc(&pagelist->refcnt);
ceph_msg_data_add_pagelist(msg, pagelist);
msg->hdr.data_len = cpu_to_le32(pagelist->length);
} else {
msg->hdr.data_len = 0;
}

msg->hdr.data_len = cpu_to_le32(req->r_data_len);
msg->hdr.data_off = cpu_to_le16(0);

out_free2:
Expand Down
4 changes: 1 addition & 3 deletions fs/ceph/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ struct ceph_mds_request {
bool r_direct_is_hash; /* true if r_direct_hash is valid */

/* data payload is used for xattr ops */
struct page **r_pages;
int r_num_pages;
int r_data_len;
struct ceph_pagelist *r_pagelist;

/* what caps shall we drop? */
int r_inode_drop, r_inode_unless;
Expand Down
49 changes: 18 additions & 31 deletions fs/ceph/xattr.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <linux/ceph/ceph_debug.h>
#include <linux/ceph/pagelist.h>

#include "super.h"
#include "mds_client.h"
Expand Down Expand Up @@ -850,35 +851,25 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_request *req;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_pagelist *pagelist = NULL;
int err;
int i, nr_pages;
struct page **pages = NULL;
void *kaddr;

/* copy value into some pages */
nr_pages = calc_pages_for(0, size);
if (nr_pages) {
pages = kmalloc(sizeof(pages[0])*nr_pages, GFP_NOFS);
if (!pages)

if (value) {
/* copy value into pagelist */
pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
if (!pagelist)
return -ENOMEM;
err = -ENOMEM;
for (i = 0; i < nr_pages; i++) {
pages[i] = __page_cache_alloc(GFP_NOFS);
if (!pages[i]) {
nr_pages = i;
goto out;
}
kaddr = kmap(pages[i]);
memcpy(kaddr, value + i*PAGE_CACHE_SIZE,
min(PAGE_CACHE_SIZE, size-i*PAGE_CACHE_SIZE));
}

ceph_pagelist_init(pagelist);
err = ceph_pagelist_append(pagelist, value, size);
if (err)
goto out;
} else {
flags |= CEPH_XATTR_REMOVE;
}

dout("setxattr value=%.*s\n", (int)size, value);

if (!value)
flags |= CEPH_XATTR_REMOVE;

/* do request */
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR,
USE_AUTH_MDS);
Expand All @@ -893,21 +884,17 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
req->r_args.setxattr.flags = cpu_to_le32(flags);
req->r_path2 = kstrdup(name, GFP_NOFS);

req->r_pages = pages;
req->r_num_pages = nr_pages;
req->r_data_len = size;
req->r_pagelist = pagelist;
pagelist = NULL;

dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
err = ceph_mdsc_do_request(mdsc, NULL, req);
ceph_mdsc_put_request(req);
dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);

out:
if (pages) {
for (i = 0; i < nr_pages; i++)
__free_page(pages[i]);
kfree(pages);
}
if (pagelist)
ceph_pagelist_release(pagelist);
return err;
}

Expand Down

0 comments on commit 25e6bae

Please sign in to comment.