Skip to content

Commit

Permalink
[ROCKETMQ-145][HOTFIX] Resolve concureent issue in HAService and Grou…
Browse files Browse the repository at this point in the history
…pCommitService
  • Loading branch information
zhouxinyu committed Mar 18, 2017
1 parent ac8941b commit 155823e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 43 deletions.
54 changes: 28 additions & 26 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -995,12 +995,12 @@ class GroupCommitService extends FlushCommitLogService {
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

public void putRequest(final GroupCommitRequest request) {
synchronized (this) {
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}

Expand All @@ -1011,32 +1011,34 @@ private void swapRequests() {
}

private void doCommit() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}

req.wakeupCustomer(flushOK);
}

req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}

this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}

Expand Down
36 changes: 19 additions & 17 deletions store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,12 @@ class GroupTransferService extends ServiceThread {
private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();

public void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this) {
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}

Expand All @@ -273,22 +273,24 @@ private void swapRequests() {
}

private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5; i++) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5; i++) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}

if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}

req.wakeupCustomer(transferOK);
}

req.wakeupCustomer(transferOK);
this.requestsRead.clear();
}

this.requestsRead.clear();
}
}

Expand Down

0 comments on commit 155823e

Please sign in to comment.