Skip to content

Commit

Permalink
direct-io: Implement generic deferred AIO completions
Browse files Browse the repository at this point in the history
Add support to the core direct-io code to defer AIO completions to user
context using a workqueue.  This replaces opencoded and less efficient
code in XFS and ext4 (we save a memory allocation for each direct IO)
and will be needed to properly support O_(D)SYNC for AIO.

The communication between the filesystem and the direct I/O code requires
a new buffer head flag, which is a bit ugly but not avoidable until the
direct I/O code stops abusing the buffer_head structure for communicating
with the filesystems.

Currently this creates a per-superblock unbound workqueue for these
completions, which is taken from an earlier patch by Jan Kara.  I'm
not really convinced about this use and would prefer a "normal" global
workqueue with a high concurrency limit, but this needs further discussion.

JK: Fixed ext4 part, dynamic allocation of the workqueue.

Signed-off-by: Christoph Hellwig <[email protected]>
Signed-off-by: Jan Kara <[email protected]>
Signed-off-by: Al Viro <[email protected]>
  • Loading branch information
Christoph Hellwig authored and Al Viro committed Sep 4, 2013
1 parent 4b6ccca commit 7b7a866
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 131 deletions.
85 changes: 69 additions & 16 deletions fs/direct-io.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ struct dio {
spinlock_t bio_lock; /* protects BIO fields below */
int page_errors; /* errno from get_user_pages() */
int is_async; /* is IO async ? */
bool defer_completion; /* defer AIO completion to workqueue? */
int io_error; /* IO error in completion path */
unsigned long refcount; /* direct_io_worker() and bios */
struct bio *bio_list; /* singly linked via bi_private */
Expand All @@ -141,7 +142,10 @@ struct dio {
* allocation time. Don't add new fields after pages[] unless you
* wish that they not be zeroed.
*/
struct page *pages[DIO_PAGES]; /* page buffer */
union {
struct page *pages[DIO_PAGES]; /* page buffer */
struct work_struct complete_work;/* deferred AIO completion */
};
} ____cacheline_aligned_in_smp;

static struct kmem_cache *dio_cache __read_mostly;
Expand Down Expand Up @@ -221,16 +225,16 @@ static inline struct page *dio_get_page(struct dio *dio,
* dio_complete() - called when all DIO BIO I/O has been completed
* @offset: the byte offset in the file of the completed operation
*
* This releases locks as dictated by the locking type, lets interested parties
* know that a DIO operation has completed, and calculates the resulting return
* code for the operation.
* This drops i_dio_count, lets interested parties know that a DIO operation
* has completed, and calculates the resulting return code for the operation.
*
* It lets the filesystem know if it registered an interest earlier via
* get_block. Pass the private field of the map buffer_head so that
* filesystems can use it to hold additional state between get_block calls and
* dio_complete.
*/
static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async)
static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret,
bool is_async)
{
ssize_t transferred = 0;

Expand Down Expand Up @@ -258,19 +262,26 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is
if (ret == 0)
ret = transferred;

if (dio->end_io && dio->result) {
dio->end_io(dio->iocb, offset, transferred,
dio->private, ret, is_async);
} else {
inode_dio_done(dio->inode);
if (is_async)
aio_complete(dio->iocb, ret, 0);
}
if (dio->end_io && dio->result)
dio->end_io(dio->iocb, offset, transferred, dio->private);

inode_dio_done(dio->inode);
if (is_async)
aio_complete(dio->iocb, ret, 0);

kmem_cache_free(dio_cache, dio);
return ret;
}

static void dio_aio_complete_work(struct work_struct *work)
{
struct dio *dio = container_of(work, struct dio, complete_work);

dio_complete(dio, dio->iocb->ki_pos, 0, true);
}

static int dio_bio_complete(struct dio *dio, struct bio *bio);

/*
* Asynchronous IO callback.
*/
Expand All @@ -290,8 +301,13 @@ static void dio_bio_end_aio(struct bio *bio, int error)
spin_unlock_irqrestore(&dio->bio_lock, flags);

if (remaining == 0) {
dio_complete(dio, dio->iocb->ki_pos, 0, true);
kmem_cache_free(dio_cache, dio);
if (dio->result && dio->defer_completion) {
INIT_WORK(&dio->complete_work, dio_aio_complete_work);
queue_work(dio->inode->i_sb->s_dio_done_wq,
&dio->complete_work);
} else {
dio_complete(dio, dio->iocb->ki_pos, 0, true);
}
}
}

Expand Down Expand Up @@ -510,6 +526,41 @@ static inline int dio_bio_reap(struct dio *dio, struct dio_submit *sdio)
return ret;
}

