Skip to content

Commit

Permalink
[fix][broker] Avoid messages being repeatedly replayed with SHARED su…
Browse files Browse the repository at this point in the history
…bscriptions (streaming dispatcher) (apache#17163)

* [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

* move test to broker-api
  • Loading branch information
nicoloboschi authored Aug 20, 2022
1 parent 1d6824c commit 7f69727
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ protected void cancelPendingRead() {

@Override
public synchronized void readMoreEntries() {
if (sendInProgress) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
return;
}
// totalAvailablePermits may be updated by other threads
int currentTotalAvailablePermits = totalAvailablePermits;
if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* SimpleProducerConsumerTest with {@link StreamingDispatcher}
*/
@Test(groups = "flaky")
@Test(groups = "broker-api")
public class SimpleProducerConsumerTestStreamingDispatcherTest extends SimpleProducerConsumerTest {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "flaky")
@Test(groups = "broker-api")
public class SimpleProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
private static final int TIMEOUT_MULTIPLIER = Integer.getInteger("SimpleProducerConsumerTest.receive.timeout.multiplier", 1);
Expand Down

0 comments on commit 7f69727

Please sign in to comment.