Skip to content

Commit

Permalink
[feature][pulsar-broker-common]Move additional servlet module to puls…
Browse files Browse the repository at this point in the history
…ar broker common module (apache#9164)

### Motivation

* Currently, after merging this or apache#8067, the pulsar-proxy module already supports loading custom web routes, which is also needed in the `pulsar-broker` and `pulsar-function` modules, so consider moving this functionality to the `pulsar-broker-common` module as a generic module.

### Modifications

* Update the class name to remove the proxy prefix
* Move to the module

How to use this generic module

1. add configuration `additionalServletDirectory` and `additionalServlets` to the configuration file
2. Call the following code

```
          Collection<AdditionalServletWithClassLoader> additionalServletCollection =
                    service.getProxyAdditionalServlets().getServlets().values();
            for (AdditionalServletWithClassLoader servletWithClassLoader : additionalServletCollection) {
                servletWithClassLoader.loadConfig(config);
                server.addServlet(servletWithClassLoader.getBasePath(), servletWithClassLoader.getServletHolder(),
                        Collections.emptyList(), config.isAuthenticationEnabled());
                log.info("proxy add additional servlet basePath {} ", servletWithClassLoader.getBasePath());
            }
```
  • Loading branch information
tuteng authored Jan 13, 2021
1 parent 49d3e86 commit 118d64e
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;
package org.apache.pulsar.broker.web.plugin.servlet;

import com.google.common.annotations.Beta;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.eclipse.jetty.servlet.ServletHolder;

/**
* The additional servlet interface for support additional servlet on Pulsar proxy.
* The additional servlet interface for support additional servlet.
*/
@Beta
public interface ProxyAdditionalServlet extends AutoCloseable {
public interface AdditionalServlet extends AutoCloseable {

/**
* load plugin config
*
* @param proxyConfiguration
* @param pulsarConfiguration
*/
void loadConfig(ProxyConfiguration proxyConfiguration);
void loadConfig(PulsarConfiguration pulsarConfiguration);

/**
* Get the base path of prometheus metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;
package org.apache.pulsar.broker.web.plugin.servlet;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Metadata information about a proxy additional servlet.
* Metadata information about an additional servlet.
*/
@Data
@NoArgsConstructor
public class ProxyAdditionalServletDefinition {
public class AdditionalServletDefinition {
/**
* The name of the proxy additional servlet.
* The name of the additional servlet.
*/
private String name;

/**
* The description of the proxy additional servlet to be used for user help.
* The description of the additional servlet to be used for user help.
*/
private String description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;
package org.apache.pulsar.broker.web.plugin.servlet;

import java.util.Map;
import java.util.TreeMap;
import lombok.Data;
import lombok.experimental.Accessors;

/**
* The collection of proxy additional servlet definition.
* The collection of additional servlet definition.
*/
@Data
@Accessors(fluent = true)
public class ProxyAdditionalServletDefinitions {
public class AdditionalServletDefinitions {

private final Map<String, ProxyAdditionalServletMetadata> servlets = new TreeMap<>();
private final Map<String, AdditionalServletMetadata> servlets = new TreeMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;
package org.apache.pulsar.broker.web.plugin.servlet;

import java.nio.file.Path;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* The metadata of proxy additional servlet
* The metadata of additional servlet
*/
@Data
@NoArgsConstructor
public class ProxyAdditionalServletMetadata {
public class AdditionalServletMetadata {

/**
* The definition of the proxy additional servlet.
* The definition of the additional servlet.
*/
private ProxyAdditionalServletDefinition definition;
private AdditionalServletDefinition definition;

/**
* The path to the additional servlet package.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;
package org.apache.pulsar.broker.web.plugin.servlet;

import java.io.File;
import java.io.IOException;
Expand All @@ -34,76 +34,76 @@
import static com.google.common.base.Preconditions.checkArgument;

/**
* Util class to search and load {@link ProxyAdditionalServlets}.
* Util class to search and load {@link AdditionalServlets}.
*/
@UtilityClass
@Slf4j
public class ProxyAdditionalServletUtils {
public class AdditionalServletUtils {

public final String PROXY_ADDITIONAL_SERVLET_FILE = "proxy_additional_servlet.yml";
public final String ADDITIONAL_SERVLET_FILE = "additional_servlet.yml";

/**
* Retrieve the proxy additional servlet definition from the provided nar package.
* Retrieve the additional servlet definition from the provided nar package.
*
* @param narPath the path to the proxy additional servlet NAR package
* @return the proxy additional servlet definition
* @throws IOException when fail to load the proxy additional servlet or get the definition
* @param narPath the path to the additional servlet NAR package
* @return the additional servlet definition
* @throws IOException when fail to load the additional servlet or get the definition
*/
public ProxyAdditionalServletDefinition getProxyAdditionalServletDefinition(
public AdditionalServletDefinition getAdditionalServletDefinition(
String narPath, String narExtractionDirectory) throws IOException {

try (NarClassLoader ncl = NarClassLoader.getFromArchive(
new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
return getProxyAdditionalServletDefinition(ncl);
return getAdditionalServletDefinition(ncl);
}
}

private ProxyAdditionalServletDefinition getProxyAdditionalServletDefinition(NarClassLoader ncl) throws IOException {
String configStr = ncl.getServiceDefinition(PROXY_ADDITIONAL_SERVLET_FILE);
private AdditionalServletDefinition getAdditionalServletDefinition(NarClassLoader ncl) throws IOException {
String configStr = ncl.getServiceDefinition(ADDITIONAL_SERVLET_FILE);
return ObjectMapperFactory.getThreadLocalYaml().readValue(
configStr, ProxyAdditionalServletDefinition.class
configStr, AdditionalServletDefinition.class
);
}

/**
* Search and load the available proxy additional servlets.
* Search and load the available additional servlets.
*
* @param additionalServletDirectory the directory where all the proxy additional servlets are stored
* @return a collection of proxy additional servlet definitions
* @throws IOException when fail to load the available proxy additional servlets from the provided directory.
* @param additionalServletDirectory the directory where all the additional servlets are stored
* @return a collection of additional servlet definitions
* @throws IOException when fail to load the available additional servlets from the provided directory.
*/
public ProxyAdditionalServletDefinitions searchForServlets(String additionalServletDirectory,
String narExtractionDirectory) throws IOException {
public AdditionalServletDefinitions searchForServlets(String additionalServletDirectory,
String narExtractionDirectory) throws IOException {
Path path = Paths.get(additionalServletDirectory).toAbsolutePath();
log.info("Searching for proxy additional servlets in {}", path);
log.info("Searching for additional servlets in {}", path);

ProxyAdditionalServletDefinitions servletDefinitions = new ProxyAdditionalServletDefinitions();
AdditionalServletDefinitions servletDefinitions = new AdditionalServletDefinitions();
if (!path.toFile().exists()) {
log.warn("Pulsar proxy additional servlets directory not found");
log.warn("Pulsar additional servlets directory not found");
return servletDefinitions;
}

try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
ProxyAdditionalServletDefinition def =
ProxyAdditionalServletUtils.getProxyAdditionalServletDefinition(
AdditionalServletDefinition def =
AdditionalServletUtils.getAdditionalServletDefinition(
archive.toString(), narExtractionDirectory);
log.info("Found proxy additional servlet from {} : {}", archive, def);
log.info("Found additional servlet from {} : {}", archive, def);

checkArgument(StringUtils.isNotBlank(def.getName()));
checkArgument(StringUtils.isNotBlank(def.getAdditionalServletClass()));

ProxyAdditionalServletMetadata metadata = new ProxyAdditionalServletMetadata();
AdditionalServletMetadata metadata = new AdditionalServletMetadata();
metadata.setDefinition(def);
metadata.setArchivePath(archive);

servletDefinitions.servlets().put(def.getName(), metadata);
} catch (Throwable t) {
log.warn("Failed to load proxy additional servlet from {}."
+ " It is OK however if you want to use this proxy additional servlet,"
+ " please make sure you put the correct proxy additional servlet NAR"
+ " package in the proxy additional servlets directory.", archive, t);
log.warn("Failed to load additional servlet from {}."
+ " It is OK however if you want to use this additional servlet,"
+ " please make sure you put the correct additional servlet NAR"
+ " package in the additional servlets directory.", archive, t);
}
}
}
Expand All @@ -112,33 +112,33 @@ public ProxyAdditionalServletDefinitions searchForServlets(String additionalServ
}

/**
* Load the proxy additional servlets according to the additional servlet definition.
* Load the additional servlets according to the additional servlet definition.
*
* @param metadata the proxy additional servlet definition.
* @param metadata the additional servlet definition.
*/
public ProxyAdditionalServletWithClassLoader load(
ProxyAdditionalServletMetadata metadata, String narExtractionDirectory) throws IOException {
public AdditionalServletWithClassLoader load(
AdditionalServletMetadata metadata, String narExtractionDirectory) throws IOException {

NarClassLoader ncl = NarClassLoader.getFromArchive(
metadata.getArchivePath().toAbsolutePath().toFile(),
Collections.emptySet(),
ProxyAdditionalServlet.class.getClassLoader(), narExtractionDirectory);
AdditionalServlet.class.getClassLoader(), narExtractionDirectory);

ProxyAdditionalServletDefinition def = getProxyAdditionalServletDefinition(ncl);
AdditionalServletDefinition def = getAdditionalServletDefinition(ncl);
if (StringUtils.isBlank(def.getAdditionalServletClass())) {
throw new IOException("Proxy additional servlets `" + def.getName() + "` does NOT provide a proxy"
+ " additional servlets implementation");
throw new IOException("Additional servlets `" + def.getName() + "` does NOT provide an "
+ "additional servlets implementation");
}

try {
Class additionalServletClass = ncl.loadClass(def.getAdditionalServletClass());
Object additionalServlet = additionalServletClass.newInstance();
if (!(additionalServlet instanceof ProxyAdditionalServlet)) {
if (!(additionalServlet instanceof AdditionalServlet)) {
throw new IOException("Class " + def.getAdditionalServletClass()
+ " does not implement proxy additional servlet interface");
+ " does not implement additional servlet interface");
}
ProxyAdditionalServlet servlet = (ProxyAdditionalServlet) additionalServlet;
return new ProxyAdditionalServletWithClassLoader(servlet, ncl);
AdditionalServlet servlet = (AdditionalServlet) additionalServlet;
return new AdditionalServletWithClassLoader(servlet, ncl);
} catch (Throwable t) {
rethrowIOException(t);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;
package org.apache.pulsar.broker.web.plugin.servlet;

import java.io.IOException;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServlet;
import org.eclipse.jetty.servlet.ServletHolder;

/**
* A proxy additional servlet with it's classloader.
* An additional servlet with it's classloader.
*/
@Slf4j
@Data
@RequiredArgsConstructor
public class ProxyAdditionalServletWithClassLoader implements ProxyAdditionalServlet {
public class AdditionalServletWithClassLoader implements AdditionalServlet {

private final ProxyAdditionalServlet servlet;
private final AdditionalServlet servlet;
private final NarClassLoader classLoader;

@Override
public void loadConfig(ProxyConfiguration proxyConfiguration) {
servlet.loadConfig(proxyConfiguration);
public void loadConfig(PulsarConfiguration pulsarConfiguration) {
servlet.loadConfig(pulsarConfiguration);
}

@Override
Expand Down
Loading

0 comments on commit 118d64e

Please sign in to comment.