Skip to content

Commit

Permalink
[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever cl…
Browse files Browse the repository at this point in the history
…oses apache#61
  • Loading branch information
Jaskey authored and dongeforever committed Jun 6, 2017
1 parent 16c8d43 commit 031347d
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class BrokerConfig {
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";

/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int adminBrokerThreadPoolNums = 16;
Expand Down
40 changes: 7 additions & 33 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
Expand Down Expand Up @@ -63,12 +61,7 @@ public class CommitLog {
private volatile long confirmOffset = -1L;

private volatile long beginTimeInLock = 0;

//true: Can lock, false : in lock.
private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync

private final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
Expand All @@ -88,6 +81,8 @@ public CommitLog(final DefaultMessageStore defaultMessageStore) {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();

}

public boolean load() {
Expand Down Expand Up @@ -577,7 +572,7 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

lockForPutMessage(); //spin...
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
Expand Down Expand Up @@ -626,7 +621,7 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
releasePutMessageLock();
putMessageLock.unlock();
}

if (eclipseTimeInLock > 500) {
Expand Down Expand Up @@ -861,7 +856,7 @@ public void destroy() {
}

public boolean appendData(long startOffset, byte[] data) {
lockForPutMessage(); //spin...
putMessageLock.lock();
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
if (null == mappedFile) {
Expand All @@ -871,7 +866,7 @@ public boolean appendData(long startOffset, byte[] data) {

return mappedFile.appendMessage(data);
} finally {
releasePutMessageLock();
putMessageLock.unlock();
}
}

Expand Down Expand Up @@ -906,28 +901,7 @@ public long lockTimeMills() {
return diff;
}

/**
* Spin util acquired the lock.
*/
private void lockForPutMessage() {
if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
putMessageNormalLock.lock();
} else {
boolean flag;
do {
flag = this.putMessageSpinLock.compareAndSet(true, false);
}
while (!flag);
}
}

private void releasePutMessageLock() {
if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
putMessageNormalLock.unlock();
} else {
this.putMessageSpinLock.compareAndSet(false, true);
}
}

public static class GroupCommitRequest {
private final long nextOffset;
Expand Down
25 changes: 25 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;

/**
* Used when trying to put message
*/
public interface PutMessageLock {
void lock();
void unlock();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;


import java.util.concurrent.locks.ReentrantLock;

/**
* Exclusive lock implementation to put message
*/
public class PutMessageReentrantLock implements PutMessageLock {
private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync

@Override
public void lock() {
putMessageNormalLock.lock();
}

@Override
public void unlock() {
putMessageNormalLock.unlock();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;


import java.util.concurrent.atomic.AtomicBoolean;

/**
* Spin lock Implementation to put message, suggest using this witb low race conditions
*
*/
public class PutMessageSpinLock implements PutMessageLock {
//true: Can lock, false : in lock.
private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

@Override
public void lock() {
boolean flag;
do {
flag = this.putMessageSpinLock.compareAndSet(true, false);
}
while (!flag);
}

@Override
public void unlock() {
this.putMessageSpinLock.compareAndSet(false, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public class MessageStoreConfig {
@ImportantField
private int commitIntervalCommitLog = 200;

/**
* introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
* By default it is set to false indicating using spin lock when putting message.
*/
private boolean useReentrantLockWhenPutMessage = false;

// Whether schedule flush,default is real-time
Expand Down

0 comments on commit 031347d

Please sign in to comment.