Skip to content

Commit

Permalink
Replicated subscriptions - Markers protobuf (apache#4340)
Browse files Browse the repository at this point in the history
* Replicated subscriptions - Markers protobuf

* Added license check exclusions for generated code
  • Loading branch information
merlimat authored May 23, 2019
1 parent f824410 commit d6c2813
Show file tree
Hide file tree
Showing 12 changed files with 3,520 additions and 12 deletions.
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>**/*.hgrm</exclude>
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
Expand Down Expand Up @@ -1258,6 +1258,7 @@ flexible messaging model and an intuitive client API.</description>
and are included in source tree for convenience -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/Markers.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>bin/proto/MLDataFormats_pb2.py</exclude>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Markers;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;

Expand Down Expand Up @@ -72,8 +73,8 @@ public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchS
MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);

try {
if (msgMetadata == null) {
// Message metadata was corrupted
if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
// Message metadata was corrupted or the messages was a server-only marker
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Markers;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
Expand Down
1 change: 1 addition & 0 deletions pulsar-common/generate_protobuf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@

PROTOC=${PROTOC:-protoc}
${PROTOC} --java_out=src/main/java src/main/proto/PulsarApi.proto
${PROTOC} --java_out=src/main/java src/main/proto/PulsarMarkers.proto
2 changes: 1 addition & 1 deletion pulsar-common/generate_protobuf_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ docker pull $IMAGE
WORKDIR=/workdir
docker run -i \
-v ${COMMON_DIR}:${WORKDIR} $IMAGE \
bash -c "cd ${WORKDIR}; /pulsar/protobuf/src/protoc --java_out=src/main/java src/main/proto/PulsarApi.proto"
bash -c "cd ${WORKDIR}; PROTOC=/pulsar/protobuf/src/protoc ./generate_protobuf.sh"

251 changes: 251 additions & 0 deletions pulsar-common/src/main/java/org/apache/pulsar/common/api/Markers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.api;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;

import org.apache.pulsar.common.api.Commands.ChecksumType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsUpdate;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;

@UtilityClass
public class Markers {

private static ByteBuf newMessage(MarkerType markerType, Optional<String> restrictToCluster, ByteBuf payload) {
MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
msgMetadataBuilder.setProducerName("pulsar.marker");
msgMetadataBuilder.setSequenceId(0);
msgMetadataBuilder.setMarkerType(markerType.getNumber());

if (restrictToCluster.isPresent()) {
msgMetadataBuilder.addReplicateTo(restrictToCluster.get());
}

MessageMetadata msgMetadata = msgMetadataBuilder.build();
try {
return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
} finally {
msgMetadata.recycle();
msgMetadataBuilder.recycle();
}
}

public static boolean isServerOnlyMarker(MessageMetadata msgMetadata) {
// In future, if we add more marker types that can be also sent to clients
// we'll have to do finer check here.
return msgMetadata.hasMarkerType();
}

@SneakyThrows
public static ByteBuf newReplicatedSubscriptionsSnapshotRequest(String snapshotId, String sourceCluster) {
ReplicatedSubscriptionsSnapshotRequest.Builder builder = ReplicatedSubscriptionsSnapshotRequest.newBuilder();
builder.setSnapshotId(snapshotId);
builder.setSourceCluster(sourceCluster);

ReplicatedSubscriptionsSnapshotRequest req = builder.build();

int size = req.getSerializedSize();

ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(size);
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(payload);
try {
req.writeTo(outStream);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST, Optional.empty(), payload);
} finally {
payload.release();
builder.recycle();
req.recycle();
outStream.recycle();
}
}

public static ReplicatedSubscriptionsSnapshotRequest parseReplicatedSubscriptionsSnapshotRequest(ByteBuf payload)
throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
ReplicatedSubscriptionsSnapshotRequest.Builder builder = null;

try {
builder = ReplicatedSubscriptionsSnapshotRequest.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
inStream.recycle();
}
}

@SneakyThrows
public static ByteBuf newReplicatedSubscriptionsSnapshotResponse(String snapshotId, String replyToCluster,
String cluster, long ledgerId, long entryId) {
ReplicatedSubscriptionsSnapshotResponse.Builder builder = ReplicatedSubscriptionsSnapshotResponse.newBuilder();
builder.setSnapshotId(snapshotId);

MessageIdData.Builder msgIdBuilder = MessageIdData.newBuilder();
msgIdBuilder.setLedgerId(ledgerId);
msgIdBuilder.setEntryId(entryId);

ClusterMessageId.Builder clusterMessageIdBuilder = ClusterMessageId.newBuilder();
clusterMessageIdBuilder.setCluster(cluster);
clusterMessageIdBuilder.setMessageId(msgIdBuilder);

builder.setCluster(clusterMessageIdBuilder);
ReplicatedSubscriptionsSnapshotResponse response = builder.build();

int size = response.getSerializedSize();

ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(size);
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(payload);
try {
response.writeTo(outStream);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE, Optional.of(replyToCluster),
payload);
} finally {
msgIdBuilder.recycle();
clusterMessageIdBuilder.recycle();
payload.release();
builder.recycle();
response.recycle();
outStream.recycle();
}
}

public static ReplicatedSubscriptionsSnapshotResponse parseReplicatedSubscriptionsSnapshotResponse(ByteBuf payload)
throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
ReplicatedSubscriptionsSnapshotResponse.Builder builder = null;

try {
builder = ReplicatedSubscriptionsSnapshotResponse.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
inStream.recycle();
}
}

@SneakyThrows
public static ByteBuf newReplicatedSubscriptionsSnapshot(String snapshotId, String sourceCluster, long ledgerId,
long entryId, Map<String, MessageIdData> clusterIds) {
ReplicatedSubscriptionsSnapshot.Builder builder = ReplicatedSubscriptionsSnapshot.newBuilder();
builder.setSnapshotId(snapshotId);

MessageIdData.Builder msgIdBuilder = MessageIdData.newBuilder();
msgIdBuilder.setLedgerId(ledgerId);
msgIdBuilder.setEntryId(entryId);
builder.setLocalMessageId(msgIdBuilder);

clusterIds.forEach((cluster, msgId) -> {
ClusterMessageId.Builder clusterMessageIdBuilder = ClusterMessageId.newBuilder()
.setCluster(cluster)
.setMessageId(msgId);
builder.addClusters(clusterMessageIdBuilder);
clusterMessageIdBuilder.recycle();
});

ReplicatedSubscriptionsSnapshot snapshot = builder.build();

int size = snapshot.getSerializedSize();

ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(size);
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(payload);
try {
snapshot.writeTo(outStream);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT, Optional.of(sourceCluster), payload);
} finally {
payload.release();
builder.recycle();
snapshot.recycle();
outStream.recycle();
}
}

public static ReplicatedSubscriptionsSnapshot parseReplicatedSubscriptionsSnapshot(ByteBuf payload)
throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
ReplicatedSubscriptionsSnapshot.Builder builder = null;

try {
builder = ReplicatedSubscriptionsSnapshot.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
inStream.recycle();
}
}

@SneakyThrows
public static ByteBuf newReplicatedSubscriptionsUpdate(String subscriptionName, Map<String, MessageIdData> clusterIds) {
ReplicatedSubscriptionsUpdate.Builder builder = ReplicatedSubscriptionsUpdate.newBuilder();
builder.setSubscriptionName(subscriptionName);

clusterIds.forEach((cluster, msgId) -> {
ClusterMessageId.Builder clusterMessageIdBuilder = ClusterMessageId.newBuilder()
.setCluster(cluster)
.setMessageId(msgId);
builder.addClusters(clusterMessageIdBuilder);
clusterMessageIdBuilder.recycle();
});

ReplicatedSubscriptionsUpdate update = builder.build();

int size = update.getSerializedSize();

ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(size);
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(payload);
try {
update.writeTo(outStream);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT, Optional.empty(), payload);
} finally {
payload.release();
builder.recycle();
update.recycle();
outStream.recycle();
}
}

public static ReplicatedSubscriptionsUpdate parseReplicatedSubscriptionsUpdate(ByteBuf payload)
throws IOException {
ByteBufCodedInputStream inStream = ByteBufCodedInputStream.get(payload);
ReplicatedSubscriptionsUpdate.Builder builder = null;

try {
builder = ReplicatedSubscriptionsUpdate.newBuilder();
return builder.mergeFrom(inStream, null).build();
} finally {
builder.recycle();
inStream.recycle();
}
}
}
Loading

0 comments on commit d6c2813

Please sign in to comment.