Skip to content

Commit

Permalink
ceph: search cache postion for dcache readdir
Browse files Browse the repository at this point in the history
use binary search to find cache index that corresponds to readdir
postion.

Signed-off-by: Yan, Zheng <[email protected]>
  • Loading branch information
ukernel authored and idryomov committed May 25, 2016
1 parent 04303d8 commit c530cd2
Showing 1 changed file with 83 additions and 46 deletions.
129 changes: 83 additions & 46 deletions fs/ceph/dir.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,50 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name,
return 0;
}


static struct dentry *
__dcache_find_get_entry(struct dentry *parent, u64 idx,
struct ceph_readdir_cache_control *cache_ctl)
{
struct inode *dir = d_inode(parent);
struct dentry *dentry;
unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
loff_t ptr_pos = idx * sizeof(struct dentry *);
pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;

if (ptr_pos >= i_size_read(dir))
return NULL;

if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
ceph_readdir_cache_release(cache_ctl);
cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
if (!cache_ctl->page) {
dout(" page %lu not found\n", ptr_pgoff);
return ERR_PTR(-EAGAIN);
}
/* reading/filling the cache are serialized by
i_mutex, no need to use page lock */
unlock_page(cache_ctl->page);
cache_ctl->dentries = kmap(cache_ctl->page);
}

cache_ctl->index = idx & idx_mask;

rcu_read_lock();
spin_lock(&parent->d_lock);
/* check i_size again here, because empty directory can be
* marked as complete while not holding the i_mutex. */
if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
dentry = cache_ctl->dentries[cache_ctl->index];
else
dentry = NULL;
spin_unlock(&parent->d_lock);
if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
dentry = NULL;
rcu_read_unlock();
return dentry ? : ERR_PTR(-EAGAIN);
}

/*
* When possible, we try to satisfy a readdir by peeking at the
* dcache. We make this work by carefully ordering dentries on
Expand All @@ -129,62 +173,57 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
struct inode *dir = d_inode(parent);
struct dentry *dentry, *last = NULL;
struct ceph_dentry_info *di;
unsigned nsize = PAGE_SIZE / sizeof(struct dentry *);
int err = 0;
loff_t ptr_pos = 0;
struct ceph_readdir_cache_control cache_ctl = {};
u64 idx = 0;
int err = 0;

dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);

/* we can calculate cache index for the first dirfrag */
if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
cache_ctl.index = fpos_off(ctx->pos) - 2;
BUG_ON(cache_ctl.index < 0);
ptr_pos = cache_ctl.index * sizeof(struct dentry *);
/* search start position */
if (ctx->pos > 2) {
u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
while (count > 0) {
u64 step = count >> 1;
dentry = __dcache_find_get_entry(parent, idx + step,
&cache_ctl);
if (!dentry) {
/* use linar search */
idx = 0;
break;
}
if (IS_ERR(dentry)) {
err = PTR_ERR(dentry);
goto out;
}
di = ceph_dentry(dentry);
spin_lock(&dentry->d_lock);
if (fpos_cmp(di->offset, ctx->pos) < 0) {
idx += step + 1;
count -= step + 1;
} else {
count = step;
}
spin_unlock(&dentry->d_lock);
dput(dentry);
}

dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
}

while (true) {
pgoff_t pgoff;
bool emit_dentry;

if (ptr_pos >= i_size_read(dir)) {
for (;;) {
bool emit_dentry = false;
dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
if (!dentry) {
fi->flags |= CEPH_F_ATEND;
err = 0;
break;
}

err = -EAGAIN;
pgoff = ptr_pos >> PAGE_SHIFT;
if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
ceph_readdir_cache_release(&cache_ctl);
cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
if (!cache_ctl.page) {
dout(" page %lu not found\n", pgoff);
break;
}
/* reading/filling the cache are serialized by
* i_mutex, no need to use page lock */
unlock_page(cache_ctl.page);
cache_ctl.dentries = kmap(cache_ctl.page);
if (IS_ERR(dentry)) {
err = PTR_ERR(dentry);
goto out;
}

rcu_read_lock();
spin_lock(&parent->d_lock);
/* check i_size again here, because empty directory can be
* marked as complete while not holding the i_mutex. */
if (ceph_dir_is_complete_ordered(dir) &&
ptr_pos < i_size_read(dir))
dentry = cache_ctl.dentries[cache_ctl.index % nsize];
else
dentry = NULL;
spin_unlock(&parent->d_lock);
if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
dentry = NULL;
rcu_read_unlock();
if (!dentry)
break;

emit_dentry = false;
di = ceph_dentry(dentry);
spin_lock(&dentry->d_lock);
if (di->lease_shared_gen == shared_gen &&
Expand Down Expand Up @@ -217,10 +256,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
} else {
dput(dentry);
}

cache_ctl.index++;
ptr_pos += sizeof(struct dentry *);
}
out:
ceph_readdir_cache_release(&cache_ctl);
if (last) {
int ret;
Expand Down

0 comments on commit c530cd2

Please sign in to comment.