Skip to content

Commit

Permalink
[Transactoin] Add default handler to handle transaction related comma…
Browse files Browse the repository at this point in the history
…nds (apache#4891)

---

*Motivation*

Add default handler to handle transaction related commands.
  • Loading branch information
zymap authored and sijie committed Aug 6, 2019
1 parent f6ed037 commit 822d8ec
Showing 1 changed file with 134 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxn;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddSubscriptionToTxn;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
Expand All @@ -36,6 +40,12 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxn;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnPartition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnSubscription;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
Expand All @@ -45,6 +55,8 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxnResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPing;
Expand Down Expand Up @@ -312,6 +324,78 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
handleAuthResponse(cmd.getAuthResponse());
cmd.getAuthResponse().recycle();
break;

case NEW_TXN:
checkArgument(cmd.hasNewTxn());
handleNewTxn(cmd.getNewTxn());
cmd.getNewTxn().recycle();
break;

case NEW_TXN_RESPONSE:
checkArgument(cmd.hasNewTxnResponse());
handleNewTxnResponse(cmd.getNewTxnResponse());
cmd.getNewTxnResponse().recycle();
break;

case ADD_PARTITION_TO_TXN:
checkArgument(cmd.hasAddPartitionToTxn());
handleAddPartitionToTxn(cmd.getAddPartitionToTxn());
cmd.getAddPartitionToTxn().recycle();
break;

case ADD_PARTITION_TO_TXN_RESPONSE:
checkArgument(cmd.hasAddPartitionToTxnResponse());
handleAddPartitionToTxnResponse(cmd.getAddPartitionToTxnResponse());
cmd.getAddPartitionToTxnResponse().recycle();
break;

case ADD_SUBSCRIPTION_TO_TXN:
checkArgument(cmd.hasAddSubscriptionToTxn());
handleAddSubscriptionToTxn(cmd.getAddSubscriptionToTxn());
cmd.getAddSubscriptionToTxn().recycle();
break;

case ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
checkArgument(cmd.hasAddSubscriptionToTxnResponse());
handleAddSubsciptionToTxnResponse(cmd.getAddSubscriptionToTxnResponse());
cmd.getAddSubscriptionToTxnResponse().recycle();
break;

case END_TXN:
checkArgument(cmd.hasEndTxn());
handleEndTxn(cmd.getEndTxn());
cmd.getEndTxn().recycle();
break;

case END_TXN_RESPONSE:
checkArgument(cmd.hasEndTxnResponse());
handleEndTxnResponse(cmd.getEndTxnResponse());
cmd.getEndTxnResponse().recycle();
break;

case END_TXN_ON_PARTITION:
checkArgument(cmd.hasEndTxnOnPartition());
handleEndTxnOnPartition(cmd.getEndTxnOnPartition());
cmd.getEndTxnOnPartition().recycle();
break;

case END_TXN_ON_PARTITION_RESPONSE:
checkArgument(cmd.hasEndTxnOnPartitionResponse());
handleEndTxnOnPartitionResponse(cmd.getEndTxnOnPartitionResponse());
cmd.getEndTxnOnPartitionResponse().recycle();
break;

case END_TXN_ON_SUBSCRIPTION:
checkArgument(cmd.hasEndTxnOnSubscription());
handleEndTxnOnSubscription(cmd.getEndTxnOnSubscription());
cmd.getEndTxnOnSubscription().recycle();
break;

case END_TXN_ON_SUBSCRIPTION_RESPONSE:
checkArgument(cmd.hasEndTxnOnSubscriptionResponse());
handleEndTxnOnsubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse());
cmd.getEndTxnOnSubscriptionResponse().recycle();
break;
}
} finally {
if (cmdBuilder != null) {
Expand Down Expand Up @@ -471,5 +555,55 @@ protected void handleAuthChallenge(CommandAuthChallenge commandAuthChallenge) {
throw new UnsupportedOperationException();
}

protected void handleNewTxn(CommandNewTxn commandNewTxn) {
throw new UnsupportedOperationException();
}

protected void handleNewTxnResponse(CommandNewTxnResponse commandNewTxnResponse) {
throw new UnsupportedOperationException();
}

protected void handleAddPartitionToTxn(CommandAddPartitionToTxn commandAddPartitionToTxn) {
throw new UnsupportedOperationException();
}

protected void handleAddPartitionToTxnResponse(CommandAddPartitionToTxnResponse commandAddPartitionToTxnResponse) {
throw new UnsupportedOperationException();
}

protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn commandAddSubscriptionToTxn) {
throw new UnsupportedOperationException();
}

protected void handleAddSubsciptionToTxnResponse(
CommandAddSubscriptionToTxnResponse commandAddSubscriptionToTxnResponse) {
throw new UnsupportedOperationException();
}

protected void handleEndTxn(CommandEndTxn commandEndTxn) {
throw new UnsupportedOperationException();
}

protected void handleEndTxnResponse(CommandEndTxnResponse commandEndTxnResponse) {
throw new UnsupportedOperationException();
}

protected void handleEndTxnOnPartition(CommandEndTxnOnPartition commandEndTxnOnPartition) {
throw new UnsupportedOperationException();
}

protected void handleEndTxnOnPartitionResponse(CommandEndTxnOnPartitionResponse commandEndTxnOnPartitionResponse) {
throw new UnsupportedOperationException();
}

protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription commandEndTxnOnSubscription) {
throw new UnsupportedOperationException();
}

protected void handleEndTxnOnsubscriptionResponse(
CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
throw new UnsupportedOperationException();
}

private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class);
}

0 comments on commit 822d8ec

Please sign in to comment.