Skip to content

Commit

Permalink
Move integration tests for function upload/download to apache (apach…
Browse files Browse the repository at this point in the history
…e#2061)

This change add `function-worker` into the `PulsarClusterTestBase` to create a test base for functions related tests `PulsarFunctionsTestBase`.

Move the existing upload/download tests to use it.
  • Loading branch information
sijie authored Jul 2, 2018
1 parent 60a07e9 commit 124dfaf
Show file tree
Hide file tree
Showing 11 changed files with 403 additions and 13 deletions.
3 changes: 2 additions & 1 deletion docker/pulsar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
FROM openjdk:8-jdk

# Install some utilities
RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo python-yaml

ARG PULSAR_TARBALL

ADD ${PULSAR_TARBALL} /
RUN mv /apache-pulsar-* /pulsar

COPY scripts/apply-config-from-env.py /pulsar/bin
COPY scripts/gen-yml-from-env.py /pulsar/bin
COPY scripts/generate-zookeeper-config.sh /pulsar/bin
COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
COPY scripts/watch-znode.py /pulsar/bin
Expand Down
65 changes: 65 additions & 0 deletions docker/pulsar/scripts/gen-yml-from-env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

##
## Generate a yml from env.py
##
## ./gen-yml-from-env.py <template yml file> [<template yml file>]
##

import os, sys
import yaml

if len(sys.argv) < 2:
print 'Usage: %s' % (sys.argv[0])
sys.exit(1)

conf_files = sys.argv[1:]

for conf_filename in conf_files:
conf = yaml.load(open(conf_filename))

# update the config
modified = False
for k in sorted(os.environ.keys()):
key_parts = k.split('_')
v = os.environ[k]

i = 0
conf_to_modify = conf
while i < len(key_parts):
key_part = key_parts[i]
if not key_part in conf_to_modify:
break

if i == (len(key_parts) - 1):
if key_part == 'workerPort':
conf_to_modify[key_part] = int(v)
else:
conf_to_modify[key_part] = v

modified = True
else:
conf_to_modify = conf_to_modify[key_part]
i += 1
# Store back the updated config in the same file
f = open(conf_filename , 'w')
yaml.dump(conf, f, default_flow_style=False)
f.close()
4 changes: 2 additions & 2 deletions tests/docker-images/latest-version-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ RUN apt-get update && apt-get install -y supervisor
RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p /pulsar/ssl

COPY conf/supervisord.conf /etc/supervisord.conf
COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf \
COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf \
conf/proxy.conf /etc/supervisord/conf.d/

COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/

COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
scripts/run-bookie.sh scripts/run-broker.sh scripts/run-proxy.sh /pulsar/bin/
scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh /pulsar/bin/

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

[program:functions-worker]
autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/functions_worker.log
directory=/pulsar
environment=PULSAR_MEM=-Xms128M
command=/pulsar/bin/pulsar functions-worker

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

bin/apply-config-from-env.py conf/client.conf && \
bin/gen-yml-from-env.py conf/functions_worker.yml && \
bin/apply-config-from-env.py conf/pulsar_env.sh

if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/functions_worker.conf
fi

bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf

Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ public String getContainerName() {
@Override
protected void configure() {
if (httpPort > 0) {
addExposedPorts(
servicePort, httpPort
);
} else if (servicePort > 0) {
addExposedPorts(httpPort);
}
if (servicePort > 0) {
addExposedPort(servicePort);
}
}
Expand All @@ -80,7 +79,7 @@ protected void configure() {
public void start() {
if (httpPort > 0 || servicePort > 0) {
this.waitStrategy = new HostPortWaitStrategy()
.withStartupTimeout(Duration.of(60, SECONDS));
.withStartupTimeout(Duration.of(300, SECONDS));
}
this.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(hostname);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.containers;

/**
* A pulsar container that runs functions worker.
*/
public class WorkerContainer extends PulsarContainer<WorkerContainer> {

public WorkerContainer(String clusterName, String hostname) {
super(
clusterName, hostname, hostname, "bin/run-functions-worker.sh", -1, BROKER_HTTP_PORT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.tests.containers.CSContainer;
import org.apache.pulsar.tests.containers.ProxyContainer;
import org.apache.pulsar.tests.containers.PulsarContainer;
import org.apache.pulsar.tests.containers.WorkerContainer;
import org.apache.pulsar.tests.containers.ZKContainer;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.Network;
Expand Down Expand Up @@ -67,6 +68,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
private final CSContainer csContainer;
private final Map<String, BKContainer> bookieContainers;
private final Map<String, BrokerContainer> brokerContainers;
private final Map<String, WorkerContainer> workerContainers;
private final ProxyContainer proxyContainer;

private PulsarCluster(PulsarClusterSpec spec) {
Expand All @@ -86,9 +88,11 @@ private PulsarCluster(PulsarClusterSpec spec) {
.withNetworkAliases(CSContainer.NAME);
this.bookieContainers = Maps.newTreeMap();
this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap();
this.proxyContainer = new ProxyContainer(clusterName, "pulsar-proxy")
.withNetwork(network)
.withNetworkAliases("pulsar-proxy")
.withEnv("zkServers", ZKContainer.NAME)
.withEnv("zookeeperServers", ZKContainer.NAME)
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName);
Expand Down Expand Up @@ -132,6 +136,7 @@ public void start() throws Exception {
runNumContainers("broker", spec.numBrokers(), (name) -> new BrokerContainer(clusterName, name)
.withNetwork(network)
.withNetworkAliases(name)
.withEnv("zkServers", ZKContainer.NAME)
.withEnv("zookeeperServers", ZKContainer.NAME)
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName)
Expand Down Expand Up @@ -166,6 +171,7 @@ private static <T extends PulsarContainer> Map<String, T> runNumContainers(Strin

public void stop() {
proxyContainer.stop();
workerContainers.values().forEach(WorkerContainer::stop);
brokerContainers.values().forEach(BrokerContainer::stop);
bookieContainers.values().forEach(BKContainer::stop);
csContainer.stop();
Expand All @@ -177,12 +183,48 @@ public void stop() {
}
}

public void startFunctionWorkers(int numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
"functions-worker",
numFunctionWorkers,
(name) -> new WorkerContainer(clusterName, name)
.withNetwork(network)
.withNetworkAliases(name)
// worker settings
.withEnv("workerId", name)
.withEnv("workerHostname", name)
.withEnv("workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
.withEnv("pulsarFunctionsCluster", clusterName)
.withEnv("pulsarServiceUrl", serviceUrl)
.withEnv("pulsarWebServiceUrl", httpServiceUrl)
.withEnv("clusterName", clusterName)
// script
.withEnv("zookeeperServers", ZKContainer.NAME)
// bookkeeper tools
.withEnv("zkServers", ZKContainer.NAME)
));
}

public BrokerContainer getAnyBroker() {
List<BrokerContainer> brokerList = Lists.newArrayList();
brokerList.addAll(brokerContainers.values());
Collections.shuffle(brokerList);
checkArgument(!brokerList.isEmpty(), "No broker is alive");
return brokerList.get(0);
return getAnyContainer(brokerContainers, "broker");
}

public WorkerContainer getAnyWorker() {
return getAnyContainer(workerContainers, "functions-worker");
}

private <T> T getAnyContainer(Map<String, T> containers, String serviceName) {
List<T> containerList = Lists.newArrayList();
containerList.addAll(containers.values());
Collections.shuffle(containerList);
checkArgument(!containerList.isEmpty(), "No " + serviceName + " is alive");
return containerList.get(0);
}

public Collection<BrokerContainer> getBrokers() {
return brokerContainers.values();
}

public Collection<BrokerContainer> getBrokers() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.functions;

import static org.testng.Assert.assertTrue;

import com.google.common.io.Files;
import java.io.File;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
import org.apache.pulsar.tests.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
import org.testng.annotations.Test;

@Slf4j
public class PulsarFunctionsTest extends PulsarFunctionsTestBase {

@Test
public String checkUpload() throws Exception {
String bkPkgPath = String.format("%s/%s/%s",
"tenant-" + randomName(8),
"ns-" + randomName(8),
"fn-" + randomName(8));

UploadDownloadCommandGenerator generator = UploadDownloadCommandGenerator.createUploader(
PulsarCluster.ADMIN_SCRIPT,
bkPkgPath);
String actualCommand = generator.generateCommand();

log.info(actualCommand);

String[] commands = {
"sh", "-c", actualCommand
};
ExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
return bkPkgPath;
}

@Test
public void checkDownload() throws Exception {
String bkPkgPath = checkUpload();
String localPkgFile = "/tmp/checkdownload-" + randomName(16);

UploadDownloadCommandGenerator generator = UploadDownloadCommandGenerator.createDownloader(
localPkgFile,
bkPkgPath);
String actualCommand = generator.generateCommand();

log.info(actualCommand);

String[] commands = {
"sh", "-c", actualCommand
};
WorkerContainer container = pulsarCluster.getAnyWorker();
ExecResult output = container.execCmd(commands);
assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
String[] diffCommand = {
"diff",
PulsarCluster.ADMIN_SCRIPT,
localPkgFile
};
output = container.execCmd(diffCommand);
assertTrue(output.getStdout().isEmpty());
assertTrue(output.getStderr().isEmpty());
}

}
Loading

0 comments on commit 124dfaf

Please sign in to comment.