Skip to content

Commit

Permalink
Renamed PulsarFunction to plain Function (apache#1377)
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and sijie committed Mar 13, 2018
1 parent e7e340c commit 381ccc0
Show file tree
Hide file tree
Showing 18 changed files with 70 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
Expand Down Expand Up @@ -85,7 +85,7 @@ public IObjectFactory getObjectFactory() {
private Functions functions;
private CmdFunctions cmd;

public class DummyFunction implements PulsarFunction<String, String> {
public class DummyFunction implements Function<String, String> {
@Override
public String process(String input, Context context) throws Exception {
return null;
Expand All @@ -111,7 +111,7 @@ public void setup() throws Exception {
mockStatic(Reflections.class);
when(Reflections.classExistsInJar(any(File.class), anyString())).thenReturn(true);
when(Reflections.classExists(anyString())).thenReturn(true);
when(Reflections.classInJarImplementsIface(any(File.class), anyString(), eq(PulsarFunction.class)))
when(Reflections.classInJarImplementsIface(any(File.class), anyString(), eq(Function.class)))
.thenReturn(true);
when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true);
when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
Expand All @@ -62,7 +62,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import org.apache.pulsar.functions.utils.Utils;

@Slf4j
Expand Down Expand Up @@ -250,13 +249,13 @@ void processArguments() throws Exception {

private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
File file = new File(jarFile);
// check if the function class exists in Jar and it implements PulsarFunction class
// check if the function class exists in Jar and it implements Function class
if (!Reflections.classExistsInJar(file, functionConfigBuilder.getClassName())) {
throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
functionConfigBuilder.getClassName(), jarFile));
} else if (!Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), PulsarFunction.class)
&& !Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), Function.class)) {
throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s implements neither PulsarFunction nor java.util.Function",
} else if (!Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), Function.class)
&& !Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), java.util.function.Function.class)) {
throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s implements neither Function nor java.util.function.Function",
functionConfigBuilder.getClassName(), jarFile));
}

