Skip to content

Commit

Permalink
PIP-45: Allow more info in MetadataSerde (apache#12243)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 29, 2021
1 parent bef306c commit 0a2c895
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,12 +90,13 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
this.config = pulsar.getConfiguration();
this.locatorEntryCache = store.getMetadataCache(new MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
@Override
public byte[] serialize(SchemaStorageFormat.SchemaLocator value) {
public byte[] serialize(String path, SchemaStorageFormat.SchemaLocator value) {
return value.toByteArray();
}

@Override
public SchemaStorageFormat.SchemaLocator deserialize(byte[] content) throws IOException {
public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content, Stat stat)
throws IOException {
return SchemaStorageFormat.SchemaLocator.parseFrom(content);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,30 @@

import java.io.IOException;

/**
* Interface that define a serializer/deserializer implementation.
*
* @param <T>
*/
public interface MetadataSerde<T> {

byte[] serialize(T value) throws IOException;
/**
* Serialize the object into a byte array.
*
* @param path the path of the object on the metadata store
* @param value the object instance
* @return a byte array of the serialized version
* @throws IOException if the serialization fails
*/
byte[] serialize(String path, T value) throws IOException;

T deserialize(byte[] content) throws IOException;
/**
* Serialize the object from a byte array.
* @param path the path of the object on the metadata store
* @param content the content as stored on metadata store
* @param stat the {@link Stat} metadata for the object
* @return the deserialized object
* @throws IOException if the deserialization fails
*/
T deserialize(String path, byte[] content, Stat stat) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.pulsar.metadata.api.MetadataSerde;

/**
* Interface for the coordination service. Provides abstraction for distributed locks and leader election.
Expand All @@ -41,6 +42,7 @@ <T> LeaderElection<T> getLeaderElection(Class<T> clazz, String path,
Consumer<LeaderElectionState> stateChangesListener);

<T> LockManager<T> getLockManager(Class<T> clazz);
<T> LockManager<T> getLockManager(MetadataSerde<T> serde);

/**
* Increment a counter identified by the specified path and return the current value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.Stat;

public class JSONMetadataSerdeSimpleType<T> implements MetadataSerde<T> {

Expand All @@ -32,12 +33,12 @@ public JSONMetadataSerdeSimpleType(JavaType typeRef) {
}

@Override
public byte[] serialize(T value) throws IOException {
public byte[] serialize(String path, T value) throws IOException {
return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value);
}

@Override
public T deserialize(byte[] content) throws IOException {
public T deserialize(String path, byte[] content, Stat stat) throws IOException {
return ObjectMapperFactory.getThreadLocal().readValue(content, typeRef);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.Stat;

public class JSONMetadataSerdeTypeRef<T> implements MetadataSerde<T> {

Expand All @@ -34,12 +35,12 @@ public JSONMetadataSerdeTypeRef(TypeReference<T> typeRef) {
}

@Override
public byte[] serialize(T value) throws IOException {
public byte[] serialize(String paht, T value) throws IOException {
return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value);
}

@Override
public T deserialize(byte[] content) throws IOException {
public T deserialize(String path, byte[] content, Stat stat) throws IOException {
return ObjectMapperFactory.getThreadLocal().readValue(content, typeRef);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
Expand Down Expand Up @@ -102,9 +103,10 @@ private CompletableFuture<Optional<CacheGetResult<T>>> readValueFromStore(String
}

try {
T obj = serde.deserialize(optRes.get().getValue());
GetResult res = optRes.get();
T obj = serde.deserialize(path, res.getValue(), res.getStat());
return FutureUtils
.value(Optional.of(new CacheGetResult<>(obj, optRes.get().getStat())));
.value(Optional.of(new CacheGetResult<>(obj, res.getStat())));
} catch (Throwable t) {
return FutureUtils.exception(new ContentDeserializationException(
"Failed to deserialize payload for key '" + path + "'", t));
Expand Down Expand Up @@ -141,15 +143,16 @@ public CompletableFuture<T> readModifyUpdateOrCreate(String path, Function<Optio
long expectedVersion;

if (optEntry.isPresent()) {
CacheGetResult<T> entry = optEntry.get();
T clone;
try {
// Use clone and CAS zk to ensure thread safety
clone = serde.deserialize(serde.serialize(optEntry.get().getValue()));
clone = serde.deserialize(path, serde.serialize(path, entry.getValue()), entry.getStat());
} catch (IOException e) {
return FutureUtils.exception(e);
}
currentValue = Optional.of(clone);
expectedVersion = optEntry.get().getStat().getVersion();
expectedVersion = entry.getStat().getVersion();
} else {
currentValue = Optional.empty();
expectedVersion = -1;
Expand All @@ -159,7 +162,7 @@ public CompletableFuture<T> readModifyUpdateOrCreate(String path, Function<Optio
byte[] newValue;
try {
newValueObj = modifyFunction.apply(currentValue);
newValue = serde.serialize(newValueObj);
newValue = serde.serialize(path, newValueObj);
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand All @@ -181,15 +184,15 @@ public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> modifyF

CacheGetResult<T> entry = optEntry.get();
T currentValue = entry.getValue();
long expectedVersion = optEntry.get().getStat().getVersion();
long expectedVersion = entry.getStat().getVersion();

T newValueObj;
byte[] newValue;
try {
// Use clone and CAS zk to ensure thread safety
currentValue = serde.deserialize(serde.serialize(currentValue));
currentValue = serde.deserialize(path, serde.serialize(path, currentValue), entry.getStat());
newValueObj = modifyFunction.apply(currentValue);
newValue = serde.serialize(newValueObj);
newValue = serde.serialize(path, newValueObj);
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand All @@ -205,7 +208,7 @@ public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> modifyF
public CompletableFuture<Void> create(String path, T value) {
byte[] content;
try {
content = serde.serialize(value);
content = serde.serialize(path, value);
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
Expand All @@ -44,7 +45,7 @@ public class CoordinationServiceImpl implements CoordinationService {

private final MetadataStoreExtended store;

private final Map<Class<?>, LockManager<?>> lockManagers = new ConcurrentHashMap<>();
private final Map<Object, LockManager<?>> lockManagers = new ConcurrentHashMap<>();
private final Map<String, LeaderElection<?>> leaderElections = new ConcurrentHashMap<>();

private final ScheduledExecutorService executor;
Expand Down Expand Up @@ -80,6 +81,12 @@ public <T> LockManager<T> getLockManager(Class<T> clazz) {
k -> new LockManagerImpl<T>(store, clazz, executor));
}

@Override
public <T> LockManager<T> getLockManager(MetadataSerde<T> serde) {
return (LockManager<T>) lockManagers.computeIfAbsent(serde,
k -> new LockManagerImpl<T>(store, serde, executor));
}

@Override
public CompletableFuture<Long> getNextCounterValue(String path) {
return store.exists(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private synchronized CompletableFuture<LeaderElectionState> elect() {
private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
T existingValue;
try {
existingValue = serde.deserialize(res.getValue());
existingValue = serde.deserialize(path, res.getValue(), res.getStat());
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand Down Expand Up @@ -160,7 +160,7 @@ private synchronized void changeState(LeaderElectionState les) {
private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader() {
byte[] payload;
try {
payload = serde.serialize(proposedValue.get());
payload = serde.serialize(path, proposedValue.get());
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.fasterxml.jackson.databind.type.TypeFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -30,7 +29,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
Expand Down Expand Up @@ -65,9 +63,15 @@ private enum State {
private State state = State.Ready;

LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ExecutorService executor) {
this(store, new JSONMetadataSerdeSimpleType<>(
TypeFactory.defaultInstance().constructSimpleType(clazz, null)),
executor);
}

LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, ExecutorService executor) {
this.store = store;
this.cache = store.getMetadataCache(clazz);
this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
this.cache = store.getMetadataCache(serde);
this.serde = serde;
this.executor = executor;
store.registerSessionListener(this::handleSessionEvent);
store.registerListener(this::handleDataNotification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public synchronized T getValue() {
public synchronized CompletableFuture<Void> updateValue(T newValue) {
byte[] payload;
try {
payload = serde.serialize(newValue);
payload = serde.serialize(path, newValue);
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand Down Expand Up @@ -160,7 +160,7 @@ synchronized CompletableFuture<Void> acquire() {
private CompletableFuture<Void> acquireWithNoRevalidation() {
byte[] payload;
try {
payload = serde.serialize(value);
payload = serde.serialize(path, value);
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand Down Expand Up @@ -242,7 +242,7 @@ synchronized CompletableFuture<Void> revalidate() {

T existingValue;
try {
existingValue = serde.deserialize(optGetResult.get().getValue());
existingValue = serde.deserialize(path, res.getValue(), res.getStat());
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testBackwardCompatibility() throws IOException {

JSONMetadataSerdeSimpleType jsonMetadataSerdeSimpleType = new JSONMetadataSerdeSimpleType(
TypeFactory.defaultInstance().constructSimpleType(Policies.class, null));
Policies policies = (Policies) jsonMetadataSerdeSimpleType.deserialize(oldPolicyStr.getBytes());
Policies policies = (Policies) jsonMetadataSerdeSimpleType.deserialize(null, oldPolicyStr.getBytes(), null);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
1001);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -495,12 +496,12 @@ public void cacheWithCustomSerde(String provider, Supplier<String> urlSupplier)
// Simple serde that convert numbers to ascii
MetadataCache<Integer> objCache = store.getMetadataCache(new MetadataSerde<Integer>() {
@Override
public byte[] serialize(Integer value) throws IOException {
public byte[] serialize(String path, Integer value) throws IOException {
return value.toString().getBytes(StandardCharsets.UTF_8);
}

@Override
public Integer deserialize(byte[] content) throws IOException {
public Integer deserialize(String path, byte[] content, Stat stat) throws IOException {
return Integer.parseInt(new String(content, StandardCharsets.UTF_8));
}
});
Expand All @@ -511,4 +512,47 @@ public Integer deserialize(byte[] content) throws IOException {

assertEquals(objCache.get(key1).join().get(), (Integer) 1);
}

@Data
@NoArgsConstructor
static class CustomClass {
@JsonIgnore
private String path;

public int a;
public int b;
}

@Test(dataProvider = "impl")
public void customSerde(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
MetadataCache<CustomClass> objCache = store.getMetadataCache(new MetadataSerde<CustomClass>() {
@Override
public byte[] serialize(String path, CustomClass value) throws IOException {
return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value);
}

@Override
public CustomClass deserialize(String path, byte[] content, Stat stat) throws IOException {
CustomClass cc = ObjectMapperFactory.getThreadLocal().readValue(content, CustomClass.class);
cc.path = path;
return cc;
}
});

String key1 = newKey();

CustomClass value1 = new CustomClass();
value1.a = 1;
value1.b = 2;
Stat stat = store.put(key1, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1), Optional.of(-1L))
.join();

CacheGetResult<CustomClass> res = objCache.getWithStats(key1).join().get();
assertEquals(res.getStat().getVersion(), stat.getVersion());
assertEquals(res.getValue().a, 1);
assertEquals(res.getValue().b, 2);
assertEquals(res.getValue().path, key1);
}
}

0 comments on commit 0a2c895

Please sign in to comment.