Skip to content

Commit

Permalink
Support create and update sink with package name (apache#8987)
Browse files Browse the repository at this point in the history
---

Master Issue: apache#8676

*Motivation*

We have a new package management service that can manage all
the sink packages. We can use that in the Pulsar Sink to manage
sink packages.
I reuse the 'packageUrl' for downloading the package from the packages
management service. So user can use the package name as the 'packageUrl'
to download it.

*Modifications*

- Support create and update sink with the package name
- Add test for this
  • Loading branch information
zymap authored Dec 30, 2020
1 parent bddd030 commit 92fc224
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.service.api.Sinks;
import org.apache.pulsar.packages.management.core.common.PackageType;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;

import javax.ws.rs.WebApplicationException;
Expand All @@ -52,6 +52,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Supplier;
Expand Down Expand Up @@ -149,14 +150,17 @@ public void registerSink(final String tenant,
// validate parameters
try {
if (isPkgUrlProvided) {

if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
}
try {
componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
if (hasPackageTypePrefix(sinkPkgUrl)) {
componentPackageFile = downloadPackageFile(sinkPkgUrl);
} else {
if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
}
try {
componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
}
}
functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
sinkConfig, componentPackageFile);
Expand Down Expand Up @@ -317,10 +321,14 @@ public void updateSink(final String tenant,
// validate parameters
try {
if (isNotBlank(sinkPkgUrl)) {
try {
componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
if (hasPackageTypePrefix(sinkPkgUrl)) {
componentPackageFile = downloadPackageFile(sinkPkgUrl);
} else {
try {
componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
}
}
functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
mergedConfig, componentPackageFile);
Expand Down Expand Up @@ -724,4 +732,15 @@ componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory(),
worker().getWorkerConfig().getValidateConnectorConfig());
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
}


private static boolean hasPackageTypePrefix(String destPkgUrl) {
return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
}

private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
File file = Files.createTempFile("function", ".tmp").toFile();
worker().getBrokerAdmin().packages().download(packageName, file.toString());
return file;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.Packages;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
Expand Down Expand Up @@ -129,6 +130,7 @@ public IObjectFactory getObjectFactory() {
private FormDataContentDisposition mockedFormData;
private FunctionMetaData mockedFunctionMetaData;
private LeaderService mockedLeaderService;
private Packages mockedPackages;

@BeforeMethod
public void setup() throws Exception {
Expand All @@ -145,6 +147,7 @@ public void setup() throws Exception {
this.mockedNamespaces = mock(Namespaces.class);
this.mockedFunctions = mock(Functions.class);
this.mockedLeaderService = mock(LeaderService.class);
this.mockedPackages = mock(Packages.class);
namespaceList.add(tenant + "/" + namespace);

this.mockedWorkerService = mock(PulsarWorkerService.class);
Expand All @@ -159,9 +162,11 @@ public void setup() throws Exception {
when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
when(mockedPulsarAdmin.packages()).thenReturn(mockedPackages);
when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
when(mockedLeaderService.isLeader()).thenReturn(true);
doNothing().when(mockedPackages).download(anyString(), anyString());

URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
if (file == null) {
Expand Down Expand Up @@ -482,14 +487,18 @@ public void testUpdateMissingSinkConfig() {
}

private void registerDefaultSink() throws IOException {
registerDefaultSinkWithPackageUrl(null);
}

private void registerDefaultSinkWithPackageUrl(String packageUrl) throws IOException {
SinkConfig sinkConfig = createDefaultSinkConfig();
resource.registerSink(
tenant,
namespace,
sink,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
packageUrl,
sinkConfig,
null, null);
}
Expand Down Expand Up @@ -630,6 +639,23 @@ public void testRegisterSinkInterrupted() throws Exception {
}
}

@Test(timeOut = 20000)
public void testRegisterSinkSuccessWithPackageName() throws IOException {
registerDefaultSinkWithPackageUrl("sink://public/default/test@v1");
}

@Test(timeOut = 20000)
public void testRegisterSinkFailedWithWrongPackageName() throws PulsarAdminException, IOException {
try {
doThrow(new PulsarAdminException("package name is invalid"))
.when(mockedPackages).download(anyString(), anyString());
registerDefaultSinkWithPackageUrl("function://");
} catch (RestException e) {
// expected exception
assertEquals(e.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
}
}

//
// Update Functions
//
Expand Down Expand Up @@ -860,6 +886,10 @@ private void testUpdateSinkMissingArguments(
}

private void updateDefaultSink() throws Exception {
updateDefaultSinkWithPackageUrl(null);
}

private void updateDefaultSinkWithPackageUrl(String packageUrl) throws Exception {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant(tenant);
sinkConfig.setNamespace(namespace);
Expand Down Expand Up @@ -896,7 +926,7 @@ private void updateDefaultSink() throws Exception {
sink,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
packageUrl,
sinkConfig,
null, null, null);
}
Expand Down Expand Up @@ -1017,6 +1047,25 @@ public void testUpdateSinkFailure() throws Exception {
}
}

@Test(timeOut = 20000)
public void testUpdateSinkSuccessWithPackageName() throws Exception {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
updateDefaultSinkWithPackageUrl("function://public/default/test@v1");
}

@Test(timeOut = 20000)
public void testUpdateSinkFailedWithWrongPackageName() throws Exception {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
try {
doThrow(new PulsarAdminException("package name is invalid"))
.when(mockedPackages).download(anyString(), anyString());
updateDefaultSinkWithPackageUrl("function://");
} catch (RestException e) {
// expected exception
assertEquals(e.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
}
}

@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
public void testUpdateSinkInterrupted() throws Exception {
try {
Expand Down

0 comments on commit 92fc224

Please sign in to comment.