Expand All @@ -269,20 +268,20 @@ private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {

Object userClass = Reflections.createInstance(functionConfigBuilder.getClassName(), file);
Class<?>[] typeArgs;
if (userClass instanceof PulsarFunction) {
PulsarFunction pulsarFunction = (PulsarFunction) userClass;
if (userClass instanceof Function) {
Function pulsarFunction = (Function) userClass;
if (pulsarFunction == null) {
throw new IllegalArgumentException(String.format("Pulsar function class %s could not be instantiated from jar %s",
functionConfigBuilder.getClassName(), jarFile));
}
typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
} else {
Function function = (Function) userClass;
java.util.function.Function function = (java.util.function.Function) userClass;
if (function == null) {
throw new IllegalArgumentException(String.format("Java Util function class %s could not be instantiated from jar %s",
functionConfigBuilder.getClassName(), jarFile));
}
typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
}

// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* meet your needs, you can use the byte stream handler defined in RawRequestHandler.
*/
@FunctionalInterface
public interface PulsarFunction<I, O> {
public interface Function<I, O> {
/**
* Process the input.
* @return the output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -82,7 +82,7 @@ public void testIntegerSerDe() {
assertEquals(result, input);
}

private class SimplePulsarFunction implements PulsarFunction<String, String> {
private class SimplePulsarFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return null;
Expand All @@ -92,7 +92,7 @@ public String process(String input, Context context) {
@Test
public void testPulsarFunction() {
SimplePulsarFunction pulsarFunction = new SimplePulsarFunction();
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
SerDe serDe = new DefaultSerDe(String.class);
Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
assertTrue(inputSerdeTypeArgs[0].isAssignableFrom(typeArgs[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;

import java.util.Map;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,8 +42,8 @@ public class JavaInstance implements AutoCloseable {

@Getter(AccessLevel.PACKAGE)
private final ContextImpl context;
private PulsarFunction pulsarFunction;
private Function javaUtilFunction;
private Function function;
private java.util.function.Function javaUtilFunction;

public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader,
Expand All @@ -56,10 +55,10 @@ public JavaInstance(InstanceConfig config, Object userClassObject,
this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers);

// create the functions
if (userClassObject instanceof PulsarFunction) {
this.pulsarFunction = (PulsarFunction) userClassObject;
if (userClassObject instanceof Function) {
this.function = (Function) userClassObject;
} else {
this.javaUtilFunction = (Function) userClassObject;
this.javaUtilFunction = (java.util.function.Function) userClassObject;
}
}

Expand All @@ -73,8 +72,8 @@ private JavaExecutionResult processMessage(Object input) {
JavaExecutionResult executionResult = new JavaExecutionResult();
try {
Object output;
if (pulsarFunction != null) {
output = pulsarFunction.process(input, context);
if (function != null) {
output = function.process(input, context);
} else {
output = javaUtilFunction.apply(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -53,7 +53,7 @@
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
Expand Down Expand Up @@ -171,16 +171,16 @@ JavaInstance setupJavaInstance() throws Exception {
Object object = Reflections.createInstance(
instanceConfig.getFunctionConfig().getClassName(),
clsLoader);
if (!(object instanceof PulsarFunction) && !(object instanceof Function)) {
throw new RuntimeException("User class must either be PulsarFunction or java.util.Function");
if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
throw new RuntimeException("User class must either be Function or java.util.Function");
}
Class<?>[] typeArgs;
if (object instanceof PulsarFunction) {
PulsarFunction pulsarFunction = (PulsarFunction) object;
typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
} else {
if (object instanceof Function) {
Function function = (Function) object;
typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
} else {
java.util.function.Function function = (java.util.function.Function) object;
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
}

// setup serde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
Expand All @@ -87,7 +87,6 @@
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockObjectFactory;
import org.powermock.reflect.Whitebox;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
Expand All @@ -107,14 +106,14 @@ public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}

private static class TestFunction implements PulsarFunction<String, String> {
private static class TestFunction implements Function<String, String> {
@Override
public String process(String input, Context context) throws Exception {
return input + "!";
}
}

private static class TestFailureFunction implements PulsarFunction<String, String> {
private static class TestFailureFunction implements Function<String, String> {

private int processId2Count = 0;

Expand All @@ -134,7 +133,7 @@ public String process(String input, Context context) throws Exception {
}
}

private static class TestVoidFunction implements PulsarFunction<String, Void> {
private static class TestVoidFunction implements Function<String, Void> {

@Override
public Void process(String input, Context context) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import lombok.Setter;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.testng.annotations.Test;

import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -84,7 +83,7 @@ private class ComplexUserDefinedType {
private Integer age;
}

private class ComplexTypeHandler implements PulsarFunction<String, ComplexUserDefinedType> {
private class ComplexTypeHandler implements Function<String, ComplexUserDefinedType> {
@Override
public ComplexUserDefinedType process(String input, Context context) throws Exception {
return new ComplexUserDefinedType();
Expand All @@ -103,14 +102,14 @@ public byte[] serialize(ComplexUserDefinedType input) {
}
}

private class VoidInputHandler implements PulsarFunction<Void, String> {
private class VoidInputHandler implements Function<Void, String> {
@Override
public String process(Void input, Context context) throws Exception {
return new String("Interesting");
}
}

private class VoidOutputHandler implements PulsarFunction<String, Void> {
private class VoidOutputHandler implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
return null;
Expand All @@ -127,7 +126,7 @@ public void testVoidInputClasses() {
Method method = makeAccessible(runnable);
VoidInputHandler pulsarFunction = new VoidInputHandler();
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
method.invoke(runnable, typeArgs, clsLoader);
assertFalse(true);
} catch (InvocationTargetException ex) {
Expand All @@ -147,7 +146,7 @@ public void testVoidOutputClasses() {
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
VoidOutputHandler pulsarFunction = new VoidOutputHandler();
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
method.invoke(runnable, typeArgs, clsLoader);
} catch (Exception ex) {
assertTrue(false);
Expand All @@ -163,8 +162,8 @@ public void testInconsistentInputType() {
JavaInstanceRunnable runnable = createRunnable(true, DefaultSerDe.class.getName());
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
Function function = (Function<String, String>) (input, context) -> input + "-lambda";
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
method.invoke(runnable, typeArgs, clsLoader);
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (InvocationTargetException ex) {
Expand All @@ -183,8 +182,8 @@ public void testDefaultSerDe() {
JavaInstanceRunnable runnable = createRunnable(false, null);
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
Function function = (Function<String, String>) (input, context) -> input + "-lambda";
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
method.invoke(runnable, typeArgs, clsLoader);
} catch (Exception ex) {
ex.printStackTrace();
Expand All @@ -202,8 +201,8 @@ public void testExplicitDefaultSerDe() {
JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName());
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
Function function = (Function<String, String>) (input, context) -> input + "-lambda";
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
method.invoke(runnable, typeArgs, clsLoader);
} catch (Exception ex) {
assertTrue(false);
Expand All @@ -219,8 +218,8 @@ public void testInconsistentOutputType() {
JavaInstanceRunnable runnable = createRunnable(false, IntegerSerDe.class.getName());
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
Function function = (Function<String, String>) (input, context) -> input + "-lambda";
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
method.invoke(runnable, typeArgs, clsLoader);
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (InvocationTargetException ex) {
Expand Down
Loading

0 comments on commit 381ccc0

Please sign in to comment.