Skip to content

Commit

Permalink
Merge pull request #88 from lizhanhui/master
Browse files Browse the repository at this point in the history
Typo fix: master should timeout properly for transferring message to slave
  • Loading branch information
vintagewang committed Apr 20, 2015
2 parents 4fe0954 + 07a736b commit edb11cb
Showing 1 changed file with 25 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
*/
package com.alibaba.rocketmq.store.ha;

import com.alibaba.rocketmq.common.ServiceThread;
import com.alibaba.rocketmq.common.constant.LoggerName;
import com.alibaba.rocketmq.remoting.common.RemotingUtil;
import com.alibaba.rocketmq.store.CommitLog.GroupCommitRequest;
import com.alibaba.rocketmq.store.DefaultMessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -32,15 +40,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.common.ServiceThread;
import com.alibaba.rocketmq.common.constant.LoggerName;
import com.alibaba.rocketmq.remoting.common.RemotingUtil;
import com.alibaba.rocketmq.store.CommitLog.GroupCommitRequest;
import com.alibaba.rocketmq.store.DefaultMessageStore;


/**
* HA服务,负责同步双写,异步复制功能
Expand Down Expand Up @@ -96,10 +95,8 @@ public void putRequest(final GroupCommitRequest request) {
*/
public boolean isSlaveOK(final long masterPutWhere) {
boolean result = this.connectionCount.get() > 0;
result =
result
&& ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
.getMessageStoreConfig().getHaSlaveFallbehindMax());
result = result && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
.getMessageStoreConfig().getHaSlaveFallbehindMax());
return result;
}

Expand Down Expand Up @@ -298,13 +295,13 @@ private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5;) {
for (int i = 0; !transferOK && i < 5; i++) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}

if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
log.warn("transfer message to slave timeout, " + req.getNextOffset());
}

req.wakeupCustomer(transferOK);
Expand Down Expand Up @@ -355,7 +352,7 @@ class HAClient extends ServiceThread {
private long lastWriteTimestamp = System.currentTimeMillis();
// Slave向Master汇报Offset,汇报到哪里
private long currentReportedOffset = 0;
private int dispatchPostion = 0;
private int dispatchPosition = 0;
// 从Master接收数据Buffer
private ByteBuffer byteBufferRead = ByteBuffer.allocate(ReadMaxBufferSize);
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(ReadMaxBufferSize);
Expand Down Expand Up @@ -410,19 +407,19 @@ private boolean reportSlaveMaxOffset(final long maxOffset) {

// private void reallocateByteBuffer() {
// ByteBuffer bb = ByteBuffer.allocate(ReadMaxBufferSize);
// int remain = this.byteBufferRead.limit() - this.dispatchPostion;
// bb.put(this.byteBufferRead.array(), this.dispatchPostion, remain);
// this.dispatchPostion = 0;
// int remain = this.byteBufferRead.limit() - this.dispatchPosition;
// bb.put(this.byteBufferRead.array(), this.dispatchPosition, remain);
// this.dispatchPosition = 0;
// this.byteBufferRead = bb;
// }

/**
* Buffer满了以后,重新整理一次
*/
private void reallocateByteBuffer() {
int remain = ReadMaxBufferSize - this.dispatchPostion;
int remain = ReadMaxBufferSize - this.dispatchPosition;
if (remain > 0) {
this.byteBufferRead.position(this.dispatchPostion);
this.byteBufferRead.position(this.dispatchPosition);

this.byteBufferBackup.position(0);
this.byteBufferBackup.limit(ReadMaxBufferSize);
Expand All @@ -433,7 +430,7 @@ private void reallocateByteBuffer() {

this.byteBufferRead.position(remain);
this.byteBufferRead.limit(ReadMaxBufferSize);
this.dispatchPostion = 0;
this.dispatchPosition = 0;
}


Expand Down Expand Up @@ -484,10 +481,10 @@ private boolean dispatchReadRequest() {
int readSocketPos = this.byteBufferRead.position();

while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPostion;
int diff = this.byteBufferRead.position() - this.dispatchPosition;
if (diff >= MSG_HEADER_SIZE) {
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

Expand All @@ -503,14 +500,14 @@ private boolean dispatchReadRequest() {
// 可以凑够一个请求
if (diff >= (MSG_HEADER_SIZE + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + MSG_HEADER_SIZE);
this.byteBufferRead.position(this.dispatchPosition + MSG_HEADER_SIZE);
this.byteBufferRead.get(bodyData);

// TODO 结果是否需要处理,暂时不处理
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

this.byteBufferRead.position(readSocketPos);
this.dispatchPostion += MSG_HEADER_SIZE + bodySize;
this.dispatchPosition += MSG_HEADER_SIZE + bodySize;

if (!reportSlaveMaxOffsetPlus()) {
return false;
Expand Down Expand Up @@ -590,7 +587,7 @@ private void closeMaster() {
}

this.lastWriteTimestamp = 0;
this.dispatchPostion = 0;
this.dispatchPosition = 0;

this.byteBufferBackup.position(0);
this.byteBufferBackup.limit(ReadMaxBufferSize);
Expand Down

0 comments on commit edb11cb

Please sign in to comment.