Skip to content

Latest commit

 

History

History
 
 

coherence-concurrent

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Distributed Concurrency

Distributed Concurrency

Coherence Concurrent module provides distributed implementations of the concurrency primitives from the java.util.concurrent package that you are already familiar with, such as executors, atomics, locks, semaphores and latches.

This allows you to implement concurrent applications using the constructs you are already familiar with, but to expand the "scope" of concurrency from a single process to potentially hundreds of processes within a Coherence cluster. You can use executors to submit tasks to be executed somewhere in the cluster; you can use locks, latches and semaphores to synchronize execution across many cluster members; you can use atomics to implement global counters across many processes, etc.

Please keep in mind that while these features are extremely powerful and allow you to reuse the knowledge you already have, they may have detrimental effect on scalability and/or performance. Whenever you synchronize execution via locks, latches or semaphores, you are introducing a potential bottleneck into the architecture. Whenever you use a distributed atomic to implement a global counter, you are turning very simple operations that take mere nanoseconds locally, such as increment and decrement, into fairly expensive network calls that could take milliseconds (and potentially block even longer under heavy load).

So, use these features sparingly. In many cases there is a better, faster and more scalable way to accomplish the same goal using Coherence primitives such as entry processors, aggregators and events, which were designed to perform and scale well in a distributed environment from the get go.

Factory Classes

Each of the features above is backed by one or more Coherence caches, possibly with preconfigured interceptors, but for the most part you shouldn’t care about that: all interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need.

For example, you will use factory methods within Atomics class to get instances of various atomic types, Locks to get lock instances, Latches and Semaphores to get, well, latches and semaphores.

Local vs Remote

In many cases the factory classes will allow you to get both the local and the remote instances of various constructs. For example, Locks.localLock will give you an instance of a standard java.util.concurrent.locks.ReentrantLock, while Locks.remoteLock will return an instance of a RemoteLock. In cases where JDK doesn’t provide a standard interface, which is the case with atomics, latches and semaphores, we’ve extracted the interface from the existing JDK class, and created a thin wrapper around the corresponding JDK implementation. For example, Coherence Concurrent provides a Semaphore interface, and LocalSemaphore class that wraps java.util.concurrent.Semaphore. The same is true for the CountDownLatch, and all atomic types.

The main advantage of using factory classes to construct both the local and the remote instances is that it allows you to name local locks the same way you have to name remote locks: calling Locks.localLock("foo") will always return the same Lock instance, as the Locks class internally caches both the local and the remote instances it created. Of course, in the case of remote locks, every locally cached remote lock instance is ultimately backed by a shared lock instance somewhere in the cluster, which is used to synchronize lock state across the processes.

Serialization

Coherence Concurrent supports both Java serialization and POF out-of-the-box, with Java serialization being the default.

If you want to use POF instead, you will need to specify that by setting coherence.concurrent.serializer system property to pof. You will also need to include coherence-concurrent-pof-config.xml into your own POF configuration file, in order to register built-in Coherence Concurrent types.

Persistence

Coherence Concurrent supports both active and on-demand persistence, but just like in the rest of Coherence it is set to on-demand by default.

In order to use active persistence you should set coherence.concurrent.persistence.environment system property to default-active, or another persistence environment that has active persistence enabled.

Usage

In order to use Coherence Concurrent features, you need to declare it as a dependency in your pom.xml:

    <dependency>
        <groupId>{coherence-maven-group-id}</groupId>
        <artifactId>coherence-concurrent</artifactId>
        <version>{version-coherence-maven}</version>
    </dependency>

Once the necessary dependency is in place, you can start using the features it provides, as the following sections describe.

Executors

Overview

Coherence Concurrent provides a facility to dispatch tasks, either a Runnable or Callable to a Coherence cluster for execution.

Executors that will actually execute the submitted tasks are configured on each cluster member by defining one or more named executors within a cache configuration resource.

Usage Examples

By default, each Coherence cluster with the coherence-concurrent module on the classpath, will include a single-threaded executor that may be used to execute dispatched tasks.

Given this, the simplest example would be:

RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();

Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));

result.get(); // block until completion

If for example, an executor was configured named Fixed5, the code would be:

RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");

If no executor has been configured with the given name, the RemoteExecutor will throw RejectedExecutionException.

Each RemoteExecutor instance may hold local resources that should be released when the RemoteExecutor is no longer needed. Like an ExecutorService, a RemoteExecutor has similar methods to shut the executor down. When calling these methods, it will have no impact on the executors registered within the cluster.

