Skip to content

Commit

Permalink
Add package management service into the pulsar startup process (apach…
Browse files Browse the repository at this point in the history
…e#8764)

---
    
Master Issue: apache#8676
    
*Motivation*
    
Make the pulsar service has the ability to enable the packages management
service.
    
*Modifications*
    
- Add the packages management service in the start up process
- Add the related configuration in the configuration file

Will add integration tests after the REST API service and Client added.
  • Loading branch information
zymap authored Dec 7, 2020
1 parent 81d9c2d commit 349930b
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 1 deletion.
17 changes: 17 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1176,3 +1176,20 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
# Enable transaction coordinator in broker
transactionCoordinatorEnabled=false
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider

### --- Packages management service configuration variables (begin) --- ###

# Enable the packages management service or not
enablePackagesManagement=false

# The packages management service storage service provide
packagesManagementStorageProvider=org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageProvider

# When the packages storage provider is bookkeeper, you can use this configuration to
# control the number of replicas for storing the package
packagesReplicas=1

# The bookkeeper ledger root path
packagesManagementLedgerRootPath=/ledger

### --- Packages management service configuration variables (end) --- ###
17 changes: 17 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -896,3 +896,20 @@ defaultNumPartitions=1

### --- Transaction config variables --- ###
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider

### --- Packages management service configuration variables (begin) --- ###

# Enable the packages management service or not
enablePackagesManagement=false

# The packages management service storage service provide
packagesManagementStorageProvider=org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageProvider

# When the packages storage provider is bookkeeper, you can use this configuration to
# control the number of replicas for storing the package
packagesReplicas=1

# The bookkeeper ledger root path
packagesManagementLedgerRootPath=/ledger

### --- Packages management service configuration variables (end) --- ###
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
private static final String CATEGORY_HTTP = "HTTP";
@Category
private static final String CATEGORY_TRANSACTION = "Transaction";
@Category
private static final String CATEGORY_PACKAGES_MANAGEMENT = "Packages Management";

/***** --- pulsar configuration --- ****/
@FieldContext(
Expand Down Expand Up @@ -1965,6 +1967,35 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private Set<String> brokerClientTlsProtocols = Sets.newTreeSet();

/* packages management service configurations (begin) */

@FieldContext(
category = CATEGORY_PACKAGES_MANAGEMENT,
doc = "Enable the packages management service or not"
)
private boolean enablePackagesManagement = false;

@FieldContext(
category = CATEGORY_PACKAGES_MANAGEMENT,
doc = "The packages management service storage service provider"
)
private String packagesManagementStorageProvider = "org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageProvider";

@FieldContext(
category = CATEGORY_PACKAGES_MANAGEMENT,
doc = "When the packages storage provider is bookkeeper, you can use this configuration to\n"
+ "control the number of replicas for storing the package"
)
private int packagesReplicas = 1;

@FieldContext(
category = CATEGORY_PACKAGES_MANAGEMENT,
doc = "The bookkeeper ledger root path"
)
private String packagesManagementLedgerRootPath = "/ledgers";

/* packages management service configurations (end) */

/**
* @deprecated See {@link #getConfigurationStoreServers}
*/
Expand Down
7 changes: 7 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@

<!-- transaction related dependencies (end) -->

<!-- package manager related dependencies (begin) -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-package-core</artifactId>
<version>${project.version}</version>
</dependency>
<!-- package manager related dependencies (begin) -->
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.packages.management.core.PackagesManagement;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
import org.apache.pulsar.packages.management.core.impl.PackagesManagementImpl;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
Expand Down Expand Up @@ -208,6 +213,10 @@ public class PulsarService implements AutoCloseable {

private BrokerInterceptor brokerInterceptor;

// packages management service
private PackagesManagement packagesManagement;


public enum State {
Init, Started, Closed
}
Expand Down Expand Up @@ -607,6 +616,11 @@ public Boolean get() {
// start function worker service if necessary
this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService());

// start packages management service if necessary
if (config.isEnablePackagesManagement()) {
this.startPackagesManagementService();
}

final String bootstrapMessage = "bootstrap service "
+ (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
+ (config.getWebServicePortTls().isPresent() ? ", tls-port = " + config.getWebServicePortTls() : "")
Expand Down Expand Up @@ -1329,6 +1343,18 @@ private void startWorkerService(AuthenticationService authenticationService,
}
}

private void startPackagesManagementService() throws IOException {
// TODO: using provider to initialize the packages management service.
this.packagesManagement = new PackagesManagementImpl();
PackagesStorageProvider storageProvider = PackagesStorageProvider
.newProvider(config.getPackagesManagementStorageProvider());
DefaultPackagesStorageConfiguration storageConfiguration = new DefaultPackagesStorageConfiguration();
storageConfiguration.setProperty(config.getProperties());
PackagesStorage storage = storageProvider.getStorage(new DefaultPackagesStorageConfiguration());
storage.initialize();
packagesManagement.initialize(storage);
}

public Optional<Integer> getListenPortHTTP() {
return webService.getListenPortHTTP();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.packages.management.storage.bookkeeper;

import java.util.Properties;
import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;

Expand Down Expand Up @@ -67,4 +68,9 @@ public String getProperty(String key) {
public void setProperty(String key, String value) {
configuration.setProperty(key, value);
}

@Override
public void setProperty(Properties properties) {
this.configuration.setProperty(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.packages.management.core;

import java.util.Properties;

/**
* Packages storage configuration is used to set and get the storage related configuration values.
*/
Expand All @@ -40,4 +42,12 @@ public interface PackagesStorageConfiguration {
* property value
*/
void setProperty(String key, String value);

/**
* Set a group of the property.
*
* @param properties
* a group of the property
*/
void setProperty(Properties properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;

public class DefaultPackagesStorageConfiguration implements PackagesStorageConfiguration {
private final Properties properties = new Properties();

private Properties properties = new Properties();

@Override
public String getProperty(String key) {
Expand All @@ -33,4 +34,9 @@ public String getProperty(String key) {
public void setProperty(String key, String value) {
properties.setProperty(key, value);
}

@Override
public void setProperty(Properties properties) {
this.properties = properties;
}
}

0 comments on commit 349930b

Please sign in to comment.