Skip to content

Commit

Permalink
Introduce Function Config and Functions Cmd (apache#4)
Browse files Browse the repository at this point in the history
- introduce a `fs` package for function state related classes
- add `FunctionConfig` for user defined function configurations
- add `CmdFunctions`: it takes a yaml file and also command parameters
  • Loading branch information
sijie committed Mar 4, 2018
1 parent 7c871b0 commit aca6ac5
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 57 deletions.
44 changes: 44 additions & 0 deletions pulsar-functions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,61 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-tools</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.9.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.2</version>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>typetools</artifactId>
<version>0.5.0</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.12</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-testng</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.functions.fs.FunctionConfig;

@Parameters(commandDescription = "Operations about functions")
public class CmdFunctions extends CmdBase {

private final LocalRunner cmdRunner;

@Getter
@Parameters(commandDescription = "Run function locally")
class LocalRunner extends CliCommand {

@Parameter(names = "--name", description = "Function Name\n")
private String name;
@Parameter(names = "--source-topic", description = "Input Topic Name\n")
private String sourceTopicName;
@Parameter(names = "--sink-topic", description = "Output Topic Name\n")
private String sinkTopicName;

@Parameter(names = "--function-config", description = "Function Config\n")
private String fnConfigFile;

@Override
void run() throws Exception {
FunctionConfig fc;
if (null != fnConfigFile) {
fc = FunctionConfig.load(fnConfigFile);
} else {
fc = new FunctionConfig();
}
if (null != sourceTopicName) {
fc.setSourceTopic(sourceTopicName);
}
if (null != sinkTopicName) {
fc.setSinkTopic(sinkTopicName);
}
if (null != name) {
fc.setName(name);
}
// TODO: execute the runner here

System.out.println(ReflectionToStringBuilder.toString(fc, ToStringStyle.MULTI_LINE_STYLE));
}

}

public CmdFunctions(PulsarAdmin admin) {
super("functions", admin);
cmdRunner = new LocalRunner();
jcommander.addCommand("run", cmdRunner);
}

@VisibleForTesting
LocalRunner getCmdRunner() {
return cmdRunner;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.admin.cli;

import java.io.FileInputStream;
import java.util.Arrays;
import java.util.Properties;

/**
* TODO: merge this into {@link PulsarAdminTool}.
*/
public class FunctionsTool extends PulsarAdminTool {

FunctionsTool(Properties properties) throws Exception {
super(properties);
commandMap.put("functions", CmdFunctions.class);
}

public static void main(String[] args) throws Exception {
String configFile = args[0];
Properties properties = new Properties();

if (configFile != null) {
FileInputStream fis = null;
try {
fis = new FileInputStream(configFile);
properties.load(fis);
} finally {
if (fis != null)
fis.close();
}
}

FunctionsTool tool = new FunctionsTool(properties);

if (tool.run(Arrays.copyOfRange(args, 1, args.length))) {
System.exit(0);
} else {
System.exit(1);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* Pulsar Functions CLI.
*
* TODO: move this to `pulsar-client-tools` after merged back to the apache repo.
*/
package org.apache.pulsar.admin.cli;
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.fs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;

/**
* Function Configuration.
*/
@Setter
@Getter
public class FunctionConfig {

// function name
private String name;
// source topic
private String sourceTopic;
// sink topic
private String sinkTopic;

public static FunctionConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), FunctionConfig.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* Function State.
*/
package org.apache.pulsar.functions.fs;
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private SupportedTypes computeSupportedType(Type type) {
} else if (type.equals(List.class)) {
return SupportedTypes.LIST;
} else {
throw new RuntimeException("Non Basic types not yet supported");
throw new RuntimeException("Non Basic types not yet supported: " + type);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/
package org.apache.pulsar.functions.instance;

import lombok.Getter;
import lombok.Setter;

/**
* This is the config passed to the Java Instance. Contains all the information
* passed to run functions
*/
@Getter
@Setter
class JavaInstanceConfig {
private String functionName;
private String functionId;
Expand All @@ -30,60 +35,4 @@ class JavaInstanceConfig {
private String userName;
private int timeBudgetInMs;
private int maxMemory;

public String getFunctionName() {
return functionName;
}

public void setFunctionName(String functionName) {
this.functionName = functionName;
}

public String getFunctionId() {
return functionId;
}

public void setFunctionId(String functionId) {
this.functionId = functionId;
}

public String getNameSpace() {
return nameSpace;
}

public void setNameSpace(String nameSpace) {
this.nameSpace = nameSpace;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public int getTimeBudgetInMs() {
return timeBudgetInMs;
}

public void setTimeBudgetInMs(int timeBudgetInMs) {
this.timeBudgetInMs = timeBudgetInMs;
}

public int getMaxMemory() {
return maxMemory;
}

public void setMaxMemory(int maxMemory) {
this.maxMemory = maxMemory;
}

public String getFunctionVersion() {
return functionVersion;
}

public void setFunctionVersion(String functionVersion) {
this.functionVersion = functionVersion;
}
}
Loading

0 comments on commit aca6ac5

Please sign in to comment.