Skip to content

Commit

Permalink
[transaction-coordinator] Implementation of transaction coordinator c…
Browse files Browse the repository at this point in the history
…lient. (apache#5680)

### Motivation

Implement transaction coordinator client.

### Modifications

Add transaction coordinator client.
Add transaction meta store handler to handle meta store request and response.
  • Loading branch information
codelipenghui authored and sijie committed Dec 9, 2019
1 parent 9aaf631 commit 5504dc6
Show file tree
Hide file tree
Showing 35 changed files with 1,585 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,14 +400,6 @@ public void start() throws PulsarServerException {

this.offloader = createManagedLedgerOffloader(this.getConfiguration());

if (StringUtils.isNotBlank(config.getTransactionMetadataStoreProviderClassName())) {
transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this);
} else {
transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(InMemTransactionMetadataStoreProvider.class.getName()), this);
}

brokerService.start();

this.webService = new WebService(this);
Expand Down Expand Up @@ -483,9 +475,11 @@ public Boolean get() {
// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();

// Register pulsar system namespaces
// Register pulsar system namespaces and start transaction meta store service
if (config.isTransactionCoordinatorEnabled()) {
this.nsService.registerNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), false);
transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this);
transactionMetadataStoreService.start();
}

this.metricsGenerator = new MetricsGenerator(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,29 +55,80 @@ public TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
}

public void start() {
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
@Override
public void onLoad(NamespaceBundle bundle) {
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
.whenComplete((topics, ex) -> {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
&& name.isPartitioned()) {
addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
}
}
} else {
LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.", bundle, ex);
}
});
}
@Override
public void unLoad(NamespaceBundle bundle) {
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
.whenComplete((topics, ex) -> {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
&& name.isPartitioned()) {
removeTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
}
}
} else {
LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.", bundle, ex);
}
});
}
@Override
public boolean test(NamespaceBundle namespaceBundle) {
return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
}
});
}

public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory())
.whenComplete((store, ex) -> {
if (ex != null) {
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
} else {
stores.put(tcId, store);
LOG.info("Added new transaction meta store {}", tcId);
}
});
}

public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
stores.remove(tcId).closeAsync().whenComplete((v, ex) -> {
if (ex != null) {
LOG.error("Close transaction metadata store with id {} error", ex);
}
});
TransactionMetadataStore metadataStore = stores.remove(tcId);
if (metadataStore != null) {
metadataStore.closeAsync().whenComplete((v, ex) -> {
if (ex != null) {
LOG.error("Close transaction metadata store with id {} error", ex);
} else {
LOG.info("Removed and closed transaction meta store {}", tcId);
}
});
}
}

public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorException.NotFoundException(tcId));
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.newTransaction();
}
Expand All @@ -81,7 +137,7 @@ public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<Strin
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorException.NotFoundException(tcId));
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.addProducedPartitionToTxn(txnId, partitions);
}
Expand All @@ -90,7 +146,7 @@ public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<String>
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorException.NotFoundException(tcId));
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.addAckedPartitionToTxn(txnId, partitions);
}
Expand All @@ -99,11 +155,20 @@ public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnId) {
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorException.NotFoundException(tcId));
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.getTxnMeta(txnId);
}

public CompletableFuture<Void> updateTxnStatus(TxnID txnId, TxnStatus newStatus, TxnStatus expectedStatus) {
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.updateTxnStatus(txnId, newStatus, expectedStatus);
}

private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) {
return new TransactionCoordinatorID(txnId.getMostSigBits());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;

/**
* Base type of exception thrown by Pulsar Broker Service
Expand Down Expand Up @@ -195,6 +196,10 @@ private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean che
return PulsarApi.ServerError.IncompatibleSchema;
} else if (t instanceof ConsumerAssignException) {
return ServerError.ConsumerAssignError;
} else if (t instanceof CoordinatorException.CoordinatorNotFoundException) {
return ServerError.TransactionCoordinatorNotFound;
} else if (t instanceof CoordinatorException.InvalidTxnStatusException) {
return ServerError.InvalidTxnStatus;
} else {
if (checkCauseIfUnknown) {
return getClientErrorCode(t.getCause(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
Expand Down Expand Up @@ -110,6 +110,9 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1402,6 +1405,85 @@ protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCrea
});
}

@Override
protected void handleNewTxn(CommandNewTxn command) {
if (log.isDebugEnabled()) {
log.debug("Receive new txn request {} to transaction meta store {} from {}.", command.getRequestId(), command.getTcId(), remoteAddress);
}
TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId)
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response {} for new txn request {}", tcId.getId(), command.getRequestId());
}
ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for new txn request {}", command.getRequestId(), ex);
}
ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
}
}));
}

@Override
protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn command) {
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
if (log.isDebugEnabled()) {
log.debug("Receive add published partition to txn request {} from {} with txnId {}", command.getRequestId(), remoteAddress, txnID);
}
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, command.getPartitionsList())
.whenComplete(((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published partition to txn request {}", command.getRequestId());
}
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(),
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published partition to txn request {}", command.getRequestId(), ex);
}
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
}
}));
}

@Override
protected void handleEndTxn(PulsarApi.CommandEndTxn command) {
TxnStatus newStatus = null;
switch (command.getTxnAction()) {
case COMMIT:
newStatus = TxnStatus.COMMITTING;
break;
case ABORT:
newStatus = TxnStatus.ABORTING;
break;
}
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
if (log.isDebugEnabled()) {
log.debug("Receive end txn by {} request {} from {} with txnId {}", newStatus, command.getRequestId(), remoteAddress, txnID);
}
service.pulsar().getTransactionMetadataStoreService().updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
.whenComplete((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for end txn request {}", command.getRequestId());
}
ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(),
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for end txn request {}", command.getRequestId());
}
ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
}
});
}

private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
if (schema != null) {
return topic.addSchema(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void testOwnedNamespaces() {

Map<String, NamespaceOwnershipStatus> nsMap = pulsarAdmins[i].brokers().getOwnedNamespaces("my-cluster",
list.get(0));
Assert.assertEquals(nsMap.size(), 3);
Assert.assertEquals(nsMap.size(), 2);
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void brokers() throws Exception {

Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0));
// since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace)
Assert.assertEquals(nsMap.size(), 2);
Assert.assertEquals(nsMap.size(), 1);
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
Expand All @@ -417,7 +417,7 @@ public void brokers() throws Exception {
Assert.assertEquals(parts.length, 2);
Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("test",
String.format("%s:%d", parts[0], BROKER_WEBSERVICE_PORT_TLS));
Assert.assertEquals(nsMap2.size(), 2);
Assert.assertEquals(nsMap2.size(), 1);

admin.namespaces().deleteNamespace("prop-xyz/ns1");
admin.clusters().deleteCluster("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void brokers() throws Exception {

Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("use", list.get(0));
// since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace)
Assert.assertEquals(nsMap.size(), 2);
Assert.assertEquals(nsMap.size(), 1);
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
Expand All @@ -413,7 +413,7 @@ public void brokers() throws Exception {
Assert.assertEquals(parts.length, 2);
Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("use",
String.format("%s:%d", parts[0], BROKER_WEBSERVICE_PORT_TLS));
Assert.assertEquals(nsMap2.size(), 2);
Assert.assertEquals(nsMap2.size(), 1);

admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
admin.clusters().deleteCluster("use");
Expand Down
Loading

0 comments on commit 5504dc6

Please sign in to comment.