Skip to content

Commit

Permalink
[fix][test] fix testEndTxnWhenCommittingOrAborting flaky test (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 authored Jan 28, 2023
1 parent ccb7d82 commit fcecca4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -355,9 +354,6 @@ public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError err
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
writeAndFlush(outBuf);
if (this.interceptor != null) {
this.interceptor.txnEnded(txnID.toString(), TxnAction.ABORT_VALUE);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {

private AtomicInteger beforeSendCount = new AtomicInteger();
private AtomicInteger beforeSendCountAtConsumerLevel = new AtomicInteger();
private AtomicInteger count = new AtomicInteger();
private AtomicInteger connectionCreationCount = new AtomicInteger();
private AtomicInteger producerCount = new AtomicInteger();
private AtomicInteger consumerCount = new AtomicInteger();
private AtomicInteger messagePublishCount = new AtomicInteger();
private AtomicInteger messageCount = new AtomicInteger();
private AtomicInteger messageDispatchCount = new AtomicInteger();
private AtomicInteger messageAckCount = new AtomicInteger();
private AtomicInteger handleAckCount = new AtomicInteger();
private AtomicInteger txnCount = new AtomicInteger();
private AtomicInteger committedTxnCount = new AtomicInteger();
private AtomicInteger abortedTxnCount = new AtomicInteger();
private final AtomicInteger beforeSendCount = new AtomicInteger();
private final AtomicInteger beforeSendCountAtConsumerLevel = new AtomicInteger();
private final AtomicInteger count = new AtomicInteger();
private final AtomicInteger connectionCreationCount = new AtomicInteger();
private final AtomicInteger producerCount = new AtomicInteger();
private final AtomicInteger consumerCount = new AtomicInteger();
private final AtomicInteger messagePublishCount = new AtomicInteger();
private final AtomicInteger messageCount = new AtomicInteger();
private final AtomicInteger messageDispatchCount = new AtomicInteger();
private final AtomicInteger messageAckCount = new AtomicInteger();
private final AtomicInteger handleAckCount = new AtomicInteger();
private final AtomicInteger txnCount = new AtomicInteger();
private final AtomicInteger committedTxnCount = new AtomicInteger();
private final AtomicInteger abortedTxnCount = new AtomicInteger();

public void reset() {
beforeSendCount.set(0);
Expand All @@ -81,7 +81,7 @@ public void reset() {
abortedTxnCount.set(0);
}

private List<ResponseEvent> responseList = new ArrayList<>();
private final List<ResponseEvent> responseList = new ArrayList<>();

@Data
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,11 +920,11 @@ public void testEndTxnWhenCommittingOrAborting() throws Exception {
field.set(abortTxn, TransactionImpl.State.ABORTING);


assertEquals(((CounterBrokerInterceptor)listener).getTxnCount(),2);
Awaitility.await().untilAsserted(() -> assertEquals(listener.getTxnCount(),2));
abortTxn.abort().get();
assertEquals(((CounterBrokerInterceptor)listener).getAbortedTxnCount(),1);
Awaitility.await().untilAsserted(() -> assertEquals(listener.getAbortedTxnCount(),1));
commitTxn.commit().get();
assertEquals(((CounterBrokerInterceptor)listener).getCommittedTxnCount(),1);
Awaitility.await().untilAsserted(() -> assertEquals(listener.getCommittedTxnCount(),1));
}

@Test
Expand Down

0 comments on commit fcecca4

Please sign in to comment.