/*
* Create workqueue for deferred direct IO completions. We allocate the
* workqueue when it's first needed. This avoids creating workqueue for
* filesystems that don't need it and also allows us to create the workqueue
* late enough so the we can include s_id in the name of the workqueue.
*/
static int sb_init_dio_done_wq(struct super_block *sb)
{
struct workqueue_struct *wq = alloc_workqueue("dio/%s",
WQ_MEM_RECLAIM, 0,
sb->s_id);
if (!wq)
return -ENOMEM;
/*
* This has to be atomic as more DIOs can race to create the workqueue
*/
cmpxchg(&sb->s_dio_done_wq, NULL, wq);
/* Someone created workqueue before us? Free ours... */
if (wq != sb->s_dio_done_wq)
destroy_workqueue(wq);
return 0;
}

static int dio_set_defer_completion(struct dio *dio)
{
struct super_block *sb = dio->inode->i_sb;

if (dio->defer_completion)
return 0;
dio->defer_completion = true;
if (!sb->s_dio_done_wq)
return sb_init_dio_done_wq(sb);
return 0;
}

/*
* Call into the fs to map some more disk blocks. We record the current number
* of available blocks at sdio->blocks_available. These are in units of the
Expand Down Expand Up @@ -581,6 +632,9 @@ static int get_more_blocks(struct dio *dio, struct dio_submit *sdio,

/* Store for completion */
dio->private = map_bh->b_private;

if (ret == 0 && buffer_defer_completion(map_bh))
ret = dio_set_defer_completion(dio);
}
return ret;
}
Expand Down Expand Up @@ -1269,7 +1323,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,

if (drop_refcount(dio) == 0) {
retval = dio_complete(dio, offset, retval, false);
kmem_cache_free(dio_cache, dio);
} else
BUG_ON(retval != -EIOCBQUEUED);

Expand Down
11 changes: 0 additions & 11 deletions fs/ext4/ext4.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ struct ext4_map_blocks {
* Flags for ext4_io_end->flags
*/
#define EXT4_IO_END_UNWRITTEN 0x0001
#define EXT4_IO_END_DIRECT 0x0002

/*
* For converting uninitialized extents on a work queue. 'handle' is used for
Expand All @@ -196,8 +195,6 @@ typedef struct ext4_io_end {
unsigned int flag; /* unwritten or not */
loff_t offset; /* offset in the file */
ssize_t size; /* size of the extent */
struct kiocb *iocb; /* iocb struct for AIO */
int result; /* error value for AIO */
atomic_t count; /* reference counter */
} ext4_io_end_t;

