diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 0e4d8cc11b5ce..c1446edce213a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.instance.PulsarSource; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; @@ -54,6 +55,7 @@ import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf; import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil; import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled; +import org.apache.pulsar.functions.shaded.proto.Function.ConnectorDetails; import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; @@ -837,6 +839,10 @@ private FunctionDetails convert(FunctionConfig functionConfig) if (functionConfig.getInputs() != null) { functionDetailsBuilder.setTenant(functionConfig.getTenant()); } + functionDetailsBuilder.setSource( + ConnectorDetails.newBuilder() + .setClassName(PulsarSource.class.getName()) + .build()); if (functionConfig.getNamespace() != null) { functionDetailsBuilder.setNamespace(functionConfig.getNamespace()); } diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java index 40f18207093dd..2a4133632eb61 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java @@ -27,12 +27,12 @@ public interface Source extends AutoCloseable { * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ - void open(final Map config) throws Exception; + void open(final Map config) throws Exception; /** - * Reads the next message from source, if one exists, and returns. This call should be non-blocking. - * If source does not have any new messages, return null immediately. - * @return next message from source or null, if no new messages are available. + * Reads the next message from source. + * If source does not have any new messages, this call should block. + * @return next message from source. The return result should never be null * @throws Exception */ Record read() throws Exception; diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index ba5183a9677dd..3e4cfa14b2dd5 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -53,6 +53,12 @@ ${project.version} + + org.apache.pulsar + pulsar-connect-core + ${project.version} + + org.apache.bookkeeper stream-storage-java-client diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java new file mode 100644 index 0000000000000..72336596eebfa --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.utils.Reflections; + +public class InstanceUtils { + public static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoader, + Class type) { + if (null == serdeClassName || serdeClassName.isEmpty()) { + return null; + } else if (serdeClassName.equals(DefaultSerDe.class.getName())) { + return initializeDefaultSerDe(type); + } else { + return Reflections.createInstance( + serdeClassName, + SerDe.class, + clsLoader); + } + } + + public static SerDe initializeDefaultSerDe(Class type) { + if (!DefaultSerDe.IsSupportedType(type)) { + throw new RuntimeException("Default Serializer does not support " + type); + } + return new DefaultSerDe(type); + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 2133c3565c231..67f69bfc07eaa 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -21,9 +21,9 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -48,11 +48,16 @@ public class JavaInstance implements AutoCloseable { public JavaInstance(InstanceConfig config, Object userClassObject, ClassLoader clsLoader, PulsarClient pulsarClient, - Consumer inputConsumer) { + Source source) { // TODO: cache logger instances by functions? Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionDetails().getName()); - this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, inputConsumer); + if (source instanceof PulsarSource) { + this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, + ((PulsarSource) source).getInputConsumer()); + } else { + this.context = null; + } // create the functions if (userClassObject instanceof Function) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 4349860d837da..ce5220f64030d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.AccessLevel; import lombok.Getter; @@ -49,8 +50,10 @@ import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.processors.MessageProcessor; @@ -88,7 +91,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Getter private Exception failureException; private JavaInstance javaInstance; - private volatile boolean running = true; + private AtomicBoolean running = new AtomicBoolean(true); @Getter(AccessLevel.PACKAGE) private Map inputSerDe; @@ -101,6 +104,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // function stats private final FunctionStats stats; + private Record currentRecord; + public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, @@ -159,7 +164,7 @@ JavaInstance setupJavaInstance() throws Exception { // start any log topic handler setupLogHandler(); - return new JavaInstance(instanceConfig, object, clsLoader, client, processor.getInputConsumer()); + return new JavaInstance(instanceConfig, object, clsLoader, client, processor.getSource()); } /** @@ -169,9 +174,11 @@ JavaInstance setupJavaInstance() throws Exception { public void run() { try { javaInstance = setupJavaInstance(); - while (running) { + while (running.get()) { + + currentRecord = processor.recieveMessage(); - InputMessage currentMessage = processor.recieveMessage(); + processor.postReceiveMessage(currentRecord); // state object is per function, because we need to have the ability to know what updates // are made in this function and ensure we only acknowledge after the state is persisted. @@ -184,20 +191,20 @@ public void run() { } // process the message - Object input; - try { - input = currentMessage.getInputSerDe().deserialize(currentMessage.getActualMessage().getData()); - } catch (Exception ex) { - stats.incrementDeserializationExceptions(currentMessage.getTopicName()); - throw ex; - } long processAt = System.currentTimeMillis(); stats.incrementProcessed(processAt); addLogTopicHandler(); - JavaExecutionResult result = javaInstance.handleMessage( - currentMessage.getActualMessage().getMessageId(), - currentMessage.getTopicName(), - input); + JavaExecutionResult result; + MessageId messageId = null; + String topicName = null; + + if (currentRecord instanceof PulsarRecord) { + PulsarRecord pulsarRecord = (PulsarRecord) currentRecord; + messageId = pulsarRecord.getMessageId(); + topicName = pulsarRecord.getTopicName(); + } + result = javaInstance.handleMessage(messageId, topicName, currentRecord.getValue()); + removeLogTopicHandler(); long doneProcessing = System.currentTimeMillis(); @@ -209,18 +216,25 @@ public void run() { try { completableFuture.join(); } catch (Exception e) { - log.error("Failed to flush the state updates of message {}", currentMessage, e); - throw e; + log.error("Failed to flush the state updates of message {}", currentRecord, e); + currentRecord.fail(); } } - processResult(currentMessage, result, processAt, doneProcessing); + try { + processResult(currentRecord, result, processAt, doneProcessing); + } catch (Exception e) { + log.warn("Failed to process result of message {}", currentRecord, e); + currentRecord.fail(); + } } - - javaInstance.close(); } catch (Exception ex) { - log.info("Uncaught exception in Java Instance", ex); - failureException = ex; - throw new RuntimeException(ex); + log.error("Uncaught exception in Java Instance", ex); + if (running.get()) { + failureException = ex; + throw new RuntimeException(ex); + } + } finally { + close(); } } @@ -286,15 +300,15 @@ private void setupStateTable() throws Exception { this.stateTable = result(storageClient.openTable(tableName)); } - private void processResult(InputMessage msg, + private void processResult(Record srcRecord, JavaExecutionResult result, long startTime, long endTime) throws Exception { if (result.getUserException() != null) { - log.info("Encountered user exception when processing message {}", msg, result.getUserException()); + log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException()); stats.incrementUserExceptions(result.getUserException()); - throw result.getUserException(); + this.currentRecord.fail(); } else if (result.getSystemException() != null) { - log.info("Encountered system exception when processing message {}", msg, result.getSystemException()); + log.info("Encountered system exception when processing message {}", srcRecord, result.getSystemException()); stats.incrementSystemExceptions(result.getSystemException()); throw result.getSystemException(); } else { @@ -308,35 +322,47 @@ private void processResult(InputMessage msg, throw ex; } if (output != null) { - sendOutputMessage(msg, output); + sendOutputMessage(srcRecord, output); } else { - processor.sendOutputMessage(msg, null); + processor.sendOutputMessage(srcRecord, null); } } else { // the function doesn't produce any result or the user doesn't want the result. - processor.sendOutputMessage(msg, null); + processor.sendOutputMessage(srcRecord, null); } } } - private void sendOutputMessage(InputMessage srcMsg, + private void sendOutputMessage(Record srcRecord, byte[] output) throws Exception { - MessageBuilder msgBuilder = MessageBuilder.create() - .setContent(output) - .setProperty("__pfn_input_topic__", srcMsg.getTopicName()) - .setProperty("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray()))); - processor.sendOutputMessage(srcMsg, msgBuilder); + MessageBuilder msgBuilder = MessageBuilder.create(); + if (srcRecord instanceof PulsarRecord) { + PulsarRecord pulsarMessage = (PulsarRecord) srcRecord; + msgBuilder + .setContent(output) + .setProperty("__pfn_input_topic__", pulsarMessage.getTopicName()) + .setProperty("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarMessage.getMessageId().toByteArray()))); + } + + processor.sendOutputMessage(srcRecord, msgBuilder); + } + + /** + * Stop java instance runnable + */ + public void stop() { + this.running.set(false); } @Override public void close() { - if (!running) { + if (!running.get()) { return; } - running = false; processor.close(); + javaInstance.close(); // kill the state table if (null != stateTable) { @@ -401,39 +427,12 @@ private static void addSystemMetrics(String metricName, double value, InstanceCo bldr.putMetrics(metricName, digest); } - private static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoader, - Class[] typeArgs, boolean inputArgs) { - if (null == serdeClassName || serdeClassName.isEmpty()) { - return null; - } else if (serdeClassName.equals(DefaultSerDe.class.getName())) { - return initializeDefaultSerDe(typeArgs, inputArgs); - } else { - return Reflections.createInstance( - serdeClassName, - SerDe.class, - clsLoader); - } - } - - private static SerDe initializeDefaultSerDe(Class[] typeArgs, boolean inputArgs) { - if (inputArgs) { - if (!DefaultSerDe.IsSupportedType(typeArgs[0])) { - throw new RuntimeException("Default Serializer does not support " + typeArgs[0]); - } - return new DefaultSerDe(typeArgs[0]); - } else { - if (!DefaultSerDe.IsSupportedType(typeArgs[1])) { - throw new RuntimeException("Default Serializer does not support " + typeArgs[1]); - } - return new DefaultSerDe(typeArgs[1]); - } - } - private void setupSerDe(Class[] typeArgs, ClassLoader clsLoader) { + this.inputSerDe = new HashMap<>(); - instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) -> this.inputSerDe.put(k, initializeSerDe(v, clsLoader, typeArgs, true))); + instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) -> this.inputSerDe.put(k, InstanceUtils.initializeSerDe(v, clsLoader, typeArgs[0]))); for (String topicName : instanceConfig.getFunctionDetails().getInputsList()) { - this.inputSerDe.put(topicName, initializeDefaultSerDe(typeArgs, true)); + this.inputSerDe.put(topicName, InstanceUtils.initializeDefaultSerDe(typeArgs[0])); } if (Void.class.equals(typeArgs[0])) { @@ -458,9 +457,9 @@ private void setupSerDe(Class[] typeArgs, ClassLoader clsLoader) { if (instanceConfig.getFunctionDetails().getOutputSerdeClassName() == null || instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty() || instanceConfig.getFunctionDetails().getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) { - outputSerDe = initializeDefaultSerDe(typeArgs, false); + outputSerDe = InstanceUtils.initializeDefaultSerDe(typeArgs[1]); } else { - this.outputSerDe = initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(), clsLoader, typeArgs, false); + this.outputSerDe = InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(), clsLoader, typeArgs[1]); } Class[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass()); if (outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java similarity index 58% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java index bd7e7888369f3..c812a77aae7f2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java @@ -18,39 +18,25 @@ */ package org.apache.pulsar.functions.instance; +import lombok.Builder; +import lombok.Data; import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.proto.Function; + +import java.util.Map; @Getter @Setter +@Data +@Builder @ToString -public class InputMessage { - - private Message actualMessage; - String topicName; - SerDe inputSerDe; - Consumer consumer; - - public int getTopicPartition() { - MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId(); - return msgId.getPartitionIndex(); - } - - public void ack() { - if (null != consumer) { - consumer.acknowledgeAsync(actualMessage); - } - } - - public void ackCumulative() { - if (null != consumer) { - consumer.acknowledgeCumulativeAsync(actualMessage); - } - } - +public class PulsarConfig { + private Function.FunctionDetails.ProcessingGuarantees processingGuarantees; + private SubscriptionType subscriptionType; + private String subscription; + private Map topicToSerdeMap; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java new file mode 100644 index 0000000000000..2bdbdb1f9735c --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.connect.core.Record; + +@Data +@Builder +@Getter +@ToString +@EqualsAndHashCode +public class PulsarRecord implements Record { + + private String partitionId; + private Long sequenceId; + private T value; + private MessageId messageId; + private String topicName; + private Runnable failFunction; + private Runnable ackFunction; + + @Override + public void ack() { + this.ackFunction.run(); + } + + @Override + public void fail() { + this.failFunction.run(); + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java new file mode 100644 index 0000000000000..43d935092226f --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.connect.core.Record; +import org.apache.pulsar.connect.core.Source; +import org.apache.pulsar.functions.proto.Function; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class PulsarSource implements Source { + + private PulsarClient pulsarClient; + private PulsarConfig pulsarConfig; + + @Getter + private org.apache.pulsar.client.api.Consumer inputConsumer; + + public PulsarSource(PulsarClient pulsarClient, PulsarConfig pulsarConfig) { + this.pulsarClient = pulsarClient; + this.pulsarConfig = pulsarConfig; + } + + @Override + public void open(Map config) throws Exception { + this.inputConsumer = this.pulsarClient.newConsumer() + .topics(new ArrayList<>(this.pulsarConfig.getTopicToSerdeMap().keySet())) + .subscriptionName(this.pulsarConfig.getSubscription()) + .subscriptionType(this.pulsarConfig.getSubscriptionType()) + .ackTimeout(1, TimeUnit.MINUTES) + .subscribe(); + } + + @Override + public Record read() throws Exception { + org.apache.pulsar.client.api.Message message = this.inputConsumer.receive(); + + String topicName; + String partitionId; + + // If more than one topics are being read than the Message return by the consumer will be TopicMessageImpl + // If there is only topic being read then the Message returned by the consumer wil be MessageImpl + if (message instanceof TopicMessageImpl) { + topicName = ((TopicMessageImpl) message).getTopicName(); + TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId(); + MessageIdImpl messageId = (MessageIdImpl) topicMessageId.getInnerMessageId(); + partitionId = Long.toString(messageId.getPartitionIndex()); + } else { + topicName = this.pulsarConfig.getTopicToSerdeMap().keySet().iterator().next(); + partitionId = Long.toString(((MessageIdImpl) message.getMessageId()).getPartitionIndex()); + } + + Object object; + try { + object = this.pulsarConfig.getTopicToSerdeMap().get(topicName).deserialize(message.getData()); + } catch (Exception e) { + //TODO Add deserialization exception stats + throw new RuntimeException("Error occured when attempting to deserialize input:", e); + } + + T input; + try { + input = (T) object; + } catch (ClassCastException e) { + throw new RuntimeException("Error in casting input to expected type:", e); + } + + PulsarRecord pulsarMessage = (PulsarRecord) PulsarRecord.builder() + .value(input) + .messageId(message.getMessageId()) + .partitionId(partitionId) + .sequenceId(message.getSequenceId()) + .topicName(topicName) + .ackFunction(() -> { + if (pulsarConfig.getProcessingGuarantees() + == Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) { + inputConsumer.acknowledgeCumulativeAsync(message); + } else { + inputConsumer.acknowledgeAsync(message); + } + }).failFunction(() -> { + if (pulsarConfig.getProcessingGuarantees() + == Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) { + throw new RuntimeException("Failed to process message: " + message.getMessageId()); + } + }) + .build(); + return pulsarMessage; + } + + @Override + public void close() throws Exception { + this.inputConsumer.close(); + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java index 86aeb83efa1b2..465d1989d5e6c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.functions.instance.processors; -import java.util.concurrent.LinkedBlockingDeque; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; @@ -27,7 +26,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -52,15 +51,17 @@ protected void initializeOutputProducer(String outputTopic) throws Exception { } @Override - public void sendOutputMessage(InputMessage inputMsg, MessageBuilder outputMsgBuilder) { + public void sendOutputMessage(Record srcRecord, MessageBuilder outputMsgBuilder) { if (null == outputMsgBuilder || null == producer) { - inputMsg.ack(); + srcRecord.ack(); return; } Message outputMsg = outputMsgBuilder.build(); producer.sendAsync(outputMsg) - .thenAccept(msgId -> inputMsg.ack()); + .thenAccept(msgId -> { + srcRecord.ack(); + }); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java index 994be0d03aaae..08cf11170b507 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.functions.instance.processors; -import java.util.concurrent.LinkedBlockingDeque; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; @@ -26,7 +25,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -45,10 +44,10 @@ class AtMostOnceProcessor extends MessageProcessorBase { } @Override - protected void postReceiveMessage(InputMessage message) { - super.postReceiveMessage(message); + public void postReceiveMessage(Record record) { + super.postReceiveMessage(record); if (functionDetails.getAutoAck()) { - message.ack(); + record.ack(); } } @@ -58,7 +57,7 @@ protected void initializeOutputProducer(String outputTopic) throws Exception { } @Override - public void sendOutputMessage(InputMessage inputMsg, MessageBuilder outputMsgBuilder) { + public void sendOutputMessage(Record srcRecord, MessageBuilder outputMsgBuilder) { if (null == outputMsgBuilder) { return; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java index 59c7bd6f3fa78..aafca69534c74 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java @@ -18,31 +18,22 @@ */ package org.apache.pulsar.functions.instance.processors; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerEventListener; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.connect.core.Record; +import org.apache.pulsar.functions.instance.PulsarRecord; import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; import org.apache.pulsar.functions.instance.producers.Producers; import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.utils.FunctionDetailsUtils; -import org.apache.pulsar.functions.utils.Utils; /** * A message processor that process messages effectively-once. @@ -50,8 +41,6 @@ @Slf4j class EffectivelyOnceProcessor extends MessageProcessorBase implements ConsumerEventListener { - private LinkedList inputTopicsToResubscribe = null; - @Getter(AccessLevel.PACKAGE) protected Producers outputProducer; @@ -103,24 +92,28 @@ protected void initializeOutputProducer(String outputTopic) throws Exception { // @Override - public void sendOutputMessage(InputMessage inputMsg, - MessageBuilder outputMsgBuilder) throws Exception { + public void sendOutputMessage(Record srcRecord, + MessageBuilder outputMsgBuilder) throws Exception { if (null == outputMsgBuilder) { - inputMsg.ackCumulative(); + srcRecord.ack(); return; } // assign sequence id to output message for idempotent producing outputMsgBuilder = outputMsgBuilder - .setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId())); - - - Producer producer = outputProducer.getProducer(inputMsg.getTopicName(), inputMsg.getTopicPartition()); - - Message outputMsg = outputMsgBuilder.build(); - producer.sendAsync(outputMsg) - .thenAccept(messageId -> inputMsg.ackCumulative()) - .join(); + .setSequenceId(srcRecord.getRecordSequence()); + + // currently on PulsarRecord + if (srcRecord instanceof PulsarRecord) { + PulsarRecord pulsarMessage = (PulsarRecord) srcRecord; + Producer producer = outputProducer.getProducer(pulsarMessage.getTopicName(), + Integer.parseInt(srcRecord.getPartitionId())); + + org.apache.pulsar.client.api.Message outputMsg = outputMsgBuilder.build(); + producer.sendAsync(outputMsg) + .thenAccept(messageId -> srcRecord.ack()) + .join(); + } } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java index 97c971d0a1637..fd22adb261772 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java @@ -19,16 +19,15 @@ package org.apache.pulsar.functions.instance.processors; import java.util.Map; -import java.util.concurrent.LinkedBlockingDeque; + import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.connect.core.Record; +import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.instance.InputMessage; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees; @@ -68,6 +67,8 @@ static MessageProcessor create(PulsarClient client, } } + void postReceiveMessage(Record record); + /** * Setup the input with a provided processQueue. The implementation of this processor is responsible for * setting up the input and passing the received messages from input to the provided processQueue. @@ -78,11 +79,11 @@ void setupInput(Map inputSerDe) throws Exception; /** - * Return the input. + * Return the source. * - * @return the input consumer. + * @return the source. */ - Consumer getInputConsumer(); + Source getSource(); /** * Setup the output with a provided outputSerDe. The implementation of this processor is responsible for @@ -99,18 +100,18 @@ void setupInput(Map inputSerDe) *

If the outputMsgBuilder is null, the implementation doesn't have to send any messages to the output. * The implementation can decide to acknowledge the input message based on its process guarantees. * - * @param inputMsg input message + * @param srcRecord record from source * @param outputMsgBuilder output message builder. it can be null. */ - void sendOutputMessage(InputMessage inputMsg, - MessageBuilder outputMsgBuilder) throws PulsarClientException, Exception; + void sendOutputMessage(Record srcRecord, + MessageBuilder outputMsgBuilder) throws PulsarClientException, Exception; /** * Get the next message to process * @return the next input message * @throws Exception */ - InputMessage recieveMessage() throws Exception; + Record recieveMessage() throws Exception; @Override void close(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java index fc37b134ef806..06bc17573013f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java @@ -25,18 +25,17 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.Message; +import net.jodah.typetools.TypeResolver; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; -import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.connect.core.Record; +import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.functions.instance.PulsarConfig; +import org.apache.pulsar.functions.instance.PulsarSource; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; +import org.apache.pulsar.functions.utils.Reflections; /** * The base implementation of {@link MessageProcessor}. @@ -48,12 +47,8 @@ abstract class MessageProcessorBase implements MessageProcessor { protected final FunctionDetails functionDetails; protected final SubscriptionType subType; - protected Map inputSerDe; - - protected SerDe outputSerDe; - @Getter - protected Consumer inputConsumer; + protected Source source; protected List topics; @@ -64,6 +59,8 @@ protected MessageProcessorBase(PulsarClient client, this.functionDetails = functionDetails; this.subType = subType; this.topics = new LinkedList<>(); + this.topics.addAll(this.functionDetails.getInputsList()); + this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet()); } // @@ -72,36 +69,52 @@ protected MessageProcessorBase(PulsarClient client, @Override public void setupInput(Map inputSerDe) throws Exception { - log.info("Setting up input with input serdes: {}", inputSerDe); - this.inputSerDe = inputSerDe; - this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet()); - this.topics.addAll(this.functionDetails.getInputsList()); - this.inputConsumer = this.client.newConsumer() - .topics(this.topics) - .subscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails)) - .subscriptionType(getSubscriptionType()) - .subscribe(); + org.apache.pulsar.functions.proto.Function.ConnectorDetails connectorDetails = this.functionDetails.getSource(); + Object object; + if (connectorDetails.getClassName().equals(PulsarSource.class.getName())) { + PulsarConfig pulsarConfig = PulsarConfig.builder() + .topicToSerdeMap(inputSerDe) + .subscription(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails)) + .processingGuarantees(this.functionDetails.getProcessingGuarantees()) + .subscriptionType(this.subType) + .build(); + Object[] params = {this.client, pulsarConfig}; + Class[] paramTypes = {PulsarClient.class, PulsarConfig.class}; + object = Reflections.createInstance( + connectorDetails.getClassName(), + PulsarSource.class.getClassLoader(), params, paramTypes); + + } else { + object = Reflections.createInstance( + connectorDetails.getClassName(), + Thread.currentThread().getContextClassLoader()); + } + + Class[] typeArgs; + if (object instanceof Source) { + typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass()); + assert typeArgs.length > 0; + } else { + throw new RuntimeException("Source does not implement correct interface"); + } + this.source = (Source) object; + + try { + this.source.open(connectorDetails.getConfigsMap()); + } catch (Exception e) { + log.info("Error occurred executing open for source: {}", + this.functionDetails.getSource().getClassName(), e); + } + } protected SubscriptionType getSubscriptionType() { return subType; } - public InputMessage recieveMessage() throws PulsarClientException { - Message message = this.inputConsumer.receive(); - String topicName; - if (message instanceof TopicMessageImpl) { - topicName = ((TopicMessageImpl)message).getTopicName(); - } else { - topicName = this.topics.get(0); - } - InputMessage inputMessage = new InputMessage(); - inputMessage.setConsumer(inputConsumer); - inputMessage.setInputSerDe(inputSerDe.get(topicName)); - inputMessage.setActualMessage(message); - inputMessage.setTopicName(topicName); - return inputMessage; + public Record recieveMessage() throws Exception { + return this.source.read(); } /** @@ -110,9 +123,10 @@ public InputMessage recieveMessage() throws PulsarClientException { *

The processor implementation can make a decision to process the message based on its processing guarantees. * for example, an at-most-once processor can ack the message immediately. * - * @param message input message. + * @param record input message. */ - protected void postReceiveMessage(InputMessage message) {} + @Override + public void postReceiveMessage(Record record) {} // // Output @@ -120,8 +134,6 @@ protected void postReceiveMessage(InputMessage message) {} @Override public void setupOutput(SerDe outputSerDe) throws Exception { - this.outputSerDe = outputSerDe; - String outputTopic = functionDetails.getOutput(); if (outputTopic != null && !functionDetails.getOutput().isEmpty() @@ -141,10 +153,9 @@ public void setupOutput(SerDe outputSerDe) throws Exception { public void close() { try { - this.inputConsumer.close(); - } catch (PulsarClientException e) { - log.warn("Failed to close consumer to input topics {}", - ((MultiTopicsConsumerImpl) this.inputConsumer).getTopics(), e); + this.source.close(); + } catch (Exception e) { + log.warn("Failed to close source {}", this.source, e); } } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index b9e762a583db6..c0aae7c024e91 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.instance; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -50,7 +51,7 @@ public void testLambda() { JavaInstance instance = new JavaInstance( config, (Function) (input, context) -> input + "-lambda", - null, null, null); + null, null, mock(PulsarSource.class)); String testString = "ABC123"; JavaExecutionResult result = instance.handleMessage(MessageId.earliest, "random", testString); assertNotNull(result.getResult()); diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index c9b3da2831790..1aeb21adbc73a 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -53,6 +53,12 @@ message FunctionDetails { bool autoAck = 13; repeated string inputs = 14; int32 parallelism = 15; + ConnectorDetails source = 16; +} + +message ConnectorDetails { + string className = 1; + map configs = 4; } message PackageLocationMetaData { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 88f1ac76ee76f..4b14dad5c1fd6 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -30,6 +30,7 @@ import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.proto.Function.ConnectorDetails; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceControlGrpc; @@ -106,6 +107,13 @@ public class JavaInstanceMain { @Parameter(names = "--subscription_type", description = "What subscription type to use") protected FunctionDetails.SubscriptionType subscriptionType; + @Parameter(names = "--source_classname", description = "The source classname") + protected String sourceClassname; + + @Parameter(names = "--source_configs", description = "The source classname") + protected String sourceConfigs; + + private Server server; public JavaInstanceMain() { } @@ -159,6 +167,16 @@ public void start() throws Exception { Map userConfigMap = new Gson().fromJson(userConfig, type); functionDetailsBuilder.putAllUserConfig(userConfigMap); } + + ConnectorDetails.Builder sourceDetailsBuilder = ConnectorDetails.newBuilder(); + sourceDetailsBuilder.setClassName(sourceClassname); + if (sourceConfigs != null && !sourceConfigs.isEmpty()) { + Type type = new TypeToken>(){}.getType(); + Map sourceConfigMap = new Gson().fromJson(sourceConfigs, type); + sourceDetailsBuilder.putAllConfigs(sourceConfigMap); + } + functionDetailsBuilder.setSource(sourceDetailsBuilder); + FunctionDetails functionDetails = functionDetailsBuilder.build(); instanceConfig.setFunctionDetails(functionDetails); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index def06cfea2114..8da2b9d1de4ec 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -175,6 +175,13 @@ private List composeArgs(InstanceConfig instanceConfig, instancePort = findAvailablePort(); args.add("--port"); args.add(String.valueOf(instancePort)); + args.add("--source_classname"); + args.add(instanceConfig.getFunctionDetails().getSource().getClassName()); + Map sourceConfigs = instanceConfig.getFunctionDetails().getSource().getConfigsMap(); + if (sourceConfigs != null && !sourceConfigs.isEmpty()) { + args.add("--source_config"); + args.add(new Gson().toJson(sourceConfigs)); + } return args; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 830e18a9f19cf..136951eaa0001 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -79,7 +79,15 @@ public void start() { @Override public void uncaughtException(Thread t, Throwable e) { startupException = new Exception(e); + log.error("Error occured in java instance:", e); + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + //ignore + } + // restart start(); + } }); this.fnThread.start(); @@ -95,9 +103,10 @@ public void join() throws Exception { @Override public void stop() { if (fnThread != null) { + // Stop instance thread + javaInstanceRunnable.stop(); // interrupt the instance thread fnThread.interrupt(); - javaInstanceRunnable.close(); try { fnThread.join(); } catch (InterruptedException e) { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 7042c266db2c3..67ac82f2b0349 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.runtime.ProcessRuntime; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; @@ -80,6 +81,8 @@ FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) { functionDetailsBuilder.setOutput(TEST_NAME + "-output"); functionDetailsBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer"); functionDetailsBuilder.setLogTopic(TEST_NAME + "-log"); + functionDetailsBuilder.setSource(Function.ConnectorDetails.newBuilder() + .setClassName("org.pulsar.pulsar.TestSource")); return functionDetailsBuilder.build(); } @@ -101,8 +104,7 @@ public void testJavaConstructor() { ProcessRuntime container = factory.createContainer(config, userJarFile); List args = container.getProcessArgs(); - assertEquals(args.size(), 43); - args.remove(args.size() - 1); + assertEquals(args.size(), 45); String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml " + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " -Dpulsar.log.file=" + config.getFunctionDetails().getName() + " org.apache.pulsar.functions.runtime.JavaInstanceMain" @@ -120,7 +122,8 @@ public void testJavaConstructor() { + " --output_serde_classname " + config.getFunctionDetails().getOutputSerdeClassName() + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl - + " --max_buffered_tuples 1024 --port"; + + " --max_buffered_tuples 1024 --port " + args.get(42) + + " --source_classname " + config.getFunctionDetails().getSource().getClassName(); assertEquals(expectedArgs, String.join(" ", args)); } @@ -130,8 +133,7 @@ public void testPythonConstructor() { ProcessRuntime container = factory.createContainer(config, userJarFile); List args = container.getProcessArgs(); - assertEquals(args.size(), 42); - args.remove(args.size() - 1); + assertEquals(args.size(), 44); String expectedArgs = "python " + pythonInstanceFile + " --py " + userJarFile + " --logging_directory " + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id " @@ -148,7 +150,8 @@ public void testPythonConstructor() { + " --output_serde_classname " + config.getFunctionDetails().getOutputSerdeClassName() + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl - + " --max_buffered_tuples 1024 --port"; + + " --max_buffered_tuples 1024 --port " + args.get(41) + + " --source_classname " + config.getFunctionDetails().getSource().getClassName(); assertEquals(expectedArgs, String.join(" ", args)); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java index 77060302f03be..5ecdd4f3471dd 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java @@ -43,17 +43,6 @@ public static String extractFunctionNameFromFQN(String fullyQualifiedName) { return fullyQualifiedName.split("/")[2]; } - public static boolean areAllRequiredFieldsPresent(FunctionDetails FunctionDetails) { - if (FunctionDetails.getTenant() == null || FunctionDetails.getNamespace() == null - || FunctionDetails.getName() == null || FunctionDetails.getClassName() == null - || (FunctionDetails.getInputsCount() <= 0 && FunctionDetails.getCustomSerdeInputsCount() <= 0) - || FunctionDetails.getParallelism() <= 0) { - return false; - } else { - return true; - } - } - public static String getDownloadFileName(FunctionDetails FunctionDetails) { String[] hierarchy = FunctionDetails.getClassName().split("\\."); String fileName; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java index 890195cafb2ba..64418bbb78c3c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java @@ -116,6 +116,40 @@ public static Object createInstance(String userClassName, } + public static Object createInstance(String userClassName, + ClassLoader classLoader, Object[] params, Class[] paramTypes) { + if (params.length != paramTypes.length) { + throw new RuntimeException( + "Unequal number of params and paramTypes. Each param must have a correspoinding param type"); + } + Class theCls; + try { + theCls = Class.forName(userClassName, true, classLoader); + } catch (ClassNotFoundException cnfe) { + throw new RuntimeException("User class must be in class path", cnfe); + } + Object result; + try { + Constructor meth = constructorCache.get(theCls); + if (null == meth) { + meth = theCls.getDeclaredConstructor(paramTypes); + meth.setAccessible(true); + constructorCache.put(theCls, meth); + } + result = meth.newInstance(params); + } catch (InstantiationException ie) { + throw new RuntimeException("User class must be concrete", ie); + } catch (NoSuchMethodException e) { + throw new RuntimeException("User class doesn't have such method", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("User class must have a no-arg constructor", e); + } catch (InvocationTargetException e) { + throw new RuntimeException("User class constructor throws exception", e); + } + return result; + + } + public static Object createInstance(String userClassName, java.io.File jar) { try { return createInstance(userClassName, loadJar(jar));