Skip to content

Commit

Permalink
[Tests] Reduce Pulsar IO integration test RAM requirements to prevent…
Browse files Browse the repository at this point in the history
… system OOM (exit code 137) in GitHub Actions CI (apache#12547)

* Limit RAM to 128MB for Pulsar IO Sinks and Sources

* Reduce ElasticSearchContainer memory requirements
  • Loading branch information
lhotari authored Oct 30, 2021
1 parent f527a22 commit 7f60bcd
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@
import java.util.Optional;

public class ElasticSearchContainer extends ChaosContainer<ElasticSearchContainer> {

public static final String NAME = "ElasticSearch";
static final Integer[] PORTS = { 9200, 9300 };

public static final String IMAGE_NAME = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.15.0");

public ElasticSearchContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
super(clusterName, IMAGE_NAME);
}

@Override
protected void configure() {
super.configure();
this.withNetworkAliases(NAME)
.withExposedPorts(PORTS)
.withEnv("discovery.type", "single-node")
.withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
.withEnv("ES_JAVA_OPTS", "-Xms512m -Xmx1500m")
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(clusterName + "-" + NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@

@Slf4j
public abstract class PulsarIOTestRunner {

static final long MB = 1048576L;
public static final long RUNTIME_INSTANCE_RAM_BYTES = 128 * MB;
final Duration ONE_MINUTE = Duration.ofMinutes(1);
final Duration TEN_SECONDS = Duration.ofSeconds(10);

Expand All @@ -47,15 +48,15 @@ public abstract class PulsarIOTestRunner {
.withMaxDuration(ONE_MINUTE)
.withDelay(TEN_SECONDS)
.onRetry(e -> log.error("Retry ... "));

protected PulsarCluster pulsarCluster;
protected String functionRuntimeType;

protected PulsarIOTestRunner(PulsarCluster cluster, String functionRuntimeType) {
this.pulsarCluster = cluster;
this.functionRuntimeType = functionRuntimeType;
}

@SuppressWarnings("rawtypes")
protected Schema getSchema(boolean jsonWithEnvelope) {
if (jsonWithEnvelope) {
Expand All @@ -64,7 +65,7 @@ protected Schema getSchema(boolean jsonWithEnvelope) {
return KeyValueSchemaImpl.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED);
}
}

protected <T> void ensureSubscriptionCreated(String inputTopicName,
String subscriptionName,
Schema<T> inputTopicSchema)
Expand All @@ -81,7 +82,7 @@ protected <T> void ensureSubscriptionCreated(String inputTopicName,
}
}
}

protected Map<String, String> produceMessagesToInputTopic(String inputTopicName,
int numMessages, SinkTester<?> tester) throws Exception {

Expand All @@ -92,5 +93,5 @@ protected Map<String, String> produceMessagesToInputTopic(String inputTopicName,
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
tester.produceMessage(numMessages, client, inputTopicName, kvs);
return kvs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ protected void submitSinkConnector(@SuppressWarnings("rawtypes") SinkTester test
"--name", sinkName,
"--sink-type", tester.sinkType().getValue().toLowerCase(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName
"--inputs", inputTopicName,
"--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
} else {
commands = new String[] {
Expand All @@ -167,7 +168,8 @@ protected void submitSinkConnector(@SuppressWarnings("rawtypes") SinkTester test
"--archive", tester.getSinkArchive(),
"--classname", tester.getSinkClassName(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName
"--inputs", inputTopicName,
"--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
}
log.info("Run command : {}", StringUtils.join(commands, ' '));
Expand All @@ -193,7 +195,8 @@ protected void updateSinkConnector(@SuppressWarnings("rawtypes") SinkTester test
"--sink-type", tester.sinkType().getValue().toLowerCase(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName,
"--parallelism", "2"
"--parallelism", "2",
"--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
} else {
commands = new String[] {
Expand All @@ -206,7 +209,8 @@ protected void updateSinkConnector(@SuppressWarnings("rawtypes") SinkTester test
"--classname", tester.getSinkClassName(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName,
"--parallelism", "2"
"--parallelism", "2",
"--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
}
log.info("Run command : {}", StringUtils.join(commands, ' '));
Expand Down Expand Up @@ -337,7 +341,7 @@ protected void waitForProcessingSinkMessages(String tenant,
assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
}

// This for JdbcPostgresSinkTester
protected Map<String, String> produceSchemaInsertMessagesToInputTopic(String inputTopicName,
int numMessages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

@Slf4j
public class PulsarIOSourceRunner extends PulsarIOTestRunner {

public PulsarIOSourceRunner(PulsarCluster cluster, String functionRuntimeType) {
super(cluster, functionRuntimeType);
}
Expand Down Expand Up @@ -131,7 +131,8 @@ protected void submitSourceConnector(SourceTester tester,
"--name", sourceName,
"--source-type", tester.sourceType(),
"--sourceConfig", new Gson().toJson(tester.sourceConfig()),
"--destinationTopicName", outputTopicName
"--destinationTopicName", outputTopicName,
"--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};

log.info("Run command : {}", StringUtils.join(commands, ' '));
Expand All @@ -156,7 +157,8 @@ protected void updateSourceConnector(SourceTester tester,
"--source-type", tester.sourceType(),
"--sourceConfig", new Gson().toJson(tester.sourceConfig()),
"--destinationTopicName", outputTopicName,
"--parallelism", "2"
"--parallelism", "2",
"--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};

log.info("Run command : {}", StringUtils.join(commands, ' '));
Expand Down

0 comments on commit 7f60bcd

Please sign in to comment.