Skip to content

Commit

Permalink
Remove pulsar-client-admin-api dependency : pulsar-common (apache#10774)
Browse files Browse the repository at this point in the history
* remove gson from admin api Topics.java

* Bookies.java

* Brokers.java

* BrokerStats.java

* fix PulsarAdminException

* Clusters.java

* Tenants.java

* minor fix on ResourceGroups

* Namespaces.java & OffloadPolicies

* temp remove JsonProperty in OffloadPolicies

* Topics & NonPersistentTopics

* ResourceQuotas with MixIn class

* Functions

* Sources.java

* Sinks.java

* Schemas.java

* resolve checkstyle

* minor fix

* Transactions.java

* fix stylecheck

* set ObjectMapper module

* fix ci

* fix PulsarAdminException

* remove gson

* remove swagger

* merge master

* remove swagger

* remove io.swagger

* remove commons-lang3

* use Objects.requireNonNull

* remove commons-lang3

* fix interface mapping

* fix ci

* extract OffloadPoliciesInterface

* extract FunctionStats interfaces

* fix unit test

* fix shaded class

* add object mapper mixin tests

* fix ci

* fix admin exception ci error

* UpdateOptions

* AutoFailoverPolicyData

* BrokerNamespaceIsolationData

* ClusterData

* FailureDomain

* FunctionInstanceStats

* NamespaceIsolationData

* OffloadPolicies

* TenantInfo

* FunctionStats

* fix stylecheck

* Fixed interface

* Fixed ClusterData equals method

* Fixed merge issue

* Fixed another merge issue

* Fixed test issue after merge

Co-authored-by: Rui Fu <[email protected]>
  • Loading branch information
merlimat and freeznet authored Jun 2, 2021
1 parent 91e102b commit 301a764
Show file tree
Hide file tree
Showing 402 changed files with 3,652 additions and 2,459 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

Expand Down Expand Up @@ -104,10 +104,10 @@ protected void internalSetUpForNamespace() throws Exception {
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(false)
.authentication(AuthenticationTls.class.getName(), authParams).build());
admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
admin.clusters().createCluster(clusterName, new ClusterDataImpl(brokerUrl.toString(), brokerUrlTls.toString(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()));
admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/my-ns");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;

/**
* Interface for offloading ledgers to long-term storage.
Expand Down Expand Up @@ -211,7 +211,7 @@ default CompletableFuture<Void> deleteOffloaded(UUID uid,
*
* @return offload policies
*/
OffloadPolicies getOffloadPolicies();
OffloadPoliciesImpl getOffloadPolicies();

/**
* Close the resources if necessary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;

/**
Expand All @@ -50,7 +50,7 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
T create(OffloadPolicies offloadPolicies,
T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler)
throws IOException;
Expand All @@ -65,10 +65,10 @@ T create(OffloadPolicies offloadPolicies,
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
default T create(OffloadPolicies offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
OrderedScheduler scheduler)
default T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
OrderedScheduler scheduler)
throws IOException {
return create(offloadPolicies, userMetadata, scheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;

/**
* Null implementation that throws an error on any invokation.
Expand Down Expand Up @@ -63,7 +63,7 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
}

@Override
public OffloadPolicies getOffloadPolicies() {
public OffloadPoliciesImpl getOffloadPolicies() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;

import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -210,7 +210,7 @@ public void testLaggedDeleteSlowConsumer() throws Exception {

@Test
public void isOffloadedNeedsDeleteTest() throws Exception {
OffloadPolicies offloadPolicies = new OffloadPolicies();
OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -202,14 +202,14 @@ static class MockLedgerOffloader implements LedgerOffloader {
ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>();


OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("S3", "", "", "",
null, null,
null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);


@Override
Expand Down Expand Up @@ -246,7 +246,7 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,
};

@Override
public OffloadPolicies getOffloadPolicies() {
public OffloadPoliciesImpl getOffloadPolicies() {
return offloadPolicies;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -991,14 +991,14 @@ Set<Long> deletedOffloads() {
return deletes.keySet();
}

OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("S3", "", "", "",
null, null,
null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);

@Override
public String getOffloadDriverName() {
Expand Down Expand Up @@ -1044,7 +1044,7 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,
};

@Override
public OffloadPolicies getOffloadPolicies() {
public OffloadPoliciesImpl getOffloadPolicies() {
return offloadPolicies;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
Expand All @@ -34,18 +33,16 @@
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;

/**
* Pulsar service configuration object.
Expand Down Expand Up @@ -1581,7 +1578,7 @@ public class ServiceConfiguration implements PulsarConfiguration {

@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Read priority when ledgers exists in both bookkeeper and the second layer storage.")
private String managedLedgerDataReadPriority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST
private String managedLedgerDataReadPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST
.getValue();

/*** --- Load balancer --- ****/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;

Expand Down Expand Up @@ -79,7 +78,7 @@ default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration
* @return a CompletableFuture containing a boolean in which true means the role is an admin user
* and false if it is not
*/
default CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
default CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfoImpl tenantInfo,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role) ? true : false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -80,7 +80,7 @@ public CompletableFuture<Boolean> isSuperUser(String user, AuthenticationDataSou
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}

public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfoImpl tenantInfo,
AuthenticationDataSource authenticationData) {
if (provider != null) {
return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -634,7 +634,7 @@ public CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName,
return CompletableFuture.completedFuture(true);
} else {
try {
TenantInfo tenantInfo = pulsarResources.getTenantResources()
TenantInfoImpl tenantInfo = pulsarResources.getTenantResources()
.get(path(POLICIES, tenantName))
.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
return isTenantAdmin(tenantName, role, tenantInfo, authData);
Expand Down
Loading

0 comments on commit 301a764

Please sign in to comment.