Skip to content

Commit

Permalink
[Pulsar IO]Support reload Source and Sink for Pulsar IO (apache#5008)
Browse files Browse the repository at this point in the history
Fixes: apache#4904

Master Issue: apache#4904

### Motivation

At present, pulsar can only load the built-in connector when starting the service, but some users want to automatically load the feature of the connector, which is very useful when adding, updating and deleting the built-in connector.

### Modifications

* Add reload rest API for source and sink

### Verifying this change

I haven't found an automated integration testing method for this API. If there is a better method, please let me know.

I just verified it in the following way:
```
./bin/pulsar-admin sources available-sources
mv pulsar-io-debezium-mysql-2.4.0.nar ./connectors
./bin/pulsar-admin sources reload
./bin/pulsar-admin sources available-sources

./bin/pulsar-admin sinks available-sinks
mv pulsar-io-jdbc-2.4.0.nar ./connectors
./bin/pulsar-admin sinks reload
./bin/pulsar-admin sinks available-sinks
```
  • Loading branch information
tuteng authored and sijie committed Aug 27, 2019
1 parent 147c3c8 commit 41430ce
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -483,4 +483,19 @@ public List<ConnectorDefinition> getSinkList() {
}
return retval;
}

@POST
@ApiOperation(
value = "Reload the available built-in connectors, include Source and Sink",
response = Void.class
)
@ApiResponses(value = {
@ApiResponse(code = 401, message = "This operation requires super-user access"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/reloadBuiltInSinks")
public void reloadSinks() {
sink.reloadConnectors(clientAppId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -473,4 +473,19 @@ public List<ConnectorDefinition> getSourceList() {
}
return retval;
}

@POST
@ApiOperation(
value = "Reload the available built-in connectors, include Source and Sink",
response = Void.class
)
@ApiResponses(value = {
@ApiResponse(code = 401, message = "This operation requires super-user access"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/reloadBuiltInSources")
public void reloadSources() {
source.reloadConnectors(clientAppId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,12 @@ SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant
*
*/
List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException;

/**
* Reload the available built-in connectors, include Source and Sink
*
* @throws PulsarAdminException
* Unexpected error
*/
void reloadBuiltInSinks() throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,13 @@ SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(Strin
*
*/
List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException;


/**
* Reload the available built-in connectors, include Source and Sink
*
* @throws PulsarAdminException
* Unexpected error
*/
void reloadBuiltInSources() throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,14 @@ public List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException {
throw getApiException(e);
}
}

@Override
public void reloadBuiltInSinks() throws PulsarAdminException {
try {
request(sink.path("reloadBuiltInSinks"))
.post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,14 @@ public List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException
throw getApiException(e);
}
}

@Override
public void reloadBuiltInSources() throws PulsarAdminException {
try {
request(source.path("reloadBuiltInSources"))
.post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public CmdSinks(PulsarAdmin admin) {
jcommander.addCommand("restart", restartSink);
jcommander.addCommand("localrun", localSinkRunner);
jcommander.addCommand("available-sinks", new ListBuiltInSinks());
jcommander.addCommand("reload", new ReloadBuiltInSinks());
}

/**
Expand Down Expand Up @@ -651,4 +652,13 @@ void runCmd() throws Exception {
});
}
}

@Parameters(commandDescription = "Reload the available built-in connectors")
public class ReloadBuiltInSinks extends BaseCommand {

@Override
void runCmd() throws Exception {
admin.sinks().reloadBuiltInSinks();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public CmdSources(PulsarAdmin admin) {
jcommander.addCommand("restart", restartSource);
jcommander.addCommand("localrun", localSourceRunner);
jcommander.addCommand("available-sources", new ListBuiltInSources());
jcommander.addCommand("reload", new ReloadBuiltInSources());
}

/**
Expand Down Expand Up @@ -605,4 +606,13 @@ void runCmd() throws Exception {
});
}
}

@Parameters(commandDescription = "Reload the available built-in connectors")
public class ReloadBuiltInSources extends BaseCommand {

@Override
void runCmd() throws Exception {
admin.sources().reloadBuiltInSources();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class ConnectorsManager {

private final Connectors connectors;
private Connectors connectors;

public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory());
Expand All @@ -45,4 +45,8 @@ public Path getSourceArchive(String sourceType) {
public Path getSinkArchive(String sinkType) {
return connectors.getSinks().get(sinkType);
}

public void reloadConnectors(WorkerConfig workerConfig) throws IOException {
this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,23 @@ public List<ConnectorDefinition> getListOfConnectors() {
return this.worker().getConnectorsManager().getConnectors();
}

public void reloadConnectors(String clientRole) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
// Only superuser has permission to do this operation.
if (!isSuperUser(clientRole)) {
throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access");
}
}
try {
this.worker().getConnectorsManager().reloadConnectors(worker().getWorkerConfig());
} catch (IOException e) {
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}

public String triggerFunction(final String tenant,
final String namespace,
final String functionName,
Expand Down
23 changes: 23 additions & 0 deletions site2/docs/reference-connector-admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Subcommands are:

* `available-sources`

* `reload`


### `create`

Expand Down Expand Up @@ -282,6 +284,16 @@ Get the list of Pulsar IO connector sources supported by Pulsar cluster.
$ pulsar-admin sources available-sources
```

### `reload`

Reload the available built-in connectors.

#### Usage

```bash
$ pulsar-admin sources reload
```

## `sinks`

An interface for managing Pulsar IO sinks (egress data from Pulsar).
Expand Down Expand Up @@ -314,6 +326,8 @@ Subcommands are:

* `available-sinks`

* `reload`


### `create`

Expand Down Expand Up @@ -574,4 +588,13 @@ Get the list of Pulsar IO connector sinks supported by Pulsar cluster.
$ pulsar-admin sinks available-sinks
```

### `reload`

Reload the available built-in connectors.

#### Usage

```bash
$ pulsar-admin sinks reload
```

0 comments on commit 41430ce

Please sign in to comment.