Skip to content

Commit

Permalink
Source/Sink Endpoint validations (apache#2807)
Browse files Browse the repository at this point in the history
* Added Get and List source/sink functionality

* Fixed compile

* Removed test that doesnt make sense any more

* Fixed build

* Fixed logic

* Return error response

* Return response on error

* Fix unittest

* Fixed unittest

* Fixed unittest

* Fixed unittest

* Added get/list sinks tests

* Added get/list tests

* Add more unittests

* Added more unittests

* Added TODO

* Took feedback

* Fix unittest

* Fix unittest

* Fix unittest

* Fixed integration tests

* Fixed integration test
  • Loading branch information
srkukarni authored Oct 19, 2018
1 parent 9ae37b2 commit 1386e6d
Show file tree
Hide file tree
Showing 33 changed files with 984 additions and 412 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("functionConfig") String functionConfigJson) {

return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
}

@PUT
Expand All @@ -106,7 +106,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("functionConfig") String functionConfigJson) {

return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());

}

Expand All @@ -124,7 +124,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
public Response deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId());
}

@GET
Expand All @@ -143,7 +143,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionInfo(
tenant, namespace, functionName);
tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}

@GET
Expand Down Expand Up @@ -196,7 +196,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(
tenant, namespace);
tenant, namespace, FunctionsImpl.FUNCTION);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Response registerSink(final @PathParam("tenant") String tenant,
final @FormDataParam("sinkConfig") String sinkConfigJson) {

return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
}

@PUT
Expand All @@ -93,7 +93,7 @@ public Response updateSink(final @PathParam("tenant") String tenant,
final @FormDataParam("sinkConfig") String sinkConfigJson) {

return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());

}

Expand All @@ -111,7 +111,7 @@ public Response updateSink(final @PathParam("tenant") String tenant,
public Response deregisterSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId());
}

@GET
Expand All @@ -129,7 +129,7 @@ public Response deregisterSink(final @PathParam("tenant") String tenant,
public Response getSinkInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
return functions.getFunctionInfo(tenant, namespace, sinkName);
return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK);
}

@GET
Expand Down Expand Up @@ -180,7 +180,7 @@ public Response getSinkStatus(final @PathParam("tenant") String tenant,
@Path("/{tenant}/{namespace}")
public Response listSinks(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(tenant, namespace);
return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Response registerSource(final @PathParam("tenant") String tenant,
final @FormDataParam("sourceConfig") String sourceConfigJson) {

return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
}

@PUT
Expand All @@ -94,7 +94,7 @@ public Response updateSource(final @PathParam("tenant") String tenant,
final @FormDataParam("sourceConfig") String sourceConfigJson) {

return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());

}

Expand All @@ -112,7 +112,7 @@ public Response updateSource(final @PathParam("tenant") String tenant,
public Response deregisterSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId());
}

@GET
Expand All @@ -131,7 +131,7 @@ public Response getSourceInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) throws IOException {
return functions.getFunctionInfo(
tenant, namespace, sourceName);
tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}

@GET
Expand Down Expand Up @@ -183,7 +183,7 @@ public Response getSourceStatus(final @PathParam("tenant") String tenant,
public Response listSources(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(
tenant, namespace);
tenant, namespace, FunctionsImpl.SOURCE);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
// validate updated function prop = auto-ack=false and instnaceid
for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
String functionName = baseFunctionName + i;
assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck());
assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).isAutoAck());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,36 +407,6 @@ public void testAuthorization(boolean validRoleName) throws Exception {
}
}

/**
* Test to verify: function-server loads jar using file-url and derives type-args classes if not provided
* @throws Exception
*/
@Test(timeOut = 20000)
public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {

final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String functionName = "PulsarSink-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();

FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);

admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);

FunctionDetails functionMetadata = admin.source().getSource(tenant, namespacePortion, functionName);

assertEquals(functionMetadata.getSource().getTypeClassName(), String.class.getName());
assertEquals(functionMetadata.getSink().getTypeClassName(), String.class.getName());

}

