Skip to content

Commit

Permalink
io_uring: move to using create_io_thread()
Browse files Browse the repository at this point in the history
This allows us to do task creation and setup without needing to use
completions to try and synchronize with the starting thread. Get rid of
the old io_wq_fork_thread() wrapper, and the 'wq' and 'worker' startup
completion events - we can now do setup before the task is running.

Signed-off-by: Jens Axboe <[email protected]>
  • Loading branch information
axboe committed Mar 5, 2021
1 parent cc440e8 commit 46fe18b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 109 deletions.
123 changes: 35 additions & 88 deletions fs/io-wq.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ struct io_worker {
spinlock_t lock;

struct completion ref_done;
struct completion started;

struct rcu_head rcu;
};
Expand Down Expand Up @@ -116,7 +115,6 @@ struct io_wq {
struct io_wq_hash *hash;

refcount_t refs;
struct completion started;
struct completion exited;

atomic_t worker_refs;
Expand Down Expand Up @@ -199,6 +197,7 @@ static void io_worker_exit(struct io_worker *worker)
kfree_rcu(worker, rcu);
if (atomic_dec_and_test(&wqe->wq->worker_refs))
complete(&wqe->wq->worker_done);
do_exit(0);
}

static inline bool io_wqe_run_queue(struct io_wqe *wqe)
Expand Down Expand Up @@ -273,14 +272,6 @@ static void io_wqe_dec_running(struct io_worker *worker)
io_wqe_wake_worker(wqe, acct);
}

static void io_worker_start(struct io_worker *worker)
{
current->flags |= PF_NOFREEZE;
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
io_wqe_inc_running(worker);
complete(&worker->started);
}

