Skip to content

Commit

Permalink
use heap memory for small object
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyijq committed Dec 28, 2018
1 parent b4218b3 commit 5761051
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package qunar.tc.qmq.delay;

import io.netty.buffer.ByteBuf;
import qunar.tc.qmq.delay.base.LongHashSet;
import qunar.tc.qmq.delay.base.ReceivedDelayMessage;
import qunar.tc.qmq.delay.base.ReceivedResult;
Expand Down Expand Up @@ -51,7 +50,7 @@ public class DefaultDelayLogFacade implements DelayLogFacade {
private final LogCleaner cleaner;
private final MessageLogReplayer replayer;

public DefaultDelayLogFacade(final StoreConfiguration config, final Function<ByteBuf, Boolean> func) {
public DefaultDelayLogFacade(final StoreConfiguration config, final Function<ScheduleIndex, Boolean> func) {
this.messageLog = new MessageLog(config);
this.scheduleLog = new ScheduleLog(config);
this.dispatchLog = new DispatchLog(config);
Expand Down Expand Up @@ -125,8 +124,8 @@ public void shutdown() {
}

@Override
public List<ScheduleSetRecord> recoverLogRecord(final List<ByteBuf> pureRecords) {
return scheduleLog.recoverLogRecord(pureRecords);
public List<ScheduleSetRecord> recoverLogRecord(final List<ScheduleIndex> indexList) {
return scheduleLog.recoverLogRecord(indexList);
}

@Override
Expand All @@ -150,7 +149,7 @@ public ScheduleSetSegment loadScheduleLogSegment(final int segmentBaseOffset) {
}

@Override
public WheelLoadCursor.Cursor loadUnDispatch(final ScheduleSetSegment setSegment, final LongHashSet dispatchedSet, final Consumer<ByteBuf> refresh) {
public WheelLoadCursor.Cursor loadUnDispatch(final ScheduleSetSegment setSegment, final LongHashSet dispatchedSet, final Consumer<ScheduleIndex> refresh) {
return scheduleLog.loadUnDispatch(setSegment, dispatchedSet, refresh);
}

Expand All @@ -170,7 +169,7 @@ public LogVisitor<LogRecord> newMessageLogVisitor(long start) {
}

@Override
public AppendLogResult<ByteBuf> appendScheduleLog(LogRecord event) {
public AppendLogResult<ScheduleIndex> appendScheduleLog(LogRecord event) {
return scheduleLog.append(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package qunar.tc.qmq.delay;

import io.netty.buffer.ByteBuf;
import qunar.tc.qmq.delay.base.LongHashSet;
import qunar.tc.qmq.delay.base.ReceivedDelayMessage;
import qunar.tc.qmq.delay.base.ReceivedResult;
Expand Down Expand Up @@ -61,7 +60,7 @@ public interface DelayLogFacade {

boolean appendDispatchLogData(long startOffset, int baseOffset, ByteBuffer body);

List<ScheduleSetRecord> recoverLogRecord(List<ByteBuf> pureRecords);
List<ScheduleSetRecord> recoverLogRecord(List<ScheduleIndex> indexList);

void appendDispatchLog(LogRecord record);

Expand All @@ -71,13 +70,13 @@ public interface DelayLogFacade {

ScheduleSetSegment loadScheduleLogSegment(int segmentBaseOffset);

WheelLoadCursor.Cursor loadUnDispatch(ScheduleSetSegment setSegment, LongHashSet dispatchedSet, Consumer<ByteBuf> refresh);
WheelLoadCursor.Cursor loadUnDispatch(ScheduleSetSegment setSegment, LongHashSet dispatchedSet, Consumer<ScheduleIndex> refresh);

int higherScheduleBaseOffset(int index);

LogVisitor<LogRecord> newMessageLogVisitor(long start);

AppendLogResult<ByteBuf> appendScheduleLog(LogRecord event);
AppendLogResult<ScheduleIndex> appendScheduleLog(LogRecord event);

long initialMessageIterateFrom();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package qunar.tc.qmq.delay;

import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.delay.base.AppendException;
import qunar.tc.qmq.delay.meta.BrokerRoleManager;
import qunar.tc.qmq.delay.store.model.AppendLogResult;
import qunar.tc.qmq.delay.store.model.LogRecord;
import qunar.tc.qmq.protocol.producer.MessageProducerCode;
Expand All @@ -35,31 +33,22 @@ public class MessageIterateEventListener implements EventListener<LogRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageIterateEventListener.class);

private final DelayLogFacade facade;
private final Function<ByteBuf, Boolean> iterateCallback;
private final Function<ScheduleIndex, Boolean> iterateCallback;

MessageIterateEventListener(final DelayLogFacade facade, Function<ByteBuf, Boolean> iterateCallback) {
MessageIterateEventListener(final DelayLogFacade facade, Function<ScheduleIndex, Boolean> iterateCallback) {
this.facade = facade;
this.iterateCallback = iterateCallback;
}

@Override
public void post(LogRecord event) {
AppendLogResult<ByteBuf> result = facade.appendScheduleLog(event);
AppendLogResult<ScheduleIndex> result = facade.appendScheduleLog(event);
int code = result.getCode();
if (MessageProducerCode.SUCCESS != code) {
LOGGER.error("appendMessageLog schedule log error,log:{} {},code:{}", event.getSubject(), event.getMessageId(), code);
throw new AppendException("appendScheduleLogError");
}

if (BrokerRoleManager.isDelayMaster()) {
process(result.getAdditional());
} else {
ScheduleIndex.release(result.getAdditional());
}
}

private void process(ByteBuf record) {
if (iterateCallback != null && iterateCallback.apply(record)) return;
ScheduleIndex.release(record);
iterateCallback.apply(result.getAdditional());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class MessageLogReplayer implements Switchable {
private final EventListener<LogRecord> dispatcher;
private volatile boolean stop = true;

MessageLogReplayer(final DelayLogFacade facade, final Function<ByteBuf, Boolean> func) {
MessageLogReplayer(final DelayLogFacade facade, final Function<ScheduleIndex, Boolean> func) {
this.facade = facade;
this.dispatcher = new MessageIterateEventListener(facade, func);
this.iterateFrom = new LongAdder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,33 @@

package qunar.tc.qmq.delay;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.ReferenceCountUtil;

import java.util.List;

public class ScheduleIndex {
public static ByteBuf buildIndex(long scheduleTime, long offset, int size, long sequence) {
ByteBuf index = PooledByteBufAllocator.DEFAULT.ioBuffer(8 + 8 + 4 + 8);
index.writeLong(scheduleTime);
index.writeLong(offset);
index.writeInt(size);
index.writeLong(sequence);
return index;
}

public static long scheduleTime(ByteBuf index) {
return index.getLong(0);
}
private final long scheduleTime;
private final long offset;
private final int size;
private final long sequence;

public static long offset(ByteBuf index) {
return index.getLong(8);
public ScheduleIndex(long scheduleTime, long offset, int size, long sequence) {
this.scheduleTime = scheduleTime;
this.offset = offset;
this.size = size;
this.sequence = sequence;
}

public static int size(ByteBuf index) {
return index.getInt(16);
public long getScheduleTime() {
return scheduleTime;
}

public static long sequence(ByteBuf index) {
return index.getLong(20);
public long getOffset() {
return offset;
}

public static void release(List<ByteBuf> resources) {
for (ByteBuf resource : resources) {
release(resource);
}
public int getSize() {
return size;
}

public static void release(ByteBuf resource) {
ReferenceCountUtil.safeRelease(resource);
public long getSequence() {
return sequence;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package qunar.tc.qmq.delay.sender;

import io.netty.buffer.ByteBuf;
import qunar.tc.qmq.common.Disposable;
import qunar.tc.qmq.delay.ScheduleIndex;

/**
* @author xufeng.deng [email protected]
Expand All @@ -26,5 +26,5 @@
public interface DelayProcessor extends Disposable {
void init();

void send(ByteBuf record);
void send(ScheduleIndex index);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package qunar.tc.qmq.delay.sender;

import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.batch.BatchExecutor;
Expand All @@ -35,13 +34,11 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static qunar.tc.qmq.delay.ScheduleIndex.buildIndex;

/**
* @author xufeng.deng [email protected]
* @since 2018-07-25 13:59
*/
public class SenderProcessor implements DelayProcessor, Processor<ByteBuf>, SenderGroup.ResultHandler {
public class SenderProcessor implements DelayProcessor, Processor<ScheduleIndex>, SenderGroup.ResultHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SenderProcessor.class);

private static final long DEFAULT_SEND_WAIT_TIME = 1;
Expand All @@ -54,7 +51,7 @@ public class SenderProcessor implements DelayProcessor, Processor<ByteBuf>, Send
private final BrokerService brokerService;
private final DelayLogFacade facade;

private BatchExecutor<ByteBuf> batchExecutor;
private BatchExecutor<ScheduleIndex> batchExecutor;

private long sendWaitTime = DEFAULT_SEND_WAIT_TIME;

Expand All @@ -77,9 +74,8 @@ public void init() {
}

@Override
public void send(ByteBuf index) {
public void send(ScheduleIndex index) {
if (!BrokerRoleManager.isDelayMaster()) {
ScheduleIndex.release(index);
return;
}

Expand All @@ -100,23 +96,22 @@ public void send(ByteBuf index) {
}

@Override
public void process(List<ByteBuf> pureRecords) {
if (pureRecords == null || pureRecords.isEmpty()) {
public void process(List<ScheduleIndex> indexList) {
if (indexList == null || indexList.isEmpty()) {
return;
}

List<ScheduleSetRecord> records = null;
List<ScheduleSetRecord> records = facade.recoverLogRecord(indexList);
try {
records = facade.recoverLogRecord(pureRecords);
senderExecutor.execute(records, this, brokerService);
} catch (Exception e) {
LOGGER.error("send message failed,messageSize:{} will retry", pureRecords.size(), e);
LOGGER.error("send message failed,messageSize:{} will retry", indexList.size(), e);
retry(records);
}
}

private void reject(ByteBuf record) {
send(record);
private void reject(ScheduleIndex index) {
send(index);
}

private void success(ScheduleSetRecord record) {
Expand All @@ -128,7 +123,7 @@ private void retry(List<ScheduleSetRecord> records, Set<String> messageIds) {
for (ScheduleSetRecord record : records) {
if (messageIds.contains(record.getMessageId())) {
refresh(record, refreshSubject);
send(buildIndex(record.getScheduleTime(), record.getStartWroteOffset(), record.getRecordSize(), record.getSequence()));
send(new ScheduleIndex(record.getScheduleTime(), record.getStartWroteOffset(), record.getRecordSize(), record.getSequence()));
continue;
}
success(record);
Expand All @@ -143,7 +138,7 @@ private void retry(List<ScheduleSetRecord> records) {
final Set<String> refreshSubject = Sets.newHashSet();
for (ScheduleSetRecord record : records) {
refresh(record, refreshSubject);
send(buildIndex(record.getScheduleTime(), record.getStartWroteOffset(), record.getRecordSize(), record.getSequence()));
send(new ScheduleIndex(record.getScheduleTime(), record.getStartWroteOffset(), record.getRecordSize(), record.getSequence()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package qunar.tc.qmq.delay.startup;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerService;
Expand Down Expand Up @@ -114,11 +113,11 @@ private void init() {
this.processor = new ReceivedDelayMessageProcessor(receiver);
}

private boolean iterateCallback(final ByteBuf buf) {
long scheduleTime = ScheduleIndex.scheduleTime(buf);
long offset = ScheduleIndex.offset(buf);
private boolean iterateCallback(final ScheduleIndex index) {
long scheduleTime = index.getScheduleTime();
long offset = index.getOffset();
if (wheelTickManager.canAdd(scheduleTime, offset)) {
wheelTickManager.addWHeel(buf);
wheelTickManager.addWHeel(index);
return true;
}

Expand Down
Loading

0 comments on commit 5761051

Please sign in to comment.