Skip to content

Commit

Permalink
ZOOKEEPER-2080: Fix deadlock in dynamic reconfiguration
Browse files Browse the repository at this point in the history
Use explicit fine grained locks for synchronizing access to QuorumVerifier states in QuorumPeer.

Author: Michael Han <[email protected]>

Reviewers: Alexander Shraer <[email protected]>, Edward Ribeiro <[email protected]>, Flavio Junqueira <[email protected]>

Closes apache#92 from hanm/ZOOKEEPER-2080 and squashes the following commits:

25f0caf [Michael Han] Further simplify code - suggested by Alex.
08301f4 [Michael Han] Remove QuorumVerifier from connectOne signatures. Get last seen quorum verifier inside connectOne, when needed, instead.
de367bf [Michael Han] Address review comments from Alex and Edward. 1. Use Object instead of byte[0] for lock. Naming and modifier updates as well. 2. Synchronize on the lock object in connectOne, also simplified code a little bit.
8995dd4 [Michael Han] ZOOKEEPER-2080: Fix deadlock in dynamic reconfiguration. Use explicit fine grained locks for synchronizing access to QuorumVerifier states in QuorumPeer.

(cherry picked from commit 434abbb)
Signed-off-by: Rakesh Radhakrishnan <[email protected]>
  • Loading branch information
hanm authored and rakeshadr committed Feb 9, 2017
1 parent 11e5768 commit ac864f5
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -306,7 +308,7 @@ public boolean initiateConnection(Socket sock, Long sid) {
* connection if it wins. Notice that it checks whether it has a connection
* to this server already or not. If it does, then it sends the smallest
* possible long value to lose the challenge.
*
*
*/
public void receiveConnection(Socket sock) {
Long sid = null, protocolVersion = null;
Expand Down Expand Up @@ -466,29 +468,31 @@ synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr
*
* @param sid server id
*/

synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server " + sid);
return;
LOG.debug("There is a connection already for server " + sid);
return;
}
synchronized(self) {
boolean knownId = false;
synchronized (self.QV_LOCK) {
boolean knownId = false;
// Resolve hostname for the remote server before attempting to
// connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
if (self.getView().containsKey(sid)) {
knownId = true;
if (connectOne(sid, self.getView().get(sid).electionAddr))
return;
}
if (self.getLastSeenQuorumVerifier()!=null && self.getLastSeenQuorumVerifier().getAllMembers().containsKey(sid)
&& (!knownId || (self.getLastSeenQuorumVerifier().getAllMembers().get(sid).electionAddr !=
self.getView().get(sid).electionAddr))) {
knownId = true;
if (connectOne(sid, self.getLastSeenQuorumVerifier().getAllMembers().get(sid).electionAddr))
return;
}
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
if (connectOne(sid, lastCommittedView.get(sid).electionAddr))
return;
}
if (lastSeenQV != null && lastProposedView.containsKey(sid)
&& (!knownId || (lastProposedView.get(sid).electionAddr !=
lastCommittedView.get(sid).electionAddr))) {
knownId = true;
if (connectOne(sid, lastProposedView.get(sid).electionAddr))
return;
}
if (!knownId) {
LOG.warn("Invalid server id: " + sid);
return;
Expand Down
186 changes: 106 additions & 80 deletions src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,10 @@ public int getQuorumSize(){
//last proposed quorum verifier
public QuorumVerifier lastSeenQuorumVerifier = null;

// Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
final Object QV_LOCK = new Object();


/**
* My id
*/
Expand Down Expand Up @@ -665,28 +669,40 @@ public void recreateSocketAddresses(long id) {
}
}

public synchronized InetSocketAddress getQuorumAddress(){
return myQuorumAddr;
public InetSocketAddress getQuorumAddress(){
synchronized (QV_LOCK) {
return myQuorumAddr;
}
}

public synchronized void setQuorumAddress(InetSocketAddress addr){
myQuorumAddr = addr;
public void setQuorumAddress(InetSocketAddress addr){
synchronized (QV_LOCK) {
myQuorumAddr = addr;
}
}

public InetSocketAddress getElectionAddress(){
return myElectionAddr;
synchronized (QV_LOCK) {
return myElectionAddr;
}
}

public void setElectionAddress(InetSocketAddress addr){
myElectionAddr = addr;
synchronized (QV_LOCK) {
myElectionAddr = addr;
}
}

public InetSocketAddress getClientAddress(){
return myClientAddr;
synchronized (QV_LOCK) {
return myClientAddr;
}
}

public void setClientAddress(InetSocketAddress addr){
myClientAddr = addr;
synchronized (QV_LOCK) {
myClientAddr = addr;
}
}

private int electionType;
Expand Down Expand Up @@ -1396,25 +1412,32 @@ public QuorumVerifier configFromString(String s) throws IOException, ConfigExcep
}

/**
* Return QuorumVerifier object for the last committed configuration
* Return QuorumVerifier object for the last committed configuration.
*/

public synchronized QuorumVerifier getQuorumVerifier(){
return quorumVerifier;

public QuorumVerifier getQuorumVerifier(){
synchronized (QV_LOCK) {
return quorumVerifier;
}
}

public synchronized QuorumVerifier getLastSeenQuorumVerifier(){
return lastSeenQuorumVerifier;
/**
* Return QuorumVerifier object for the last proposed configuration.
*/
public QuorumVerifier getLastSeenQuorumVerifier(){
synchronized (QV_LOCK) {
return lastSeenQuorumVerifier;
}
}

public synchronized void connectNewPeers(){
if (qcm!=null && getQuorumVerifier()!=null && getLastSeenQuorumVerifier()!=null) {
Map<Long, QuorumServer> committedView = getQuorumVerifier().getAllMembers();
for (Entry<Long, QuorumServer> e: getLastSeenQuorumVerifier().getAllMembers().entrySet()){
if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
qcm.connectOne(e.getKey());
}
private void connectNewPeers(){
synchronized (QV_LOCK) {
if (qcm != null && quorumVerifier != null && lastSeenQuorumVerifier != null) {
Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
qcm.connectOne(e.getKey());
}
}
}
}

Expand All @@ -1431,73 +1454,76 @@ public String getNextDynamicConfigFilename() {
return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
}

public synchronized void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
if (lastSeenQuorumVerifier!=null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() +
". Current version: " + quorumVerifier.getVersion());

}
// assuming that a version uniquely identifies a configuration, so if
// version is the same, nothing to do here.
if (lastSeenQuorumVerifier != null &&
lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
return;
}
lastSeenQuorumVerifier = qv;
connectNewPeers();
if (writeToDisk) {
try {
QuorumPeerConfig.writeDynamicConfig(
getNextDynamicConfigFilename(), qv, true);
} catch(IOException e){
LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
}
}
public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
synchronized (QV_LOCK) {
if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() +
". Current version: " + quorumVerifier.getVersion());

}

public synchronized QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
// this is normal. For example - server found out about new config through FastLeaderElection gossiping
// and then got the same config in UPTODATE message so its already known
LOG.debug(getId() + " setQuorumVerifier called with known or old config " + qv.getVersion() +
". Current version: " + quorumVerifier.getVersion());
return quorumVerifier;
}
QuorumVerifier prevQV = quorumVerifier;
quorumVerifier = qv;
if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion()))
}
// assuming that a version uniquely identifies a configuration, so if
// version is the same, nothing to do here.
if (lastSeenQuorumVerifier != null &&
lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
return;
}
lastSeenQuorumVerifier = qv;

if (writeToDisk) {
// some tests initialize QuorumPeer without a static config file
if (configFilename != null) {
connectNewPeers();
if (writeToDisk) {
try {
String dynamicConfigFilename = makeDynamicConfigFilename(
qv.getVersion());
QuorumPeerConfig.writeDynamicConfig(
dynamicConfigFilename, qv, false);
QuorumPeerConfig.editStaticConfig(configFilename,
dynamicConfigFilename,
needEraseClientInfoFromStaticConfig());
getNextDynamicConfigFilename(), qv, true);
} catch (IOException e) {
LOG.error("Error closing file: ", e.getMessage());
LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
}
} else {
LOG.info("writeToDisk == true but configFilename == null");
}
}
}

