Skip to content

Commit

Permalink
[DLM] add lock timeouts and warnings [2/6]
Browse files Browse the repository at this point in the history
New features: lock timeouts and time warnings.  If the DLM_LKF_TIMEOUT
flag is set, then the request/conversion will be canceled after waiting
the specified number of centiseconds (specified per lock).  This feature
is only available for locks requested through libdlm (can be enabled for
kernel dlm users if there's a use for it.)

If the new DLM_LSFL_TIMEWARN flag is set when creating the lockspace, then
a warning message will be sent to userspace (using genetlink) after a
request/conversion has been waiting for a given number of centiseconds
(configurable per node).  The time warnings will be used in the future
to do deadlock detection in userspace.

Signed-off-by: David Teigland <[email protected]>
Signed-off-by: Steven Whitehouse <[email protected]>
  • Loading branch information
teigland authored and swhiteho committed Jul 9, 2007
1 parent 85e86ed commit 3ae1acf
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 12 deletions.
1 change: 1 addition & 0 deletions fs/dlm/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dlm-y := ast.o \
member.o \
memory.o \
midcomms.o \
netlink.o \
lowcomms.o \
rcom.o \
recover.o \
Expand Down
8 changes: 7 additions & 1 deletion fs/dlm/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct cluster {
unsigned int cl_scan_secs;
unsigned int cl_log_debug;
unsigned int cl_protocol;
unsigned int cl_timewarn_cs;
};

enum {
Expand All @@ -103,6 +104,7 @@ enum {
CLUSTER_ATTR_SCAN_SECS,
CLUSTER_ATTR_LOG_DEBUG,
CLUSTER_ATTR_PROTOCOL,
CLUSTER_ATTR_TIMEWARN_CS,
};

struct cluster_attribute {
Expand Down Expand Up @@ -162,6 +164,7 @@ CLUSTER_ATTR(toss_secs, 1);
CLUSTER_ATTR(scan_secs, 1);
CLUSTER_ATTR(log_debug, 0);
CLUSTER_ATTR(protocol, 0);
CLUSTER_ATTR(timewarn_cs, 1);

static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
Expand All @@ -174,6 +177,7 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
[CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr,
[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
NULL,
};

Expand Down Expand Up @@ -916,6 +920,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_SCAN_SECS 5
#define DEFAULT_LOG_DEBUG 0
#define DEFAULT_PROTOCOL 0
#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */

struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT,
Expand All @@ -927,6 +932,7 @@ struct dlm_config_info dlm_config = {
.ci_toss_secs = DEFAULT_TOSS_SECS,
.ci_scan_secs = DEFAULT_SCAN_SECS,
.ci_log_debug = DEFAULT_LOG_DEBUG,
.ci_protocol = DEFAULT_PROTOCOL
.ci_protocol = DEFAULT_PROTOCOL,
.ci_timewarn_cs = DEFAULT_TIMEWARN_CS
};

1 change: 1 addition & 0 deletions fs/dlm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct dlm_config_info {
int ci_scan_secs;
int ci_log_debug;
int ci_protocol;
int ci_timewarn_cs;
};

extern struct dlm_config_info dlm_config;
Expand Down
10 changes: 10 additions & 0 deletions fs/dlm/dlm_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,10 @@ struct dlm_args {
#define DLM_IFL_OVERLAP_UNLOCK 0x00080000
#define DLM_IFL_OVERLAP_CANCEL 0x00100000
#define DLM_IFL_ENDOFLIFE 0x00200000
#define DLM_IFL_WATCH_TIMEWARN 0x00400000
#define DLM_IFL_USER 0x00000001
#define DLM_IFL_ORPHAN 0x00000002
#define DLM_IFL_TIMEOUT_CANCEL 0x00000004

struct dlm_lkb {
struct dlm_rsb *lkb_resource; /* the rsb */
Expand Down Expand Up @@ -243,6 +245,9 @@ struct dlm_lkb {
struct list_head lkb_wait_reply; /* waiting for remote reply */
struct list_head lkb_astqueue; /* need ast to be sent */
struct list_head lkb_ownqueue; /* list of locks for a process */
struct list_head lkb_time_list;
unsigned long lkb_timestamp;
unsigned long lkb_timeout_cs;

char *lkb_lvbptr;
struct dlm_lksb *lkb_lksb; /* caller's status block */
Expand Down Expand Up @@ -447,6 +452,9 @@ struct dlm_ls {
struct mutex ls_orphans_mutex;
struct list_head ls_orphans;

struct mutex ls_timeout_mutex;
struct list_head ls_timeout;

struct list_head ls_nodes; /* current nodes in ls */
struct list_head ls_nodes_gone; /* dead node list, recovery */
int ls_num_nodes; /* number of nodes in ls */
Expand All @@ -472,6 +480,7 @@ struct dlm_ls {
struct task_struct *ls_recoverd_task;
struct mutex ls_recoverd_active;
spinlock_t ls_recover_lock;
unsigned long ls_recover_begin; /* jiffies timestamp */
uint32_t ls_recover_status; /* DLM_RS_ */
uint64_t ls_recover_seq;
struct dlm_recover *ls_recover_args;
Expand Down Expand Up @@ -501,6 +510,7 @@ struct dlm_ls {
#define LSFL_RCOM_READY 3
#define LSFL_RCOM_WAIT 4
#define LSFL_UEVENT_WAIT 5
#define LSFL_TIMEWARN 6

/* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */
Expand Down
146 changes: 144 additions & 2 deletions fs/dlm/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,13 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms);
static int receive_extralen(struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void del_timeout(struct dlm_lkb *lkb);
void dlm_timeout_warn(struct dlm_lkb *lkb);

/*
* Lock compatibilty matrix - thanks Steve
Expand Down Expand Up @@ -286,8 +289,17 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
if (is_master_copy(lkb))
return;

del_timeout(lkb);

DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););

/* if the operation was a cancel, then return -DLM_ECANCEL, if a
timeout caused the cancel then return -ETIMEDOUT */
if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
rv = -ETIMEDOUT;
}

lkb->lkb_lksb->sb_status = rv;
lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;

Expand Down Expand Up @@ -581,6 +593,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
kref_init(&lkb->lkb_ref);
INIT_LIST_HEAD(&lkb->lkb_ownqueue);
INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
INIT_LIST_HEAD(&lkb->lkb_time_list);

get_random_bytes(&bucket, sizeof(bucket));
bucket &= (ls->ls_lkbtbl_size - 1);
Expand Down Expand Up @@ -993,6 +1006,125 @@ void dlm_scan_rsbs(struct dlm_ls *ls)
}
}

static void add_timeout(struct dlm_lkb *lkb)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;

if (is_master_copy(lkb))
return;

if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
goto add_it;

if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
goto add_it;
}
return;

add_it:
DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
mutex_lock(&ls->ls_timeout_mutex);
hold_lkb(lkb);
lkb->lkb_timestamp = jiffies;
list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
mutex_unlock(&ls->ls_timeout_mutex);
}

static void del_timeout(struct dlm_lkb *lkb)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;

mutex_lock(&ls->ls_timeout_mutex);
if (!list_empty(&lkb->lkb_time_list)) {
list_del_init(&lkb->lkb_time_list);
unhold_lkb(lkb);
}
mutex_unlock(&ls->ls_timeout_mutex);
}

/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
and then lock rsb because of lock ordering in add_timeout. We may need
to specify some special timeout-related bits in the lkb that are just to
be accessed under the timeout_mutex. */

void dlm_scan_timeout(struct dlm_ls *ls)
{
struct dlm_rsb *r;
struct dlm_lkb *lkb;
int do_cancel, do_warn;

for (;;) {
if (dlm_locking_stopped(ls))
break;

do_cancel = 0;
do_warn = 0;
mutex_lock(&ls->ls_timeout_mutex);
list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {

if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
time_after_eq(jiffies, lkb->lkb_timestamp +
lkb->lkb_timeout_cs * HZ/100))
do_cancel = 1;

if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
time_after_eq(jiffies, lkb->lkb_timestamp +
dlm_config.ci_timewarn_cs * HZ/100))
do_warn = 1;

if (!do_cancel && !do_warn)
continue;
hold_lkb(lkb);
break;
}
mutex_unlock(&ls->ls_timeout_mutex);

if (!do_cancel && !do_warn)
break;

r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);

if (do_warn) {
/* clear flag so we only warn once */
lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
del_timeout(lkb);
dlm_timeout_warn(lkb);
}

if (do_cancel) {
lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
del_timeout(lkb);
_cancel_lock(r, lkb);
}

unlock_rsb(r);
unhold_rsb(r);
dlm_put_lkb(lkb);
}
}

/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
dlm_recoverd before checking/setting ls_recover_begin. */

void dlm_adjust_timeouts(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
long adj = jiffies - ls->ls_recover_begin;

ls->ls_recover_begin = 0;
mutex_lock(&ls->ls_timeout_mutex);
list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
lkb->lkb_timestamp += adj;
mutex_unlock(&ls->ls_timeout_mutex);
}

/* lkb is master or local copy */

static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
Expand Down Expand Up @@ -1902,6 +2034,9 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
if (is_overlap(lkb))
goto out;

/* don't let scand try to do a cancel */
del_timeout(lkb);

if (lkb->lkb_flags & DLM_IFL_RESEND) {
lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
rv = -EBUSY;
Expand Down Expand Up @@ -1933,6 +2068,9 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
if (is_overlap_unlock(lkb))
goto out;

/* don't let scand try to do a cancel */
del_timeout(lkb);

if (lkb->lkb_flags & DLM_IFL_RESEND) {
lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
rv = -EBUSY;
Expand Down Expand Up @@ -1993,6 +2131,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
error = -EINPROGRESS;
add_lkb(r, lkb, DLM_LKSTS_WAITING);
send_blocking_asts(r, lkb);
add_timeout(lkb);
goto out;
}

Expand Down Expand Up @@ -2040,6 +2179,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
del_lkb(r, lkb);
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
send_blocking_asts(r, lkb);
add_timeout(lkb);
goto out;
}

Expand Down Expand Up @@ -3110,9 +3250,10 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
lkb->lkb_remid = ms->m_lkid;
if (is_altmode(lkb))
munge_altmode(lkb, ms);
if (result)
if (result) {
add_lkb(r, lkb, DLM_LKSTS_WAITING);
else {
add_timeout(lkb);
} else {
grant_lock_pc(r, lkb, ms);
queue_cast(r, lkb, 0);
}
Expand Down Expand Up @@ -3178,6 +3319,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
munge_demoted(lkb, ms);
del_lkb(r, lkb);
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
add_timeout(lkb);
break;

case 0:
Expand Down
4 changes: 3 additions & 1 deletion fs/dlm/lock.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
Expand All @@ -26,6 +26,8 @@ int dlm_put_lkb(struct dlm_lkb *lkb);
void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_lock_recovery_try(struct dlm_ls *ls);
void dlm_unlock_recovery(struct dlm_ls *ls);
void dlm_scan_timeout(struct dlm_ls *ls);
void dlm_adjust_timeouts(struct dlm_ls *ls);

int dlm_purge_locks(struct dlm_ls *ls);
void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
Expand Down
10 changes: 9 additions & 1 deletion fs/dlm/lockspace.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ static int dlm_scand(void *data)
list_for_each_entry(ls, &lslist, ls_list) {
if (dlm_lock_recovery_try(ls)) {
dlm_scan_rsbs(ls);
dlm_scan_timeout(ls);
dlm_unlock_recovery(ls);
}
}
Expand Down Expand Up @@ -421,11 +422,16 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
goto out;
memcpy(ls->ls_name, name, namelen);
ls->ls_namelen = namelen;
ls->ls_exflags = flags;
ls->ls_lvblen = lvblen;
ls->ls_count = 0;
ls->ls_flags = 0;

/* ls_exflags are forced to match among nodes, and we don't
need to require all nodes to have TIMEWARN active */
if (flags & DLM_LSFL_TIMEWARN)
set_bit(LSFL_TIMEWARN, &ls->ls_flags);
ls->ls_exflags = (flags & ~DLM_LSFL_TIMEWARN);

size = dlm_config.ci_rsbtbl_size;
ls->ls_rsbtbl_size = size;

Expand Down Expand Up @@ -465,6 +471,8 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
mutex_init(&ls->ls_waiters_mutex);
INIT_LIST_HEAD(&ls->ls_orphans);
mutex_init(&ls->ls_orphans_mutex);
INIT_LIST_HEAD(&ls->ls_timeout);
mutex_init(&ls->ls_timeout_mutex);

INIT_LIST_HEAD(&ls->ls_nodes);
INIT_LIST_HEAD(&ls->ls_nodes_gone);
Expand Down
Loading

0 comments on commit 3ae1acf

Please sign in to comment.