Skip to content

Commit

Permalink
mds: fix shared_ptr MDRequest bugs
Browse files Browse the repository at this point in the history
The main change is use shared_ptr instead of weak_ptr to define
active request map. The reason is that slave request needs to be
preserved until master explicitly finishes it.

Fixes: ceph#8026
Signed-off-by: Yan, Zheng <[email protected]>
  • Loading branch information
Yan, Zheng committed Apr 8, 2014
1 parent 2a6d962 commit 55cfb14
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 108 deletions.
176 changes: 84 additions & 92 deletions src/mds/MDCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2494,21 +2494,20 @@ void MDCache::send_slave_resolves()
} else {
set<int> resolve_set;
mds->mdsmap->get_mds_set(resolve_set, MDSMap::STATE_RESOLVE);
for (ceph::unordered_map<metareqid_t, ceph::weak_ptr<MDRequestImpl> >::iterator p = active_requests.begin();
for (ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.begin();
p != active_requests.end();
++p) {
MDRequestRef amdr(p->second.lock());
assert(amdr);
if (!amdr->is_slave() || !amdr->slave_did_prepare())
MDRequestRef& mdr = p->second;
if (!mdr->is_slave() || !mdr->slave_did_prepare())
continue;
int master = amdr->slave_to_mds;
int master = mdr->slave_to_mds;
if (resolve_set.count(master) || is_ambiguous_slave_update(p->first, master)) {
dout(10) << " including uncommitted " << *amdr << dendl;
dout(10) << " including uncommitted " << *mdr << dendl;
if (!resolves.count(master))
resolves[master] = new MMDSResolve;
if (amdr->has_more() && amdr->more()->is_inode_exporter) {
if (mdr->has_more() && mdr->more()->is_inode_exporter) {
// re-send cap exports
CInode *in = amdr->more()->rename_inode;
CInode *in = mdr->more()->rename_inode;
map<client_t, Capability::Export> cap_map;
in->export_client_caps(cap_map);
bufferlist bl;
Expand Down Expand Up @@ -2650,95 +2649,94 @@ void MDCache::handle_mds_failure(int who)

// clean up any requests slave to/from this node
list<MDRequestRef> finish;
for (ceph::unordered_map<metareqid_t, ceph::weak_ptr<MDRequestImpl> >::iterator p = active_requests.begin();
for (ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.begin();
p != active_requests.end();
++p) {
MDRequestRef amdr(p->second.lock());
assert(amdr);
MDRequestRef& mdr = p->second;;
// slave to the failed node?
if (amdr->slave_to_mds == who) {
if (amdr->slave_did_prepare()) {
dout(10) << " slave request " << *amdr << " uncommitted, will resolve shortly" << dendl;
if (!amdr->more()->waiting_on_slave.empty()) {
assert(amdr->more()->srcdn_auth_mds == mds->get_nodeid());
if (mdr->slave_to_mds == who) {
if (mdr->slave_did_prepare()) {
dout(10) << " slave request " << *mdr << " uncommitted, will resolve shortly" << dendl;
if (!mdr->more()->waiting_on_slave.empty()) {
assert(mdr->more()->srcdn_auth_mds == mds->get_nodeid());
// will rollback, no need to wait
if (amdr->slave_request) {
amdr->slave_request->put();
amdr->slave_request = 0;
if (mdr->slave_request) {
mdr->slave_request->put();
mdr->slave_request = 0;
}
amdr->more()->waiting_on_slave.clear();
mdr->more()->waiting_on_slave.clear();
}
} else {
dout(10) << " slave request " << *amdr << " has no prepare, finishing up" << dendl;
if (amdr->slave_request)
amdr->aborted = true;
dout(10) << " slave request " << *mdr << " has no prepare, finishing up" << dendl;
if (mdr->slave_request)
mdr->aborted = true;
else
finish.push_back(amdr);
finish.push_back(mdr);
}
}

if (amdr->is_slave() && amdr->slave_did_prepare()) {
if (amdr->more()->waiting_on_slave.count(who)) {
assert(amdr->more()->srcdn_auth_mds == mds->get_nodeid());
dout(10) << " slave request " << *amdr << " no longer need rename notity ack from mds."
if (mdr->is_slave() && mdr->slave_did_prepare()) {
if (mdr->more()->waiting_on_slave.count(who)) {
assert(mdr->more()->srcdn_auth_mds == mds->get_nodeid());
dout(10) << " slave request " << *mdr << " no longer need rename notity ack from mds."
<< who << dendl;
amdr->more()->waiting_on_slave.erase(who);
if (amdr->more()->waiting_on_slave.empty() && amdr->slave_request)
mds->queue_waiter(new C_MDS_RetryRequest(this, amdr));
mdr->more()->waiting_on_slave.erase(who);
if (mdr->more()->waiting_on_slave.empty() && mdr->slave_request)
mds->queue_waiter(new C_MDS_RetryRequest(this, mdr));
}

if (amdr->more()->srcdn_auth_mds == who &&
mds->mdsmap->is_clientreplay_or_active_or_stopping(amdr->slave_to_mds)) {
if (mdr->more()->srcdn_auth_mds == who &&
mds->mdsmap->is_clientreplay_or_active_or_stopping(mdr->slave_to_mds)) {
// rename srcdn's auth mds failed, resolve even I'm a survivor.
dout(10) << " slave request " << *amdr << " uncommitted, will resolve shortly" << dendl;
add_ambiguous_slave_update(p->first, amdr->slave_to_mds);
dout(10) << " slave request " << *mdr << " uncommitted, will resolve shortly" << dendl;
add_ambiguous_slave_update(p->first, mdr->slave_to_mds);
}
}

// failed node is slave?
if (amdr->is_master() && !amdr->committing) {
if (amdr->more()->srcdn_auth_mds == who) {
dout(10) << " master request " << *amdr << " waiting for rename srcdn's auth mds."
if (mdr->is_master() && !mdr->committing) {
if (mdr->more()->srcdn_auth_mds == who) {
dout(10) << " master request " << *mdr << " waiting for rename srcdn's auth mds."
<< who << " to recover" << dendl;
assert(amdr->more()->witnessed.count(who) == 0);
if (amdr->more()->is_ambiguous_auth)
amdr->clear_ambiguous_auth();
assert(mdr->more()->witnessed.count(who) == 0);
if (mdr->more()->is_ambiguous_auth)
mdr->clear_ambiguous_auth();
// rename srcdn's auth mds failed, all witnesses will rollback
amdr->more()->witnessed.clear();
mdr->more()->witnessed.clear();
pending_masters.erase(p->first);
}

if (amdr->more()->witnessed.count(who)) {
int srcdn_auth = amdr->more()->srcdn_auth_mds;
if (srcdn_auth >= 0 && amdr->more()->waiting_on_slave.count(srcdn_auth)) {
dout(10) << " master request " << *amdr << " waiting for rename srcdn's auth mds."
<< amdr->more()->srcdn_auth_mds << " to reply" << dendl;
if (mdr->more()->witnessed.count(who)) {
int srcdn_auth = mdr->more()->srcdn_auth_mds;
if (srcdn_auth >= 0 && mdr->more()->waiting_on_slave.count(srcdn_auth)) {
dout(10) << " master request " << *mdr << " waiting for rename srcdn's auth mds."
<< mdr->more()->srcdn_auth_mds << " to reply" << dendl;
// waiting for the slave (rename srcdn's auth mds), delay sending resolve ack
// until either the request is committing or the slave also fails.
assert(amdr->more()->waiting_on_slave.size() == 1);
assert(mdr->more()->waiting_on_slave.size() == 1);
pending_masters.insert(p->first);
} else {
dout(10) << " master request " << *amdr << " no longer witnessed by slave mds."
dout(10) << " master request " << *mdr << " no longer witnessed by slave mds."
<< who << " to recover" << dendl;
if (srcdn_auth >= 0)
assert(amdr->more()->witnessed.count(srcdn_auth) == 0);
assert(mdr->more()->witnessed.count(srcdn_auth) == 0);

// discard this peer's prepare (if any)
amdr->more()->witnessed.erase(who);
mdr->more()->witnessed.erase(who);
}
}

if (amdr->more()->waiting_on_slave.count(who)) {
dout(10) << " master request " << *amdr << " waiting for slave mds." << who
if (mdr->more()->waiting_on_slave.count(who)) {
dout(10) << " master request " << *mdr << " waiting for slave mds." << who
<< " to recover" << dendl;
// retry request when peer recovers
amdr->more()->waiting_on_slave.erase(who);
if (amdr->more()->waiting_on_slave.empty())
mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, amdr));
mdr->more()->waiting_on_slave.erase(who);
if (mdr->more()->waiting_on_slave.empty())
mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, mdr));
}

if (amdr->locking && amdr->locking_target_mds == who)
amdr->finish_locking(amdr->locking);
if (mdr->locking && mdr->locking_target_mds == who)
mdr->finish_locking(mdr->locking);
}
}

Expand Down Expand Up @@ -3702,71 +3700,70 @@ void MDCache::rejoin_send_rejoins()
if (!mds->is_rejoin()) {
// i am survivor. send strong rejoin.
// note request remote_auth_pins, xlocks
for (ceph::unordered_map<metareqid_t, ceph::weak_ptr<MDRequestImpl> >::iterator p = active_requests.begin();
for (ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.begin();
p != active_requests.end();
++p) {
MDRequestRef amdr(p->second.lock());
assert(amdr);
if (amdr->is_slave())
MDRequestRef& mdr = p->second;
if (mdr->is_slave())
continue;
// auth pins
for (set<MDSCacheObject*>::iterator q = amdr->remote_auth_pins.begin();
q != amdr->remote_auth_pins.end();
for (set<MDSCacheObject*>::iterator q = mdr->remote_auth_pins.begin();
q != mdr->remote_auth_pins.end();
++q) {
if (!(*q)->is_auth()) {
int who = (*q)->authority().first;
if (rejoins.count(who) == 0) continue;
MMDSCacheRejoin *rejoin = rejoins[who];

dout(15) << " " << *amdr << " authpin on " << **q << dendl;
dout(15) << " " << *mdr << " authpin on " << **q << dendl;
MDSCacheObjectInfo i;
(*q)->set_object_info(i);
if (i.ino)
rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), amdr->reqid, amdr->attempt);
rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), mdr->reqid, mdr->attempt);
else
rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, amdr->reqid, amdr->attempt);
rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, mdr->reqid, mdr->attempt);

if (amdr->has_more() && amdr->more()->is_remote_frozen_authpin &&
amdr->more()->rename_inode == (*q))
if (mdr->has_more() && mdr->more()->is_remote_frozen_authpin &&
mdr->more()->rename_inode == (*q))
rejoin->add_inode_frozen_authpin(vinodeno_t(i.ino, i.snapid),
amdr->reqid, amdr->attempt);
mdr->reqid, mdr->attempt);
}
}
// xlocks
for (set<SimpleLock*>::iterator q = amdr->xlocks.begin();
q != amdr->xlocks.end();
for (set<SimpleLock*>::iterator q = mdr->xlocks.begin();
q != mdr->xlocks.end();
++q) {
if (!(*q)->get_parent()->is_auth()) {
int who = (*q)->get_parent()->authority().first;
if (rejoins.count(who) == 0) continue;
MMDSCacheRejoin *rejoin = rejoins[who];

dout(15) << " " << *amdr << " xlock on " << **q << " " << *(*q)->get_parent() << dendl;
dout(15) << " " << *mdr << " xlock on " << **q << " " << *(*q)->get_parent() << dendl;
MDSCacheObjectInfo i;
(*q)->get_parent()->set_object_info(i);
if (i.ino)
rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), (*q)->get_type(),
amdr->reqid, amdr->attempt);
mdr->reqid, mdr->attempt);
else
rejoin->add_dentry_xlock(i.dirfrag, i.dname, i.snapid,
amdr->reqid, amdr->attempt);
mdr->reqid, mdr->attempt);
}
}
// remote wrlocks
for (map<SimpleLock*, int>::iterator q = amdr->remote_wrlocks.begin();
q != amdr->remote_wrlocks.end();
for (map<SimpleLock*, int>::iterator q = mdr->remote_wrlocks.begin();
q != mdr->remote_wrlocks.end();
++q) {
int who = q->second;
if (rejoins.count(who) == 0) continue;
MMDSCacheRejoin *rejoin = rejoins[who];

dout(15) << " " << *amdr << " wrlock on " << q->second
dout(15) << " " << *mdr << " wrlock on " << q->second
<< " " << q->first->get_parent() << dendl;
MDSCacheObjectInfo i;
q->first->get_parent()->set_object_info(i);
assert(i.ino);
rejoin->add_inode_wrlock(vinodeno_t(i.ino, i.snapid), q->first->get_type(),
amdr->reqid, amdr->attempt);
mdr->reqid, mdr->attempt);
}
}
}
Expand Down Expand Up @@ -8839,12 +8836,11 @@ void MDCache::kick_find_ino_peers(int who)
int MDCache::get_num_client_requests()
{
int count = 0;
for (ceph::unordered_map<metareqid_t, ceph::weak_ptr<MDRequestImpl> >::iterator p = active_requests.begin();
for (ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.begin();
p != active_requests.end();
++p) {
MDRequestRef amdr(p->second.lock());
assert(amdr);
if (amdr->reqid.name.is_client() && !amdr->is_slave())
MDRequestRef& mdr = p->second;
if (mdr->reqid.name.is_client() && !mdr->is_slave())
count++;
}
return count;
Expand All @@ -8855,7 +8851,7 @@ MDRequestRef MDCache::request_start(MClientRequest *req)
{
// did we win a forward race against a slave?
if (active_requests.count(req->get_reqid())) {
MDRequestRef mdr = active_requests[req->get_reqid()].lock();
MDRequestRef& mdr = active_requests[req->get_reqid()];
assert(mdr);
if (mdr->is_slave()) {
dout(10) << "request_start already had " << *mdr << ", waiting for finish" << dendl;
Expand All @@ -8870,7 +8866,6 @@ MDRequestRef MDCache::request_start(MClientRequest *req)
// register new client request
MDRequestRef mdr(new MDRequestImpl(req->get_reqid(),
req->get_num_fwd(), req));
mdr->set_self_ref(mdr);
active_requests[req->get_reqid()] = mdr;
dout(7) << "request_start " << *mdr << dendl;
return mdr;
Expand All @@ -8879,7 +8874,6 @@ MDRequestRef MDCache::request_start(MClientRequest *req)
MDRequestRef MDCache::request_start_slave(metareqid_t ri, __u32 attempt, int by)
{
MDRequestRef mdr(new MDRequestImpl(ri, attempt, by));
mdr->set_self_ref(mdr);
assert(active_requests.count(mdr->reqid) == 0);
active_requests[mdr->reqid] = mdr;
dout(7) << "request_start_slave " << *mdr << " by mds." << by << dendl;
Expand All @@ -8889,7 +8883,6 @@ MDRequestRef MDCache::request_start_slave(metareqid_t ri, __u32 attempt, int by)
MDRequestRef MDCache::request_start_internal(int op)
{
MDRequestRef mdr(new MDRequestImpl);
mdr->set_self_ref(mdr);
mdr->reqid.name = entity_name_t::MDS(mds->get_nodeid());
mdr->reqid.tid = mds->issue_tid();
mdr->internal_op = op;
Expand All @@ -8900,13 +8893,12 @@ MDRequestRef MDCache::request_start_internal(int op)
return mdr;
}


MDRequestRef MDCache::request_get(metareqid_t rid)
{
assert(active_requests.count(rid));
MDRequestRef amdr(active_requests[rid]);
dout(7) << "request_get " << rid << " " << *amdr << dendl;
return amdr;
ceph::unordered_map<metareqid_t, MDRequestRef>::iterator p = active_requests.find(rid);
assert(p != active_requests.end());
dout(7) << "request_get " << rid << " " << *p->second << dendl;
return p->second;
}

void MDCache::request_finish(MDRequestRef& mdr)
Expand Down
2 changes: 1 addition & 1 deletion src/mds/MDCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class MDCache {

// -- requests --
protected:
ceph::unordered_map<metareqid_t,ceph::weak_ptr<MDRequestImpl> > active_requests;
ceph::unordered_map<metareqid_t, MDRequestRef> active_requests;

public:
int get_num_client_requests();
Expand Down
2 changes: 1 addition & 1 deletion src/mds/Migrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2618,7 +2618,7 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last)
it->second.peer_exports.swap(peer_exports);

// clear import state (we're done!)
MutationRef& mut = it->second.mut;
MutationRef mut = it->second.mut;
import_state.erase(it);

mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
Expand Down
14 changes: 2 additions & 12 deletions src/mds/Mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class MClientRequest;
class MMDSSlaveRequest;

struct MutationImpl {
ceph::weak_ptr<MutationImpl> self_ref;
metareqid_t reqid;
__u32 attempt; // which attempt for this request
LogSegment *ls; // the log segment i'm committing to
Expand Down Expand Up @@ -80,16 +79,14 @@ struct MutationImpl {
list<pair<CDentry*,version_t> > dirty_cow_dentries;

MutationImpl()
: self_ref(),
attempt(0),
: attempt(0),
ls(0),
slave_to_mds(-1),
locking(NULL),
locking_target_mds(-1),
done_locking(false), committing(false), aborted(false), killed(false) { }
MutationImpl(metareqid_t ri, __u32 att=0, int slave_to=-1)
: self_ref(),
reqid(ri), attempt(att),
: reqid(ri), attempt(att),
ls(0),
slave_to_mds(slave_to),
locking(NULL),
Expand Down Expand Up @@ -141,10 +138,6 @@ struct MutationImpl {
virtual void print(ostream &out) {
out << "mutation(" << this << ")";
}

void set_self_ref(ceph::shared_ptr<MutationImpl>& ref) {
self_ref = ref;
}
};

inline ostream& operator<<(ostream& out, MutationImpl &mut)
Expand Down Expand Up @@ -314,9 +307,6 @@ struct MDRequestImpl : public MutationImpl {
void clear_ambiguous_auth();

void print(ostream &out);
void set_self_ref(ceph::shared_ptr<MDRequestImpl>& ref) {
self_ref = ceph::static_pointer_cast<MutationImpl,MDRequestImpl>(ref);
}
};

typedef ceph::shared_ptr<MDRequestImpl> MDRequestRef;
Expand Down
Loading

0 comments on commit 55cfb14

Please sign in to comment.