Configuration

Several executor types are available for configuration.

ExecutorService Type Description

Single thread

Creates an ExecutorService with a single thread

Fixed thread

Creates an ExecutorService with a fixed number of threads

Cached

Create an ExecutorService that will create new threads as needed and reuse existing threads when possible

Work stealing

Creates a work-stealing thread pool using the number of available processors as its target parallelism level.

Configuration Elements
Element Name Required Expected Type Description

single

no

N/A

Defines a single-thread executor

fixed

no

N/A

Defines a fixed-thread-pool executor

cached

no

N/A

Defines a cached-thread-pool executor

work-stealing

no

N/A

Defines a work-stealing-pool executor

name

yes

java.lang.String

Defines the logical name of the executor

thread-count

yes

java.lang.Integer

Defines the thread count for a fixed thread pool executor.

parallelism

no

java.lang.Integer

Defines the parallelism of a work-stealing thread pool executor. If not defined, it will default to the number of processors available on the system.

thread-factory

no

N/A

Defines a java.util.concurrent.ThreadFactory. Used by single, fixed, and cached executors.

instance

yes

java.util.concurrent.ThreadFactory

Defines how the ThreadFactory will be instantiated. See the docs for details on the instance element. This element must be a child of the thread-factory element.

See the schema for full details.

Configuration Examples

To define executors, the cache-config root element needs to include the coherence-concurrent NamespaceHandler in order to recognize the configuration elements.

<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
               xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
               xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>
Tip
Executors defined by configuration must precede any other elements in the document. Failing to do so, will prevent the document from validating.

The following examples assume the xml namespace defined for the NamespaceHandler is c:

<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
  <c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factor-->
<c:single>
  <c:name>SingleTF</c:name>
  <c:thread-factory>
    <c:instance>
      <c:class-name>my.custom.ThreadFactory</c:class-name>
    </c:instance>
  </c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
  <c:name>Single</c:name>
  <c:thread-count>5</c:thread-count>
</c:fixed>

Management

The ExecutorMBean represents the operational state of a registered executor.

The object name of the MBean is:

type=Executor,name=<executor name>,nodeId=<cluster node>
ExecutorMBean Attributes
Attribute Type Access Description

MemberId

java.lang.String

read-only

The member ID where the executor is running.

Name

java.lang.String

read-only

The logical name of the executor.

Id

java.lang.String

read-only

The ID of the registered executor.

Description

java.lang.String

read-only

The generated description of the executor.

Location

java.lang.String

read-only

The complete location details of the executor.

State

java.lang.String

read-only

The current state of the executor. May be one of JOINING, RUNNING, CLOSING_GRACEFULLY, CLOSING, CLOSED or REJECTING.

TaskCompletedCount

java.lang.Long

read-only

The number of tasks completed by this executor.

TaskRejectedCount

java.lang.Long

read-only

The number of tasks rejected by this executor.

TaskInProgressCount

java.lang.Long

read-only

The number of tasks currently running or pending to be run by this executor.

TraceLogging

java.lang.Boolean

read-write

Enables executor trace logging (WARNING! VERBOSE). Disabled by default.

Operations

The ExecutorMBean MBean includes a resetStatistics operation that resets the statistics for this executor.

Management over REST

Coherence Management over REST exposes endpoints to query and invoke actions against ExecutorMBean instances.

Description Method Path Produces

View all Executors

GET

/management/coherence/cluster/executors

JSON

View all Executors with matching name

GET

/management/coherence/cluster/executors/{name}

JSON

Reset Executor statistics by name

POST

/management/coherence/cluster/executors/{name}/resetStatistics

JSON

CDI Support

RemoteExecutors may be injected via CDI. For example:

@Inject
private RemoteExecutor single; // (1)

@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor; // (2)
  1. injects a RemoteExecutor named single.

  2. injects a RemoteExecutor named Fixed5.

Atomics

Coherence Concurrent provides distributed implementations of atomic types, such as AtomicInteger, AtomicLong and AtomicReference. It also provides local implementations of the same types. The local implementations are just thin wrappers around existing java.util.concurrent.atomic types, which implement the same interface as their distributed variants, in order to be interchangeable.

To create instances of atomic types you need to call the appropriate factory method on the Atomics class:

AtomicInteger localFoo  = Atomics.localAtomicInteger("foo");   // (1)
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");  // (2)
AtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L); // (3)
  1. creates a local, in-process instance of named AtomicInteger with an implicit initial value of 0

  2. creates a remote, distributed instance of named AtomicInteger, distinct from the local instance foo, with an implicit initial value of 0

  3. creates a remote, distributed instance of named AtomicLong, with an initial value of 5

Note that the AtomicInteger and AtomicLong types used above are not types from the java.util.concurrent.atomic package they you are familiar with — they are actually interfaces defined within com.oracle.coherence.concurrent.atomic package, that both LocalAtomicXyz and RemoteAtomicXyz classes implement, which are the instances that are actually returned by the methods above.

That means that the above code could be rewritten as:

LocalAtomicInteger  localFoo  = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

However, we strongly suggest that you use interfaces instead of concrete types, as they make it easy to switch between local and distributed implementations when necessary.

Once created, these instances can be used the same way you would use any of the corresponding java.util.concurrent.atomic types:

int  counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);

Asynchronous Implementations

The instances of numeric atomic types, such as AtomicInteger and AtomicLong, are frequently used to represent various counters in the application, where a client may need to increment the value, but doesn’t necessarily need to know what the new value is.

When working with the local atomics, the same API shown above can be used, and the return value simply ignored. However, when using distributed atomics that would introduce unnecessary blocking on the client while waiting for the response from the server, which would then simply be discarded. Obviously, this would have negative impact on both performance and throughput of the atomics.

To reduce the impact of remote calls in those situations, Coherence Concurrent also provides non-blocking, asynchronous implementations of all atomic types it supports.

To obtain a non-blocking instance of any supported atomic type, simply call async method on the blocking instance of that type:

AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();      // (1)
AsyncAtomicLong    asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();     // (2)
  1. creates a remote, distributed instance of named, non-blocking AsyncAtomicInteger, with an implicit initial value of 0

  2. creates a remote, distributed instance of named, non-blocking AsyncAtomicLong, with an initial value of 5

Once created, these instances can be used the same way you would use any of the corresponding blocking types. The only difference is that they will simply return a CompletableFuture for the result, and will not block:

CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long>    futureCounter5 = asyncBar.addAndGet(5L);

Both the blocking and the non-blocking instance of any distributed atomic type, with the same name, are backed by the same cluster-side atomic instance state, so they can be used interchangeably.

CDI Support

Atomic types from Coherence Concurrent can also be injected using CDI, which eliminates the need for explicit factory method calls on the Atomics class.

@Inject
@Name("foo")
private AtomicInteger localFoo;   // (1)

@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;  // (2)

@Inject
@Remote
private AsyncAtomicLong asyncBar  // (3)
  1. injects a local, in-process instance of an AtomicInteger named foo, with an implicit initial value of 0

  2. injects a remote, distributed instance of an AtomicInteger named foo, distinct from the local instance foo, with an implicit initial value of 0

  3. injects a remote, distributed instance of non-blocking AsyncAtomicLong, with an implicit name of asyncBar

Once an instance of an atomic type is obtained via CDI injection, it can be used the same way as an instance obtained directly from the Atomics factory class.

Locks

Coherence Concurrent provides distributed implementations of Lock and ReadWriteLock interfaces from the java.util.concurrent.locks package, allowing you to implement lock-based concurrency control across cluster members when necessary.

Unlike local JDK implementations, the classes in this package use cluster member/process ID and thread ID to identify lock owner, and store shared lock state within a Coherence NamedMap. However, that also implies that the calls to acquire and release locks are remote, network calls, as they need to update shared state that is likely stored on a different cluster member, which will have an impact on performance of lock and unlock operations.

Exclusive Locks

A RemoteLock class provides an implementation of a Lock interface and allows you to ensure that only one thread on one member is running critical section guarded by the lock at any given time.

To obtain an instance of a RemoteLock, call Locks.remoteLock factory method:

Lock foo = Locks.remoteLock("foo");

Just like with Atomics, you can also obtain a local Lock instance from the Locks class, with will simply return an instance of a standard java.util.concurrent.locks.ReentrantLock, by calling localLock factory method:

Lock foo = Locks.localLock("foo");

Once you have a Lock instance, you can use it as you normally would:

foo.lock();
try {
    // critical section guarded by the exclusive lock `foo`
}
finally {
    foo.unlock();
}

Read/Write Locks

