From 81f1bed626fc750c68ab6740d8a80a6b2821b542 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 29 Jan 2021 14:09:59 +0800 Subject: [PATCH] [fix #7814] fix java function logging appender not added to java function logger (#9299) Fixes #7814 ### Motivation `JavaInstanceRunnable` create an instance of logger named `"function-" + instanceConfig.getFunctionDetails().getName()` and pass it to `Function Context`, and the logger can be used to send user defined content to function's log-topic if `--log-topic` defined. as issue #7814 mentioned, the logger is not working as expected since user cannot consume any self defined log content from `log-topic`. this happens in process runtime with created functions, but not noticed with other situation such as `localrun` function. Through debug to the created function, the logger in `Function Context` is different from the logger in `JavaInstanceRunnable`, such as the `contextName` as images shown below. In addition, the `LogAppender` set in `JavaInstanceRunnable` is not shown in `Function Context`'s logger as well. ![Imgur](https://i.imgur.com/39DMH6R.png) ^^^^ from JavaInstanceRunnable ![img](https://i.imgur.com/UDw5Lzt.png) ^^^^ from Function Context After some tests, I find out that when get `LoggerContext` by `LoggerContext.getContext()`, the context's logAppender can be take effect to `Function Context`, and the `Function Context`'s logger works great. ### Modifications Add `LogAppender` to the single context from `LoggerContext.getContext()`. --- .../instance/JavaInstanceRunnable.java | 22 ++- .../functions/PulsarFunctionsTest.java | 154 ++++++++++++++++++ .../functions/PulsarFunctionsTestBase.java | 3 + 3 files changed, 176 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index fe476f76e73c4..b5f55c0d4e194 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -231,7 +231,7 @@ synchronized private void setup() throws Exception { } ContextImpl setupContext() { - Logger instanceLog = LoggerFactory.getLogger( + Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager); @@ -478,6 +478,13 @@ synchronized public void close() { log.info("Unloading JAR files for function {}", instanceConfig); instanceCache = null; } + + if (logAppender != null) { + removeLogTopicAppender(LoggerContext.getContext()); + removeLogTopicAppender(LoggerContext.getContext(false)); + logAppender.stop(); + logAppender = null; + } } public String getStatsAsString() throws IOException { @@ -600,28 +607,37 @@ private void setupLogHandler() { logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(), FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())); logAppender.start(); + setupLogTopicAppender(LoggerContext.getContext()); } } private void addLogTopicHandler() { if (logAppender == null) return; - LoggerContext context = LoggerContext.getContext(false); + setupLogTopicAppender(LoggerContext.getContext(false)); + } + + private void setupLogTopicAppender(LoggerContext context) { Configuration config = context.getConfiguration(); config.addAppender(logAppender); for (final LoggerConfig loggerConfig : config.getLoggers().values()) { loggerConfig.addAppender(logAppender, null, null); } config.getRootLogger().addAppender(logAppender, null, null); + context.updateLoggers(); } private void removeLogTopicHandler() { if (logAppender == null) return; - LoggerContext context = LoggerContext.getContext(false); + removeLogTopicAppender(LoggerContext.getContext(false)); + } + + private void removeLogTopicAppender(LoggerContext context) { Configuration config = context.getConfiguration(); for (final LoggerConfig loggerConfig : config.getLoggers().values()) { loggerConfig.removeAppender(logAppender.getName()); } config.getRootLogger().removeAppender(logAppender.getName()); + context.updateLoggers(); } private void setupInput(ContextImpl contextImpl) throws Exception { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 6ca84b5521c90..3bbfbea09cb34 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -24,6 +24,8 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.gson.Gson; + +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; import java.util.HashSet; @@ -1970,6 +1972,21 @@ private static void checkSubscriptionsCleanup(String topic) throws Exception { } } + private static void checkPublisherCleanup(String topic) throws Exception { + try { + ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "stats", + topic); + TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class); + assertEquals(topicStats.publishers.size(), 0); + + } catch (ContainerExecException e) { + fail("Command should have exited with non-zero"); + } + } + private static void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception { getFunctionStatus(functionName, numMessages, checkRestarts, 1); } @@ -2621,4 +2638,141 @@ private Schema getSchema(boolean jsonWithEnvelope) { } } + @Test(groups = {"java_function", "function"}) + public void testJavaLoggingFunction() throws Exception { + testLoggingFunction(Runtime.JAVA); + } + + private void testLoggingFunction(Runtime runtime) throws Exception { + if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) { + // python can only run on process mode + return; + } + + if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.GO) { + // go can only run on process mode + return; + } + + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + + Schema schema; + if (Runtime.JAVA == runtime) { + schema = Schema.STRING; + } else { + schema = Schema.BYTES; + } + + String inputTopicName = "persistent://public/default/test-log-" + runtime + "-input-" + randomName(8); + String logTopicName = "test-log-" + runtime + "-log-topic-" + randomName(8); + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { + admin.topics().createNonPartitionedTopic(inputTopicName); + admin.topics().createNonPartitionedTopic(logTopicName); + } + + String functionName = "test-logging-fn-" + randomName(8); + final int numMessages = 10; + + // submit the exclamation function + submitJavaLoggingFunction( + inputTopicName, logTopicName, functionName, schema); + + // get function info + getFunctionInfoSuccess(functionName); + + // get function stats + getFunctionStatsEmpty(functionName); + + // publish and consume result + publishAndConsumeMessages(inputTopicName, logTopicName, numMessages, "-log"); + + // get function status + getFunctionStatus(functionName, numMessages, true); + + // get function stats + getFunctionStats(functionName, numMessages); + + // delete function + deleteFunction(functionName); + + // get function info + getFunctionInfoNotFound(functionName); + + // make sure subscriptions are cleanup + checkSubscriptionsCleanup(inputTopicName); + checkPublisherCleanup(logTopicName); + + } + + private static void submitJavaLoggingFunction(String inputTopicName, + String logTopicName, + String functionName, + Schema schema) throws Exception { + CommandGenerator generator; + log.info("------- INPUT TOPIC: '{}'", inputTopicName); + if (inputTopicName.endsWith(".*")) { + log.info("----- CREATING TOPIC PATTERN FUNCTION --- "); + generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, LOGGING_JAVA_CLASS); + } else { + log.info("----- CREATING REGULAR FUNCTION --- "); + generator = CommandGenerator.createDefaultGenerator(inputTopicName, LOGGING_JAVA_CLASS); + } + generator.setLogTopic(logTopicName); + generator.setFunctionName(functionName); + String command = generator.generateCreateFunctionCommand(); + + log.info("---------- Function command: {}", command); + String[] commands = { + "sh", "-c", command + }; + ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( + commands); + assertTrue(result.getStdout().contains("\"Created successfully\"")); + + ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema); + } + + private static void publishAndConsumeMessages(String inputTopic, + String outputTopic, + int numMessages, + String messagePostfix) throws Exception { + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + + @Cleanup Consumer consumer = client.newConsumer() + .topic(outputTopic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + + @Cleanup Producer producer = client.newProducer(Schema.STRING) + .topic(inputTopic) + .create(); + + for (int i = 0; i < numMessages; i++) { + producer.send("message-" + i); + } + + Set expectedMessages = new HashSet<>(); + for (int i = 0; i < numMessages; i++) { + expectedMessages.add("message-" + i + messagePostfix); + } + + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(30, TimeUnit.SECONDS); + String logMsg = new String(msg.getValue(), UTF_8); + log.info("Received: {}", logMsg); + assertTrue(expectedMessages.contains(logMsg)); + expectedMessages.remove(logMsg); + } + + consumer.close(); + producer.close(); + client.close(); + } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index 94bffb3316c64..253834121ae47 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -106,6 +106,9 @@ public void teardownFunctionWorkers() { public static final String EXCLAMATION_GO_FILE = "exclamationFunc"; public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc"; + public static final String LOGGING_JAVA_CLASS = + "org.apache.pulsar.functions.api.examples.LoggingFunction"; + protected static String getExclamationClass(Runtime runtime, boolean pyZip, boolean extraDeps) {