Skip to content

Commit

Permalink
Allow stats operations not to be blocked in functions (apache#9005)
Browse files Browse the repository at this point in the history
Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Dec 20, 2020
1 parent b43e83c commit 31f7d70
Showing 1 changed file with 85 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
Expand Down Expand Up @@ -86,7 +89,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

// input topic consumer & output topic producer
private final PulsarClientImpl client;
//private final Map<String, PulsarClient> pulsarClientMap;

private LogAppender logAppender;

Expand Down Expand Up @@ -122,6 +124,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private ClassLoader functionClassLoader;
private String narExtractionDirectory;

// a flog to determine if member variables have been initialized as part of setup().
// used for out of band API calls like operations involving stats
private transient boolean isInitialized = false;

// a read write lock for stats operations
private ReadWriteLock statsLock = new ReentrantReadWriteLock();

public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
Expand Down Expand Up @@ -216,6 +225,9 @@ synchronized private void setup() throws Exception {
setupLogHandler();

javaInstance = new JavaInstance(contextImpl, object, instanceConfig);

// to signal member variables are initialized
isInitialized = true;
}

ContextImpl setupContext() {
Expand Down Expand Up @@ -404,12 +416,14 @@ record = this.source.read();
}

/**
* NOTE: this method is be syncrhonized because it is potentially called by two different places
* NOTE: this method is be synchronized because it is potentially called by two different places
* one inside the run/finally clause and one inside the ThreadRuntime::stop
*/
@Override
synchronized public void close() {

isInitialized = false;

if (stats != null) {
stats.close();
stats = null;
Expand Down Expand Up @@ -466,49 +480,67 @@ synchronized public void close() {
}
}

synchronized public String getStatsAsString() throws IOException {
if (stats != null) {
return stats.getStatsAsString();
} else {
return "";
public String getStatsAsString() throws IOException {
if (isInitialized) {
try {
statsLock.readLock().lock();
return stats.getStatsAsString();
} finally {
statsLock.readLock().unlock();
}
}
return "";
}

// This method is synchronized because it is using the stats variable
synchronized public InstanceCommunication.MetricsData getAndResetMetrics() {
InstanceCommunication.MetricsData metricsData = internalGetMetrics();
internalResetMetrics();
return metricsData;
public InstanceCommunication.MetricsData getAndResetMetrics() {
if (isInitialized) {
try {
statsLock.writeLock().lock();
InstanceCommunication.MetricsData metricsData = internalGetMetrics();
internalResetMetrics();
return metricsData;
} finally {
statsLock.writeLock().unlock();
}
}
return InstanceCommunication.MetricsData.getDefaultInstance();
}

// This method is synchronized because it is using the stats and javaInstance variables
synchronized public InstanceCommunication.MetricsData getMetrics() {
return internalGetMetrics();
public InstanceCommunication.MetricsData getMetrics() {
if (isInitialized) {
try {
statsLock.readLock().lock();
return internalGetMetrics();
} finally {
statsLock.readLock().unlock();
}
}
return InstanceCommunication.MetricsData.getDefaultInstance();
}

// This method is synchronized because it is using the stats and javaInstance variables
synchronized public void resetMetrics() {
internalResetMetrics();
public void resetMetrics() {
if (isInitialized) {
try {
statsLock.writeLock().lock();
internalResetMetrics();
} finally {
statsLock.writeLock().unlock();
}
}
}

private InstanceCommunication.MetricsData internalGetMetrics() {
InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
if (javaInstance != null) {
Map<String, Double> userMetrics = javaInstance.getMetrics();
if (userMetrics != null) {
bldr.putAllUserMetrics(userMetrics);
}
Map<String, Double> userMetrics = javaInstance.getMetrics();
if (userMetrics != null) {
bldr.putAllUserMetrics(userMetrics);
}
return bldr.build();
}

private void internalResetMetrics() {
if (stats != null) {
stats.reset();
}
if (javaInstance != null) {
javaInstance.resetMetrics();
}
}

private Builder createMetricsDataBuilder() {
Expand All @@ -531,28 +563,33 @@ private Builder createMetricsDataBuilder() {
return bldr;
}

// This method is synchronized because it is using the stats variable
synchronized public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
if (stats != null) {
functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
stats.getLatestUserExceptions().forEach(ex -> {
functionStatusBuilder.addLatestUserExceptions(ex);
});
functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions());
stats.getLatestSystemExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSystemExceptions(ex);
});
stats.getLatestSourceExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSourceExceptions(ex);
});
stats.getLatestSinkExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSinkExceptions(ex);
});
functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation());
if (isInitialized) {
try {
statsLock.readLock().lock();

functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
stats.getLatestUserExceptions().forEach(ex -> {
functionStatusBuilder.addLatestUserExceptions(ex);
});
functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions());
stats.getLatestSystemExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSystemExceptions(ex);
});
stats.getLatestSourceExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSourceExceptions(ex);
});
stats.getLatestSinkExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSinkExceptions(ex);
});
functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation());
} finally {
statsLock.readLock().unlock();
}
}
return functionStatusBuilder;
}
Expand Down

0 comments on commit 31f7d70

Please sign in to comment.