Skip to content

Commit

Permalink
[fix][test] Fix flaky MetadataStoreStatsTest and prevent certain flak…
Browse files Browse the repository at this point in the history
…iness in all metric / stat tests (apache#19329)
  • Loading branch information
lhotari authored Jan 26, 2023
1 parent e2a056d commit f047de2
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,20 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setMaxUnackedMessagesPerConsumer(0);
super.internalSetup();
super.producerBaseSetup();
}

@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
conf.setMaxUnackedMessagesPerConsumer(0);
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
Expand All @@ -37,6 +38,15 @@ public class LedgerOffloaderMetricsTest extends BrokerTestBase {
protected void setup() throws Exception {
}

@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
Expand All @@ -53,9 +54,19 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
}

@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
super.internalSetup();

// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -52,6 +53,15 @@ protected void setup() throws Exception {
super.baseSetup();
}

@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
Expand All @@ -44,17 +45,26 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
super.baseSetup();
AuthenticationProviderToken.resetMetrics();
}

@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
// wait for shutdown of the broker, this prevents flakiness which could be caused by
// org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats.close method which unregisters metrics
// asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
resetConfig();
}

@Test
Expand All @@ -76,7 +86,7 @@ public void testMetadataStoreStats() throws Exception {
producer.newMessage().value(UUID.randomUUID().toString()).send();
}

for (;;) {
for (int i = 0; i < 100; i++) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
Expand All @@ -89,51 +99,53 @@ public void testMetadataStoreStats() throws Exception {
String metricsStr = output.toString();
Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);

String metricsDebugMessage = "Assertion failed with metrics:\n" + metricsStr + "\n";

Collection<PrometheusMetricsTest.Metric> opsLatency = metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
Collection<PrometheusMetricsTest.Metric> putBytes = metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");

Assert.assertTrue(opsLatency.size() > 1);
Assert.assertTrue(putBytes.size() > 1);
Assert.assertTrue(opsLatency.size() > 1, metricsDebugMessage);
Assert.assertTrue(putBytes.size() > 1, metricsDebugMessage);

for (PrometheusMetricsTest.Metric m : opsLatency) {
Assert.assertEquals(m.tags.get("cluster"), "test");
Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertNotNull(m.tags.get("status"));
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), metricsDebugMessage);
Assert.assertNotNull(m.tags.get("status"), metricsDebugMessage);

if (m.tags.get("status").equals("success")) {
if (m.tags.get("type").equals("get")) {
Assert.assertTrue(m.value >= 0);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else if (m.tags.get("type").equals("del")) {
Assert.assertTrue(m.value >= 0);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else if (m.tags.get("type").equals("put")) {
Assert.assertTrue(m.value >= 0);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else {
Assert.fail();
Assert.fail(metricsDebugMessage);
}
} else {
if (m.tags.get("type").equals("get")) {
Assert.assertTrue(m.value >= 0);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else if (m.tags.get("type").equals("del")) {
Assert.assertTrue(m.value >= 0);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else if (m.tags.get("type").equals("put")) {
Assert.assertTrue(m.value >= 0);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else {
Assert.fail();
Assert.fail(metricsDebugMessage);
}
}
}
for (PrometheusMetricsTest.Metric m : putBytes) {
Assert.assertEquals(m.tags.get("cluster"), "test");
Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value > 0);
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), metricsDebugMessage);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}
}

Expand All @@ -156,7 +168,7 @@ public void testBatchMetadataStoreMetrics() throws Exception {
producer.newMessage().value(UUID.randomUUID().toString()).send();
}

for (;;) {
for (int i = 0; i < 100; i++) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
Expand All @@ -174,49 +186,50 @@ public void testBatchMetadataStoreMetrics() throws Exception {
Collection<PrometheusMetricsTest.Metric> batchExecuteTime = metricsMap.get("pulsar_batch_metadata_store_batch_execute_time_ms" + "_sum");
Collection<PrometheusMetricsTest.Metric> opsPerBatch = metricsMap.get("pulsar_batch_metadata_store_batch_size" + "_sum");

Assert.assertTrue(executorQueueSize.size() > 1);
Assert.assertTrue(opsWaiting.size() > 1);
Assert.assertTrue(batchExecuteTime.size() > 0);
Assert.assertTrue(opsPerBatch.size() > 0);
String metricsDebugMessage = "Assertion failed with metrics:\n" + metricsStr + "\n";

Assert.assertTrue(executorQueueSize.size() > 1, metricsDebugMessage);
Assert.assertTrue(opsWaiting.size() > 1, metricsDebugMessage);
Assert.assertTrue(batchExecuteTime.size() > 0, metricsDebugMessage);
Assert.assertTrue(opsPerBatch.size() > 0, metricsDebugMessage);

for (PrometheusMetricsTest.Metric m : executorQueueSize) {
Assert.assertEquals(m.tags.get("cluster"), "test");
Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value >= 0);
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), metricsDebugMessage);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}
for (PrometheusMetricsTest.Metric m : opsWaiting) {
Assert.assertEquals(m.tags.get("cluster"), "test");
Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value >= 0);
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), metricsDebugMessage);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}

for (PrometheusMetricsTest.Metric m : batchExecuteTime) {
Assert.assertEquals(m.tags.get("cluster"), "test");
Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value > 0);
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), metricsDebugMessage);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}

for (PrometheusMetricsTest.Metric m : opsPerBatch) {
Assert.assertEquals(m.tags.get("cluster"), "test");
Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value > 0);
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), metricsDebugMessage);
Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,21 @@ public class PrometheusMetricsTest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
super.baseSetup();
AuthenticationProviderToken.resetMetrics();
}

@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
Expand Down Expand Up @@ -65,6 +66,15 @@ protected void setup() throws Exception {
super.producerBaseSetup();
}

@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ protected void cleanup() throws Exception {
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();

// enable transaction.
conf.setSystemTopicEnabled(true);
conf.setTransactionCoordinatorEnabled(true);
Expand All @@ -99,8 +100,14 @@ protected void doInitConf() throws Exception {
conf.setTransactionPendingAckBatchedWriteMaxRecords(10);
conf.setTransactionLogBatchedWriteEnabled(true);
conf.setTransactionLogBatchedWriteMaxRecords(10);

// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}


@Override
protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
PulsarService pulsar = startBrokerWithoutAuthorization(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ protected void setup() throws Exception {
super.baseSetup(serviceConfiguration);
}

@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}

protected void afterSetup() throws Exception {
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
TenantInfo.builder()
Expand Down
Loading

0 comments on commit f047de2

Please sign in to comment.