Skip to content

Commit

Permalink
Allow users to update auth data during function update (apache#4198)
Browse files Browse the repository at this point in the history
* allow users to update auth data during function update
  • Loading branch information
jerrypeng authored May 6, 2019
1 parent 42c3bf9 commit 08fcc21
Show file tree
Hide file tree
Showing 36 changed files with 636 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
Expand Down Expand Up @@ -99,10 +100,11 @@ public void updateFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("functionConfig") String functionConfigJson) {
final @FormDataParam("functionConfig") String functionConfigJson,
final @FormDataParam("updateOptions") UpdateOptions updateOptions) throws IOException {

functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData());
functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData(), updateOptions);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.swagger.annotations.ApiResponses;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.SinkStatus;
Expand Down Expand Up @@ -96,10 +97,11 @@ public void updateSink(final @PathParam("tenant") String tenant,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("sinkConfig") String sinkConfigJson) {
final @FormDataParam("sinkConfig") String sinkConfigJson,
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {

sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData());
functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData(), updateOptions);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.swagger.annotations.ApiResponses;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.SourceStatus;
Expand Down Expand Up @@ -96,10 +97,11 @@ public void updateSource(final @PathParam("tenant") String tenant,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("sourceConfig") String sourceConfigJson) {
final @FormDataParam("sourceConfig") String sourceConfigJson,
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {

source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand Down Expand Up @@ -122,6 +123,23 @@ public interface Functions {
*/
void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;

/**
* Update the configuration for a function.
* <p>
*
* @param functionConfig
* the function configuration object
* @param updateOptions
* options for the update operations
* @throws NotAuthorizedException
* You don't have admin permission to create the cluster
* @throws NotFoundException
* Cluster doesn't exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateFunction(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException;

/**
* Update the configuration for a function.
* <pre>
Expand All @@ -144,6 +162,31 @@ public interface Functions {
*/
void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException;

/**
* Update the configuration for a function.
* <pre>
* Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
* eg:
* File: file:/dir/fileName.jar
* Http: http://www.repo.com/fileName.jar
* </pre>
*
* @param functionConfig
* the function configuration object
* @param pkgUrl
* url from which pkg can be downloaded
* @param updateOptions
* options for the update operations
* @throws NotAuthorizedException
* You don't have admin permission to create the cluster
* @throws NotFoundException
* Cluster doesn't exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException;


/**
* Delete an existing function
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.io.SinkConfig;
Expand Down Expand Up @@ -119,6 +120,23 @@ public interface Sink {
*/
void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException;

/**
* Update the configuration for a sink.
* <p>
*
* @param sinkConfig
* the sink configuration object
* @param updateOptions
* options for the update operations
* @throws NotAuthorizedException
* You don't have admin permission to create the cluster
* @throws NotFoundException
* Cluster doesn't exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateSink(SinkConfig sinkConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException;

/**
* Update the configuration for a sink.
* <pre>
Expand All @@ -141,6 +159,30 @@ public interface Sink {
*/
void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException;

/**
* Update the configuration for a sink.
* <pre>
* Update a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
* eg:
* File: file:/dir/fileName.jar
* Http: http://www.repo.com/fileName.jar
* </pre>
*
* @param sinkConfig
* the sink configuration object
* @param pkgUrl
* url from which pkg can be downloaded
* @param updateOptions
* options for the update operations
* @throws NotAuthorizedException
* You don't have admin permission to create the cluster
* @throws NotFoundException
* Cluster doesn't exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException;

/**
* Delete an existing sink
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.SourceStatus;
Expand Down Expand Up @@ -119,6 +120,23 @@ public interface Source {
*/
void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException;

/**
* Update the configuration for a source.
* <p>
*
* @param sourceConfig
* the source configuration object
* @param updateOptions
* options for the update operations
* @throws NotAuthorizedException
* You don't have admin permission to create the cluster
* @throws NotFoundException
* Cluster doesn't exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateSource(SourceConfig sourceConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException;

/**
* Update the configuration for a source.
* <pre>
Expand All @@ -141,6 +159,30 @@ public interface Source {
*/
void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException;

/**
* Update the configuration for a source.
* <pre>
* Update a source by providing url from which fun-pkg can be downloaded. supported url: http/file
* eg:
* File: file:/dir/fileName.jar
* Http: http://www.repo.com/fileName.jar
* </pre>
*
* @param sourceConfig
* the source configuration object
* @param pkgUrl
* url from which pkg can be downloaded
* @param updateOptions
* options for the update operations
* @throws NotAuthorizedException
* You don't have admin permission to create the cluster
* @throws NotFoundException
* Cluster doesn't exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException;

/**
* Delete an existing source
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
Expand Down Expand Up @@ -207,10 +208,19 @@ public void deleteFunction(String cluster, String namespace, String function) th

@Override
public void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
updateFunction(functionConfig, fileName, null);
}

@Override
public void updateFunction(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException {
try {
RequestBuilder builder = put(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
.addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));

if (updateOptions != null) {
builder.addBodyPart(new StringPart("updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON));
}

if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
Expand All @@ -226,22 +236,37 @@ public void updateFunction(FunctionConfig functionConfig, String fileName) throw
}

@Override
public void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException {
public void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();

mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));

mp.bodyPart(new FormDataBodyPart("functionConfig", new Gson().toJson(functionConfig),
mp.bodyPart(new FormDataBodyPart(
"functionConfig",
ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig),
MediaType.APPLICATION_JSON_TYPE));

if (updateOptions != null) {
mp.bodyPart(new FormDataBodyPart(
"updateOptions",
ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions),
MediaType.APPLICATION_JSON_TYPE));
}

request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace())
.path(functionConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
ErrorData.class);
ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException {
updateFunctionWithUrl(functionConfig, pkgUrl, null);
}

@Override
public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Sink;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.SinkStatus;
Expand Down Expand Up @@ -164,11 +165,15 @@ public void deleteSink(String cluster, String namespace, String function) throws
}

@Override
public void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
public void updateSink(SinkConfig sinkConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException {
try {
RequestBuilder builder = put(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()).getUri().toASCIIString())
.addBodyPart(new StringPart("sinkConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sinkConfig), MediaType.APPLICATION_JSON));

if (updateOptions != null) {
builder.addBodyPart(new StringPart("updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON));
}

if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
Expand All @@ -184,22 +189,42 @@ public void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdmi
}

@Override
public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException {
public void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
updateSink(sinkConfig, fileName, null);
}

@Override
public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();

mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));

mp.bodyPart(new FormDataBodyPart("sinkConfig", new Gson().toJson(sinkConfig),
mp.bodyPart(new FormDataBodyPart(
"sinkConfig",
new Gson().toJson(sinkConfig),
MediaType.APPLICATION_JSON_TYPE));

if (updateOptions != null) {
mp.bodyPart(new FormDataBodyPart(
"updateOptions",
ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions),
MediaType.APPLICATION_JSON_TYPE));
}

request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace())
.path(sinkConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
ErrorData.class);
ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException {
updateSinkWithUrl(sinkConfig, pkgUrl, null);
}

@Override
public void restartSink(String tenant, String namespace, String functionName, int instanceId)
throws PulsarAdminException {
Expand Down
Loading

0 comments on commit 08fcc21

Please sign in to comment.