/*
* Worker will start processing some work. Move it to the busy list, if
* it's currently on the freelist
Expand Down Expand Up @@ -489,8 +480,13 @@ static int io_wqe_worker(void *data)
struct io_worker *worker = data;
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
char buf[TASK_COMM_LEN];

io_worker_start(worker);
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
io_wqe_inc_running(worker);

sprintf(buf, "iou-wrk-%d", wq->task_pid);
set_task_comm(current, buf);

while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
set_current_state(TASK_INTERRUPTIBLE);
Expand Down Expand Up @@ -565,67 +561,11 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
raw_spin_unlock_irq(&worker->wqe->lock);
}

static int task_thread(void *data, int index)
{
struct io_worker *worker = data;
struct io_wqe *wqe = worker->wqe;
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_wq *wq = wqe->wq;
char buf[TASK_COMM_LEN];

sprintf(buf, "iou-wrk-%d", wq->task_pid);
set_task_comm(current, buf);

current->pf_io_worker = worker;
worker->task = current;

set_cpus_allowed_ptr(current, cpumask_of_node(wqe->node));
current->flags |= PF_NO_SETAFFINITY;

raw_spin_lock_irq(&wqe->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
acct->nr_workers++;
raw_spin_unlock_irq(&wqe->lock);

io_wqe_worker(data);
do_exit(0);
}

static int task_thread_bound(void *data)
{
return task_thread(data, IO_WQ_ACCT_BOUND);
}

static int task_thread_unbound(void *data)
{
return task_thread(data, IO_WQ_ACCT_UNBOUND);
}

pid_t io_wq_fork_thread(int (*fn)(void *), void *arg)
{
unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|
CLONE_IO|SIGCHLD;
struct kernel_clone_args args = {
.flags = ((lower_32_bits(flags) | CLONE_VM |
CLONE_UNTRACED) & ~CSIGNAL),
.exit_signal = (lower_32_bits(flags) & CSIGNAL),
.stack = (unsigned long)fn,
.stack_size = (unsigned long)arg,
};

return kernel_clone(&args);
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_worker *worker;
pid_t pid;
struct task_struct *tsk;

__set_current_state(TASK_RUNNING);

Expand All @@ -638,21 +578,33 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
worker->wqe = wqe;
spin_lock_init(&worker->lock);
init_completion(&worker->ref_done);
init_completion(&worker->started);

atomic_inc(&wq->worker_refs);

if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker);
else
pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) {
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
if (IS_ERR(tsk)) {
if (atomic_dec_and_test(&wq->worker_refs))
complete(&wq->worker_done);
kfree(worker);
return false;
}
wait_for_completion(&worker->started);

tsk->pf_io_worker = worker;
worker->task = tsk;
set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node));
tsk->flags |= PF_NOFREEZE | PF_NO_SETAFFINITY;

raw_spin_lock_irq(&wqe->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
acct->nr_workers++;
raw_spin_unlock_irq(&wqe->lock);
wake_up_new_task(tsk);
return true;
}

Expand Down Expand Up @@ -696,6 +648,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe,

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
set_notify_signal(worker->task);
wake_up_process(worker->task);
return false;
}
Expand Down Expand Up @@ -752,10 +705,6 @@ static int io_wq_manager(void *data)

sprintf(buf, "iou-mgr-%d", wq->task_pid);
set_task_comm(current, buf);
current->flags |= PF_IO_WORKER;
wq->manager = get_task_struct(current);

complete(&wq->started);

do {
set_current_state(TASK_INTERRUPTIBLE);
Expand Down Expand Up @@ -815,21 +764,20 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)

static int io_wq_fork_manager(struct io_wq *wq)
{
int ret;
struct task_struct *tsk;

if (wq->manager)
return 0;

reinit_completion(&wq->worker_done);
current->flags |= PF_IO_WORKER;
ret = io_wq_fork_thread(io_wq_manager, wq);
current->flags &= ~PF_IO_WORKER;
if (ret >= 0) {
wait_for_completion(&wq->started);
tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
wq->manager = get_task_struct(tsk);
wake_up_new_task(tsk);
return 0;
}

return ret;
return PTR_ERR(tsk);
}

static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
Expand Down Expand Up @@ -1062,7 +1010,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
}

wq->task_pid = current->pid;
init_completion(&wq->started);
init_completion(&wq->exited);
refcount_set(&wq->refs, 1);

Expand Down
2 changes: 0 additions & 2 deletions fs/io-wq.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ void io_wq_put_and_exit(struct io_wq *wq);
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
void io_wq_hash_work(struct io_wq_work *work, void *val);

pid_t io_wq_fork_thread(int (*fn)(void *), void *arg);

static inline bool io_wq_is_hashed(struct io_wq_work *work)
{
return work->flags & IO_WQ_WORK_HASHED;
Expand Down
38 changes: 19 additions & 19 deletions fs/io_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -6668,7 +6668,6 @@ static int io_sq_thread(void *data)

sprintf(buf, "iou-sqp-%d", sqd->task_pid);
set_task_comm(current, buf);
sqd->thread = current;
current->pf_io_worker = NULL;

if (sqd->sq_cpu != -1)
Expand All @@ -6677,8 +6676,6 @@ static int io_sq_thread(void *data)
set_cpus_allowed_ptr(current, cpu_online_mask);
current->flags |= PF_NO_SETAFFINITY;

complete(&sqd->completion);

wait_for_completion(&sqd->startup);

while (!io_sq_thread_should_stop(sqd)) {
Expand Down Expand Up @@ -7818,21 +7815,22 @@ void __io_uring_free(struct task_struct *tsk)

static int io_sq_thread_fork(struct io_sq_data *sqd, struct io_ring_ctx *ctx)
{
struct task_struct *tsk;
int ret;

clear_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
reinit_completion(&sqd->completion);
ctx->sqo_exec = 0;
sqd->task_pid = current->pid;
current->flags |= PF_IO_WORKER;
ret = io_wq_fork_thread(io_sq_thread, sqd);
current->flags &= ~PF_IO_WORKER;
if (ret < 0) {
sqd->thread = NULL;
return ret;
}
wait_for_completion(&sqd->completion);
return io_uring_alloc_task_context(sqd->thread, ctx);
tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
if (IS_ERR(tsk))
return PTR_ERR(tsk);
ret = io_uring_alloc_task_context(tsk, ctx);
if (ret)
set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
sqd->thread = tsk;
wake_up_new_task(tsk);
return ret;
}

static int io_sq_offload_create(struct io_ring_ctx *ctx,
Expand All @@ -7855,6 +7853,7 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx,
fdput(f);
}
if (ctx->flags & IORING_SETUP_SQPOLL) {
struct task_struct *tsk;
struct io_sq_data *sqd;

ret = -EPERM;
Expand Down Expand Up @@ -7896,15 +7895,16 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx,
}

sqd->task_pid = current->pid;
current->flags |= PF_IO_WORKER;
ret = io_wq_fork_thread(io_sq_thread, sqd);
current->flags &= ~PF_IO_WORKER;
if (ret < 0) {
sqd->thread = NULL;
tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
if (IS_ERR(tsk)) {
ret = PTR_ERR(tsk);
goto err;
}
wait_for_completion(&sqd->completion);
ret = io_uring_alloc_task_context(sqd->thread, ctx);
ret = io_uring_alloc_task_context(tsk, ctx);
if (ret)
set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
sqd->thread = tsk;
wake_up_new_task(tsk);
if (ret)
goto err;
} else if (p->flags & IORING_SETUP_SQ_AFF) {
Expand Down

0 comments on commit 46fe18b

Please sign in to comment.