Skip to content

Commit

Permalink
Add proxy plugin interface to support user defined additional servlet (
Browse files Browse the repository at this point in the history
…apache#8067)

Motivation
In order to facilitate users' flexible access in the broker, we provide plug-ins similar to broker protocol and broker interceptor. Similarly, in the proxy, sometimes users also need similar plug-in functions to facilitate customizing some data requests.

Modifications
Add protocol plugin for proxy

* Add proxy metrics logic

Signed-off-by: xiaolong.ran <[email protected]>

* fix proxy interface

Signed-off-by: xiaolong.ran <[email protected]>

* rename interceptor to protocol

Signed-off-by: xiaolong.ran <[email protected]>

* add test case

Signed-off-by: xiaolong.ran <[email protected]>

* fix license header

Signed-off-by: xiaolong.ran <[email protected]>

* fix ci error

Signed-off-by: xiaolong.ran <[email protected]>

* 1. change some class name;
2. add mock additional servlet test

* fix

* fix

* fix

* 1. add authentication support for proxy additional servlet plugin;
2. fix java doc

* fix test

Co-authored-by: gaoran10 <[email protected]>
  • Loading branch information
wolfstudy and gaoran10 authored Oct 23, 2020
1 parent ef614ad commit ca97be3
Show file tree
Hide file tree
Showing 17 changed files with 1,003 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public BrokerInterceptorDefinitions searchForInterceptors(String interceptorsDir
*
* @param metadata the broker interceptors definition.
*/
BrokerInterceptorWithClassLoader load(BrokerInterceptorMetadata metadata, String narExtractionDirectory) throws IOException {
BrokerInterceptorWithClassLoader load(BrokerInterceptorMetadata metadata, String narExtractionDirectory) throws IOException {
NarClassLoader ncl = NarClassLoader.getFromArchive(
metadata.getArchivePath().toAbsolutePath().toFile(),
Collections.emptySet(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.common.configuration.PropertiesContext;
import org.apache.pulsar.common.configuration.PropertyContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.sasl.SaslConstants;

@Getter
Expand Down Expand Up @@ -67,6 +68,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
private static final String CATEGORY_HTTP = "HTTP";
@Category
private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider";
@Category
private static final String CATEGORY_PLUGIN = "proxy plugin";

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
Expand Down Expand Up @@ -165,6 +168,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private Optional<Integer> webServicePortTls = Optional.empty();

@FieldContext(
category = CATEGORY_SERVER,
doc = "The directory where nar Extraction happens"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Proxy log level, default is 0."
Expand Down Expand Up @@ -471,6 +480,18 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private int httpNumThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors());

@FieldContext(
category = CATEGORY_PLUGIN,
doc = "The directory to locate proxy additional servlet"
)
private String proxyAdditionalServletDirectory = "./proxyAdditionalServlet";

@FieldContext(
category = CATEGORY_PLUGIN,
doc = "List of proxy additional servlet to load, which is a list of proxy additional servlet names"
)
private Set<String> proxyAdditionalServlets = Sets.newTreeSet();

@FieldContext(
category = CATEGORY_HTTP,
doc = "Enable the enforcement of limits on the incoming HTTP requests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -46,14 +44,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServlets;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
Expand Down Expand Up @@ -119,6 +117,8 @@ public class ProxyService implements Closeable {
private final Set<ProxyConnection> clientCnxs;
@Getter
private final Map<String, TopicStats> topicStats;
@Getter
private ProxyAdditionalServlets proxyAdditionalServlets;

public ProxyService(ProxyConfiguration proxyConfig,
AuthenticationService authenticationService) throws IOException {
Expand Down Expand Up @@ -152,6 +152,7 @@ public ProxyService(ProxyConfiguration proxyConfig,
stats.calculate();
});
}, 60, TimeUnit.SECONDS);
this.proxyAdditionalServlets = ProxyAdditionalServlets.load(proxyConfig);
}

public void start() throws Exception {
Expand Down Expand Up @@ -234,6 +235,12 @@ public void close() throws IOException {
if (statsExecutor != null) {
statsExecutor.shutdown();
}

if (proxyAdditionalServlets != null) {
proxyAdditionalServlets.close();
proxyAdditionalServlets = null;
}

acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServletWithClassLoader;
import org.eclipse.jetty.proxy.ProxyServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
Expand All @@ -46,6 +47,7 @@

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;

Expand Down Expand Up @@ -193,6 +195,18 @@ public static void addWebServerHandlers(WebServer server,
proxyHolder.setInitParameter("prefix", "/");
server.addServlet(revProxy.getPath(), proxyHolder);
}

// add proxy additional servlets
if (service != null && service.getProxyAdditionalServlets() != null) {
Collection<ProxyAdditionalServletWithClassLoader> additionalServletCollection =
service.getProxyAdditionalServlets().getServlets().values();
for (ProxyAdditionalServletWithClassLoader servletWithClassLoader : additionalServletCollection) {
servletWithClassLoader.loadConfig(config);
server.addServlet(servletWithClassLoader.getBasePath(), servletWithClassLoader.getServletHolder(),
Collections.emptyList(), config.isAuthenticationEnabled());
log.info("proxy add additional servlet basePath {} ", servletWithClassLoader.getBasePath());
}
}
}

private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.proxy.server;

import com.google.common.collect.Lists;

import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class WebServer {
private ServerConnector connector;
private ServerConnector connectorTls;

public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) {
public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) throws IOException {
this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web");
this.server = new Server(webServiceExecutor);
this.authenticationService = authenticationService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;

import com.google.common.annotations.Beta;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.eclipse.jetty.servlet.ServletHolder;

/**
* The additional servlet interface for support additional servlet on Pulsar proxy.
*/
@Beta
public interface ProxyAdditionalServlet extends AutoCloseable {

/**
* load plugin config
*
* @param proxyConfiguration
*/
void loadConfig(ProxyConfiguration proxyConfiguration);

/**
* Get the base path of prometheus metrics
*
* @return the base path of prometheus metrics
*/
String getBasePath();

/**
* Get the servlet holder
*
* @return the servlet holder
*/
ServletHolder getServletHolder();

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Metadata information about a proxy additional servlet.
*/
@Data
@NoArgsConstructor
public class ProxyAdditionalServletDefinition {
/**
* The name of the proxy additional servlet.
*/
private String name;

/**
* The description of the proxy additional servlet to be used for user help.
*/
private String description;

/**
* The class name for the additional servlet.
*/
private String additionalServletClass;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;

import java.util.Map;
import java.util.TreeMap;
import lombok.Data;
import lombok.experimental.Accessors;

/**
* The collection of proxy additional servlet definition.
*/
@Data
@Accessors(fluent = true)
public class ProxyAdditionalServletDefinitions {

private final Map<String, ProxyAdditionalServletMetadata> servlets = new TreeMap<>();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server.plugin.servlet;

import java.nio.file.Path;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* The metadata of proxy additional servlet
*/
@Data
@NoArgsConstructor
public class ProxyAdditionalServletMetadata {

/**
* The definition of the proxy additional servlet.
*/
private ProxyAdditionalServletDefinition definition;

/**
* The path to the additional servlet package.
*/
private Path archivePath;
}
Loading

0 comments on commit ca97be3

Please sign in to comment.