Skip to content

Commit

Permalink
Improve error handling during localrun start (apache#10450)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored May 4, 2021
1 parent f69a03b commit 26118e6
Showing 1 changed file with 84 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class LocalRunner implements AutoCloseable {
private final Thread shutdownHook;
private ClassLoader userCodeClassLoader;
private boolean userCodeClassLoaderCreated;
private RuntimeFactory runtimeFactory;
private HTTPServer metricsServer;

public enum RuntimeEnv {
THREAD,
Expand Down Expand Up @@ -185,7 +188,12 @@ public static void main(String[] args) throws Exception {

// parse args by JCommander
jcommander.parse(args);
localRunner.start(true);
try {
localRunner.start(true);
} catch (Exception e) {
log.error("Encountered error starting localrunner", e);
localRunner.close();
}
}

@Builder
Expand Down Expand Up @@ -227,11 +235,13 @@ public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, Sin
this.connectorsDir = Paths.get(pulsarHome, "connectors").toString();
}
this.metricsPortStart = metricsPortStart;
shutdownHook = new Thread() {
public void run() {
LocalRunner.this.stop();
shutdownHook = new Thread(() -> {
try {
LocalRunner.this.close();
} catch (Exception exception) {
log.warn("Encountered exception when closing localrunner", exception);
}
};
});
}

private static File createNarExtractionTempDirectory() {
Expand Down Expand Up @@ -260,12 +270,21 @@ public synchronized void stop() {
} catch (IllegalStateException e) {
// ignore possible "Shutdown in progress"
}
log.info("Shutting down the localrun runtimeSpawner ...");

if (metricsServer != null) {
metricsServer.stop();
}

for (RuntimeSpawner spawner : spawners) {
spawner.close();
}
spawners.clear();

if (runtimeFactory != null) {
runtimeFactory.close();
runtimeFactory = null;
}

if (userCodeClassLoaderCreated) {
if (userCodeClassLoader instanceof Closeable) {
try {
Expand Down Expand Up @@ -464,7 +483,7 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
String stateStorageServiceUrl, AuthenticationConfig authConfig,
String userCodeFile) throws Exception {
SecretsProviderConfigurator secretsProviderConfigurator = getSecretsProviderConfigurator();
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
runtimeFactory = new ProcessRuntimeFactory(
serviceUrl,
webServiceUrl,
stateStorageServiceUrl,
Expand All @@ -475,71 +494,66 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
null, /* extra dependencies dir */
narExtractionDirectory, /* nar extraction dir */
secretsProviderConfigurator,
false, Optional.empty(), Optional.empty())) {

for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
// TODO: correctly implement function version and id
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());

if (metricsPortStart != null) {
int metricsPort = metricsPortStart + i;
if (metricsPortStart < 0 || metricsPortStart > 65535) {
throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
}
instanceConfig.setMetricsPort(metricsPort);
} else {
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
false, Optional.empty(), Optional.empty());

for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
// TODO: correctly implement function version and id
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());

if (metricsPortStart != null) {
int metricsPort = metricsPortStart + i;
if (metricsPortStart < 0 || metricsPortStart > 65535) {
throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
}
instanceConfig.setClusterName("local");
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
if (functionConfig.getExposePulsarAdminClientEnabled() != null) {
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
instanceConfig.setMetricsPort(metricsPort);
} else {
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
}
instanceConfig.setClusterName("local");
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
if (functionConfig.getExposePulsarAdminClientEnabled() != null) {
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
null,
containerFactory,
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
Timer statusCheckTimer = new Timer();
statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
int index = 0;
for (RuntimeSpawner spawner : spawners) {
futures[index] = spawner.getFunctionStatusAsJson(index);
index++;
}
try {
CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
for (index = 0; index < futures.length; ++index) {
String json = futures[index].get();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
log.info(gson.toJson(new JsonParser().parse(json)));
}
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.error("Could not get status from all local instances");
}
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
null,
runtimeFactory,
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
Timer statusCheckTimer = new Timer();
statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
int index = 0;
for (RuntimeSpawner spawner : spawners) {
futures[index] = spawner.getFunctionStatusAsJson(index);
index++;
}
}, 30000, 30000);
java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
statusCheckTimer.cancel();
try {
CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
for (index = 0; index < futures.length; ++index) {
String json = futures[index].get();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
log.info(gson.toJson(new JsonParser().parse(json)));
}
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.error("Could not get status from all local instances");
}
});
}
}
}, 30000, 30000);
java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> statusCheckTimer.cancel()));
}


Expand Down Expand Up @@ -574,13 +588,12 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
RuntimeUtils.registerDefaultCollectors(collectorRegistry);

ThreadRuntimeFactory threadRuntimeFactory;
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
if (userCodeClassLoader != null) {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
}
threadRuntimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
serviceUrl,
stateStorageServiceUrl,
authConfig,
Expand Down Expand Up @@ -614,16 +627,15 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
instanceConfig,
userCodeFile,
null,
threadRuntimeFactory,
runtimeFactory,
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}

if (metricsPortStart != null) {
// starting metrics server
log.info("Starting metrics server on port {}", metricsPortStart);
new HTTPServer(new InetSocketAddress(metricsPortStart), collectorRegistry, true);
metricsServer = new HTTPServer(new InetSocketAddress(metricsPortStart), collectorRegistry, true);
}
}

Expand Down

0 comments on commit 26118e6

Please sign in to comment.