Skip to content

Commit

Permalink
Rename package name and code simplify (apache#9058)
Browse files Browse the repository at this point in the history
### Motivation
This is a minor-update contains :
1. change package path `org.apache.bookkeeper.mledger.interceptor` to `org.apache.bookkeeper.mledger.intercept` for keeping consistency with other module like pulsar-broker `org.apache.pulsar.broker.intercept.BrokerInterceptor`
2. simplify constructor of `OpAddEntry`
3.  rename parameter `batchSize ` to `numberOfMessages `
4. add annotation and JavaDoc  for `BrokerEntryMetadataInterceptor`
  • Loading branch information
aloyszhang authored Dec 27, 2020
1 parent 8574e58 commit b51e4f3
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;

import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,30 +77,26 @@ enum State {
}

public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
op.data = data.retain();
op.dataLength = data.readableBytes();
op.callback = callback;
op.ctx = ctx;
op.addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
op.closeWhenDone = false;
op.entryId = -1;
op.startTime = System.nanoTime();
op.state = State.OPEN;
ml.mbean.addAddEntrySample(op.dataLength);
OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}

public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}

private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
op.numberOfMessages = numberOfMessages;
op.data = data.retain();
op.dataLength = data.readableBytes();
op.callback = callback;
Expand All @@ -111,9 +107,6 @@ public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOf
op.startTime = System.nanoTime();
op.state = State.OPEN;
ml.mbean.addAddEntrySample(op.dataLength);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.interceptor;
package org.apache.bookkeeper.mledger.intercept;

import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand All @@ -35,10 +35,10 @@ public interface ManagedLedgerInterceptor {
/**
* Intercept an OpAddEntry and return an OpAddEntry.
* @param op an OpAddEntry to be intercepted.
* @param batchSize
* @param numberOfMessages
* @return an OpAddEntry.
*/
OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize);
OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages);

/**
* Intercept when ManagedLedger is initialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
Expand Down Expand Up @@ -55,11 +55,11 @@ public long getIndex() {
}

@Override
public OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize) {
if (op == null || batchSize <= 0) {
public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
if (op == null || numberOfMessages <= 0) {
return op;
}
op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors, batchSize));
op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors, numberOfMessages));
return op;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMeta
}

@Override
public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(
public PulsarApi.BrokerEntryMetadata.Builder interceptWithNumberOfMessages(
PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
int batchSize) {
int numberOfMessages) {
// do nothing, just return brokerMetadata
return brokerMetadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMeta
}

@Override
public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(
public PulsarApi.BrokerEntryMetadata.Builder interceptWithNumberOfMessages(
PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
int batchSize) {
return brokerMetadata.setIndex(indexGenerator.addAndGet(batchSize));
int numberOfMessages) {
return brokerMetadata.setIndex(indexGenerator.addAndGet(numberOfMessages));
}

public long getIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,24 @@
package org.apache.pulsar.common.intercept;

import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

/**
* A plugin interface that allows you to intercept the client requests to
* the Pulsar brokers and add metadata for each entry from broker side.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
public interface BrokerEntryMetadataInterceptor {
/**
* Called by ManagedLedger to intercept adding an entry.
*/
PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata);
PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
int batchSize);

/**
* Called by ManagedLedger to intercept adding an entry with numberOfMessages.
*/
PulsarApi.BrokerEntryMetadata.Builder interceptWithNumberOfMessages(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
int numberOfMessages);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -1965,14 +1964,14 @@ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload,

public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload,
Set<BrokerEntryMetadataInterceptor> brokerInterceptors,
int batchSize) {
int numberOfMessages) {
// | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE | BROKER_ENTRY_METADATA |
// | 2 bytes | 4 bytes | BROKER_ENTRY_METADATA_SIZE bytes |

PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder = PulsarApi.BrokerEntryMetadata.newBuilder();
for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) {
interceptor.intercept(brokerMetadataBuilder);
interceptor.interceptWithBatchSize(brokerMetadataBuilder, batchSize);
interceptor.interceptWithNumberOfMessages(brokerMetadataBuilder, numberOfMessages);
}
PulsarApi.BrokerEntryMetadata brokerEntryMetadata = brokerMetadataBuilder.build();
int brokerMetaSize = brokerEntryMetadata.getSerializedSize();
Expand Down

0 comments on commit b51e4f3

Please sign in to comment.