Skip to content

Commit

Permalink
[ROCKETMQ-45]Delete hanged consume queue files
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhanhui committed Jan 22, 2017
1 parent d9c398f commit 11ff542
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 36 deletions.
28 changes: 14 additions & 14 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ public void recover() {
if (index < 0)
index = 0;

int mapedFileSizeLogics = this.mappedFileSize;
int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mapedFileOffset = 0;
long mappedFileOffset = 0;
while (true) {
for (int i = 0; i < mapedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();

if (offset >= 0 && size > 0) {
mapedFileOffset = i + CQ_STORE_UNIT_SIZE;
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset;
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
Expand All @@ -98,7 +98,7 @@ public void recover() {
}
}

if (mapedFileOffset == mapedFileSizeLogics) {
if (mappedFileOffset == mappedFileSizeLogics) {
index++;
if (index >= mappedFiles.size()) {

Expand All @@ -109,17 +109,17 @@ public void recover() {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mapedFileOffset = 0;
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mapedFileOffset));
+ (processOffset + mappedFileOffset));
break;
}
}

processOffset += mapedFileOffset;
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
Expand Down Expand Up @@ -310,7 +310,7 @@ public void correctMinOffset(long phyMinOffset) {

if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
log.info("compute logics min offset: " + this.getMinOffsetInQuque() + ", topic: "
log.info("compute logics min offset: " + this.getMinOffsetInQueue() + ", topic: "
+ this.topic + ", queueId: " + this.queueId);
break;
}
Expand All @@ -324,7 +324,7 @@ public void correctMinOffset(long phyMinOffset) {
}
}

public long getMinOffsetInQuque() {
public long getMinOffsetInQueue() {
return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
}

Expand Down Expand Up @@ -435,8 +435,8 @@ public void setMinLogicOffset(long minLogicOffset) {
}

public long rollNextFile(final long index) {
int mapedFileSize = this.mappedFileSize;
int totalUnitsInFile = mapedFileSize / CQ_STORE_UNIT_SIZE;
int mappedFileSize = this.mappedFileSize;
int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;
return index + totalUnitsInFile - index % totalUnitsInFile;
}

Expand All @@ -463,10 +463,10 @@ public void destroy() {
}

public long getMessageTotalInQueue() {
return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque();
return this.getMaxOffsetInQueue() - this.getMinOffsetInQueue();
}

public long getMaxOffsetInQuque() {
public long getMaxOffsetInQueue() {
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ public GetMessageResult getMessage(final String group, final String topic, final

ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQuque();
maxOffset = consumeQueue.getMaxOffsetInQuque();
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();

if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
Expand Down Expand Up @@ -499,7 +499,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
public long getMaxOffsetInQuque(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
long offset = logic.getMaxOffsetInQuque();
long offset = logic.getMaxOffsetInQueue();
return offset;
}

Expand All @@ -512,7 +512,7 @@ public long getMaxOffsetInQuque(String topic, int queueId) {
public long getMinOffsetInQuque(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
return logic.getMinOffsetInQuque();
return logic.getMinOffsetInQueue();
}

return -1;
Expand Down Expand Up @@ -878,8 +878,8 @@ public Map<String, Long> getMessageIds(final String topic, final int queueId, lo

ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQuque());
maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQuque());
minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQueue());
maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQueue());

if (maxOffset == 0) {
return messageIds;
Expand Down Expand Up @@ -1220,7 +1220,7 @@ private void recoverTopicQueueTable() {
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
table.put(key, logic.getMaxOffsetInQuque());
table.put(key, logic.getMaxOffsetInQueue());
logic.correctMinOffset(minPhyOffset);
}
}
Expand Down
22 changes: 11 additions & 11 deletions store/src/main/java/org/apache/rocketmq/store/MappedFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public class MappedFile extends ReferenceResource {
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

private static final AtomicLong TOTAL_MAPED_VITUAL_MEMORY = new AtomicLong(0);
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

private static final AtomicInteger TOTAL_MAPED_FILES = new AtomicInteger(0);
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//ADD BY ChenYang
protected final AtomicInteger committedPosition = new AtomicInteger(0);
Expand Down Expand Up @@ -132,12 +132,12 @@ private static ByteBuffer viewed(ByteBuffer buffer) {
return viewed(viewedBuffer);
}

public static int getTotalmapedfiles() {
return TOTAL_MAPED_FILES.get();
public static int getTotalMappedFiles() {
return TOTAL_MAPPED_FILES.get();
}

public static long getTotalMapedVitualMemory() {
return TOTAL_MAPED_VITUAL_MEMORY.get();
public static long getTotalMappedVirtualMemory() {
return TOTAL_MAPPED_VIRTUAL_MEMORY.get();
}

public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
Expand All @@ -158,8 +158,8 @@ private void init(final String fileName, final int fileSize) throws IOException
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPED_FILES.incrementAndGet();
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
Expand Down Expand Up @@ -405,8 +405,8 @@ public boolean cleanup(final long currentRef) {
}

clean(this.mappedByteBuffer);
TOTAL_MAPED_VITUAL_MEMORY.addAndGet(this.fileSize * (-1));
TOTAL_MAPED_FILES.decrementAndGet();
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
TOTAL_MAPPED_FILES.decrementAndGet();
log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
return true;
}
Expand All @@ -431,7 +431,7 @@ public boolean destroy(final long intervalForcibly) {

return true;
} else {
log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,15 @@ public int deleteExpiredFileByOffset(long offset, int unitSize) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
destroy = true;
} else {
log.warn("this being not executed forever.");
break;
}

// TODO: Externalize this hardcoded value
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void executeOnTimeup() {
*/
long cqMinOffset = cq.getMinOffsetInQuque();
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
*/
package org.apache.rocketmq.store;

import java.nio.ByteBuffer;
import java.util.Arrays;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -30,6 +32,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class MappedFileQueueTest {
Expand All @@ -55,7 +58,7 @@ public void tearDown() throws Exception {
}

@Test
public void test_getLastMapedFile() {
public void test_getLastMappedFile() {
final String fixedMsg = "0123456789abcdef";

logger.debug("================================================================");
Expand All @@ -79,7 +82,7 @@ public void test_getLastMapedFile() {
}

@Test
public void test_findMapedFileByOffset() {
public void test_findMappedFileByOffset() {
// four-byte string.
final String fixedMsg = "abcd";

Expand Down Expand Up @@ -179,7 +182,7 @@ public void test_commit() {
}

@Test
public void test_getMapedMemorySize() {
public void test_getMappedMemorySize() {
final String fixedMsg = "abcd";

logger.debug("================================================================");
Expand All @@ -200,4 +203,45 @@ public void test_getMapedMemorySize() {
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.getMappedMemorySize() OK");
}


@Test
public void test_deleteExpiredFileByOffset() {

logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/e", 5120, null);

for (int i = 0; i < 2048; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertNotNull(mappedFile);

ByteBuffer byteBuffer = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
byteBuffer.putLong(i);
byte[] padding = new byte[12];
Arrays.fill(padding, (byte)'0');
byteBuffer.put(padding);
byteBuffer.flip();

boolean result = mappedFile.appendMessage(byteBuffer.array());

assertTrue(result);
}

MappedFile first = mappedFileQueue.getFirstMappedFile();
first.hold();

int count = mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE);
assertEquals(0, count);
first.release();

count = mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE);
assertTrue(count > 0);
first = mappedFileQueue.getFirstMappedFile();
assertTrue(first.getFileFromOffset() > 0);

mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.deleteExpiredFileByOffset() OK");
}
}

0 comments on commit 11ff542

Please sign in to comment.