forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Transaction] Add transaction integration test (apache#8594)
Currently, the transaction is a lack of integration test for validation.
- Loading branch information
Showing
5 changed files
with
376 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
# | ||
|
||
name: CI - Integration - Transaction | ||
on: | ||
pull_request: | ||
branches: | ||
- master | ||
push: | ||
branches: | ||
- branch-* | ||
|
||
env: | ||
MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=3 | ||
|
||
jobs: | ||
|
||
transaction: | ||
name: | ||
runs-on: ubuntu-latest | ||
timeout-minutes: 120 | ||
|
||
steps: | ||
- name: Cancel Previous Runs | ||
uses: styfle/[email protected] | ||
with: | ||
access_token: ${{ github.token }} | ||
|
||
- name: checkout | ||
uses: actions/checkout@v2 | ||
with: | ||
fetch-depth: 25 | ||
ref: ${{ github.event.pull_request.head.sha }} | ||
|
||
- name: Cache local Maven repository | ||
uses: actions/cache@v2 | ||
with: | ||
path: ~/.m2/repository | ||
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} | ||
restore-keys: | | ||
${{ runner.os }}-maven- | ||
- name: Check if this pull request only changes documentation | ||
id: docs | ||
uses: apache/pulsar-test-infra/diff-only@master | ||
with: | ||
args: site2 deployment .asf.yaml .ci ct.yaml | ||
|
||
- name: Set up JDK 1.8 | ||
uses: actions/setup-java@v1 | ||
if: steps.docs.outputs.changed_only == 'no' | ||
with: | ||
java-version: 1.8 | ||
|
||
- name: clean disk | ||
if: steps.docs.outputs.changed_only == 'no' | ||
run: | | ||
sudo swapoff -a | ||
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc | ||
sudo apt clean | ||
docker rmi $(docker images -q) -f | ||
df -h | ||
- name: run install by skip tests | ||
if: steps.docs.outputs.changed_only == 'no' | ||
run: mvn -q -B -ntp clean install -DskipTests | ||
- name: build pulsar image | ||
if: steps.docs.outputs.changed_only == 'no' | ||
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true | ||
- name: build pulsar-all image | ||
if: steps.docs.outputs.changed_only == 'no' | ||
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true | ||
- name: build artifacts and docker image | ||
if: steps.docs.outputs.changed_only == 'no' | ||
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests | ||
|
||
- name: run integration tests | ||
if: steps.docs.outputs.changed_only == 'no' | ||
run: ./build/retry.sh mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests -DredirectTestOutputToFile=false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
157 changes: 157 additions & 0 deletions
157
...ration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.tests.integration.transaction; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import lombok.Cleanup; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.pulsar.client.api.Consumer; | ||
import org.apache.pulsar.client.api.Message; | ||
import org.apache.pulsar.client.api.Producer; | ||
import org.apache.pulsar.client.api.PulsarClient; | ||
import org.apache.pulsar.client.api.Schema; | ||
import org.apache.pulsar.client.api.SubscriptionInitialPosition; | ||
import org.apache.pulsar.client.api.SubscriptionType; | ||
import org.apache.pulsar.client.api.transaction.Transaction; | ||
import org.testng.Assert; | ||
import org.testng.annotations.Test; | ||
|
||
/** | ||
* Transaction integration test. | ||
*/ | ||
@Slf4j | ||
public class TransactionTest extends TransactionTestBase { | ||
|
||
/** | ||
* Transfer Business Mock Test | ||
* | ||
* The `transfer topic` represents the transfer operation, it consist of `from account`, `to account` and amount. | ||
* The `balance update topic` represents the account update record, it consist of account and amount. | ||
* | ||
* The transfer topic consumer receive transfer messages and produce two balance update messages, | ||
* one represents `from account` balance update and one represents `to account` balance update. | ||
* | ||
* example: | ||
* | ||
* receive messages: | ||
* transfer { | ||
* fromAccount: "alice", | ||
* toAccount: "bob", | ||
* amount: 100 | ||
* } | ||
* | ||
* produce messages: | ||
* fromAccountBalanceUpdate { | ||
* account: "alice" | ||
* amount: -100 | ||
* } | ||
* toAccountBalanceUpdate { | ||
* account: "bob", | ||
* amount: 100 | ||
* } | ||
* | ||
* test target: | ||
* | ||
* 1. The balance update messages count should be double transfer message count. | ||
* 2. The balance update messages amount sum should be 0. | ||
*/ | ||
@Test(dataProvider = "ServiceUrls") | ||
public void transferNormalTest(String serviceUrl) throws Exception { | ||
log.info("transfer normal test start."); | ||
PulsarClient pulsarClient = PulsarClient.builder().enableTransaction(true).serviceUrl(serviceUrl).build(); | ||
|
||
final int transferCount = 20; | ||
final String transferTopic = "transfer-" + randomName(6); | ||
final String balanceUpdateTopic = "balance-update-" + randomName(6); | ||
|
||
@Cleanup | ||
Producer<TransferOperation> transferProducer = pulsarClient | ||
.newProducer(Schema.JSON(TransferOperation.class)) | ||
.topic(transferTopic) | ||
.create(); | ||
log.info("transfer producer create finished"); | ||
|
||
prepareTransferData(transferProducer, transferCount); | ||
|
||
@Cleanup | ||
Consumer<TransferOperation> transferConsumer = pulsarClient.newConsumer(Schema.JSON(TransferOperation.class)) | ||
.topic(transferTopic) | ||
.subscriptionName("integration-test") | ||
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) | ||
.subscriptionType(SubscriptionType.Shared) | ||
.enableBatchIndexAcknowledgment(true) | ||
.subscribe(); | ||
log.info("transfer consumer create finished"); | ||
|
||
@Cleanup | ||
Producer<BalanceUpdate> balanceUpdateProducer = pulsarClient.newProducer(Schema.JSON(BalanceUpdate.class)) | ||
.topic(balanceUpdateTopic) | ||
.sendTimeout(0, TimeUnit.SECONDS) | ||
.create(); | ||
log.info("balance update producer create finished"); | ||
|
||
@Cleanup | ||
Consumer<BalanceUpdate> balanceUpdateConsumer = pulsarClient.newConsumer(Schema.JSON(BalanceUpdate.class)) | ||
.topic(balanceUpdateTopic) | ||
.subscriptionName("integration-test") | ||
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) | ||
.subscribe(); | ||
log.info("balance update consumer create finished"); | ||
|
||
while(true) { | ||
Message<TransferOperation> message = transferConsumer.receive(10, TimeUnit.SECONDS); | ||
if (message == null) { | ||
break; | ||
} | ||
TransferOperation transferOperation = message.getValue(); | ||
|
||
Transaction transaction = pulsarClient.newTransaction() | ||
.withTransactionTimeout(5, TimeUnit.MINUTES) | ||
.build().get(); | ||
|
||
balanceUpdateProducer.newMessage(transaction) | ||
.value(getBalanceUpdate(transferOperation, true)).sendAsync(); | ||
|
||
balanceUpdateProducer.newMessage(transaction) | ||
.value(getBalanceUpdate(transferOperation, false)).sendAsync(); | ||
|
||
transferConsumer.acknowledgeAsync(message.getMessageId(), transaction); | ||
|
||
transaction.commit().get(); | ||
} | ||
|
||
int receiveBalanceUpdateCnt = 0; | ||
int balanceSum = 0; | ||
while (true) { | ||
Message<BalanceUpdate> message = balanceUpdateConsumer.receive(10, TimeUnit.SECONDS); | ||
if (message == null) { | ||
break; | ||
} | ||
receiveBalanceUpdateCnt++; | ||
|
||
BalanceUpdate balanceUpdate = message.getValue(); | ||
balanceSum += balanceUpdate.getAmount(); | ||
log.info("balance account: {}, amount: {}", balanceUpdate.getAccount(), balanceUpdate.getAmount()); | ||
} | ||
Assert.assertEquals(receiveBalanceUpdateCnt, transferCount * 2); | ||
Assert.assertEquals(balanceSum, 0); | ||
log.info("transfer normal test finish."); | ||
} | ||
|
||
} |
96 changes: 96 additions & 0 deletions
96
...on/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTestBase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.tests.integration.transaction; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import lombok.Data; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.pulsar.client.api.MessageId; | ||
import org.apache.pulsar.client.api.Producer; | ||
import org.apache.pulsar.tests.integration.containers.BrokerContainer; | ||
import org.apache.pulsar.tests.integration.containers.ZKContainer; | ||
import org.apache.pulsar.tests.integration.docker.ContainerExecResult; | ||
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; | ||
import org.testng.annotations.BeforeClass; | ||
|
||
/** | ||
* Transaction test base. | ||
*/ | ||
@Slf4j | ||
public class TransactionTestBase extends PulsarTestSuite { | ||
|
||
@Override | ||
protected void beforeStartCluster() throws Exception { | ||
super.beforeStartCluster(); | ||
for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) { | ||
brokerContainer.withEnv("transactionCoordinatorEnabled", "true"); | ||
brokerContainer.withEnv("transactionMetadataStoreProviderClassName", | ||
"org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider"); | ||
brokerContainer.withEnv("transactionBufferProviderClassName", | ||
"org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider"); | ||
brokerContainer.withEnv("acknowledgmentAtBatchIndexLevelEnabled", "true"); | ||
} | ||
} | ||
|
||
@BeforeClass | ||
public void transactionCoordinatorMetadataInitialize() throws Exception { | ||
BrokerContainer brokerContainer = pulsarCluster.getBrokers().iterator().next(); | ||
ContainerExecResult result = brokerContainer.execCmd( | ||
"/pulsar/bin/pulsar", "initialize-transaction-coordinator-metadata", | ||
"-cs", ZKContainer.NAME, | ||
"-c", pulsarCluster.getClusterName()); | ||
} | ||
|
||
public void prepareTransferData(Producer<TransferOperation> transferProducer, int messageCnt) { | ||
for (int i = 0; i < messageCnt; i++) { | ||
TransferOperation transferOperation = new TransferOperation(); | ||
transferOperation.setFromAccount("alice"); | ||
transferOperation.setToAccount("Bob"); | ||
transferOperation.setAmount(100 * i); | ||
CompletableFuture<MessageId> completableFuture = | ||
transferProducer.newMessage().value(transferOperation).sendAsync(); | ||
} | ||
log.info("transfer messages produced"); | ||
} | ||
|
||
public BalanceUpdate getBalanceUpdate(TransferOperation transferOperation, boolean isFromAccount) { | ||
BalanceUpdate balanceUpdate = new BalanceUpdate(); | ||
balanceUpdate.setAccount(transferOperation.getFromAccount()); | ||
if (isFromAccount) { | ||
balanceUpdate.setAmount(-(transferOperation.getAmount())); | ||
} else { | ||
balanceUpdate.setAmount(transferOperation.getAmount()); | ||
} | ||
return balanceUpdate; | ||
} | ||
|
||
@Data | ||
public static class TransferOperation { | ||
private String fromAccount; | ||
private String toAccount; | ||
private int amount; | ||
} | ||
|
||
@Data | ||
public static class BalanceUpdate { | ||
private String account; | ||
private int amount; | ||
} | ||
|
||
} |
28 changes: 28 additions & 0 deletions
28
tests/integration/src/test/resources/pulsar-transaction.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" > | ||
<suite name="Pulsar Transaction Integration Tests" verbose="2" annotations="JDK"> | ||
<test name="pulsar-transaction-test-suite" preserve-order="true" > | ||
<classes> | ||
<class name="org.apache.pulsar.tests.integration.transaction.TransactionTest" /> | ||
</classes> | ||
</test> | ||
</suite> |