public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
synchronized (QV_LOCK) {
if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
// this is normal. For example - server found out about new config through FastLeaderElection gossiping
// and then got the same config in UPTODATE message so its already known
LOG.debug(getId() + " setQuorumVerifier called with known or old config " + qv.getVersion() +
". Current version: " + quorumVerifier.getVersion());
return quorumVerifier;
}
QuorumVerifier prevQV = quorumVerifier;
quorumVerifier = qv;
if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion()))
lastSeenQuorumVerifier = qv;

if (writeToDisk) {
// some tests initialize QuorumPeer without a static config file
if (configFilename != null) {
try {
String dynamicConfigFilename = makeDynamicConfigFilename(
qv.getVersion());
QuorumPeerConfig.writeDynamicConfig(
dynamicConfigFilename, qv, false);
QuorumPeerConfig.editStaticConfig(configFilename,
dynamicConfigFilename,
needEraseClientInfoFromStaticConfig());
} catch (IOException e) {
LOG.error("Error closing file: ", e.getMessage());
}
} else {
LOG.info("writeToDisk == true but configFilename == null");
}
}

if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()){
QuorumPeerConfig.deleteFile( getNextDynamicConfigFilename() );
}
QuorumServer qs = qv.getAllMembers().get(getId());
if (qs!=null){
setQuorumAddress(qs.addr);
setElectionAddress(qs.electionAddr);
setClientAddress(qs.clientAddr);
if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) {
QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename());
}
QuorumServer qs = qv.getAllMembers().get(getId());
if (qs != null) {
setQuorumAddress(qs.addr);
setElectionAddress(qs.electionAddr);
setClientAddress(qs.clientAddr);
}
return prevQV;
}
return prevQV;
}

private String makeDynamicConfigFilename(long version) {
Expand Down

0 comments on commit ac864f5

Please sign in to comment.