Skip to content

Commit

Permalink
io_uring: add support for marking commands as draining
Browse files Browse the repository at this point in the history
There are no ordering constraints between the submission and completion
side of io_uring. But sometimes that would be useful to have. One common
example is doing an fsync, for instance, and have it ordered with
previous writes. Without support for that, the application must do this
tracking itself.

This adds a general SQE flag, IOSQE_IO_DRAIN. If a command is marked
with this flag, then it will not be issued before previous commands have
completed, and subsequent commands submitted after the drain will not be
issued before the drain is started.. If there are no pending commands,
setting this flag will not change the behavior of the issue of the
command.

Signed-off-by: Jens Axboe <[email protected]>
  • Loading branch information
axboe committed May 2, 2019
1 parent ea98667 commit de0617e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 3 deletions.
91 changes: 88 additions & 3 deletions fs/io_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ struct io_ring_ctx {
unsigned sq_mask;
unsigned sq_thread_idle;
struct io_uring_sqe *sq_sqes;

struct list_head defer_list;
} ____cacheline_aligned_in_smp;

/* IO offload */
Expand Down Expand Up @@ -327,8 +329,11 @@ struct io_kiocb {
#define REQ_F_FIXED_FILE 4 /* ctx owns file */
#define REQ_F_SEQ_PREV 8 /* sequential with previous */
#define REQ_F_PREPPED 16 /* prep already done */
#define REQ_F_IO_DRAIN 32 /* drain existing IO first */
#define REQ_F_IO_DRAINED 64 /* drain done */
u64 user_data;
u64 error;
u32 error;
u32 sequence;

struct work_struct work;
};
Expand Down Expand Up @@ -356,6 +361,8 @@ struct io_submit_state {
unsigned int ios_left;
};

static void io_sq_wq_submit_work(struct work_struct *work);

static struct kmem_cache *req_cachep;

static const struct file_operations io_uring_fops;
Expand Down Expand Up @@ -407,10 +414,36 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
spin_lock_init(&ctx->completion_lock);
INIT_LIST_HEAD(&ctx->poll_list);
INIT_LIST_HEAD(&ctx->cancel_list);
INIT_LIST_HEAD(&ctx->defer_list);
return ctx;
}

static void io_commit_cqring(struct io_ring_ctx *ctx)
static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
return false;

return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
}

static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
{
struct io_kiocb *req;

if (list_empty(&ctx->defer_list))
return NULL;

req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
if (!io_sequence_defer(ctx, req)) {
list_del_init(&req->list);
return req;
}

return NULL;
}

static void __io_commit_cqring(struct io_ring_ctx *ctx)
{
struct io_cq_ring *ring = ctx->cq_ring;

Expand All @@ -425,6 +458,18 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
}
}

static void io_commit_cqring(struct io_ring_ctx *ctx)
{
struct io_kiocb *req;

__io_commit_cqring(ctx);

while ((req = io_get_deferred_req(ctx)) != NULL) {
req->flags |= REQ_F_IO_DRAINED;
queue_work(ctx->sqo_wq, &req->work);
}
}

static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
{
struct io_cq_ring *ring = ctx->cq_ring;
Expand Down Expand Up @@ -1437,6 +1482,34 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return ipt.error;
}

static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
const struct io_uring_sqe *sqe)
{
struct io_uring_sqe *sqe_copy;

if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
return 0;

sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
if (!sqe_copy)
return -EAGAIN;

spin_lock_irq(&ctx->completion_lock);
if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
spin_unlock_irq(&ctx->completion_lock);
kfree(sqe_copy);
return 0;
}

memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
req->submit.sqe = sqe_copy;

INIT_WORK(&req->work, io_sq_wq_submit_work);
list_add_tail(&req->list, &ctx->defer_list);
spin_unlock_irq(&ctx->completion_lock);
return -EIOCBQUEUED;
}

static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
const struct sqe_submit *s, bool force_nonblock)
{
Expand Down Expand Up @@ -1684,6 +1757,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
flags = READ_ONCE(s->sqe->flags);
fd = READ_ONCE(s->sqe->fd);

if (flags & IOSQE_IO_DRAIN) {
req->flags |= REQ_F_IO_DRAIN;
req->sequence = ctx->cached_sq_head - 1;
}

if (!io_op_needs_file(s->sqe)) {
req->file = NULL;
return 0;
Expand Down Expand Up @@ -1713,7 +1791,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
int ret;

/* enforce forwards compatibility on users */
if (unlikely(s->sqe->flags & ~IOSQE_FIXED_FILE))
if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
return -EINVAL;

req = io_get_req(ctx, state);
Expand All @@ -1724,6 +1802,13 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
if (unlikely(ret))
goto out;

ret = io_req_defer(ctx, req, s->sqe);
if (ret) {
if (ret == -EIOCBQUEUED)
ret = 0;
return ret;
}

ret = __io_submit_sqe(ctx, req, s, true);
if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
struct io_uring_sqe *sqe_copy;
Expand Down
1 change: 1 addition & 0 deletions include/uapi/linux/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct io_uring_sqe {
* sqe->flags
*/
#define IOSQE_FIXED_FILE (1U << 0) /* use fixed fileset */
#define IOSQE_IO_DRAIN (1U << 1) /* issue after inflight IO */

/*
* io_uring_setup() flags
Expand Down

0 comments on commit de0617e

Please sign in to comment.