@Test(timeOut = 20000)
public void testFunctionStopAndRestartApi() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.FunctionConfig;
Expand Down Expand Up @@ -77,7 +76,7 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Create a new function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.SinkConfig;

import java.util.List;
import java.util.Set;

/**
* Admin interface for Sink management.
Expand All @@ -50,7 +48,7 @@ public interface Sink {
* @throws PulsarAdminException
* Unexpected error
*/
List<String> getSinks(String tenant, String namespace) throws PulsarAdminException;
List<String> listSinks(String tenant, String namespace) throws PulsarAdminException;

/**
* Get the configuration for the specified sink.
Expand All @@ -77,7 +75,7 @@ public interface Sink {
* @throws PulsarAdminException
* Unexpected error
*/
Function.FunctionDetails getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException;

/**
* Create a new sink.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.SourceConfig;

import java.util.List;
import java.util.Set;

/**
* Admin interface for Source management.
Expand All @@ -50,7 +48,7 @@ public interface Source {
* @throws PulsarAdminException
* Unexpected error
*/
List<String> getSources(String tenant, String namespace) throws PulsarAdminException;
List<String> listSources(String tenant, String namespace) throws PulsarAdminException;

/**
* Get the configuration for the specified source.
Expand All @@ -77,7 +75,7 @@ public interface Source {
* @throws PulsarAdminException
* Unexpected error
*/
Function.FunctionDetails getSource(String tenant, String namespace, String source) throws PulsarAdminException;
SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException;

/**
* Create a new source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.FunctionConfig;
Expand Down Expand Up @@ -80,16 +79,13 @@ public List<String> getFunctions(String tenant, String namespace) throws PulsarA
}

@Override
public FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
try {
Response response = request(functions.path(tenant).path(namespace).path(function)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
mergeJson(jsonResponse, functionDetailsBuilder);
return functionDetailsBuilder.build();
return response.readEntity(FunctionConfig.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.SinkConfig;
Expand All @@ -44,8 +43,6 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Slf4j
public class SinkImpl extends BaseResource implements Sink {
Expand All @@ -58,7 +55,7 @@ public SinkImpl(WebTarget web, Authentication auth) {
}

@Override
public List<String> getSinks(String tenant, String namespace) throws PulsarAdminException {
public List<String> listSinks(String tenant, String namespace) throws PulsarAdminException {
try {
Response response = request(sink.path(tenant).path(namespace)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
Expand All @@ -72,16 +69,13 @@ public List<String> getSinks(String tenant, String namespace) throws PulsarAdmin
}

@Override
public FunctionDetails getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
public SinkConfig getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
try {
Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
mergeJson(jsonResponse, functionDetailsBuilder);
return functionDetailsBuilder.build();
return response.readEntity(SinkConfig.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.SourceConfig;
Expand All @@ -44,8 +43,6 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Slf4j
public class SourceImpl extends BaseResource implements Source {
Expand All @@ -58,7 +55,7 @@ public SourceImpl(WebTarget web, Authentication auth) {
}

@Override
public List<String> getSources(String tenant, String namespace) throws PulsarAdminException {
public List<String> listSources(String tenant, String namespace) throws PulsarAdminException {
try {
Response response = request(source.path(tenant).path(namespace)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
Expand All @@ -72,16 +69,13 @@ public List<String> getSources(String tenant, String namespace) throws PulsarAdm
}

@Override
public FunctionDetails getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
public SourceConfig getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
try {
Response response = request(source.path(tenant).path(namespace).path(sourceName)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
mergeJson(jsonResponse, functionDetailsBuilder);
return functionDetailsBuilder.build();
return response.readEntity(SourceConfig.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,9 @@ void runCmd() throws Exception {
class GetFunction extends FunctionCommand {
@Override
void runCmd() throws Exception {
String json = Utils.printJson(admin.functions().getFunction(tenant, namespace, functionName));
FunctionConfig functionConfig = admin.functions().getFunction(tenant, namespace, functionName);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(new JsonParser().parse(json)));
System.out.println(gson.toJson(functionConfig));
}
}

Expand Down
Loading

0 comments on commit 1386e6d

Please sign in to comment.