A RemoteReadWriteLock class provides an implementation of a ReadWriteLock interface and allows you to ensure that only one thread on one member is running critical section guarded by the write lock at any given time, while allowing multiple concurrent readers.

To obtain an instance of a RemoteReadWriteLock, call Locks.remoteReadWriteLock factory method:

ReadWriteLock bar = Locks.remoteReadWriteLock("bar");

Just like with Atomics, you can also obtain a local ReadWriteLock instance from the Locks class, with will simply return an instance of a standard java.util.concurrent.locks.ReentrantReadWriteLock, by calling localReadWriteLock factory method:

ReadWriteLock bar = Locks.localReadWriteLock("bar");

Once you have a ReadWriteLock instance, you can use it as you normally would:

bar.writeLock().lock()
try {
    // critical section guarded by the exclusive write lock `bar`
}
finally {
    bar.writeLock().unlock();
}

Or:

bar.readLock().lock()
try {
    // critical section guarded by the shared read lock `bar`
}
finally {
    bar.readLock().unlock();
}

CDI Support

You can also use CDI to inject both the exclusive and read/write lock instances into objects that need them:

@Inject
@Remote
@Name("foo")
private Lock lock;           // (1)

@Inject
@Remote
private ReadWriteLock bar;   // (2)
  1. injects distributed exclusive lock named foo into lock field

  2. injects distributed read/write lock named bar into bar field

Once an instance of lock is obtained via CDI injection, it can be used the same way as an instance obtained directly from the Locks factory class.

Latches and Semaphores

Coherence Concurrent also provides distributed implementations of a CountDownLatch and Semaphore classes from java.util.concurrent package, allowing you to implement synchronization of execution across multiple Coherence cluster members as easily as you can implement it within a single process using those two JDK classes. It also provides interfaces for those two concurrency primitives, that both remote and local implementations conform to.

Just like with atomics, the local implementations are nothing more than thin wrappers around corresponding JDK classes.

Count Down Latch

The RemoteCoundDownLatch class provides a distributed implementation of a CountDownLatch, and allows you to ensure that the execution of the code on any cluster member that is waiting for the latch proceeds only when the latch reaches zero. Any cluster member can both wait for a latch, and count down.

To obtain an instance of a RemoteCountDownLatch, call Latches.remoteCountDownLatch factory method:

CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);     // (1)
  1. create an instance of a RemoteCountDownLatch with the initial count of 5

Just like with Atomics and Locks, you can also obtain a local CountDownLatch instance from the Latches class by calling remoteCountDownLatch factory method:

CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);     // (1)
  1. create an instance of a LocalCountDownLatch with the initial count of 10

Once you have a RemoteCountDownLatch instance, you can use it as you normally would, by calling countDown and await methods on it.

Semaphore

The RemoteSemaphore class provides a distributed implementation of a Semaphore, and allows any cluster member to acquire and release permits from the same semaphore instance.

To obtain an instance of a RemoteSemaphore, call Semaphores.remoteSemaphore factory method:

Semaphore foo = Semaphores.remoteSemaphore("foo", 5);            // (1)
  1. create an instance of a RemoteSemaphore with 5 permits

Just like with Atomics and Locks, you can also obtain a local Semaphore instance from the Semaphores class by calling localSemaphore factory method:

Semaphore foo = Semaphores.localSemaphore("foo");                // (1)
  1. create an instance of a LocalSemaphore with 0 permits

Once you have a Semaphore instance, you can use it as you normally would, by calling release and acquire methods on it.

CDI Support

You can also use CDI to inject both the CountDownLatch and Semaphore instances into objects that need them:

@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;           // (1)

@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;          // (2)

@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;            // (3)

@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;           // (4)
  1. inject an instance of a LocalCountDownLatch with the initial count of five

  2. inject an instance of a RemoteCountDownLatch with the initial count of ten

  3. inject an instance of a LocalSemaphore with zero permits available

  4. inject an instance of a RemoteSemaphore with one permit available

Once a latch or a semaphore instance is obtained via CDI injection, it can be used the same way as an instance obtained directly from the Latches or Semaphores factory classes.

The @Name annotation is optional in both cases, as long as the member name (in the examples above, the field name) can be obtained from the injection point, but is required otherwise (such as when using constructor injection).

The @Count annotation specifies the initial latch count, and if omitted will be defaulted to one. The @Permits annotation specifies the number of available permits for a semaphore, and if omitted will be defaulted to zero, which means that the first acquire call will block until another thread releases one or more permits.