Skip to content

Commit

Permalink
[fix][flaky-test] BrokerInterceptorTest.testProducerCreation (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jul 25, 2022
1 parent 22e8cb2 commit bc94643
Showing 1 changed file with 52 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
Expand All @@ -48,32 +49,32 @@
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {

int beforeSendCount = 0;
int count = 0;
int connectionCreationCount = 0;
int producerCount = 0;
int consumerCount = 0;
int messageCount = 0;
int messageDispatchCount = 0;
int messageAckCount = 0;
int handleAckCount = 0;
int txnCount = 0;
int committedTxnCount = 0;
int abortedTxnCount = 0;
private AtomicInteger beforeSendCount = new AtomicInteger();
private AtomicInteger count = new AtomicInteger();
private AtomicInteger connectionCreationCount = new AtomicInteger();
private AtomicInteger producerCount = new AtomicInteger();
private AtomicInteger consumerCount = new AtomicInteger();
private AtomicInteger messageCount = new AtomicInteger();
private AtomicInteger messageDispatchCount = new AtomicInteger();
private AtomicInteger messageAckCount = new AtomicInteger();
private AtomicInteger handleAckCount = new AtomicInteger();
private AtomicInteger txnCount = new AtomicInteger();
private AtomicInteger committedTxnCount = new AtomicInteger();
private AtomicInteger abortedTxnCount = new AtomicInteger();

public void reset() {
beforeSendCount = 0;
count = 0;
connectionCreationCount = 0;
producerCount = 0;
consumerCount = 0;
messageCount = 0;
messageDispatchCount = 0;
messageAckCount = 0;
handleAckCount = 0;
txnCount = 0;
committedTxnCount = 0;
abortedTxnCount = 0;
beforeSendCount.set(0);
count.set(0);
connectionCreationCount.set(0);
producerCount.set(0);
consumerCount.set(0);
messageCount.set(0);
messageDispatchCount.set(0);
messageAckCount.set(0);
handleAckCount.set(0);
txnCount.set(0);
committedTxnCount.set(0);
abortedTxnCount.set(0);
}

private List<ResponseEvent> responseList = new ArrayList<>();
Expand All @@ -90,7 +91,7 @@ public void onConnectionCreated(ServerCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("Connection created {}", cnx);
}
connectionCreationCount++;
connectionCreationCount.incrementAndGet();
}

@Override
Expand All @@ -100,7 +101,7 @@ public void producerCreated(ServerCnx cnx, Producer producer,
log.debug("Producer created with name={}, id={}",
producer.getProducerName(), producer.getProducerId());
}
producerCount++;
producerCount.incrementAndGet();
}

@Override
Expand All @@ -111,7 +112,7 @@ public void consumerCreated(ServerCnx cnx,
log.debug("Consumer created with name={}, id={}",
consumer.consumerName(), consumer.consumerId());
}
consumerCount++;
consumerCount.incrementAndGet();
}

@Override
Expand All @@ -122,7 +123,7 @@ public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs,
log.debug("Message published topic={}, producer={}",
producer.getTopic().getName(), producer.getProducerName());
}
messageCount++;
messageCount.incrementAndGet();
}

@Override
Expand All @@ -132,13 +133,13 @@ public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
log.debug("Message dispatched topic={}, consumer={}",
consumer.getSubscription().getTopic().getName(), consumer.consumerName());
}
messageDispatchCount++;
messageDispatchCount.incrementAndGet();
}

@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ack) {
messageAckCount++;
messageAckCount.incrementAndGet();
}

@Override
Expand All @@ -150,7 +151,7 @@ public void beforeSendMessage(Subscription subscription,
log.debug("Send message to topic {}, subscription {}",
subscription.getTopic(), subscription.getName());
}
beforeSendCount++;
beforeSendCount.incrementAndGet();
}

@Override
Expand All @@ -159,9 +160,9 @@ public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
}
if (command.getType().equals(BaseCommand.Type.ACK)) {
handleAckCount++;
handleAckCount.incrementAndGet();
}
count ++;
count.incrementAndGet();
}

@Override
Expand All @@ -171,15 +172,15 @@ public void onConnectionClosed(ServerCnx cnx) {

@Override
public void onWebserviceRequest(ServletRequest request) {
count ++;
count.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString());
}
}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
count ++;
count.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response);
}
Expand All @@ -192,21 +193,21 @@ public void onWebserviceResponse(ServletRequest request, ServletResponse respons
@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
count = 100;
count.set(100);
chain.doFilter(request, response);
}

@Override
public void txnOpened(long tcId, String txnID) {
txnCount ++;
txnCount.incrementAndGet();
}

@Override
public void txnEnded(String txnID, long txnAction) {
if(txnAction == TxnAction.COMMIT_VALUE) {
committedTxnCount ++;
committedTxnCount.incrementAndGet();
} else {
abortedTxnCount ++;
abortedTxnCount.incrementAndGet();
}
}

Expand All @@ -221,39 +222,39 @@ public void close() {
}

public int getHandleAckCount() {
return handleAckCount;
return handleAckCount.get();
}

public int getCount() {
return count;
return count.get();
}

public int getProducerCount() {
return producerCount;
return producerCount.get();
}

public int getConsumerCount() {
return consumerCount;
return consumerCount.get();
}

public int getMessagePublishCount() {
return messageCount;
return messageCount.get();
}

public int getMessageDispatchCount() {
return messageDispatchCount;
return messageDispatchCount.get();
}

public int getMessageAckCount() {
return messageAckCount;
return messageAckCount.get();
}

public int getBeforeSendCount() {
return beforeSendCount;
return beforeSendCount.get();
}

public int getConnectionCreationCount() {
return connectionCreationCount;
return connectionCreationCount.get();
}

public void clearResponseList() {
Expand All @@ -265,14 +266,14 @@ public List<ResponseEvent> getResponseList() {
}

public int getTxnCount() {
return txnCount;
return txnCount.get();
}

public int getCommittedTxnCount() {
return committedTxnCount;
return committedTxnCount.get();
}

public int getAbortedTxnCount() {
return abortedTxnCount;
return abortedTxnCount.get();
}
}

0 comments on commit bc94643

Please sign in to comment.