diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 8de60b5f94c34..fd169fe1d1d59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -400,14 +400,6 @@ public void start() throws PulsarServerException { this.offloader = createManagedLedgerOffloader(this.getConfiguration()); - if (StringUtils.isNotBlank(config.getTransactionMetadataStoreProviderClassName())) { - transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider - .newProvider(config.getTransactionMetadataStoreProviderClassName()), this); - } else { - transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider - .newProvider(InMemTransactionMetadataStoreProvider.class.getName()), this); - } - brokerService.start(); this.webService = new WebService(this); @@ -483,9 +475,11 @@ public Boolean get() { // Register heartbeat and bootstrap namespaces. this.nsService.registerBootstrapNamespaces(); - // Register pulsar system namespaces + // Register pulsar system namespaces and start transaction meta store service if (config.isTransactionCoordinatorEnabled()) { - this.nsService.registerNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), false); + transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider + .newProvider(config.getTransactionMetadataStoreProviderClassName()), this); + transactionMetadataStoreService.start(); } this.metricsGenerator = new MetricsGenerator(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 8eeb9d174df87..e7081985990e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -19,13 +19,18 @@ package org.apache.pulsar.broker; import com.google.common.annotations.VisibleForTesting; +import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.TxnMeta; -import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException; import org.apache.pulsar.transaction.impl.common.TxnID; +import org.apache.pulsar.transaction.impl.common.TxnStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +55,51 @@ public TransactionMetadataStoreService(TransactionMetadataStoreProvider transact this.transactionMetadataStoreProvider = transactionMetadataStoreProvider; } + public void start() { + pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { + @Override + public void onLoad(NamespaceBundle bundle) { + pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle) + .whenComplete((topics, ex) -> { + if (ex == null) { + for (String topic : topics) { + TopicName name = TopicName.get(topic); + if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName() + .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName()) + && name.isPartitioned()) { + addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex())); + } + } + } else { + LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.", bundle, ex); + } + }); + } + @Override + public void unLoad(NamespaceBundle bundle) { + pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle) + .whenComplete((topics, ex) -> { + if (ex == null) { + for (String topic : topics) { + TopicName name = TopicName.get(topic); + if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName() + .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName()) + && name.isPartitioned()) { + removeTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex())); + } + } + } else { + LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.", bundle, ex); + } + }); + } + @Override + public boolean test(NamespaceBundle namespaceBundle) { + return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE); + } + }); + } + public void addTransactionMetadataStore(TransactionCoordinatorID tcId) { transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory()) .whenComplete((store, ex) -> { @@ -57,22 +107,28 @@ public void addTransactionMetadataStore(TransactionCoordinatorID tcId) { LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex); } else { stores.put(tcId, store); + LOG.info("Added new transaction meta store {}", tcId); } }); } public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) { - stores.remove(tcId).closeAsync().whenComplete((v, ex) -> { - if (ex != null) { - LOG.error("Close transaction metadata store with id {} error", ex); - } - }); + TransactionMetadataStore metadataStore = stores.remove(tcId); + if (metadataStore != null) { + metadataStore.closeAsync().whenComplete((v, ex) -> { + if (ex != null) { + LOG.error("Close transaction metadata store with id {} error", ex); + } else { + LOG.info("Removed and closed transaction meta store {}", tcId); + } + }); + } } public CompletableFuture newTransaction(TransactionCoordinatorID tcId) { TransactionMetadataStore store = stores.get(tcId); if (store == null) { - return FutureUtil.failedFuture(new CoordinatorException.NotFoundException(tcId)); + return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId)); } return store.newTransaction(); } @@ -81,7 +137,7 @@ public CompletableFuture addProducedPartitionToTxn(TxnID txnId, List addAckedPartitionToTxn(TxnID txnId, List TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId); TransactionMetadataStore store = stores.get(tcId); if (store == null) { - return FutureUtil.failedFuture(new CoordinatorException.NotFoundException(tcId)); + return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId)); } return store.addAckedPartitionToTxn(txnId, partitions); } @@ -99,11 +155,20 @@ public CompletableFuture getTxnMeta(TxnID txnId) { TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId); TransactionMetadataStore store = stores.get(tcId); if (store == null) { - return FutureUtil.failedFuture(new CoordinatorException.NotFoundException(tcId)); + return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId)); } return store.getTxnMeta(txnId); } + public CompletableFuture updateTxnStatus(TxnID txnId, TxnStatus newStatus, TxnStatus expectedStatus) { + TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId); + TransactionMetadataStore store = stores.get(tcId); + if (store == null) { + return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId)); + } + return store.updateTxnStatus(txnId, newStatus, expectedStatus); + } + private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) { return new TransactionCoordinatorID(txnId.getMostSigBits()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 3a7c542d83a36..e794718c5fb1b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -22,6 +22,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; /** * Base type of exception thrown by Pulsar Broker Service @@ -195,6 +196,10 @@ private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean che return PulsarApi.ServerError.IncompatibleSchema; } else if (t instanceof ConsumerAssignException) { return ServerError.ConsumerAssignError; + } else if (t instanceof CoordinatorException.CoordinatorNotFoundException) { + return ServerError.TransactionCoordinatorNotFound; + } else if (t instanceof CoordinatorException.InvalidTxnStatusException) { + return ServerError.InvalidTxnStatus; } else { if (checkCauseIfUnknown) { return getClientErrorCode(t.getCause(), false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d2c516395be46..ec60a8de8fbad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -61,13 +61,13 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; -import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn; import org.apache.pulsar.common.protocol.CommandUtils; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; @@ -110,6 +110,9 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.apache.pulsar.transaction.impl.common.TxnStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1402,6 +1405,85 @@ protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCrea }); } + @Override + protected void handleNewTxn(CommandNewTxn command) { + if (log.isDebugEnabled()) { + log.debug("Receive new txn request {} to transaction meta store {} from {}.", command.getRequestId(), command.getTcId(), remoteAddress); + } + TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId()); + service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId) + .whenComplete(((txnID, ex) -> { + if (ex == null) { + if (log.isDebugEnabled()) { + log.debug("Send response {} for new txn request {}", tcId.getId(), command.getRequestId()); + } + ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), txnID.getLeastSigBits(), txnID.getMostSigBits())); + } else { + if (log.isDebugEnabled()) { + log.debug("Send response error for new txn request {}", command.getRequestId(), ex); + } + ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); + } + })); + } + + @Override + protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn command) { + TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); + if (log.isDebugEnabled()) { + log.debug("Receive add published partition to txn request {} from {} with txnId {}", command.getRequestId(), remoteAddress, txnID); + } + service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, command.getPartitionsList()) + .whenComplete(((v, ex) -> { + if (ex == null) { + if (log.isDebugEnabled()) { + log.debug("Send response success for add published partition to txn request {}", command.getRequestId()); + } + ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(), + txnID.getLeastSigBits(), txnID.getMostSigBits())); + } else { + if (log.isDebugEnabled()) { + log.debug("Send response error for add published partition to txn request {}", command.getRequestId(), ex); + } + ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(), txnID.getMostSigBits(), + BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); + } + })); + } + + @Override + protected void handleEndTxn(PulsarApi.CommandEndTxn command) { + TxnStatus newStatus = null; + switch (command.getTxnAction()) { + case COMMIT: + newStatus = TxnStatus.COMMITTING; + break; + case ABORT: + newStatus = TxnStatus.ABORTING; + break; + } + TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); + if (log.isDebugEnabled()) { + log.debug("Receive end txn by {} request {} from {} with txnId {}", newStatus, command.getRequestId(), remoteAddress, txnID); + } + service.pulsar().getTransactionMetadataStoreService().updateTxnStatus(txnID, newStatus, TxnStatus.OPEN) + .whenComplete((v, ex) -> { + if (ex == null) { + if (log.isDebugEnabled()) { + log.debug("Send response success for end txn request {}", command.getRequestId()); + } + ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), + txnID.getLeastSigBits(), txnID.getMostSigBits())); + } else { + if (log.isDebugEnabled()) { + log.debug("Send response error for end txn request {}", command.getRequestId()); + } + ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(), + BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); + } + }); + } + private CompletableFuture tryAddSchema(Topic topic, SchemaData schema) { if (schema != null) { return topic.addSchema(schema); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index e50c974553327..a3362b78f7634 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -162,7 +162,7 @@ public void testOwnedNamespaces() { Map nsMap = pulsarAdmins[i].brokers().getOwnedNamespaces("my-cluster", list.get(0)); - Assert.assertEquals(nsMap.size(), 3); + Assert.assertEquals(nsMap.size(), 2); } } catch (Exception e) { e.printStackTrace(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 060772637c0c0..dca1510cfaef2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -401,7 +401,7 @@ public void brokers() throws Exception { Map nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0)); // since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace) - Assert.assertEquals(nsMap.size(), 2); + Assert.assertEquals(nsMap.size(), 1); for (String ns : nsMap.keySet()) { NamespaceOwnershipStatus nsStatus = nsMap.get(ns); if (ns.equals( @@ -417,7 +417,7 @@ public void brokers() throws Exception { Assert.assertEquals(parts.length, 2); Map nsMap2 = adminTls.brokers().getOwnedNamespaces("test", String.format("%s:%d", parts[0], BROKER_WEBSERVICE_PORT_TLS)); - Assert.assertEquals(nsMap2.size(), 2); + Assert.assertEquals(nsMap2.size(), 1); admin.namespaces().deleteNamespace("prop-xyz/ns1"); admin.clusters().deleteCluster("test"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index f22cd3d656cf4..7817ef22618ac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -397,7 +397,7 @@ public void brokers() throws Exception { Map nsMap = admin.brokers().getOwnedNamespaces("use", list.get(0)); // since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace) - Assert.assertEquals(nsMap.size(), 2); + Assert.assertEquals(nsMap.size(), 1); for (String ns : nsMap.keySet()) { NamespaceOwnershipStatus nsStatus = nsMap.get(ns); if (ns.equals( @@ -413,7 +413,7 @@ public void brokers() throws Exception { Assert.assertEquals(parts.length, 2); Map nsMap2 = adminTls.brokers().getOwnedNamespaces("use", String.format("%s:%d", parts[0], BROKER_WEBSERVICE_PORT_TLS)); - Assert.assertEquals(nsMap2.size(), 2); + Assert.assertEquals(nsMap2.size(), 1); admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); admin.clusters().deleteCluster("use"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorAssignTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorAssignTest.java deleted file mode 100644 index 21ad94212fded..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorAssignTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.transaction.coordinator; - -import com.google.common.hash.Hashing; -import org.apache.pulsar.broker.BundleData; -import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; -import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.NamespaceBundleFactory; -import org.apache.pulsar.common.naming.NamespaceName; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -/** - * Unit tests for transaction coordinator assign - */ -public class TransactionCoordinatorAssignTest extends BrokerTestBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - super.baseSetup(); - } - - @AfterMethod - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - @Test - public void testTCAssignTopicNamespaceBootstrap() throws Exception { - final List loadedBundles = new ArrayList<>(); - final CountDownLatch latch = new CountDownLatch(1); - pulsar.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { - - @Override - public void onLoad(NamespaceBundle bundle) { - loadedBundles.add(bundle); - latch.countDown(); - } - - @Override - public void unLoad(NamespaceBundle bundle) { - } - - @Override - public boolean test(NamespaceBundle namespaceBundle) { - return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE); - } - }); - - Assert.assertEquals(loadedBundles.size(), 1); - Assert.assertEquals(loadedBundles.get(0).getNamespaceObject(), NamespaceName.SYSTEM_NAMESPACE); - Assert.assertEquals(loadedBundles.get(0), - NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()) - .getFullBundle(loadedBundles.get(0).getNamespaceObject())); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java new file mode 100644 index 0000000000000..f88b149a1ffef --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.coordinator; + +import com.google.common.collect.Lists; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TransactionCoordinatorClientTest extends TransactionMetaStoreTestBase { + + @Test + public void testClientStart() throws PulsarClientException, TransactionCoordinatorClientException, InterruptedException { + try { + transactionCoordinatorClient.start(); + Assert.fail("should failed here because the transaction metas store already started!"); + } catch (TransactionCoordinatorClientException e) { + // ok here + } + + Assert.assertNotNull(transactionCoordinatorClient); + Assert.assertEquals(transactionCoordinatorClient.getState(), State.READY); + } + + @Test + public void testNewTxn() throws TransactionCoordinatorClientException { + TxnID txnID = transactionCoordinatorClient.newTransaction(); + Assert.assertNotNull(txnID); + Assert.assertEquals(txnID.getLeastSigBits(), 0L); + } + + @Test + public void testCommitAndAbort() throws TransactionCoordinatorClientException { + TxnID txnID = transactionCoordinatorClient.newTransaction(); + transactionCoordinatorClient.addPublishPartitionToTxn(txnID, Lists.newArrayList("persistent://public/default/testCommitAndAbort")); + transactionCoordinatorClient.commit(txnID); + try { + transactionCoordinatorClient.abort(txnID); + Assert.fail("Should be fail, because the txn is in committing state, can't abort now."); + } catch (TransactionCoordinatorClientException.InvalidTxnStatusException ignore) { + // Ok here + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java new file mode 100644 index 0000000000000..8e1ee217b05f5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.coordinator; + +import org.apache.pulsar.broker.PulsarService; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TransactionMetaStoreAssignmentTest extends TransactionMetaStoreTestBase { + + @Test + public void testTransactionMetaStoreAssignAndFailover() throws IOException, InterruptedException { + + int transactionMetaStoreCount = 0; + for (PulsarService pulsarService : pulsarServices) { + transactionMetaStoreCount += pulsarService.getTransactionMetadataStoreService().getStores().size(); + } + + Assert.assertEquals(transactionMetaStoreCount, 16); + + PulsarService crashedMetaStore = null; + for (int i = pulsarServices.length - 1; i >= 0; i--) { + if (pulsarServices[i].getTransactionMetadataStoreService().getStores().size() > 0) { + crashedMetaStore = pulsarServices[i]; + break; + } + } + + Assert.assertNotNull(crashedMetaStore); + List services = new ArrayList<>(pulsarServices.length - 1); + for (PulsarService pulsarService : pulsarServices) { + if (pulsarService != crashedMetaStore) { + services.add(pulsarService); + } + } + pulsarServices = new PulsarService[pulsarServices.length - 1]; + for (int i = 0; i < services.size(); i++) { + pulsarServices[i] = services.get(i); + } + crashedMetaStore.close(); + + Thread.sleep(3000); + + transactionMetaStoreCount = 0; + for (PulsarService pulsarService : pulsarServices) { + transactionMetaStoreCount += pulsarService.getTransactionMetadataStoreService().getStores().size(); + } + + Assert.assertEquals(transactionMetaStoreCount, 16); + + transactionCoordinatorClient.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java new file mode 100644 index 0000000000000..afcfa01af9a0c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.coordinator; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup; +import org.apache.pulsar.broker.NoOpShutdownService; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeClass; + +import java.net.URL; +import java.util.Optional; + +public class TransactionMetaStoreTestBase { + + private static final Logger log = LoggerFactory.getLogger(TransactionMetaStoreTestBase.class); + + LocalBookkeeperEnsemble bkEnsemble; + protected final int ZOOKEEPER_PORT = PortManager.nextFreePort(); + protected int[] brokerWebServicePorts = new int[BROKER_COUNT]; + protected int[] brokerNativeBrokerPorts = new int[BROKER_COUNT]; + protected URL[] brokerUrls = new URL[BROKER_COUNT]; + protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT]; + protected static final int BROKER_COUNT = 5; + protected ServiceConfiguration[] configurations = new ServiceConfiguration[BROKER_COUNT]; + protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT]; + + protected TransactionCoordinatorClient transactionCoordinatorClient; + + @BeforeClass + void setup() throws Exception { + log.info("---- Initializing SLAMonitoringTest -----"); + // Start local bookkeeper ensemble + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); + bkEnsemble.start(); + + String[] args = new String[]{ + "--cluster", "my-cluster", + "--configuration-store", "localhost:" + ZOOKEEPER_PORT, + "--initial-num-transaction-coordinators", "16"}; + + PulsarTransactionCoordinatorMetadataSetup.main(args); + + // start brokers + for (int i = 0; i < BROKER_COUNT; i++) { + brokerWebServicePorts[i] = PortManager.nextFreePort(); + brokerNativeBrokerPorts[i] = PortManager.nextFreePort(); + + ServiceConfiguration config = new ServiceConfiguration(); + config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i])); + config.setClusterName("my-cluster"); + config.setAdvertisedAddress("localhost"); + config.setWebServicePort(Optional.ofNullable(brokerWebServicePorts[i])); + config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); + config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i])); + config.setDefaultNumberOfNamespaceBundles(1); + config.setLoadBalancerEnabled(false); + configurations[i] = config; + + pulsarServices[i] = new PulsarService(config); + pulsarServices[i].setShutdownService(new NoOpShutdownService()); + pulsarServices[i].start(); + + brokerUrls[i] = new URL("http://127.0.0.1" + ":" + brokerWebServicePorts[i]); + pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(brokerUrls[i].toString()).build(); + } + + Thread.sleep(100); + + PulsarClient client = PulsarClient.builder(). + serviceUrl(pulsarServices[0].getBrokerServiceUrl()) + .build(); + transactionCoordinatorClient = new TransactionCoordinatorClientImpl(client); + transactionCoordinatorClient.start(); + + Thread.sleep(3000); + } +} diff --git a/pulsar-client-api/pom.xml b/pulsar-client-api/pom.xml index e393599d38a24..c6a15f8ed9f04 100644 --- a/pulsar-client-api/pom.xml +++ b/pulsar-client-api/pom.xml @@ -33,12 +33,17 @@ Pulsar Client :: API - - com.google.protobuf - protobuf-java - ${protobuf3.version} - provided - + + com.google.protobuf + protobuf-java + ${protobuf3.version} + provided + + + org.apache.pulsar + pulsar-transaction-common + ${project.version} + diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java new file mode 100644 index 0000000000000..147733948ff6e --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.transaction; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.transaction.impl.common.TxnID; + +/** + * Transaction coordinator client. + */ +public interface TransactionCoordinatorClient extends Closeable { + + /** + * Default transaction ttl in mills. + */ + long DEFAULT_TXN_TTL_MS = 60000L; + + /** + * State of the transaction coordinator client. + */ + enum State { + NONE, + STARTING, + READY, + CLOSING, + CLOSED + } + + /** + * Start transaction meta store client. + * + *

This will create connections to transaction meta store service. + * + * @throws TransactionCoordinatorClientException exception occur while start + */ + void start() throws TransactionCoordinatorClientException; + + /** + * Start transaction meta store client asynchronous. + * + *

This will create connections to transaction meta store service. + * + * @return a future represents the result of start transaction meta store + */ + CompletableFuture startAsync(); + + /** + * Close the transaction meta store client asynchronous. + * + * @return a future represents the result of close transaction meta store + */ + CompletableFuture closeAsync(); + + /** + * Create a new transaction. + * + * @return {@link TxnID} as the identifier for identifying the transaction. + */ + TxnID newTransaction() throws TransactionCoordinatorClientException; + + /** + * Create a new transaction asynchronously. + * + * @return a future represents the result of creating a new transaction. + * it returns {@link TxnID} as the identifier for identifying the + * transaction. + */ + CompletableFuture newTransactionAsync(); + + /** + * Create a new transaction. + * + * @param timeout timeout for new transaction + * @param unit time unit for new transaction + * + * @return {@link TxnID} as the identifier for identifying the transaction. + */ + TxnID newTransaction(long timeout, TimeUnit unit) throws TransactionCoordinatorClientException; + + /** + * Create a new transaction asynchronously. + * + * @param timeout timeout for new transaction + * @param unit time unit for new transaction + * + * @return a future represents the result of creating a new transaction. + * it returns {@link TxnID} as the identifier for identifying the + * transaction. + */ + CompletableFuture newTransactionAsync(long timeout, TimeUnit unit); + + /** + * Add publish partition to txn. + * + * @param txnID txn id which add partitions to. + * @param partitions partitions add to the txn. + */ + void addPublishPartitionToTxn(TxnID txnID, List partitions) throws TransactionCoordinatorClientException; + + /** + * Add publish partition to txn asynchronously. + * + * @param txnID txn id which add partitions to. + * @param partitions partitions add to the txn. + * + * @return a future represents the result of add publish partition to txn. + */ + CompletableFuture addPublishPartitionToTxnAsync(TxnID txnID, List partitions); + + /** + * Commit txn. + * @param txnID txn id to commit. + */ + void commit(TxnID txnID) throws TransactionCoordinatorClientException; + + /** + * Commit txn asynchronously. + * @param txnID txn id to commit. + * @return a future represents the result of commit txn. + */ + CompletableFuture commitAsync(TxnID txnID); + + /** + * Abort txn. + * @param txnID txn id to abort. + */ + void abort(TxnID txnID) throws TransactionCoordinatorClientException; + + /** + * Abort txn asynchronously. + * @param txnID txn id to abort. + * @return a future represents the result of abort txn. + */ + CompletableFuture abortAsync(TxnID txnID); + + /** + * Get current state of the transaction meta store. + * + * @return current state {@link State} of the transaction meta store + */ + State getState(); +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java new file mode 100644 index 0000000000000..5d6ee0597bbed --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.transaction; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +/** + * Exceptions for transaction coordinator client. + */ +public class TransactionCoordinatorClientException extends IOException { + + public TransactionCoordinatorClientException(Throwable t) { + super(t); + } + + public TransactionCoordinatorClientException(String message) { + super(message); + } + + /** + * Thrown when transaction coordinator with unexpected state. + */ + public static class CoordinatorClientStateException extends TransactionCoordinatorClientException { + + public CoordinatorClientStateException() { + super("Unexpected state for transaction metadata client."); + } + + public CoordinatorClientStateException(String message) { + super(message); + } + } + + /** + * Thrown when transaction coordinator not found in broker side. + */ + public static class CoordinatorNotFoundException extends TransactionCoordinatorClientException { + public CoordinatorNotFoundException(String message) { + super(message); + } + } + + /** + * Thrown when transaction switch to a invalid status. + */ + public static class InvalidTxnStatusException extends TransactionCoordinatorClientException { + public InvalidTxnStatusException(String message) { + super(message); + } + } + + /** + * Thrown when transaction meta store handler not exists. + */ + public static class MetaStoreHandlerNotExistsException extends TransactionCoordinatorClientException { + + public MetaStoreHandlerNotExistsException(long tcId) { + super("Transaction meta store handler for transaction meta store {} not exists."); + } + + public MetaStoreHandlerNotExistsException(String message) { + super(message); + } + } + + /** + * Thrown when send request to transaction meta store but the transaction meta store handler not ready. + */ + public static class MetaStoreHandlerNotReadyException extends TransactionCoordinatorClientException { + public MetaStoreHandlerNotReadyException(long tcId) { + super("Transaction meta store handler for transaction meta store {} not ready now."); + } + + public MetaStoreHandlerNotReadyException(String message) { + super(message); + } + } + + public static TransactionCoordinatorClientException unwrap(Throwable t) { + if (t instanceof TransactionCoordinatorClientException) { + return (TransactionCoordinatorClientException) t; + } else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + return new TransactionCoordinatorClientException(t); + } else if (!(t instanceof ExecutionException)) { + // Generic exception + return new TransactionCoordinatorClientException(t); + } + + Throwable cause = t.getCause(); + String msg = cause.getMessage(); + + if (cause instanceof CoordinatorNotFoundException) { + return new CoordinatorNotFoundException(msg); + } else if (cause instanceof InvalidTxnStatusException) { + return new InvalidTxnStatusException(msg); + } else { + return new TransactionCoordinatorClientException(t); + } + + } +} diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h index 52a3a48a7c175..9a104737df599 100644 --- a/pulsar-client-cpp/include/pulsar/Result.h +++ b/pulsar-client-cpp/include/pulsar/Result.h @@ -80,6 +80,8 @@ enum Result /// consumer ResultCumulativeAcknowledgementNotAllowedError, /// Not allowed to call cumulativeAcknowledgement in /// Shared and Key_Shared subscription mode + ResultTransactionCoordinatorNotFoundError, /// Transaction coordinator not found + ResultInvalidTxnStatusError, /// Invalid txn status error }; // Return string representation of result code diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 689d3c1644d96..384789aea96fc 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -111,6 +111,12 @@ static Result getResult(ServerError serverError) { case ConsumerAssignError: return ResultConsumerAssignError; + + case TransactionCoordinatorNotFound: + return ResultTransactionCoordinatorNotFoundError; + + case InvalidTxnStatus: + return ResultInvalidTxnStatusError; } // NOTE : Do not add default case in the switch above. In future if we get new cases for // ServerError and miss them in the switch above we would like to get notified. Adding diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc index d041b9a6e597a..c57003cc9e878 100644 --- a/pulsar-client-cpp/lib/Result.cc +++ b/pulsar-client-cpp/lib/Result.cc @@ -138,6 +138,12 @@ const char* pulsar::strResult(Result result) { case ResultCumulativeAcknowledgementNotAllowedError: return "ResultCumulativeAcknowledgementNotAllowedError"; + + case ResultTransactionCoordinatorNotFoundError: + return "ResultTransactionCoordinatorNotFoundError"; + + case ResultInvalidTxnStatusError: + return "ResultInvalidTxnStatusError"; }; // NOTE : Do not add default case in the switch above. In future if we get new cases for // ServerError and miss them in the switch above we would like to get notified. Adding diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 82d01de10318a..4fb37c37257ff 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -60,6 +60,7 @@ import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; @@ -113,6 +114,7 @@ public class ClientCnx extends PulsarHandler { private final ConcurrentLongHashMap> producers = new ConcurrentLongHashMap<>(16, 1); private final ConcurrentLongHashMap> consumers = new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1); private final CompletableFuture connectionFuture = new CompletableFuture(); private final ConcurrentLinkedQueue requestTimeoutQueue = new ConcurrentLinkedQueue<>(); @@ -239,6 +241,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { // Notify all attached producers/consumers so they have a chance to reconnect producers.forEach((id, producer) -> producer.connectionClosed(this)); consumers.forEach((id, consumer) -> consumer.connectionClosed(this)); + transactionMetaStoreHandlers.forEach((id, handler) -> handler.connectionClosed(this)); pendingRequests.clear(); pendingLookupRequests.clear(); @@ -734,7 +737,7 @@ Promise newPromise() { return ctx.newPromise(); } - ChannelHandlerContext ctx() { + public ChannelHandlerContext ctx() { return ctx; } @@ -842,6 +845,39 @@ public CompletableFuture sendGetOrCreateSchema(ByteBuf request, long req }); } + @Override + protected void handleNewTxnResponse(PulsarApi.CommandNewTxnResponse command) { + TransactionMetaStoreHandler handler = checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits()); + if (handler != null) { + handler.handleNewTxnResponse(command); + } + } + + @Override + protected void handleAddPartitionToTxnResponse(PulsarApi.CommandAddPartitionToTxnResponse command) { + TransactionMetaStoreHandler handler = checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits()); + if (handler != null) { + handler.handleAddPublishPartitionToTxnResponse(command); + } + } + + @Override + protected void handleEndTxnResponse(PulsarApi.CommandEndTxnResponse command) { + TransactionMetaStoreHandler handler = checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits()); + if (handler != null) { + handler.handleEndTxnResponse(command); + } + } + + private TransactionMetaStoreHandler checkAndGetTransactionMetaStoreHandler(long tcId) { + TransactionMetaStoreHandler handler = transactionMetaStoreHandlers.get(tcId); + if (handler == null) { + channel().close(); + log.warn("Close the channel since can't get the transaction meta store handler, will reconnect later."); + } + return handler; + } + /** * check serverError and take appropriate action *

    @@ -912,6 +948,10 @@ void registerProducer(final long producerId, final ProducerImpl producer) { producers.put(producerId, producer); } + void registerTransactionMetaStoreHandler(final long transactionMetaStoreId, final TransactionMetaStoreHandler handler) { + transactionMetaStoreHandlers.put(transactionMetaStoreId, handler); + } + void removeProducer(final long producerId) { producers.remove(producerId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java new file mode 100644 index 0000000000000..7a99fe1658691 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; + +import org.apache.pulsar.client.impl.ClientCnx.RequestTime; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * Handler for transaction meta store. + */ +public class TransactionMetaStoreHandler extends HandlerState implements ConnectionHandler.Connection, Closeable, TimerTask { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionMetaStoreHandler.class); + + private final long transactionCoordinatorId; + private ConnectionHandler connectionHandler; + private final ConcurrentLongHashMap> pendingRequests = + new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLinkedQueue timeoutQueue; + + private final boolean blockIfReachMaxPendingOps; + private final Semaphore semaphore; + + private Timeout requestTimeout; + + public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic) { + super(pulsarClient, topic); + this.transactionCoordinatorId = transactionCoordinatorId; + this.timeoutQueue = new ConcurrentLinkedQueue<>(); + this.blockIfReachMaxPendingOps = true; + this.semaphore = new Semaphore(1000); + this.requestTimeout = pulsarClient.timer().newTimeout(this, pulsarClient.getConfiguration().getOperationTimeoutMs(), TimeUnit.MILLISECONDS); + this.connectionHandler = new ConnectionHandler( + this, + new BackoffBuilder() + .setInitialTime(pulsarClient.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS) + .setMax(pulsarClient.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) + .setMandatoryStop(100, TimeUnit.MILLISECONDS) + .create(), + this); + this.connectionHandler.grabCnx(); + } + + @Override + public void connectionFailed(PulsarClientException exception) { + LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", + transactionCoordinatorId, exception); + setState(State.Failed); + } + + @Override + public void connectionOpened(ClientCnx cnx) { + LOG.info("Transaction meta handler with transaction coordinator id {} connection opened.", + transactionCoordinatorId); + connectionHandler.setClientCnx(cnx); + cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this); + if (!changeToReadyState()) { + cnx.channel().close(); + } + } + + public CompletableFuture newTransactionAsync(long timeout, TimeUnit unit) { + if (LOG.isDebugEnabled()) { + LOG.debug("New transaction with timeout in ms {}", unit.toMillis(timeout)); + } + CompletableFuture callback = new CompletableFuture<>(); + + if (!canSendRequest(callback)) { + return callback; + } + long requestId = client.newRequestId(); + ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout)); + OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback); + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cmd.retain(); + cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise()); + return callback; + } + + void handleNewTxnResponse(PulsarApi.CommandNewTxnResponse response) { + OpForTxnIdCallBack op = (OpForTxnIdCallBack) pendingRequests.remove(response.getRequestId()); + if (op == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got new txn response for timeout {} - {}", response.getTxnidMostBits(), + response.getTxnidLeastBits()); + } + return; + } + if (!response.hasError()) { + TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()); + if (LOG.isDebugEnabled()) { + LOG.debug("Got new txn response {} for request {}", txnID, response.getRequestId()); + } + op.callback.complete(txnID); + } else { + LOG.error("Got new txn for request {} error {}", response.getRequestId(), response.getError()); + op.callback.completeExceptionally(getExceptionByServerError(response.getError(), response.getMessage())); + } + + onResponse(op); + } + + public CompletableFuture addPublishPartitionToTxnAsync(TxnID txnID, List partitions) { + if (LOG.isDebugEnabled()) { + LOG.debug("Add publish partition to txn request with txnId, with partitions", txnID, partitions); + } + CompletableFuture callback = new CompletableFuture<>(); + + if (!canSendRequest(callback)) { + return callback; + } + long requestId = client.newRequestId(); + ByteBuf cmd = Commands.newAddPartitionToTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()); + OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback); + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cmd.retain(); + cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise()); + return callback; + } + + void handleAddPublishPartitionToTxnResponse(PulsarApi.CommandAddPartitionToTxnResponse response) { + OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId()); + if (op == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got add publish partition to txn response for timeout {} - {}", response.getTxnidMostBits(), + response.getTxnidLeastBits()); + } + return; + } + if (!response.hasError()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Add publish partition for request {} success.", response.getRequestId()); + } + op.callback.complete(null); + } else { + LOG.error("Add publish partition for request {} error {}.", response.getRequestId(), response.getError()); + op.callback.completeExceptionally(getExceptionByServerError(response.getError(), response.getMessage())); + } + + onResponse(op); + } + + public CompletableFuture commitAsync(TxnID txnID) { + if (LOG.isDebugEnabled()) { + LOG.debug("Commit txn {}", txnID); + } + CompletableFuture callback = new CompletableFuture<>(); + + if (!canSendRequest(callback)) { + return callback; + } + long requestId = client.newRequestId(); + ByteBuf cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), PulsarApi.TxnAction.COMMIT); + OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback); + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cmd.retain(); + cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise()); + return callback; + } + + public CompletableFuture abortAsync(TxnID txnID) { + if (LOG.isDebugEnabled()) { + LOG.debug("Abort txn {}", txnID); + } + CompletableFuture callback = new CompletableFuture<>(); + + if (!canSendRequest(callback)) { + return callback; + } + long requestId = client.newRequestId(); + ByteBuf cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), PulsarApi.TxnAction.ABORT); + OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback); + pendingRequests.put(requestId, op); + timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); + cmd.retain(); + cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise()); + return callback; + } + + void handleEndTxnResponse(PulsarApi.CommandEndTxnResponse response) { + OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId()); + if (op == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got end txn response for timeout {} - {}", response.getTxnidMostBits(), + response.getTxnidLeastBits()); + } + return; + } + if (!response.hasError()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got end txn response success for request {}", response.getRequestId()); + } + op.callback.complete(null); + } else { + LOG.error("Got end txn response for request {} error {}", response.getRequestId(), response.getError()); + op.callback.completeExceptionally(getExceptionByServerError(response.getError(), response.getMessage())); + } + + onResponse(op); + } + + private static abstract class OpBase { + protected ByteBuf cmd; + protected CompletableFuture callback; + + abstract void recycle(); + } + + private static class OpForTxnIdCallBack extends OpBase { + + static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture callback) { + OpForTxnIdCallBack op = RECYCLER.get(); + op.callback = callback; + op.cmd = cmd; + return op; + } + + private OpForTxnIdCallBack(Recycler.Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + @Override + void recycle() { + recyclerHandle.recycle(this); + } + + private final Recycler.Handle recyclerHandle; + private static final Recycler RECYCLER = new Recycler() { + @Override + protected OpForTxnIdCallBack newObject(Handle handle) { + return new OpForTxnIdCallBack(handle); + } + }; + } + + private static class OpForVoidCallBack extends OpBase { + + static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture callback) { + OpForVoidCallBack op = RECYCLER.get(); + op.callback = callback; + op.cmd = cmd; + return op; + } + private OpForVoidCallBack(Recycler.Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + @Override + void recycle() { + recyclerHandle.recycle(this); + } + + private final Recycler.Handle recyclerHandle; + private static final Recycler RECYCLER = new Recycler() { + @Override + protected OpForVoidCallBack newObject(Handle handle) { + return new OpForVoidCallBack(handle); + } + }; + } + + private TransactionCoordinatorClientException getExceptionByServerError(PulsarApi.ServerError serverError, String msg) { + switch (serverError) { + case TransactionCoordinatorNotFound: + return new TransactionCoordinatorClientException.CoordinatorNotFoundException(msg); + case InvalidTxnStatus: + return new TransactionCoordinatorClientException.InvalidTxnStatusException(msg); + default: + return new TransactionCoordinatorClientException(msg); + } + } + + private void onResponse(OpBase op) { + ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); + semaphore.release(); + } + + private boolean canSendRequest(CompletableFuture callback) { + if (!isValidHandlerState(callback)) { + return false; + } + try { + if (blockIfReachMaxPendingOps) { + semaphore.acquire(); + } else { + if (!semaphore.tryAcquire()) { + callback.completeExceptionally(new TransactionCoordinatorClientException("Reach max pending ops.")); + return false; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + callback.completeExceptionally(TransactionCoordinatorClientException.unwrap(e)); + return false; + } + return true; + } + + private boolean isValidHandlerState(CompletableFuture callback) { + switch (getState()) { + case Ready: + return true; + case Connecting: + callback.completeExceptionally( + new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( + "Transaction meta store handler for tcId " + + transactionCoordinatorId + + " is connecting now, please try later.")); + return false; + case Closing: + case Closed: + callback.completeExceptionally( + new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( + "Transaction meta store handler for tcId " + + transactionCoordinatorId + + " is closing or closed.")); + return false; + case Failed: + case Uninitialized: + callback.completeExceptionally( + new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( + "Transaction meta store handler for tcId " + + transactionCoordinatorId + + " not connected.")); + return false; + default: + callback.completeExceptionally( + new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( + transactionCoordinatorId)); + return false; + } + } + + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled()) { + return; + } + long timeToWaitMs; + if (getState() == State.Closing || getState() == State.Closed) { + return; + } + RequestTime peeked = timeoutQueue.peek(); + while (peeked != null && peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs() + - System.currentTimeMillis() <= 0) { + RequestTime lastPolled = timeoutQueue.poll(); + if (lastPolled != null) { + OpBase op = pendingRequests.remove(lastPolled.requestId); + if (!op.callback.isDone()) { + op.callback.completeExceptionally(new PulsarClientException.TimeoutException( + "Could not get response from transaction meta store within given timeout.")); + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction coordinator request {} is timeout.", lastPolled.requestId); + } + onResponse(op); + } + } else { + break; + } + peeked = timeoutQueue.peek(); + } + + if (peeked == null) { + timeToWaitMs = client.getConfiguration().getOperationTimeoutMs(); + } else { + long diff = (peeked.creationTimeMs + client.getConfiguration().getOperationTimeoutMs()) - System.currentTimeMillis(); + if (diff <= 0) { + timeToWaitMs = client.getConfiguration().getOperationTimeoutMs(); + } else { + timeToWaitMs = diff; + } + } + requestTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS); + } + + private ClientCnx cnx() { + return this.connectionHandler.cnx(); + } + + void connectionClosed(ClientCnx cnx) { + this.connectionHandler.connectionClosed(cnx); + } + + @Override + public void close() throws IOException { + } + + @Override + public String getHandlerName() { + return "Transaction meta store handler [" + transactionCoordinatorId + "]"; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java similarity index 95% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClient.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java index da836a44adfda..cd1a6758c59e2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java @@ -21,9 +21,9 @@ import java.util.concurrent.CompletableFuture; /** - * The transaction coordinator client to commit and abort transactions on topics. + * The transaction buffer client to commit and abort transactions on topics. */ -public interface TransactionCoordinatorClient { +public interface TransactionBufferClient { /** * Commit the transaction associated with the topic. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java new file mode 100644 index 0000000000000..fa0941f9eb7e7 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.transaction; + +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.util.FutureUtil; + +/** + * The implementation of {@link TransactionBufferClient}. + */ +public class TransactionBufferClientImpl implements TransactionBufferClient { + + private final PulsarClientImpl client; + + public TransactionBufferClientImpl(PulsarClientImpl client) { + this.client = client; + } + + @Override + public CompletableFuture commitTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits) { + return FutureUtil.failedFuture( + new UnsupportedOperationException("Not Implemented Yet")); + } + + @Override + public CompletableFuture abortTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits) { + return FutureUtil.failedFuture( + new UnsupportedOperationException("Not Implemented Yet")); + } + + @Override + public CompletableFuture commitTxnOnSubscription(String topic, String subscription, long txnIdMostBits, long txnIdLeastBits) { + return FutureUtil.failedFuture( + new UnsupportedOperationException("Not Implemented Yet")); + } + + @Override + public CompletableFuture abortTxnOnSubscription(String topic, String subscription, long txnIdMostBits, long txnIdLeastBits) { + return FutureUtil.failedFuture( + new UnsupportedOperationException("Not Implemented Yet")); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 4ddd1aa2597ec..f6fffaa4346b4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -18,42 +18,209 @@ */ package org.apache.pulsar.client.impl.transaction; -import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorClientStateException; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.TransactionMetaStoreHandler; +import org.apache.pulsar.client.util.MathUtils; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** - * The implementation of {@link TransactionCoordinatorClient}. + * Transaction coordinator client based topic assigned. */ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorClient { - private final PulsarClientImpl client; + private static final Logger LOG = LoggerFactory.getLogger(TransactionCoordinatorClientImpl.class); + + private final PulsarClientImpl pulsarClient; + private TransactionMetaStoreHandler[] handlers; + private ConcurrentLongHashMap handlerMap = new ConcurrentLongHashMap<>(16, 1); + private final AtomicLong epoch = new AtomicLong(0); + + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(TransactionCoordinatorClientImpl.class, State.class, "state"); + private volatile State state = State.NONE; + + public TransactionCoordinatorClientImpl(PulsarClient pulsarClient) { + this.pulsarClient = (PulsarClientImpl) pulsarClient; + } + + @Override + public void start() throws TransactionCoordinatorClientException { + try { + startAsync().get(); + } catch (Exception e) { + throw TransactionCoordinatorClientException.unwrap(e); + } + } + + @Override + public CompletableFuture startAsync() { + if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { + return pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN) + .thenAccept(partitionMeta -> { + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions); + } + if (partitionMeta.partitions > 0) { + handlers = new TransactionMetaStoreHandler[partitionMeta.partitions]; + for (int i = 0; i < partitionMeta.partitions; i++) { + TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(i, pulsarClient, + TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + i); + handlers[i] = handler; + handlerMap.put(i, handler); + } + } else { + handlers = new TransactionMetaStoreHandler[1]; + TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(0, pulsarClient, + TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + handlers[0] = handler; + handlerMap.put(0, handler); + } + + STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY); + + }); + } else { + return FutureUtil.failedFuture(new CoordinatorClientStateException("Can not start while current state is " + state)); + } + } + + @Override + public void close() throws TransactionCoordinatorClientException { + try { + closeAsync().get(); + } catch (Exception e) { + throw TransactionCoordinatorClientException.unwrap(e); + } + } + + @Override + public CompletableFuture closeAsync() { + CompletableFuture result = new CompletableFuture<>(); + if (getState() == State.CLOSING || getState() == State.CLOSED) { + LOG.warn("The transaction meta store is closing or closed, doing nothing."); + result.complete(null); + } else { + for (TransactionMetaStoreHandler handler : handlers) { + try { + handler.close(); + } catch (IOException e) { + LOG.warn("Close transaction meta store handler error", e); + } + } + this.handlers = null; + result.complete(null); + } + return result; + } + + @Override + public TxnID newTransaction() throws TransactionCoordinatorClientException { + try { + return newTransactionAsync().get(); + } catch (Exception e) { + throw TransactionCoordinatorClientException.unwrap(e); + } + } - public TransactionCoordinatorClientImpl(PulsarClientImpl client) { - this.client = client; + @Override + public CompletableFuture newTransactionAsync() { + return newTransactionAsync(DEFAULT_TXN_TTL_MS, TimeUnit.MILLISECONDS); } @Override - public CompletableFuture commitTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits) { - return FutureUtil.failedFuture( - new UnsupportedOperationException("Not Implemented Yet")); + public TxnID newTransaction(long timeout, TimeUnit unit) throws TransactionCoordinatorClientException { + try { + return newTransactionAsync(timeout, unit).get(); + } catch (Exception e) { + throw TransactionCoordinatorClientException.unwrap(e); + } } @Override - public CompletableFuture abortTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits) { - return FutureUtil.failedFuture( - new UnsupportedOperationException("Not Implemented Yet")); + public CompletableFuture newTransactionAsync(long timeout, TimeUnit unit) { + return nextHandler().newTransactionAsync(timeout, unit); } @Override - public CompletableFuture commitTxnOnSubscription(String topic, String subscription, long txnIdMostBits, long txnIdLeastBits) { - return FutureUtil.failedFuture( - new UnsupportedOperationException("Not Implemented Yet")); + public void addPublishPartitionToTxn(TxnID txnID, List partitions) throws TransactionCoordinatorClientException { + try { + addPublishPartitionToTxnAsync(txnID, partitions).get(); + } catch (Exception e) { + throw TransactionCoordinatorClientException.unwrap(e); + } } @Override - public CompletableFuture abortTxnOnSubscription(String topic, String subscription, long txnIdMostBits, long txnIdLeastBits) { - return FutureUtil.failedFuture( - new UnsupportedOperationException("Not Implemented Yet")); + public CompletableFuture addPublishPartitionToTxnAsync(TxnID txnID, List partitions) { + TransactionMetaStoreHandler handler = handlerMap.get(txnID.getMostSigBits()); + if (handler == null) { + return FutureUtil.failedFuture( + new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(txnID.getMostSigBits())); + } + return handler.addPublishPartitionToTxnAsync(txnID, partitions); + } + + @Override + public void commit(TxnID txnID) throws TransactionCoordinatorClientException { + try { + commitAsync(txnID).get(); + } catch (Exception e) { + throw TransactionCoordinatorClientException.unwrap(e); + } + } + + @Override + public CompletableFuture commitAsync(TxnID txnID) { + TransactionMetaStoreHandler handler = handlerMap.get(txnID.getMostSigBits()); + if (handler == null) { + return FutureUtil.failedFuture( + new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(txnID.getMostSigBits())); + } + return handler.commitAsync(txnID); + } + + @Override + public void abort(TxnID txnID) throws TransactionCoordinatorClientException { + try { + abortAsync(txnID).get(); + } catch (Exception e) { + throw TransactionCoordinatorClientException.unwrap(e); + } + } + + @Override + public CompletableFuture abortAsync(TxnID txnID) { + TransactionMetaStoreHandler handler = handlerMap.get(txnID.getMostSigBits()); + if (handler == null) { + return FutureUtil.failedFuture( + new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(txnID.getMostSigBits())); + } + return handler.abortAsync(txnID); + } + + @Override + public State getState() { + return state; + } + + private TransactionMetaStoreHandler nextHandler() { + int index = MathUtils.signSafeMod(epoch.incrementAndGet(), handlers.length); + return handlers[index]; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index ead24136b2d27..1e9ffd608fa6d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -80,6 +80,8 @@ public enum ServerError InvalidTopicName(17, 17), IncompatibleSchema(18, 18), ConsumerAssignError(19, 19), + TransactionCoordinatorNotFound(20, 20), + InvalidTxnStatus(21, 21), ; public static final int UnknownError_VALUE = 0; @@ -102,6 +104,8 @@ public enum ServerError public static final int InvalidTopicName_VALUE = 17; public static final int IncompatibleSchema_VALUE = 18; public static final int ConsumerAssignError_VALUE = 19; + public static final int TransactionCoordinatorNotFound_VALUE = 20; + public static final int InvalidTxnStatus_VALUE = 21; public final int getNumber() { return value; } @@ -128,6 +132,8 @@ public static ServerError valueOf(int value) { case 17: return InvalidTopicName; case 18: return IncompatibleSchema; case 19: return ConsumerAssignError; + case 20: return TransactionCoordinatorNotFound; + case 21: return InvalidTxnStatus; default: return null; } } @@ -30314,6 +30320,10 @@ public interface CommandNewTxnOrBuilder // optional uint64 txn_ttl_seconds = 2 [default = 0]; boolean hasTxnTtlSeconds(); long getTxnTtlSeconds(); + + // optional uint64 tc_id = 3 [default = 0]; + boolean hasTcId(); + long getTcId(); } public static final class CommandNewTxn extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -30370,9 +30380,20 @@ public long getTxnTtlSeconds() { return txnTtlSeconds_; } + // optional uint64 tc_id = 3 [default = 0]; + public static final int TC_ID_FIELD_NUMBER = 3; + private long tcId_; + public boolean hasTcId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getTcId() { + return tcId_; + } + private void initFields() { requestId_ = 0L; txnTtlSeconds_ = 0L; + tcId_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -30401,6 +30422,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeUInt64(2, txnTtlSeconds_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeUInt64(3, tcId_); + } } private int memoizedSerializedSize = -1; @@ -30417,6 +30441,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeUInt64Size(2, txnTtlSeconds_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(3, tcId_); + } memoizedSerializedSize = size; return size; } @@ -30534,6 +30562,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); txnTtlSeconds_ = 0L; bitField0_ = (bitField0_ & ~0x00000002); + tcId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -30575,6 +30605,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn buildPartial() to_bitField0_ |= 0x00000002; } result.txnTtlSeconds_ = txnTtlSeconds_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.tcId_ = tcId_; result.bitField0_ = to_bitField0_; return result; } @@ -30587,6 +30621,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandNew if (other.hasTxnTtlSeconds()) { setTxnTtlSeconds(other.getTxnTtlSeconds()); } + if (other.hasTcId()) { + setTcId(other.getTcId()); + } return this; } @@ -30630,6 +30667,11 @@ public Builder mergeFrom( txnTtlSeconds_ = input.readUInt64(); break; } + case 24: { + bitField0_ |= 0x00000004; + tcId_ = input.readUInt64(); + break; + } } } } @@ -30678,6 +30720,27 @@ public Builder clearTxnTtlSeconds() { return this; } + // optional uint64 tc_id = 3 [default = 0]; + private long tcId_ ; + public boolean hasTcId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getTcId() { + return tcId_; + } + public Builder setTcId(long value) { + bitField0_ |= 0x00000004; + tcId_ = value; + + return this; + } + public Builder clearTcId() { + bitField0_ = (bitField0_ & ~0x00000004); + tcId_ = 0L; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandNewTxn) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 640c324ef0d48..5b8c91321279c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1182,9 +1182,10 @@ public static ByteBuf newGetOrCreateSchemaResponseError(long requestId, ServerEr // ---- transaction related ---- - public static ByteBuf newTxn(long requestId, long ttlSeconds) { - CommandNewTxn commandNewTxn = CommandNewTxn.newBuilder().setRequestId(requestId).setTxnTtlSeconds(ttlSeconds) - .build(); + public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) { + CommandNewTxn commandNewTxn = CommandNewTxn.newBuilder().setTcId(tcId).setRequestId(requestId) + .setTxnTtlSeconds(ttlSeconds) + .build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.NEW_TXN).setNewTxn(commandNewTxn)); commandNewTxn.recycle(); return res; @@ -1201,9 +1202,10 @@ public static ByteBuf newTxnResponse(long requestId, long txnIdLeastBits, long t return res; } - public static ByteBuf newTxnResponse(long requestId, ServerError error, String errorMsg) { + public static ByteBuf newTxnResponse(long requestId, long txnIdMostBits, ServerError error, String errorMsg) { CommandNewTxnResponse.Builder builder = CommandNewTxnResponse.newBuilder(); builder.setRequestId(requestId); + builder.setTxnidMostBits(txnIdMostBits); builder.setError(error); if (errorMsg != null) { builder.setMessage(errorMsg); @@ -1241,9 +1243,11 @@ public static ByteBuf newAddPartitionToTxnResponse(long requestId, long txnIdLea return res; } - public static ByteBuf newAddPartitionToTxnResponse(long requestId, ServerError error, String errorMsg) { + public static ByteBuf newAddPartitionToTxnResponse(long requestId, long txnIdMostBits, ServerError error, + String errorMsg) { CommandAddPartitionToTxnResponse.Builder builder = CommandAddPartitionToTxnResponse.newBuilder(); builder.setRequestId(requestId); + builder.setTxnidMostBits(txnIdMostBits); builder.setError(error); if (errorMsg != null) { builder.setMessage(errorMsg); @@ -1270,9 +1274,23 @@ public static ByteBuf newAddSubscriptionToTxn(long requestId, long txnIdLeastBit return res; } - public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, ServerError error, String errorMsg) { + public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnIdLeastBits, long txnIdMostBits) { + CommandAddSubscriptionToTxnResponse command = CommandAddSubscriptionToTxnResponse + .newBuilder().setRequestId(requestId) + .setTxnidLeastBits(txnIdLeastBits) + .setTxnidMostBits(txnIdMostBits) + .build(); + ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ADD_SUBSCRIPTION_TO_TXN_RESPONSE) + .setAddSubscriptionToTxnResponse(command)); + command.recycle(); + return res; + } + + public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnIdMostBits, ServerError error, + String errorMsg) { CommandAddSubscriptionToTxnResponse.Builder builder = CommandAddSubscriptionToTxnResponse.newBuilder(); builder.setRequestId(requestId); + builder.setTxnidMostBits(txnIdMostBits); builder.setError(error); if (errorMsg != null) { builder.setMessage(errorMsg); @@ -1304,9 +1322,10 @@ public static ByteBuf newEndTxnResponse(long requestId, long txnIdLeastBits, lon return res; } - public static ByteBuf newEndTxnResponse(long requestId, ServerError error, String errorMsg) { + public static ByteBuf newEndTxnResponse(long requestId, long txnIdMostBits, ServerError error, String errorMsg) { CommandEndTxnResponse.Builder builder = CommandEndTxnResponse.newBuilder(); builder.setRequestId(requestId); + builder.setTxnidMostBits(txnIdMostBits); builder.setError(error); if (errorMsg != null) { builder.setMessage(errorMsg); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index f0f44177b7c33..cfba5bbd5c73c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -374,7 +374,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case ADD_SUBSCRIPTION_TO_TXN_RESPONSE: checkArgument(cmd.hasAddSubscriptionToTxnResponse()); - handleAddSubsciptionToTxnResponse(cmd.getAddSubscriptionToTxnResponse()); + handleAddSubscriptionToTxnResponse(cmd.getAddSubscriptionToTxnResponse()); cmd.getAddSubscriptionToTxnResponse().recycle(); break; @@ -410,7 +410,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case END_TXN_ON_SUBSCRIPTION_RESPONSE: checkArgument(cmd.hasEndTxnOnSubscriptionResponse()); - handleEndTxnOnsubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse()); + handleEndTxnOnSubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse()); cmd.getEndTxnOnSubscriptionResponse().recycle(); break; } @@ -600,7 +600,7 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn commandAdd throw new UnsupportedOperationException(); } - protected void handleAddSubsciptionToTxnResponse( + protected void handleAddSubscriptionToTxnResponse( CommandAddSubscriptionToTxnResponse commandAddSubscriptionToTxnResponse) { throw new UnsupportedOperationException(); } @@ -625,7 +625,7 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription commandEnd throw new UnsupportedOperationException(); } - protected void handleEndTxnOnsubscriptionResponse( + protected void handleEndTxnOnSubscriptionResponse( CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) { throw new UnsupportedOperationException(); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index d2122026c9624..8a44fca60875a 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -178,6 +178,9 @@ enum ServerError { IncompatibleSchema = 18; // Specified schema was incompatible with topic schema ConsumerAssignError = 19; // Dispatcher assign consumer error + + TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error + InvalidTxnStatus = 21; // Invalid txn status error } enum AuthMethod { @@ -679,6 +682,7 @@ enum TxnAction { message CommandNewTxn { required uint64 request_id = 1; optional uint64 txn_ttl_seconds = 2 [default = 0]; + optional uint64 tc_id = 3 [default = 0]; } message CommandNewTxnResponse { diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index 6e2a008aa7950..601dba5b50f92 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -459,6 +459,7 @@ Protocol Buffers License * Protocol Buffers - protobuf-shaded-2.1.0-incubating.jar - protobuf-java-3.5.1.jar + - protobuf-java-util-3.5.1.jar BSD 3-clause "New" or "Revised" License * RE2J TD -- re2j-td-1.4.jar diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java index 39d92aaa70ff3..b8bf61da2dc22 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java @@ -20,7 +20,7 @@ import com.google.common.annotations.Beta; import java.util.List; -import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; import org.apache.pulsar.transaction.impl.common.TxnID; import org.apache.pulsar.transaction.impl.common.TxnStatus; diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java index 54dcddae6bcd1..04ff7d3b7d44a 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java @@ -19,6 +19,8 @@ package org.apache.pulsar.transaction.coordinator.exceptions; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.impl.common.TxnID; +import org.apache.pulsar.transaction.impl.common.TxnStatus; /** * The base exception for exceptions thrown from coordinator. @@ -39,23 +41,55 @@ public CoordinatorException(Throwable cause) { } /** - * Transaction coordinator not found exception. + * Exception is thrown when transaction coordinator not found. */ - public static class NotFoundException extends CoordinatorException { + public static class CoordinatorNotFoundException extends CoordinatorException { - public NotFoundException(TransactionCoordinatorID tcId) { + public CoordinatorNotFoundException(String msg) { + super(msg); + } + + public CoordinatorNotFoundException(TransactionCoordinatorID tcId) { super(String.format("Transaction coordinator with id %s not found!", tcId.getId())); } + } + + /** + * Exception is thrown when transaction is not in the right status. + */ + public static class InvalidTxnStatusException extends CoordinatorException { + + private static final long serialVersionUID = 0L; + + public InvalidTxnStatusException(String message) { + super(message); + } + + public InvalidTxnStatusException(TxnID txnID, + TxnStatus expectedStatus, + TxnStatus actualStatus) { + super("Expect Txn `" + txnID + "` to be in " + expectedStatus + + " status but it is in " + actualStatus + " status"); + + } + } + + /** + * Exception is thrown when a transaction is not found in coordinator. + */ + public static class TransactionNotFoundException extends CoordinatorException { + + private static final long serialVersionUID = 0L; - public NotFoundException(String message) { + public TransactionNotFoundException(String message) { super(message); } - public NotFoundException(String message, Throwable cause) { + public TransactionNotFoundException(String message, Throwable cause) { super(message, cause); } - public NotFoundException(Throwable cause) { + public TransactionNotFoundException(Throwable cause) { super(cause); } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java deleted file mode 100644 index 489b55072b5ee..0000000000000 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.transaction.coordinator.exceptions; - -import org.apache.pulsar.transaction.impl.common.TxnID; -import org.apache.pulsar.transaction.impl.common.TxnStatus; - -/** - * Exception is thrown when transaction is not in the right status. - */ -public class InvalidTxnStatusException extends CoordinatorException { - - private static final long serialVersionUID = 0L; - - public InvalidTxnStatusException(String message) { - super(message); - } - - public InvalidTxnStatusException(TxnID txnID, - TxnStatus expectedStatus, - TxnStatus actualStatus) { - super( - "Expect Txn `" + txnID + "` to be in " + expectedStatus - + " status but it is in " + actualStatus + " status"); - - } - -} diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java deleted file mode 100644 index 86f032195ad45..0000000000000 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.transaction.coordinator.exceptions; - -/** - * Exception is thrown when a transaction is not found in coordinator. - */ -public class TransactionNotFoundException extends CoordinatorException { - - private static final long serialVersionUID = 0L; - - public TransactionNotFoundException(String message) { - super(message); - } - - public TransactionNotFoundException(String message, Throwable cause) { - super(message, cause); - } - - public TransactionNotFoundException(Throwable cause) { - super(cause); - } -} diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index 8d6d7965204e3..c86e7bfd7e83c 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -26,8 +26,8 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TxnMeta; -import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; -import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException; import org.apache.pulsar.transaction.impl.common.TxnID; import org.apache.pulsar.transaction.impl.common.TxnStatus; diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java index f05af362318ec..591101eefbeb8 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java @@ -24,8 +24,7 @@ import java.util.List; import java.util.Set; import org.apache.pulsar.transaction.coordinator.TxnMeta; -import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; -import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; import org.apache.pulsar.transaction.impl.common.TxnID; import org.apache.pulsar.transaction.impl.common.TxnStatus; @@ -100,7 +99,7 @@ private synchronized void checkTxnStatus(TxnStatus expectedStatus) throws Invali * * @param partitions the list of partitions that the txn produces to * @return the transaction itself. - * @throws CoordinatorException + * @throws InvalidTxnStatusException */ @Override public synchronized TxnMetaImpl addProducedPartitions(List partitions) throws InvalidTxnStatusException { @@ -115,7 +114,7 @@ public synchronized TxnMetaImpl addProducedPartitions(List partitions) t * * @param partitions the list of partitions that the txn acknowledges to * @return the transaction itself. - * @throws CoordinatorException + * @throws InvalidTxnStatusException */ @Override public synchronized TxnMetaImpl addAckedPartitions(List partitions) throws InvalidTxnStatusException { diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java index 4515701792985..2149768b1a31e 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java @@ -25,10 +25,11 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; + +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException; import org.apache.pulsar.transaction.impl.common.TxnID; import org.apache.pulsar.transaction.impl.common.TxnStatus; -import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException; -import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException; import org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; diff --git a/src/check-binary-license b/src/check-binary-license index a5adb7df303b0..dac370a958fc4 100755 --- a/src/check-binary-license +++ b/src/check-binary-license @@ -87,7 +87,7 @@ for J in $NOTICEJARS; do done # check pulsar sql jars -JARS=$(tar -tf $TARBALL | grep '\.jar' | grep 'lib/presto/' | grep -v 'managed-ledger' | grep -v 'pulsar-client-admin' | grep -v 'pulsar-client-api' | grep -v 'pulsar-functions-api' | grep -v 'pulsar-presto-connector-original' | grep -v 'pulsar-presto-distribution' | grep -v 'pulsar-common' | grep -v 'pulsar-functions-proto' | grep -v 'pulsar-functions-utils' | grep -v 'pulsar-io-core' | sed 's!.*/!!' | sort) +JARS=$(tar -tf $TARBALL | grep '\.jar' | grep 'lib/presto/' | grep -v 'managed-ledger' | grep -v 'pulsar-client-admin' | grep -v 'pulsar-client-api' | grep -v 'pulsar-functions-api' | grep -v 'pulsar-presto-connector-original' | grep -v 'pulsar-presto-distribution' | grep -v 'pulsar-common' | grep -v 'pulsar-functions-proto' | grep -v 'pulsar-functions-utils' | grep -v 'pulsar-io-core' | grep -v 'pulsar-transaction-common' | sed 's!.*/!!' | sort) LICENSEPATH=$(tar -tf $TARBALL | awk '/^[^\/]*\/lib\/presto\/LICENSE/') LICENSE=$(tar -O -xf $TARBALL "$LICENSEPATH") LICENSEJARS=$(echo "$LICENSE" | sed -nE 's!.* (.*\.jar).*!\1!gp')