Skip to content

Commit

Permalink
PIP-45: Implement CoordinationService on top of MetadataStore (apache…
Browse files Browse the repository at this point in the history
…#9221)

* PIP-45: CoordinationService

* Fixed test compile

* Handle concurrent write internally
  • Loading branch information
merlimat authored Jan 19, 2021
1 parent 63acd20 commit 29b0f08
Show file tree
Hide file tree
Showing 26 changed files with 1,628 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2750,7 +2750,7 @@ public void testPropertiesForMeta() throws Exception {

CountDownLatch l2 = new CountDownLatch(1);
store.asyncUpdateLedgerIds(mLName, builder.build(),
new Stat(1, 0, 0),
new Stat(mLName, 1, 0, 0),
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.cache;
package org.apache.pulsar.metadata.api;

import java.util.List;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.cache.MetadataCache;

/**
* Metadata store client interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,29 @@ public MetadataStoreException(String msg) {
super(msg);
}

/**
* Implementation is invalid
*/
public static class InvalidImplementationException extends MetadataStoreException {
public InvalidImplementationException() {
super((Throwable) null);
}

public InvalidImplementationException(Throwable t) {
super(t);
}

public InvalidImplementationException(String msg) {
super(msg);
}
}

/**
* Key not found in store.
*/
public static class NotFoundException extends MetadataStoreException {
public NotFoundException() {
super((Throwable)null);
super((Throwable) null);
}

public NotFoundException(Throwable t) {
Expand Down Expand Up @@ -91,6 +108,36 @@ public ContentDeserializationException(String msg) {
}
}

/**
* A resource lock is already taken by a different instance.
*/
public static class LockBusyException extends MetadataStoreException {
public LockBusyException() {
super((Throwable) null);
}

public LockBusyException(Throwable t) {
super(t);
}

public LockBusyException(String msg) {
super(msg);
}
}

/**
* The store was already closed.
*/
public static class AlreadyClosedException extends MetadataStoreException {
public AlreadyClosedException(Throwable t) {
super(t);
}

public AlreadyClosedException(String msg) {
super(msg);
}
}

public static MetadataStoreException unwrap(Throwable t) {
if (t instanceof MetadataStoreException) {
return (MetadataStoreException) t;
Expand All @@ -115,6 +162,10 @@ public static MetadataStoreException unwrap(Throwable t) {
return new BadVersionException(msg);
} else if (cause instanceof ContentDeserializationException) {
return new ContentDeserializationException(msg);
} else if (cause instanceof InvalidImplementationException) {
return new InvalidImplementationException(msg);
} else if (cause instanceof LockBusyException) {
return new LockBusyException(msg);
} else {
return new MetadataStoreException(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class MetadataStoreFactory {
* @throws IOException
* if the metadata store initialization fails
*/
public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws IOException {
public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
if (metadataURL.startsWith("memory://")) {
return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
*/
@Data
public class Stat {

/**
* The path of the value
*/
final String path;

/**
* The data version.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.api.coordination;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* Interface for the coordination service. Provides abstraction for distributed locks and leader election.
*/
public interface CoordinationService extends AutoCloseable {

/**
* Create a new {@link LeaderElection} controller.
*
* @param clazz
* the class type to be used for serialization/deserialization
* @param path
* the path to use for the leader election
* @param stateChangesListener
* a listener that will be passed all the state changes
* @return
*/
<T> LeaderElection<T> getLeaderElection(Class<T> clazz, String path,
Consumer<LeaderElectionState> stateChangesListener);

<T> LockManager<T> getLockManager(Class<T> clazz);

/**
* Increment a counter identified by the specified path and return the current value.
*
* The counter value will be guaranteed to be unique within the context of the path.
*
* @param path
* the path that identifies a particular counter
* @return a future that will track the completion of the operation
* @throws CoordinationServiceException
* if there's a failure in incrementing the counter
*/
CompletableFuture<Long> getNextCounterValue(String path);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.api.coordination;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* Leader election controller.
*/
public interface LeaderElection<T> extends AutoCloseable {
/**
* Try to become the leader.
* <p>
* Warning: because of the distributed nature of the leader election, having been promoted to "leader" status will
* never provide a strong guarantee that no one else also thinks it's the leader. The caller will have to deal with
* these race conditions when using the resource itself (eg. using compareAndSet() or fencing mechanisms).
*
* @param proposedValue
* the value to set for the leader, in the case this instance succeeds in becoming leader
* @return a future that will track the completion of the operation
* @throws MetadataStoreException
* if there's a failure in the leader election
*/
CompletableFuture<LeaderElectionState> elect(T proposedValue);

/**
* Get the current leader election state
*/
LeaderElectionState getState();

/**
* Get the value set by the elected leader, or empty if there's currently no leader.
*
* @return a future that will track the completion of the operation
*/
CompletableFuture<Optional<T>> getLeaderValue();

/**
* Get the value set by the elected leader, or empty if there's currently no leader.
* <p>
* The call is non blocking and in certain cases can return <code>Optional.empty()</code> even though a leader is
* technically elected.
*
* @return a future that will track the completion of the operation
*/
Optional<T> getLeaderValueIfPresent();

/**
* Close the leader election controller and release the leadership (if it was acquired).
*
* @return a future that will track the completion of the operation
*/
CompletableFuture<Void> asyncClose();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.api.coordination;

/**
* State of the leader election.
*/
public enum LeaderElectionState {
/**
* No leader has been elected yet
*/
NoLeader,

/**
* We are currently the leader
*/
Leading,

/**
* Follower state
*/
Following
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.api.coordination;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;

/**
* Controller for acquiring distributed lock on resources.
*/
public interface LockManager<T> extends AutoCloseable {
/**
* Read the content of an existing lock.
*
* If the lock is already taken, this operation will fail immediately.
*
* Warning: because of the distributed nature of the lock, having acquired a lock will never provide a strong
* guarantee that no one else also think it owns the same resource. The caller will have to deal with these race
* conditions when using the resource itself (eg. using compareAndSet() or fencing mechanisms).
*
* @param path
* the path of the resource on which to acquire the lock
* @param content
* the payload of the lock
* @return a future that will track the completion of the operation
* @throws NotFoundException
* if the lock is not taken
* @throws MetadataStoreException
* if there's a failure in reading the lock
*/
CompletableFuture<Optional<T>> readLock(String path);

/**
* Acquire a lock on a shared resource.
*
* If the lock is already taken, this operation will fail immediately.
*
* Warning: because of the distributed nature of the lock, having acquired a lock will never provide a strong
* guarantee that no one else also think it owns the same resource. The caller will have to deal with these race
* conditions when using the resource itself (eg. using compareAndSet() or fencing mechanisms).
*
* @param path
* the path of the resource on which to acquire the lock
* @param value
* the value of the lock
* @return a future that will track the completion of the operation
* @throws LockBusyException
* if the lock is already taken
* @throws MetadataStoreException
* if there's a failure in acquiring the lock
*/
CompletableFuture<ResourceLock<T>> acquireLock(String path, T value);

/**
* List all the locks that are children of a specific path.
*
* For example, given locks: <code>/a/b/lock-1</code> and <code>/a/b/lock-2</code>, the
* <code>listLocks("/a/b")</code> will return a list of <code>["lock-1", "lock-2"]</code>.
*
* @param path
* the prefix path to get the list of locks
* @return a future that will track the completion of the operation
* @throws MetadataStoreException
* if there's a failure in getting the list of locks
*/
CompletableFuture<List<String>> listLocks(String path);

/**
* Close the LockManager and release all the locks.
*
* @return a future that will track the completion of the operation
* @throws MetadataStoreException
* if there's a failure in closing the LockManager
*/
CompletableFuture<Void> asyncClose();

}
Loading

0 comments on commit 29b0f08

Please sign in to comment.