Skip to content

Commit

Permalink
Avoid introducing bookkeeper-common into the pulsar-common (apache#9551)
Browse files Browse the repository at this point in the history
* Avoid introduce bookkeeper-common into the pulsar-common
---

*Motivation*

Direct using jackson to parse json to avoid introduce the bookkeeper-common
into the pulsar-common. That will make other modules which are using pulsar-common
has some unused bookkeeper dependencies.

* Fix the build and add some tests

* Address comments
  • Loading branch information
zymap authored Feb 12, 2021
1 parent 547ab3b commit 18e61b3
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
Expand Down Expand Up @@ -116,7 +117,8 @@ public static Map<String, byte[]> buildMetadataForSchema(String schemaId) {
* placement policy configuration encode error
*/
static Map<String, byte[]> buildMetadataForPlacementPolicyConfig(
Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties) throws ParseJsonException {
Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties)
throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException {
EnsemblePlacementPolicyConfig config = new EnsemblePlacementPolicyConfig(className, properties);
return ImmutableMap.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, config.encode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -124,6 +126,7 @@
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
Expand Down Expand Up @@ -3495,7 +3498,7 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()
));
} catch (JsonUtil.ParseJsonException e) {
} catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
log.error("[{}] Serialize the placement configuration failed", name, e);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
return;
Expand Down
5 changes: 0 additions & 5 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-common</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
*/
package org.apache.pulsar.common.policies.data;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Objects;
import org.apache.bookkeeper.common.util.JsonUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
Expand Down Expand Up @@ -66,11 +67,29 @@ public boolean equals(Object obj) {
return false;
}

public byte[] encode() throws JsonUtil.ParseJsonException {
return JsonUtil.toJson(this).getBytes(StandardCharsets.UTF_8);
public byte[] encode() throws ParseEnsemblePlacementPolicyConfigException {
try {
return ObjectMapperFactory.getThreadLocal()
.writerWithDefaultPrettyPrinter()
.writeValueAsString(this)
.getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
throw new ParseEnsemblePlacementPolicyConfigException("Failed to encode to json", e);
}
}

public static EnsemblePlacementPolicyConfig decode(byte[] data) throws JsonUtil.ParseJsonException {
return JsonUtil.fromJson(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class);
public static EnsemblePlacementPolicyConfig decode(byte[] data) throws ParseEnsemblePlacementPolicyConfigException {
try {
return ObjectMapperFactory.getThreadLocal()
.readValue(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class);
} catch (JsonProcessingException e) {
throw new ParseEnsemblePlacementPolicyConfigException("Failed to decode from json", e);
}
}

public static class ParseEnsemblePlacementPolicyConfigException extends Exception {
ParseEnsemblePlacementPolicyConfigException(String message, Throwable throwable) {
super(message, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.Collections;

public class EnsemblePlacementPolicyConfigTest {

static class MockedEnsemblePlacementPolicy {}

@Test
public void testEncodeDecodeSuccessfully()
throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException {

EnsemblePlacementPolicyConfig originalConfig =
new EnsemblePlacementPolicyConfig(MockedEnsemblePlacementPolicy.class, Collections.EMPTY_MAP);
byte[] encodedConfig = originalConfig.encode();

EnsemblePlacementPolicyConfig decodedConfig =
EnsemblePlacementPolicyConfig.decode(encodedConfig);
Assert.assertEquals(decodedConfig, originalConfig);
}

@Test
public void testDecodeFailed() {
byte[] configBytes = new byte[0];
try {
EnsemblePlacementPolicyConfig.decode(configBytes);
Assert.fail("should failed parse the config from bytes");
} catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
// expected error
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
Expand Down Expand Up @@ -193,7 +194,7 @@ private static Optional<EnsemblePlacementPolicyConfig> getEnsemblePlacementPolic
if (ensemblePlacementPolicyConfigData != null) {
try {
return Optional.ofNullable(EnsemblePlacementPolicyConfig.decode(ensemblePlacementPolicyConfigData));
} catch (JsonUtil.ParseJsonException e) {
} catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
LOG.error("Failed to parse the ensemble placement policy config from the custom metadata", e);
return Optional.empty();
}
Expand Down

0 comments on commit 18e61b3

Please sign in to comment.