Skip to content

Commit

Permalink
[improve] PIP-335: Pulsar with Oxia integration test (apache#22045)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Feb 9, 2024
1 parent 3036783 commit 2b75ca0
Show file tree
Hide file tree
Showing 13 changed files with 397 additions and 179 deletions.
2 changes: 1 addition & 1 deletion tests/docker-images/latest-version-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ COPY conf/supervisord.conf /etc/supervisord.conf
COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf \
conf/proxy.conf conf/websocket.conf /etc/supervisord/conf.d/

COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
COPY scripts/run-global-zk.sh scripts/run-local-zk.sh \
scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh \
scripts/run-standalone.sh scripts/run-websocket.sh \
/pulsar/bin/
Expand Down
38 changes: 0 additions & 38 deletions tests/docker-images/latest-version-image/scripts/init-cluster.sh

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,5 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/bookie.conf
fi

bin/watch-znode.py -z $zkServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/broker.conf
fi

bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/functions_worker.conf
fi

bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf

Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/proxy.conf
fi

bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/websocket.conf
fi

bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.tests.integration.containers;

import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;

/**
* Initialize the Pulsar metadata
*/
@Slf4j
public class PulsarInitMetadataContainer extends GenericContainer<PulsarInitMetadataContainer> {

public static final String NAME = "init-metadata";

private final String clusterName;
private final String metadataStoreUrl;
private final String configurationMetadataStoreUrl;
private final String brokerHostname;

public PulsarInitMetadataContainer(Network network,
String clusterName,
String metadataStoreUrl,
String configurationMetadataStoreUrl,
String brokerHostname) {
this.clusterName = clusterName;
this.metadataStoreUrl = metadataStoreUrl;
this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
this.brokerHostname = brokerHostname;
setDockerImageName(PulsarContainer.DEFAULT_IMAGE_NAME);
withNetwork(network);

setCommand("sleep 1000000");
}


public void initialize() throws Exception {
start();
ExecResult res = this.execInContainer(
"/pulsar/bin/pulsar", "initialize-cluster-metadata",
"--cluster", clusterName,
"--metadata-store", metadataStoreUrl,
"--configuration-metadata-store", configurationMetadataStoreUrl,
"--web-service-url", "http://" + brokerHostname + ":8080/",
"--broker-service-url", "pulsar://" + brokerHostname + ":6650/"
);

if (res.getExitCode() == 0) {
log.info("Successfully initialized cluster");
} else {
log.warn("Failed to initialize Pulsar cluster. exit code: " + res.getExitCode());
log.warn("STDOUT: " + res.getStdout());
log.warn("STDERR: " + res.getStderr());
throw new IOException("Failed to initialized Pulsar Cluster");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.tests.integration.oxia;

import java.time.Duration;
import org.apache.pulsar.tests.integration.containers.ChaosContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.testcontainers.containers.wait.strategy.Wait;

public class OxiaContainer extends ChaosContainer<OxiaContainer> {

public static final String NAME = "oxia";

public static final int OXIA_PORT = 6648;
public static final int METRICS_PORT = 8080;
private static final int DEFAULT_SHARDS = 1;

private static final String DEFAULT_IMAGE_NAME = "streamnative/oxia:main";

public OxiaContainer(String clusterName) {
this(clusterName, DEFAULT_IMAGE_NAME, DEFAULT_SHARDS);
}

@SuppressWarnings("resource")
OxiaContainer(String clusterName, String imageName, int shards) {
super(clusterName, imageName);
if (shards <= 0) {
throw new IllegalArgumentException("shards must be greater than zero");
}
addExposedPorts(OXIA_PORT, METRICS_PORT);
this.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName("oxia");
createContainerCmd.withName(getContainerName());
});
setCommand("oxia", "standalone",
"--shards=" + shards,
"--wal-sync-data=false");
waitingFor(
Wait.forHttp("/metrics")
.forPort(METRICS_PORT)
.forStatusCode(200)
.withStartupTimeout(Duration.ofSeconds(30)));

PulsarContainer.configureLeaveContainerRunning(this);
}

public String getServiceAddress() {
return OxiaContainer.NAME + ":" + OXIA_PORT;
}

@Override
public String getContainerName() {
return clusterName + "-oxia";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.oxia;

import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.Test;

/**
* Test pulsar produce/consume semantics
*/
@Slf4j
public class OxiaSmokeTest extends PulsarTestSuite {

protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
specBuilder.enableOxia(true);
return specBuilder;
}

//
// Test Basic Publish & Consume Operations
//

@Test(dataProvider = "ServiceUrlAndTopics")
public void testPublishAndConsume(Supplier<String> serviceUrl, boolean isPersistent) throws Exception {
super.testPublishAndConsume(serviceUrl.get(), isPersistent);
}

}
Loading

0 comments on commit 2b75ca0

Please sign in to comment.