Skip to content

Commit

Permalink
[fix apache#7814] fix java function logging appender not added to jav…
Browse files Browse the repository at this point in the history
…a function logger (apache#9299)

Fixes apache#7814

### Motivation

`JavaInstanceRunnable` create an instance of logger named `"function-" + instanceConfig.getFunctionDetails().getName()` and pass it to `Function Context`, and the logger can be used to send user defined content to function's log-topic if `--log-topic` defined.

as issue apache#7814 mentioned, the logger is not working as expected since user cannot consume any self defined log content from `log-topic`.

this happens in process runtime with created functions, but not noticed with other situation such as `localrun` function.

Through debug to the created function, the logger in `Function Context` is different from the logger in `JavaInstanceRunnable`, such as the `contextName` as images shown below. In addition, the `LogAppender` set in `JavaInstanceRunnable` is not shown in `Function Context`'s logger as well.

![Imgur](https://i.imgur.com/39DMH6R.png)
^^^^ from JavaInstanceRunnable

![img](https://i.imgur.com/UDw5Lzt.png)
^^^^ from Function Context

After some tests, I find out that when get `LoggerContext` by `LoggerContext.getContext()`, the context's logAppender can be take effect to `Function Context`, and the `Function Context`'s logger works great.

### Modifications

Add `LogAppender` to the single context from `LoggerContext.getContext()`.
  • Loading branch information
freeznet authored Jan 29, 2021
1 parent 8639f92 commit 81f1bed
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ synchronized private void setup() throws Exception {
}

ContextImpl setupContext() {
Logger instanceLog = LoggerFactory.getLogger(
Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager);
Expand Down Expand Up @@ -478,6 +478,13 @@ synchronized public void close() {
log.info("Unloading JAR files for function {}", instanceConfig);
instanceCache = null;
}

if (logAppender != null) {
removeLogTopicAppender(LoggerContext.getContext());
removeLogTopicAppender(LoggerContext.getContext(false));
logAppender.stop();
logAppender = null;
}
}

public String getStatsAsString() throws IOException {
Expand Down Expand Up @@ -600,28 +607,37 @@ private void setupLogHandler() {
logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(),
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
logAppender.start();
setupLogTopicAppender(LoggerContext.getContext());
}
}

private void addLogTopicHandler() {
if (logAppender == null) return;
LoggerContext context = LoggerContext.getContext(false);
setupLogTopicAppender(LoggerContext.getContext(false));
}

private void setupLogTopicAppender(LoggerContext context) {
Configuration config = context.getConfiguration();
config.addAppender(logAppender);
for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
loggerConfig.addAppender(logAppender, null, null);
}
config.getRootLogger().addAppender(logAppender, null, null);
context.updateLoggers();
}

private void removeLogTopicHandler() {
if (logAppender == null) return;
LoggerContext context = LoggerContext.getContext(false);
removeLogTopicAppender(LoggerContext.getContext(false));
}

private void removeLogTopicAppender(LoggerContext context) {
Configuration config = context.getConfiguration();
for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
loggerConfig.removeAppender(logAppender.getName());
}
config.getRootLogger().removeAppender(logAppender.getName());
context.updateLoggers();
}

private void setupInput(ContextImpl contextImpl) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.gson.Gson;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -1970,6 +1972,21 @@ private static void checkSubscriptionsCleanup(String topic) throws Exception {
}
}

private static void checkPublisherCleanup(String topic) throws Exception {
try {
ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"topics",
"stats",
topic);
TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class);
assertEquals(topicStats.publishers.size(), 0);

} catch (ContainerExecException e) {
fail("Command should have exited with non-zero");
}
}

private static void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception {
getFunctionStatus(functionName, numMessages, checkRestarts, 1);
}
Expand Down Expand Up @@ -2621,4 +2638,141 @@ private Schema getSchema(boolean jsonWithEnvelope) {
}
}

@Test(groups = {"java_function", "function"})
public void testJavaLoggingFunction() throws Exception {
testLoggingFunction(Runtime.JAVA);
}

private void testLoggingFunction(Runtime runtime) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
// python can only run on process mode
return;
}

if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.GO) {
// go can only run on process mode
return;
}

if (pulsarCluster == null) {
super.setupCluster();
super.setupFunctionWorkers();
}

Schema<?> schema;
if (Runtime.JAVA == runtime) {
schema = Schema.STRING;
} else {
schema = Schema.BYTES;
}

String inputTopicName = "persistent://public/default/test-log-" + runtime + "-input-" + randomName(8);
String logTopicName = "test-log-" + runtime + "-log-topic-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(logTopicName);
}

String functionName = "test-logging-fn-" + randomName(8);
final int numMessages = 10;

// submit the exclamation function
submitJavaLoggingFunction(
inputTopicName, logTopicName, functionName, schema);

// get function info
getFunctionInfoSuccess(functionName);

// get function stats
getFunctionStatsEmpty(functionName);

// publish and consume result
publishAndConsumeMessages(inputTopicName, logTopicName, numMessages, "-log");

// get function status
getFunctionStatus(functionName, numMessages, true);

// get function stats
getFunctionStats(functionName, numMessages);

// delete function
deleteFunction(functionName);

// get function info
getFunctionInfoNotFound(functionName);

// make sure subscriptions are cleanup
checkSubscriptionsCleanup(inputTopicName);
checkPublisherCleanup(logTopicName);

}

private static void submitJavaLoggingFunction(String inputTopicName,
String logTopicName,
String functionName,
Schema<?> schema) throws Exception {
CommandGenerator generator;
log.info("------- INPUT TOPIC: '{}'", inputTopicName);
if (inputTopicName.endsWith(".*")) {
log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, LOGGING_JAVA_CLASS);
} else {
log.info("----- CREATING REGULAR FUNCTION --- ");
generator = CommandGenerator.createDefaultGenerator(inputTopicName, LOGGING_JAVA_CLASS);
}
generator.setLogTopic(logTopicName);
generator.setFunctionName(functionName);
String command = generator.generateCreateFunctionCommand();

log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));

ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
}

private static void publishAndConsumeMessages(String inputTopic,
String outputTopic,
int numMessages,
String messagePostfix) throws Exception {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();

@Cleanup Consumer<byte[]> consumer = client.newConsumer()
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();

@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopic)
.create();

for (int i = 0; i < numMessages; i++) {
producer.send("message-" + i);
}

Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + messagePostfix);
}

for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
String logMsg = new String(msg.getValue(), UTF_8);
log.info("Received: {}", logMsg);
assertTrue(expectedMessages.contains(logMsg));
expectedMessages.remove(logMsg);
}

consumer.close();
producer.close();
client.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public void teardownFunctionWorkers() {
public static final String EXCLAMATION_GO_FILE = "exclamationFunc";
public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc";

public static final String LOGGING_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.LoggingFunction";

protected static String getExclamationClass(Runtime runtime,
boolean pyZip,
boolean extraDeps) {
Expand Down

0 comments on commit 81f1bed

Please sign in to comment.