Expand Down Expand Up @@ -900,11 +897,9 @@ struct ext4_inode_info {
* Completed IOs that need unwritten extents handling and don't have
* transaction reserved
*/
struct list_head i_unrsv_conversion_list;
atomic_t i_ioend_count; /* Number of outstanding io_end structs */
atomic_t i_unwritten; /* Nr. of inflight conversions pending */
struct work_struct i_rsv_conversion_work;
struct work_struct i_unrsv_conversion_work;

spinlock_t i_block_reservation_lock;

Expand Down Expand Up @@ -1276,8 +1271,6 @@ struct ext4_sb_info {
struct flex_groups *s_flex_groups;
ext4_group_t s_flex_groups_allocated;

/* workqueue for unreserved extent convertions (dio) */
struct workqueue_struct *unrsv_conversion_wq;
/* workqueue for reserved extent conversions (buffered io) */
struct workqueue_struct *rsv_conversion_wq;

Expand Down Expand Up @@ -1340,9 +1333,6 @@ static inline void ext4_set_io_unwritten_flag(struct inode *inode,
struct ext4_io_end *io_end)
{
if (!(io_end->flag & EXT4_IO_END_UNWRITTEN)) {
/* Writeback has to have coversion transaction reserved */
WARN_ON(EXT4_SB(inode->i_sb)->s_journal && !io_end->handle &&
!(io_end->flag & EXT4_IO_END_DIRECT));
io_end->flag |= EXT4_IO_END_UNWRITTEN;
atomic_inc(&EXT4_I(inode)->i_unwritten);
}
Expand Down Expand Up @@ -2716,7 +2706,6 @@ extern void ext4_put_io_end_defer(ext4_io_end_t *io_end);
extern void ext4_io_submit_init(struct ext4_io_submit *io,
struct writeback_control *wbc);
extern void ext4_end_io_rsv_work(struct work_struct *work);
extern void ext4_end_io_unrsv_work(struct work_struct *work);
extern void ext4_io_submit(struct ext4_io_submit *io);
extern int ext4_bio_write_page(struct ext4_io_submit *io,
struct page *page,
Expand Down
28 changes: 7 additions & 21 deletions fs/ext4/inode.c
Original file line number Diff line number Diff line change
Expand Up @@ -727,8 +727,12 @@ static int _ext4_get_block(struct inode *inode, sector_t iblock,

ret = ext4_map_blocks(handle, inode, &map, flags);
if (ret > 0) {
ext4_io_end_t *io_end = ext4_inode_aio(inode);

map_bh(bh, inode->i_sb, map.m_pblk);
bh->b_state = (bh->b_state & ~EXT4_MAP_FLAGS) | map.m_flags;
if (io_end && io_end->flag & EXT4_IO_END_UNWRITTEN)
set_buffer_defer_completion(bh);
bh->b_size = inode->i_sb->s_blocksize * map.m_len;
ret = 0;
}
Expand Down Expand Up @@ -2991,19 +2995,13 @@ static int ext4_get_block_write_nolock(struct inode *inode, sector_t iblock,
}

static void ext4_end_io_dio(struct kiocb *iocb, loff_t offset,
ssize_t size, void *private, int ret,
bool is_async)
ssize_t size, void *private)
{
struct inode *inode = file_inode(iocb->ki_filp);
ext4_io_end_t *io_end = iocb->private;

/* if not async direct IO just return */
if (!io_end) {
inode_dio_done(inode);
if (is_async)
aio_complete(iocb, ret, 0);
if (!io_end)
return;
}

ext_debug("ext4_end_io_dio(): io_end 0x%p "
"for inode %lu, iocb 0x%p, offset %llu, size %zd\n",
Expand All @@ -3013,11 +3011,7 @@ static void ext4_end_io_dio(struct kiocb *iocb, loff_t offset,
iocb->private = NULL;
io_end->offset = offset;
io_end->size = size;
if (is_async) {
io_end->iocb = iocb;
io_end->result = ret;
}
ext4_put_io_end_defer(io_end);
ext4_put_io_end(io_end);
}

/*
Expand Down Expand Up @@ -3102,7 +3096,6 @@ static ssize_t ext4_ext_direct_IO(int rw, struct kiocb *iocb,
ret = -ENOMEM;
goto retake_lock;
}
io_end->flag |= EXT4_IO_END_DIRECT;
/*
* Grab reference for DIO. Will be dropped in ext4_end_io_dio()
*/
Expand Down Expand Up @@ -3147,13 +3140,6 @@ static ssize_t ext4_ext_direct_IO(int rw, struct kiocb *iocb,
if (ret <= 0 && ret != -EIOCBQUEUED && iocb->private) {
WARN_ON(iocb->private != io_end);
WARN_ON(io_end->flag & EXT4_IO_END_UNWRITTEN);
WARN_ON(io_end->iocb);
/*
* Generic code already did inode_dio_done() so we
* have to clear EXT4_IO_END_DIRECT to not do it for
* the second time.
*/
io_end->flag = 0;
ext4_put_io_end(io_end);
iocb->private = NULL;
}
Expand Down
30 changes: 7 additions & 23 deletions fs/ext4/page-io.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ static void ext4_release_io_end(ext4_io_end_t *io_end)
ext4_finish_bio(bio);
bio_put(bio);
}
if (io_end->flag & EXT4_IO_END_DIRECT)
inode_dio_done(io_end->inode);
if (io_end->iocb)
aio_complete(io_end->iocb, io_end->result, 0);
kmem_cache_free(io_end_cachep, io_end);
}

Expand Down Expand Up @@ -204,19 +200,14 @@ static void ext4_add_complete_io(ext4_io_end_t *io_end)
struct workqueue_struct *wq;
unsigned long flags;

BUG_ON(!(io_end->flag & EXT4_IO_END_UNWRITTEN));
/* Only reserved conversions from writeback should enter here */
WARN_ON(!(io_end->flag & EXT4_IO_END_UNWRITTEN));
WARN_ON(!io_end->handle);
spin_lock_irqsave(&ei->i_completed_io_lock, flags);
if (io_end->handle) {
wq = EXT4_SB(io_end->inode->i_sb)->rsv_conversion_wq;
if (list_empty(&ei->i_rsv_conversion_list))
queue_work(wq, &ei->i_rsv_conversion_work);
list_add_tail(&io_end->list, &ei->i_rsv_conversion_list);
} else {
wq = EXT4_SB(io_end->inode->i_sb)->unrsv_conversion_wq;
if (list_empty(&ei->i_unrsv_conversion_list))
queue_work(wq, &ei->i_unrsv_conversion_work);
list_add_tail(&io_end->list, &ei->i_unrsv_conversion_list);
}
wq = EXT4_SB(io_end->inode->i_sb)->rsv_conversion_wq;
if (list_empty(&ei->i_rsv_conversion_list))
queue_work(wq, &ei->i_rsv_conversion_work);
list_add_tail(&io_end->list, &ei->i_rsv_conversion_list);
spin_unlock_irqrestore(&ei->i_completed_io_lock, flags);
}

Expand Down Expand Up @@ -256,13 +247,6 @@ void ext4_end_io_rsv_work(struct work_struct *work)
ext4_do_flush_completed_IO(&ei->vfs_inode, &ei->i_rsv_conversion_list);
}

void ext4_end_io_unrsv_work(struct work_struct *work)
{
struct ext4_inode_info *ei = container_of(work, struct ext4_inode_info,
i_unrsv_conversion_work);
ext4_do_flush_completed_IO(&ei->vfs_inode, &ei->i_unrsv_conversion_list);
}

ext4_io_end_t *ext4_init_io_end(struct inode *inode, gfp_t flags)
{
ext4_io_end_t *io = kmem_cache_zalloc(io_end_cachep, flags);
Expand Down
16 changes: 0 additions & 16 deletions fs/ext4/super.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,9 +762,7 @@ static void ext4_put_super(struct super_block *sb)
ext4_unregister_li_request(sb);
dquot_disable(sb, -1, DQUOT_USAGE_ENABLED | DQUOT_LIMITS_ENABLED);

flush_workqueue(sbi->unrsv_conversion_wq);
flush_workqueue(sbi->rsv_conversion_wq);
destroy_workqueue(sbi->unrsv_conversion_wq);
destroy_workqueue(sbi->rsv_conversion_wq);

if (sbi->s_journal) {
Expand Down Expand Up @@ -875,14 +873,12 @@ static struct inode *ext4_alloc_inode(struct super_block *sb)
#endif
ei->jinode = NULL;
INIT_LIST_HEAD(&ei->i_rsv_conversion_list);
INIT_LIST_HEAD(&ei->i_unrsv_conversion_list);
spin_lock_init(&ei->i_completed_io_lock);
ei->i_sync_tid = 0;
ei->i_datasync_tid = 0;
atomic_set(&ei->i_ioend_count, 0);
atomic_set(&ei->i_unwritten, 0);
INIT_WORK(&ei->i_rsv_conversion_work, ext4_end_io_rsv_work);
INIT_WORK(&ei->i_unrsv_conversion_work, ext4_end_io_unrsv_work);

return &ei->vfs_inode;
}
Expand Down Expand Up @@ -3954,14 +3950,6 @@ static int ext4_fill_super(struct super_block *sb, void *data, int silent)
goto failed_mount4;
}

EXT4_SB(sb)->unrsv_conversion_wq =
alloc_workqueue("ext4-unrsv-conversion", WQ_MEM_RECLAIM | WQ_UNBOUND, 1);
if (!EXT4_SB(sb)->unrsv_conversion_wq) {
printk(KERN_ERR "EXT4-fs: failed to create workqueue\n");
ret = -ENOMEM;
goto failed_mount4;
}

/*
* The jbd2_journal_load will have done any necessary log recovery,
* so we can safely mount the rest of the filesystem now.
Expand Down Expand Up @@ -4115,8 +4103,6 @@ static int ext4_fill_super(struct super_block *sb, void *data, int silent)
ext4_msg(sb, KERN_ERR, "mount failed");
if (EXT4_SB(sb)->rsv_conversion_wq)
destroy_workqueue(EXT4_SB(sb)->rsv_conversion_wq);
if (EXT4_SB(sb)->unrsv_conversion_wq)
destroy_workqueue(EXT4_SB(sb)->unrsv_conversion_wq);
failed_mount_wq:
if (sbi->s_journal) {
jbd2_journal_destroy(sbi->s_journal);
Expand Down Expand Up @@ -4564,7 +4550,6 @@ static int ext4_sync_fs(struct super_block *sb, int wait)

trace_ext4_sync_fs(sb, wait);
flush_workqueue(sbi->rsv_conversion_wq);
flush_workqueue(sbi->unrsv_conversion_wq);
/*
* Writeback quota in non-journalled quota case - journalled quota has
* no dirty dquots
Expand Down Expand Up @@ -4600,7 +4585,6 @@ static int ext4_sync_fs_nojournal(struct super_block *sb, int wait)

trace_ext4_sync_fs(sb, wait);
flush_workqueue(EXT4_SB(sb)->rsv_conversion_wq);
flush_workqueue(EXT4_SB(sb)->unrsv_conversion_wq);
dquot_writeback_dquots(sb, -1);
if (wait && test_opt(sb, BARRIER))
ret = blkdev_issue_flush(sb->s_bdev, GFP_KERNEL, NULL);
Expand Down
Loading

0 comments on commit 7b7a866

Please sign in to comment.