Coherence Concurrent module provides distributed implementations of the concurrency primitives from the java.util.concurrent
package that you are already familiar with, such as executors, atomics, locks, semaphores and latches.
This allows you to implement concurrent applications using the constructs you are already familiar with, but to expand the "scope" of concurrency from a single process to potentially hundreds of processes within a Coherence cluster. You can use executors to submit tasks to be executed somewhere in the cluster; you can use locks, latches and semaphores to synchronize execution across many cluster members; you can use atomics to implement global counters across many processes, etc.
Please keep in mind that while these features are extremely powerful and allow you to reuse the knowledge you already have, they may have detrimental effect on scalability and/or performance. Whenever you synchronize execution via locks, latches or semaphores, you are introducing a potential bottleneck into the architecture. Whenever you use a distributed atomic to implement a global counter, you are turning very simple operations that take mere nanoseconds locally, such as increment and decrement, into fairly expensive network calls that could take milliseconds (and potentially block even longer under heavy load).
So, use these features sparingly. In many cases there is a better, faster and more scalable way to accomplish the same goal using Coherence primitives such as entry processors, aggregators and events, which were designed to perform and scale well in a distributed environment from the get go.
Each of the features above is backed by one or more Coherence caches, possibly with preconfigured interceptors, but for the most part you shouldn’t care about that: all interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need.
For example, you will use factory methods within Atomics
class to get instances of various atomic types, Locks
to get lock instances, Latches
and Semaphores
to get, well, latches and semaphores.
In many cases the factory classes will allow you to get both the local and the remote instances of various constructs. For example, Locks.localLock
will give you an instance of a standard java.util.concurrent.locks.ReentrantLock
, while Locks.remoteLock
will return an instance of a RemoteLock
. In cases where JDK doesn’t provide a standard interface, which is the case with atomics, latches and semaphores, we’ve extracted the interface from the existing JDK class, and created a thin wrapper around the corresponding JDK implementation. For example, Coherence Concurrent provides a Semaphore
interface, and LocalSemaphore
class that wraps java.util.concurrent.Semaphore
. The same is true for the CountDownLatch
, and all atomic types.
The main advantage of using factory classes to construct both the local and the remote instances is that it allows you to name local locks the same way you have to name remote locks: calling Locks.localLock("foo")
will always return the same Lock
instance, as the Locks
class internally caches both the local and the remote instances it created. Of course, in the case of remote locks, every locally cached remote lock instance is ultimately backed by a shared lock instance somewhere in the cluster, which is used to synchronize lock state across the processes.
Coherence Concurrent supports both Java serialization and POF out-of-the-box, with Java serialization being the default.
If you want to use POF instead, you will need to specify that by setting coherence.concurrent.serializer
system property to pof
. You will also need to include coherence-concurrent-pof-config.xml
into your own POF configuration file, in order to register built-in Coherence Concurrent types.
Coherence Concurrent supports both active and on-demand persistence, but just like in the rest of Coherence it is set to on-demand
by default.
In order to use active persistence you should set coherence.concurrent.persistence.environment
system property to default-active
, or another persistence environment that has active persistence enabled.
In order to use Coherence Concurrent features, you need to declare it as a dependency in your pom.xml
:
<dependency>
<groupId>{coherence-maven-group-id}</groupId>
<artifactId>coherence-concurrent</artifactId>
<version>{version-coherence-maven}</version>
</dependency>
Once the necessary dependency is in place, you can start using the features it provides, as the following sections describe.
Coherence Concurrent provides a facility to dispatch tasks, either a Runnable
or Callable
to
a Coherence cluster for execution.
Executors that will actually execute the submitted tasks are configured on each cluster member by defining one or more named executors within a cache configuration resource.
By default, each Coherence cluster with the coherence-concurrent
module on the classpath,
will include a single-threaded executor that may be used to execute dispatched tasks.
Given this, the simplest example would be:
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();
Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));
result.get(); // block until completion
If for example, an executor was configured named Fixed5
, the code would be:
RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");
If no executor has been configured with the given name, the RemoteExecutor
will throw RejectedExecutionException
.
Each RemoteExecutor
instance may hold local resources that should be released when the RemoteExecutor
is no longer needed. Like an ExecutorService
, a RemoteExecutor
has similar methods to shut the executor down. When calling these methods, it will have no impact on the executors registered within the cluster.
Several executor types are available for configuration.
ExecutorService Type | Description |
---|---|
Single thread |
Creates an ExecutorService with a single thread |
Fixed thread |
Creates an ExecutorService with a fixed number of threads |
Cached |
Create an ExecutorService that will create new threads as needed and reuse existing threads when possible |
Work stealing |
Creates a work-stealing thread pool using the number of available processors as its target parallelism level. |
Element Name | Required | Expected Type | Description |
---|---|---|---|
single |
no |
N/A |
Defines a single-thread executor |
fixed |
no |
N/A |
Defines a fixed-thread-pool executor |
cached |
no |
N/A |
Defines a cached-thread-pool executor |
work-stealing |
no |
N/A |
Defines a work-stealing-pool executor |
name |
yes |
java.lang.String |
Defines the logical |
thread-count |
yes |
java.lang.Integer |
Defines the thread count for a |
parallelism |
no |
java.lang.Integer |
Defines the parallelism of a |
thread-factory |
no |
N/A |
Defines a java.util.concurrent.ThreadFactory. Used by |
instance |
yes |
java.util.concurrent.ThreadFactory |
Defines how the ThreadFactory will be instantiated. See the docs for details on the |
See the schema for full details.
To define executors, the cache-config
root element needs to include the coherence-concurrent
NamespaceHandler in order to recognize the configuration elements.
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>
Tip
|
Executors defined by configuration must precede any other elements in the document. Failing to do so, will prevent the document from validating. |
The following examples assume the xml namespace defined for the NamespaceHandler is c
:
<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
<c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factor-->
<c:single>
<c:name>SingleTF</c:name>
<c:thread-factory>
<c:instance>
<c:class-name>my.custom.ThreadFactory</c:class-name>
</c:instance>
</c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
<c:name>Single</c:name>
<c:thread-count>5</c:thread-count>
</c:fixed>
The ExecutorMBean represents the operational state of a registered executor.
The object name of the MBean is:
type=Executor,name=<executor name>,nodeId=<cluster node>
Attribute | Type | Access | Description |
---|---|---|---|
MemberId |
java.lang.String |
read-only |
The member ID where the executor is running. |
Name |
java.lang.String |
read-only |
The logical name of the executor. |
Id |
java.lang.String |
read-only |
The ID of the registered executor. |
Description |
java.lang.String |
read-only |
The generated description of the executor. |
Location |
java.lang.String |
read-only |
The complete location details of the executor. |
State |
java.lang.String |
read-only |
The current state of the executor. May be one of |
TaskCompletedCount |
java.lang.Long |
read-only |
The number of tasks completed by this executor. |
TaskRejectedCount |
java.lang.Long |
read-only |
The number of tasks rejected by this executor. |
TaskInProgressCount |
java.lang.Long |
read-only |
The number of tasks currently running or pending to be run by this executor. |
TraceLogging |
java.lang.Boolean |
read-write |
Enables executor trace logging (WARNING! VERBOSE). Disabled by default. |
Coherence Management over REST exposes endpoints to query and invoke actions against ExecutorMBean instances.
Description | Method | Path | Produces |
---|---|---|---|
View all Executors |
GET |
/management/coherence/cluster/executors |
JSON |
View all Executors with matching name |
GET |
/management/coherence/cluster/executors/{name} |
JSON |
Reset Executor statistics by name |
POST |
/management/coherence/cluster/executors/{name}/resetStatistics |
JSON |
Coherence Concurrent provides distributed implementations of atomic types, such as AtomicInteger
, AtomicLong
and AtomicReference
. It also provides local implementations of the same types. The local implementations are just thin wrappers around existing java.util.concurrent.atomic
types, which implement the same interface as their distributed variants, in order to be interchangeable.
To create instances of atomic types you need to call the appropriate factory method on the Atomics
class:
AtomicInteger localFoo = Atomics.localAtomicInteger("foo"); // (1)
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo"); // (2)
AtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L); // (3)
-
creates a local, in-process instance of named
AtomicInteger
with an implicit initial value of 0 -
creates a remote, distributed instance of named
AtomicInteger
, distinct from the local instancefoo
, with an implicit initial value of 0 -
creates a remote, distributed instance of named
AtomicLong
, with an initial value of 5
Note that the AtomicInteger
and AtomicLong
types used above are not types from the java.util.concurrent.atomic
package they you are familiar with — they are actually interfaces defined within com.oracle.coherence.concurrent.atomic
package, that both LocalAtomicXyz
and RemoteAtomicXyz
classes implement, which are the instances that are actually returned by the methods above.
That means that the above code could be rewritten as:
LocalAtomicInteger localFoo = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);
However, we strongly suggest that you use interfaces instead of concrete types, as they make it easy to switch between local and distributed implementations when necessary.
Once created, these instances can be used the same way you would use any of the corresponding java.util.concurrent.atomic
types:
int counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);
The instances of numeric atomic types, such as AtomicInteger
and AtomicLong
, are frequently used to represent various counters in the application, where a client may need to increment the value, but doesn’t necessarily need to know what the new value is.
When working with the local atomics, the same API shown above can be used, and the return value simply ignored. However, when using distributed atomics that would introduce unnecessary blocking on the client while waiting for the response from the server, which would then simply be discarded. Obviously, this would have negative impact on both performance and throughput of the atomics.
To reduce the impact of remote calls in those situations, Coherence Concurrent also provides non-blocking, asynchronous implementations of all atomic types it supports.
To obtain a non-blocking instance of any supported atomic type, simply call async
method on the blocking instance of that type:
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async(); // (1)
AsyncAtomicLong asyncBar = Atomics.remoteAtomicLong("bar", 5L).async(); // (2)
-
creates a remote, distributed instance of named, non-blocking
AsyncAtomicInteger
, with an implicit initial value of 0 -
creates a remote, distributed instance of named, non-blocking
AsyncAtomicLong
, with an initial value of 5
Once created, these instances can be used the same way you would use any of the corresponding blocking types. The only difference is that they will simply return a CompletableFuture
for the result, and will not block:
CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long> futureCounter5 = asyncBar.addAndGet(5L);
Both the blocking and the non-blocking instance of any distributed atomic type, with the same name, are backed by the same cluster-side atomic instance state, so they can be used interchangeably.
Atomic types from Coherence Concurrent can also be injected using CDI, which eliminates the need for explicit factory method calls on the Atomics
class.
@Inject
@Name("foo")
private AtomicInteger localFoo; // (1)
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo; // (2)
@Inject
@Remote
private AsyncAtomicLong asyncBar // (3)
-
injects a local, in-process instance of an
AtomicInteger
namedfoo
, with an implicit initial value of 0 -
injects a remote, distributed instance of an
AtomicInteger
namedfoo
, distinct from the local instancefoo
, with an implicit initial value of 0 -
injects a remote, distributed instance of non-blocking
AsyncAtomicLong
, with an implicit name ofasyncBar
Once an instance of an atomic type is obtained via CDI injection, it can be used the same way as an instance obtained directly from the Atomics
factory class.
Coherence Concurrent provides distributed implementations of Lock
and ReadWriteLock
interfaces from the java.util.concurrent.locks
package, allowing you to implement lock-based concurrency control across cluster members when necessary.
Unlike local JDK implementations, the classes in this package use cluster member/process ID and thread ID to identify lock owner, and store shared lock state within a Coherence NamedMap
. However, that also implies that the calls to acquire and release locks are remote, network calls, as they need to update shared state that is likely stored on a different cluster member, which will have an impact on performance of lock
and unlock
operations.
A RemoteLock
class provides an implementation of a Lock
interface and allows you to ensure that only one thread on one member is running critical section guarded by the lock at any given time.
To obtain an instance of a RemoteLock
, call Locks.remoteLock
factory method:
Lock foo = Locks.remoteLock("foo");
Just like with Atomics
, you can also obtain a local Lock
instance from the Locks
class, with will simply return an instance of a standard java.util.concurrent.locks.ReentrantLock
, by calling localLock
factory method:
Lock foo = Locks.localLock("foo");
Once you have a Lock
instance, you can use it as you normally would:
foo.lock();
try {
// critical section guarded by the exclusive lock `foo`
}
finally {
foo.unlock();
}
A RemoteReadWriteLock
class provides an implementation of a ReadWriteLock
interface and allows you to ensure that only one thread on one member is running critical section guarded by the write lock at any given time, while allowing multiple concurrent readers.
To obtain an instance of a RemoteReadWriteLock
, call Locks.remoteReadWriteLock
factory method:
ReadWriteLock bar = Locks.remoteReadWriteLock("bar");
Just like with Atomics
, you can also obtain a local ReadWriteLock
instance from the Locks
class, with will simply return an instance of a standard java.util.concurrent.locks.ReentrantReadWriteLock
, by calling localReadWriteLock
factory method:
ReadWriteLock bar = Locks.localReadWriteLock("bar");
Once you have a ReadWriteLock
instance, you can use it as you normally would:
bar.writeLock().lock()
try {
// critical section guarded by the exclusive write lock `bar`
}
finally {
bar.writeLock().unlock();
}
Or:
bar.readLock().lock()
try {
// critical section guarded by the shared read lock `bar`
}
finally {
bar.readLock().unlock();
}
You can also use CDI to inject both the exclusive and read/write lock instances into objects that need them:
@Inject
@Remote
@Name("foo")
private Lock lock; // (1)
@Inject
@Remote
private ReadWriteLock bar; // (2)
-
injects distributed exclusive lock named
foo
intolock
field -
injects distributed read/write lock named
bar
intobar
field
Once an instance of lock is obtained via CDI injection, it can be used the same way as an instance obtained directly from the Locks
factory class.
Coherence Concurrent also provides distributed implementations of a CountDownLatch
and Semaphore
classes from java.util.concurrent
package, allowing you to implement synchronization of execution across multiple Coherence cluster members as easily as you can implement it within a single process using those two JDK classes. It also provides interfaces for those two concurrency primitives, that both remote and local implementations conform to.
Just like with atomics, the local implementations are nothing more than thin wrappers around corresponding JDK classes.
The RemoteCoundDownLatch
class provides a distributed implementation of a CountDownLatch
, and allows you to ensure that the execution of the code on any cluster member that is waiting for the latch proceeds only when the latch reaches zero. Any cluster member can both wait for a latch, and count down.
To obtain an instance of a RemoteCountDownLatch
, call Latches.remoteCountDownLatch
factory method:
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5); // (1)
-
create an instance of a
RemoteCountDownLatch
with the initial count of 5
Just like with Atomics
and Locks
, you can also obtain a local CountDownLatch
instance from the Latches
class by calling remoteCountDownLatch
factory method:
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10); // (1)
-
create an instance of a
LocalCountDownLatch
with the initial count of 10
Once you have a RemoteCountDownLatch
instance, you can use it as you normally would, by calling countDown
and await
methods on it.
The RemoteSemaphore
class provides a distributed implementation of a Semaphore
, and allows any cluster member to acquire and release permits from the same semaphore instance.
To obtain an instance of a RemoteSemaphore
, call Semaphores.remoteSemaphore
factory method:
Semaphore foo = Semaphores.remoteSemaphore("foo", 5); // (1)
-
create an instance of a
RemoteSemaphore
with 5 permits
Just like with Atomics
and Locks
, you can also obtain a local Semaphore
instance from the Semaphores
class by calling localSemaphore
factory method:
Semaphore foo = Semaphores.localSemaphore("foo"); // (1)
-
create an instance of a
LocalSemaphore
with 0 permits
Once you have a Semaphore
instance, you can use it as you normally would, by calling release
and acquire
methods on it.
You can also use CDI to inject both the CountDownLatch
and Semaphore
instances into objects that need them:
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo; // (1)
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo; // (2)
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar; // (3)
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar; // (4)
-
inject an instance of a
LocalCountDownLatch
with the initial count of five -
inject an instance of a
RemoteCountDownLatch
with the initial count of ten -
inject an instance of a
LocalSemaphore
with zero permits available -
inject an instance of a
RemoteSemaphore
with one permit available
Once a latch or a semaphore instance is obtained via CDI injection, it can be used the same way as an instance obtained directly from the Latches
or Semaphores
factory classes.
The @Name
annotation is optional in both cases, as long as the member name (in the examples above, the field name) can be obtained from the injection point, but is required otherwise (such as when using constructor injection).
The @Count
annotation specifies the initial latch count, and if omitted will be defaulted to one. The @Permits
annotation specifies the number of available permits for a semaphore, and if omitted will be defaulted to zero, which means that the first acquire
call will block until another thread releases one or more permits.