Skip to content

Commit

Permalink
ocfs2/dlm: do not purge lockres that is queued for assert master
Browse files Browse the repository at this point in the history
When workqueue is delayed, it may occur that a lockres is purged while it
is still queued for master assert.  it may trigger BUG() as follows.

N1                                         N2
dlm_get_lockres()
->dlm_do_master_requery
                                  is the master of lockres,
                                  so queue assert_master work

                                  dlm_thread() start running
                                  and purge the lockres

                                  dlm_assert_master_worker()
                                  send assert master message
                                  to other nodes
receiving the assert_master
message, set master to N2

dlmlock_remote() send create_lock message to N2, but receive DLM_IVLOCKID,
if it is RECOVERY lockres, it triggers the BUG().

Another BUG() is triggered when N3 become the new master and send
assert_master to N1, N1 will trigger the BUG() because owner doesn't
match.  So we should not purge lockres when it is queued for assert
master.

Signed-off-by: joyce.xue <[email protected]>
Reviewed-by: Mark Fasheh <[email protected]>
Cc: Joel Becker <[email protected]>
Signed-off-by: Andrew Morton <[email protected]>
Signed-off-by: Linus Torvalds <[email protected]>
  • Loading branch information
xuejiufei authored and torvalds committed Jun 23, 2014
1 parent b9aaac5 commit ac4fef4
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 6 deletions.
4 changes: 4 additions & 0 deletions fs/ocfs2/dlm/dlmcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ struct dlm_lock_resource
u16 state;
char lvb[DLM_LVB_LEN];
unsigned int inflight_locks;
unsigned int inflight_assert_workers;
unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
};

Expand Down Expand Up @@ -910,6 +911,9 @@ void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);

void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);

void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void __dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
Expand Down
43 changes: 42 additions & 1 deletion fs/ocfs2/dlm/dlmmaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
atomic_set(&res->asts_reserved, 0);
res->migration_pending = 0;
res->inflight_locks = 0;
res->inflight_assert_workers = 0;

res->dlm = dlm;

Expand Down Expand Up @@ -683,6 +684,43 @@ void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
wake_up(&res->wq);
}

void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
assert_spin_locked(&res->spinlock);
res->inflight_assert_workers++;
mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->inflight_assert_workers);
}

static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
spin_lock(&res->spinlock);
__dlm_lockres_grab_inflight_worker(dlm, res);
spin_unlock(&res->spinlock);
}

static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
assert_spin_locked(&res->spinlock);
BUG_ON(res->inflight_assert_workers == 0);
res->inflight_assert_workers--;
mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->inflight_assert_workers);
}

static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
spin_lock(&res->spinlock);
__dlm_lockres_drop_inflight_worker(dlm, res);
spin_unlock(&res->spinlock);
}

/*
* lookup a lock resource by name.
* may already exist in the hashtable.
Expand Down Expand Up @@ -1603,7 +1641,8 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
mlog(ML_ERROR, "failed to dispatch assert master work\n");
response = DLM_MASTER_RESP_ERROR;
dlm_lockres_put(res);
}
} else
dlm_lockres_grab_inflight_worker(dlm, res);
} else {
if (res)
dlm_lockres_put(res);
Expand Down Expand Up @@ -2118,6 +2157,8 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
dlm_lockres_release_ast(dlm, res);

put:
dlm_lockres_drop_inflight_worker(dlm, res);

dlm_lockres_put(res);

mlog(0, "finished with dlm_assert_master_worker\n");
Expand Down
3 changes: 2 additions & 1 deletion fs/ocfs2/dlm/dlmrecovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,8 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
mlog_errno(-ENOMEM);
/* retry!? */
BUG();
}
} else
__dlm_lockres_grab_inflight_worker(dlm, res);
} else /* put.. incase we are not the master */
dlm_lockres_put(res);
spin_unlock(&res->spinlock);
Expand Down
11 changes: 7 additions & 4 deletions fs/ocfs2/dlm/dlmthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,14 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
* refs on it. */
unused = __dlm_lockres_unused(lockres);
if (!unused ||
(lockres->state & DLM_LOCK_RES_MIGRATING)) {
(lockres->state & DLM_LOCK_RES_MIGRATING) ||
(lockres->inflight_assert_workers != 0)) {
mlog(0, "%s: res %.*s is in use or being remastered, "
"used %d, state %d\n", dlm->name,
lockres->lockname.len, lockres->lockname.name,
!unused, lockres->state);
"used %d, state %d, assert master workers %u\n",
dlm->name, lockres->lockname.len,
lockres->lockname.name,
!unused, lockres->state,
lockres->inflight_assert_workers);
list_move_tail(&lockres->purge, &dlm->purge_list);
spin_unlock(&lockres->spinlock);
continue;
Expand Down

0 comments on commit ac4fef4

Please sign in to comment.