Implementing Concurrency in a Distributed Environment (original) (raw)
The Coherence Concurrent module provides distributed implementations of the concurrency primitives from the java.util.concurrent package such as executors, atomics, locks, semaphores, and latches.
You can implement concurrent applications using the constructs you are already familiar with and also expand the "scope" of concurrency from a single process to potentially hundreds of processes within a Coherence cluster. You can use executors to submit tasks to be executed somewhere in the cluster; you can use locks, latches, and semaphores to synchronize execution across many cluster members; you can use atomics to implement global counters across many processes, and so on.
While these features are extremely powerful and enable you to reuse the knowledge you already have, they may have detrimental effect on scalability and/or performance. Whenever you synchronize execution through locks, latches, or semaphores, you are introducing a potential bottleneck into the architecture. Whenever you use a distributed atomic to implement a global counter, you are turning very simple operations that take mere nanoseconds locally, such as increment and decrement, into fairly expensive network calls that could take milliseconds (and potentially block even longer under heavy load).
So, use these features sparingly. In many cases, there is a better, faster, and a more scalable way to accomplish the same goal using Coherence primitives such as entry processors, aggregators, and events. These primitives are designed to perform and scale well in a distributed environment.
Note:
- To use the concurrency features, Oracle recommends using the Bootstrap API to start the Coherence cluster members. See Using the Bootstrap API.
- Coherence concurrent features do not support, and cannot be configured to use federation. Coherence Federation is asynchronous. Therefore, it would not make sense to federate data that is inherently atomic in nature.
This chapter includes the following sections:
- Using Factory Classes
Each feature (executors, atomics, locks, semaphores, and latches) is backed by one or more Coherence caches, possibly with preconfigured interceptors. All interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need. - Using Local and Remote Instances
In many cases, the factory classes allow you to get both the local and the remote instances of various constructs. For example,Locks.localLockwill give you an instance of a standardjava.util.concurrent.locks.ReentrantLock, whileLocks.remoteLockwill return an instance of aRemoteLock. - Using Serialization
Coherence Concurrent supports both Java serialization and POF out-of-the-box serialization, with Java serialization being the default. - Using Persistence
Coherence Concurrent supports both active and on-demand persistence, but just like in the rest of Coherence it is set toon-demandby default. - Using the Coherence Concurrent Features
You can use the Coherence Concurrent features after declaring the features as a dependency in thepom.xmlfile. - Using Executors
Coherence Concurrent provides a facility to dispatch tasks, either aRunnable,Callable, orTaskto a Coherence cluster for execution. - Using Atomics
Coherence Concurrent provides distributed implementations of atomic types, such asAtomicInteger,AtomicLong, andAtomicReference. It also provides local implementations of the same types. - Using Locks
Coherence Concurrent provides distributed implementations ofLockandReadWriteLockinterfaces from thejava.util.concurrent.lockspackage, enabling you to implement lock-based concurrency control across cluster members when necessary. - Using Latches and Semaphores
- Using Blocking Queues
Using Factory Classes
Each feature (executors, atomics, locks, semaphores, and latches) is backed by one or more Coherence caches, possibly with preconfigured interceptors. All interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need.
For example, you will use factory methods within the Atomics class to get instances of various atomic types, Locks to get lock instances,Latches and Semaphores to get latches and semaphores.
Using Local and Remote Instances
In many cases, the factory classes allow you to get both the local and the remote instances of various constructs. For example, Locks.localLock will give you an instance of a standard java.util.concurrent.locks.ReentrantLock, while Locks.remoteLock will return an instance of a RemoteLock.
In cases where JDK does not provide a standard interface, which is the case with atomics, latches, and semaphores, the interface from the existing JDK class has been extracted to create a thin wrapper around the corresponding JDK implementation. For example, Coherence Concurrent provides a Semaphore interface and aLocalSemaphore class that wrapsjava.util.concurrent.Semaphore. The same is true forCountDownLatch and all atomic types.
The main advantage of using factory classes to construct both the local and the remote instances is that it allows you to name local locks the same way you name the remote locks: calling Locks.localLock("foo") always returns the sameLock instance because the Locks class internally caches both the local and the remote instances it creates. In the case of remote locks, every locally cached remote lock instance is ultimately backed by a shared lock instance somewhere in the cluster, which is used to synchronize lock state across the processes.
Using Serialization
Coherence Concurrent supports both Java serialization and POF out-of-the-box serialization, with Java serialization being the default.
If you want to use POF instead, you have to set thecoherence.concurrent.serializer system property topof. You should also include thecoherence-concurrent-pof-config.xml file into your own POF configuration file to register the built-in Coherence Concurrent types.
Using Persistence
Coherence Concurrent supports both active and on-demand persistence, but just like in the rest of Coherence it is set to on-demand by default.
To use active persistence, you should set thecoherence.concurrent.persistence.environment system property todefault-active, or use another persistence environment that has active persistence enabled.
Note:
The caches that store lock and semaphore data are configured as transient, and are not persisted when you use active or on-demand persistence.
Using the Coherence Concurrent Features
You can use the Coherence Concurrent features after declaring the features as a dependency in the pom.xml file.
To declare, make the following entry in the pom.xml file.
<dependency>
<groupId>${coherence.groupId}</groupId>
<artifactId>coherence-concurrent</artifactId>
<version>${coherence.version}</version>
</dependency>Using Executors
Coherence Concurrent provides a facility to dispatch tasks, either a Runnable, Callable, or Task to a Coherence cluster for execution. Executors that will run the submitted tasks are configured on each cluster member by defining one or more named executors within a cache configuration resource.
This section includes the following topics:
- Using Executors - Examples
- Advanced Orchestration
- Advanced Orchestration - Examples
- Configuring Executors
- Managing Executors
- Managing Executors Over REST
- Using CDI
Using Executors - Examples
By default, each Coherence cluster with the coherence-concurrent module on the classpath includes a single-threaded executor that may be used to run the dispatched tasks.
Given this, the simplest example would be:
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();
Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));
result.get(); // block until completionIf an executor has been configured with the name of Fixed5, then a reference to the executor may be obtained with::
RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");If no executor has been configured with the given name, theRemoteExecutor will throw the following exception:
RejectedExecutionException
Each RemoteExecutor instance may hold local resources that should be released when the RemoteExecutor is no longer required. Like an ExecutorService, a RemoteExecutor has similar methods to shut down the executor. Calling these methods has no impact on the executors registered within the cluster.
Advanced Orchestration
While the RemoteExecutor does provide functionality similar to the standard ExecutorService included in the JDK, this may not be enough in the context of Coherence. A task might need to run across multiple Coherence members, produce intermediate results, and remain durable in case a cluster member running the task fails.
In such cases, task orchestration can be used. Before diving into the details of orchestration, the following concepts should be understood.
Table 41-1 Task Orchestration Interfaces
| Interface | Description |
|---|---|
| Task | Tasks are like Callable and Runnable classes in that they are designed to be potentially run by one or more threads. Unlike Callable and Runnable classes, their execution may occur in different Java Virtual Machines, fail and/or recover between different Java Virtual Machine processes. |
| Task.Context | Provides contextual information for a Task as it is executed, including the ability to access and update intermediate results for the Executor executing the said Task. |
| Task.Orchestration | Defines information concerning the orchestration of a Task across a set of executors, defined across multiple Coherence members for a given RemoteExecutor. |
| Task.Coordinator | A publisher of collected Task results that additionally permits coordination of the submitted Task. |
| Task.Subscriber | A receiver of items produced by a Task.Coordinator. |
| Task.Properties | State sharing mechanism for tasks. |
| Task.Collector | A mutable reduction operation that accumulates results into a mutable result container, optionally transforming the accumulated result into a final representation after all results have been processed. |
- Tasks
- Task Context
- Task Orchestration
- Task Collector and Collectable
- Task Coordinator
- Task Subscriber
Tasks
Task implementations define a single method called execute(Context) that performs the task, possibly yielding execution to some later point. After the method has completed execution, by returning a result or throwing an exception (but not a Yield exception), the task is considered completed for the assigned Executor.
A Task may yield execution for a given time by throwing a Yield exception. This exception type signals the execution of a Task by an Executor is to be suspended and resumed at some later point in time, typically by the same Executor.
Task Context
When a Task is executed, a Context instance will be passed as an execution argument.
The Context provides access to task properties allowing shared state between tasks running in multiple Java Virtual Machines.
The Context provides details on overall execution status.
Table 41-2 Execution Status
| Execution State | Method | Description |
|---|---|---|
| Complete | Context.isDone() | Allows a Task to determine if the task is complete. Completion may be due to normal termination, an exception, or cancellation. In all of these cases, this method will return true. |
| Cancelled | Context.isCancelled() | Allows a Task to determine if the task is effectively cancelled. |
| Resuming | Context.isResuming() | Determines if a Task execution by an Executor is resuming after being recovered (for example, failover) or due to resumption after a task had previously thrown a Yield exception. |
Task Orchestration
Orchestrations begin by calling RemoteExecutor.orchestrate(Task), which will return a Task.Orchestration instance for the given Task. With the Task.Orchestration, it's possible to configure the aspects of where the task will be run.
Table 41-3 Task Orchestration Methods
| Method | Description |
|---|---|
| concurrently() | Tasks will be run, concurrently, across all Java Virtual Machines, where the named executor is defined/configured. This is the default. |
| sequentially() | Tasks will be run, in sequence, across all Java Virtual Machines, where the named executor is defined/configured. |
| limit(int) | Limit the task to n executors. Use this to limit the number of executors that will be considered for task execution. If not set, the default behavior is to run the task on all Java Virtual Machines where the named executor is defined/configured. |
| filter(Predicate) | Filtering provides an additional way to constrain where a task may be run. The predicates will be applied against metadata associated with each executor on each Java Virtual Machine. Some examples of metadata would be the member in which the executor is running, or the role of a member. Predicates may be chained to provide Boolean logic in determining an appropriate executor. |
| define(String, ) | Define initial state that will be available to all tasks no matter which Java Virtual Machine that task is running on. |
| retain(Duration) | When specified, the task will be retained allowing new subscribers to be notified of the final result of a task computation after it has completed. |
| collect(Collector) | This is the terminal of the orchestration builder returning a Task.Collectable, which defines how results are to be collected and ultimately submits the task to the grid. |
Task Collector and Collectable
The Task.Collector passed to the orchestration will collect results from tasks and optionally transform the collected results into a final format. Collectors are best illustrated by using examples of Collectors that are available in the TaskCollector class.
Table 41-4 Task Collector Methods
| Method | Description |
|---|---|
| count() | The count of non-null results that have been collected from the executing tasks. |
| firstOf() | Collects and returns the first result provided by the executing tasks. |
| lastOf() | Collects and returns the last result returned by the executing tasks. |
| setOf() | Collects and returns all non-null results as a Set. |
| listOf() | Collects and returns all non-null results as a List. |
The Task.Collectable instance returned by calling collect on the orchestration allows, among other things, setting the condition under which no more results will be collected or published by any registered subscribers. Calling submit() on the Task.Collectable will begin the orchestration of the task.
Task Coordinator
Upon calling submit() on the orchestration Collectable, a Task.Coordinator is returned. Like the Task.Collectable, the Task.Coordinator allows for the registration of subscribers. Additionally, it provides the ability to cancel or check the completion status of the orchestration.
Task Subscriber
The Task.Subscriber receives various events pertaining to the execution status of the orchestration.
Table 41-5 Task Subscriber Events
| Method | Description |
|---|---|
| onComplete() | Signals the completion of the orchestration. |
| onError(Throwable) | Called when an unrecoverable error (given as the argument) has occurred. |
| onNext() | Called when the Task.Coordinator has produced a result. |
| onSubscribe(Task.Subscription) | Called prior to any calls to onComplete(), onError(Throwable), or onNext() are called. The Task.Subscription provided gives access to cancelling the subscription or obtaining a reference to the Task.Coordinator. |
Advanced Orchestration - Examples
To begin, consider the following code common to orchestration examples:
// demonstrate orchestration using the default RemoteExecutor RemoteExecutor executor = RemoteExecutor.getDefault();
// WaitingSubscriber is an implementation of the // com.oracle.coherence.concurrent.executor.Task.Subscriber interface // that has a get() method that blocks until Subscriber.onComplete() is // called and will return the results received by onNext() WaitingSubscriber subscriber = new WaitingSubscriber();
// ValueTask is an implementation of the // com.oracle.coherence.concurrent.executor.Task interface // that returns the value provided at construction time ValueTask task = new ValueTask("Hello World");
Given the previous example, the simplest example of an orchestration is:
// orchestrate the task, subscribe, and submit executor.orchestrate(task) .subscribe(subscriber) .submit();
// wait for the task to complete // if this was run on four cluster members running the default executor service, // the returned Collection will have four results Collection results = subscriber.get();
Building on the previous example, assume a cluster with two storage and two proxy members. The cluster members are configured with the roles of storage and proxy, respectively. Let's say the task needs to run on storage members only, then the orchestration could look like:
// orchestrate the task, filtering by a role, subscribe, and submit executor.orchestrate(task) .filter(Predicates.role("storage")) .subscribe(subscriber) .submit();
// wait for the task to complete // as there are only two storage members in this hypothetical, only two // results will be returned Collection results = subscriber.get();
There are several predicates available for use in com.oracle.coherence.concurrent.executor.function.Predicates, however, in case none apply to the target use case, simply implement the Remote.Predicate interface.
You can customize the collection of results and how they are presented to the subscriber by using collect(Collector) and until(Predicate):
// orchestrate the task, collecting the first non-null result, // subscribe, and submit executor.orchestrate(new MayReturnNullTask()) .collect(TaskCollectors.firstOf()) .until(Predicates.nonNullValue()) .subscribe(subscriber) .submit();
// wait for the task to complete // the first non-result returned will be the one provided to the // subscriber Collection results = subscriber.get();
Several collectors are provided in com.oracle.coherence.concurrent.executor.TaskCollectors, however, in case none apply to the target use case, implement the Task.Collector interface.
Configuring Executors
Several executor types are available for configuration.
Table 41-6 Types of Executors
| ExecutorService Type | Description |
|---|---|
| Single thread | Creates an ExecutorService with a single thread. |
| Fixed thread | Creates an ExecutorService with a fixed number of threads. |
| Cached | Creates an ExecutorService that creates new threads as needed and reuses existing threads when possible. |
| Work stealing | Creates a work-stealing thread pool by using the number of available processors as its target parallelism level. |
| Custom | Allows the creation of non-standard executors. |
| VirtualThread | Creates a VirtualThread-per-task ExecutorService. Requires JDK 21 or later. |
Table 41-7 Configuration Elements
| Element Name | Required | Expected Type | Description |
|---|---|---|---|
| single | no | N/A | Defines a single-thread executor. |
| fixed | no | N/A | Defines a fixed-thread pool executor. |
| cached | no | N/A | Defines a cached-thread-pool executor |
| work-stealing | no | N/A | Defines a work-stealing pool executor. |
| custom-executor | no | java.util.concurrent.ExecutorService | Defines a custom executor. |
| virtual-per-task | no | N/A | Defines a VirtualThread-per-task executor. |
| name | yes | java.lang.String | Defines the logical name of the executor. |
| thread-count | yes | java.lang.Integer | Defines the thread count for a fixed-thread pool executor. |
| parallelism | no | java.lang.Integer | Defines the parallelism of a work-stealing-thread pool executor. If not defined, it defaults to the number of processors available on the system. |
| thread-factory | no | N/A | Defines a java.util.concurrent.ThreadFactory. Used by single, fixed, andcached executors. |
| instance | yes | java.util.concurrent.ThreadFactory | Defines how the ThreadFactory will be instantiated. For information about the instance element, seeinstance. This element must be a child of the thread-factory element. |
For complete details, see schema.
Configuration Examples
To define executors, the cache-config root element should include the coherence-concurrent NamespaceHandler to recognize the configuration elements.
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>Note:
Executors defined by the configuration must precede any other elements in the document. Failing to do so, will prevent the document from being validated.
The following examples assume that the xml namespace defined for the NamespaceHandler isc:
<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
<c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factory-->
<c:single>
<c:name>SingleTF</c:name>
<c:thread-factory>
<c:instance>
<c:class-name>my.custom.ThreadFactory</c:class-name>
</c:instance>
</c:thread-factory>
</c:single><!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
<c:name>Single</c:name>
<c:thread-count>5</c:thread-count>
</c:fixed>Managing Executors
Managing Executors Over REST
Coherence Management over REST exposes endpoints to query and invoke actions againstExecutorMBean instances.
Table 41-8 REST Endpoints
| Description | Method | Path | Produces |
|---|---|---|---|
| View all Executors | GET | /management/coherence/cluster/executors | JSON |
| View all Executors with matching name | GET | /management/coherence/cluster/executors/{name} | JSON |
| Reset Executor statistics by name | POST | /management/coherence/cluster/executors/{name}/resetStatistics | JSON |
Using CDI
You can inject RemoteExecutors through CDI.
For example:
@Inject
private RemoteExecutor single; // injects a RemoteExecutor named 'single'.
@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor; // injects a RemoteExecutor named 'Fixed5'.
Using Atomics
Coherence Concurrent provides distributed implementations of atomic types, such as AtomicInteger, AtomicLong, andAtomicReference. It also provides local implementations of the same types. The local implementations are just thin wrappers around the existingjava.util.concurrent.atomic types, which implement the same interface as their distributed variants, to be interchangeable.
To create instances of atomic types, you should call the appropriate factory method on the Atomics class:
// Creates a local, in-process instance of named 'AtomicInteger' with an implicit initial value of 0.
AtomicInteger localFoo = Atomics.localAtomicInteger("foo");
// Creates a remote, distributed instance of named 'AtomicInteger', distinct from the local instance 'foo',
// with an implicit initial value of '0'.
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
// Creates a remote, distributed instance of named 'AtomicLong', with an initial value of '5'.
AtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);Note:
The AtomicInteger and AtomicLong types used in the code above are not types from the java.util.concurrent.atomic package. They are actually interfaces defined within thecom.oracle.coherence.concurrent.atomic package that bothLocalAtomicXyz and RemoteAtomicXyz classes implement, which are the instances that are actually returned by the above methods.
Therefore, you can rewrite the above code as:
LocalAtomicInteger localFoo = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);However, Oracle strongly recommends that you use interfaces instead of concrete types because interfaces make it easy to switch between local and distributed implementations when necessary.
After the instances are created, you can use them the same way you would use any of the corresponding java.util.concurrent.atomic types:
int counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);This section includes the following topics:
Asynchronous Implementations of Atomic Types
The instances of numeric atomic types, such as AtomicInteger andAtomicLong, are frequently used to represent various counters in the application where a client may need to increment the value, but does not necessarily need to know what the new value is.
When working with the local atomics, you can use the same API shown earlier (see Using Atomics) and simply ignore the return value. However, when using distributed atomics that would introduce unnecessary blocking on the client while waiting for the response from the server, which would then simply be discarded. Obviously, this will have a negative impact on both performance and throughput of the atomics.
To reduce the impact of remote calls in those situations, Coherence Concurrent also provides non-blocking, asynchronous implementations of all atomic types it supports.
To obtain a non-blocking instance of any supported atomic type, simply call theasync method on the blocking instance of that type:
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicInteger', with an implicit initial value of 0.
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicLong', with an initial value of 5.
AsyncAtomicLong asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();After you create these instances, you can use them the same way you would use any of the corresponding blocking types. The only difference is that the non-blocking instances will simply return a CompletableFuture for the result, and will not block:
CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long> futureCounter5 = asyncBar.addAndGet(5L);Both the blocking and non-blocking instances of any distributed atomic type, with the same name, are backed by the same cluster-side atomic instance state, so they can be used interchangeably.
Using CDI
Atomic types from Coherence Concurrent can also be injected using CDI, which eliminates the need for explicit factory method calls on the Atomics class.
// Injects a local, in-process instance of an 'AtomicInteger' named 'foo', with an implicit initial value of '0'.
@Inject
@Name("foo")
private AtomicInteger localFoo;
// Injects a remote, distributed instance of an 'AtomicInteger' named 'foo', distinct from
// the local instance 'foo', with an implicit initial value of '0'.
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;
// Injects a remote, distributed instance of non-blocking 'AsyncAtomicLong', with an implicit name of 'asyncBar'.
@Inject
@Remote
private AsyncAtomicLong asyncBar After you obtain an instance of an atomic type through a CDI injection, you can use it the same way you would use an instance obtained directly from theAtomics factory class.
Using Locks
Coherence Concurrent provides distributed implementations ofLock and ReadWriteLock interfaces from thejava.util.concurrent.locks package, enabling you to implement lock-based concurrency control across cluster members when necessary.
Unlike local JDK implementations, the classes in this package use cluster member/process ID and thread ID to identify lock owner, and store shared lock state within a Coherence NamedMap. However, this also implies that the calls to acquire and release locks are remote, network calls, because they need to update shared state that is likely stored on a different cluster member. This update may impact the performance of lock and unlock operations.
This section includes the following topics:
Using Exclusive Locks
A RemoteLock class provides an implementation of aLock interface and enables you to ensure that only one thread on one member is running a critical section guarded by the lock at any given time.
To obtain an instance of a RemoteLock, call the Locks.remoteLock factory method:
Lock foo = Locks.remoteLock("foo");As seen with Atomics, you can obtain a local Lock instance from the Locks class, which will simply return an instance of a standard java.util.concurrent.locks.ReentrantLock, by calling thelocalLock factory method:
Lock foo = Locks.localLock("foo");After you create a Lock instance, you can use it as you normally would:
foo.lock();
try {
// critical section guarded by the exclusive lock `foo`
}
finally {
foo.unlock();
}Using Read/Write Locks
A RemoteReadWriteLock class provides an implementation of aReadWriteLock interface and enables you to ensure that only one thread on one member is running a critical section guarded by the write lock at any given time, while allowing multiple concurrent readers.
To obtain an instance of a RemoteReadWriteLock, call theLocks.remoteReadWriteLock factory method:
ReadWriteLock bar = Locks.remoteReadWriteLock("bar");As seen with Atomics, you can obtain a localReadWriteLock instance from the Locks class, which will simply return an instance of a standardjava.util.concurrent.locks.ReentrantReadWriteLock, by calling thelocalReadWriteLock factory method:
ReadWriteLock bar = Locks.localReadWriteLock("bar");After you create a ReadWriteLock instance, you can use it as you normally would:
bar.writeLock().lock()
try {
// critical section guarded by the exclusive write lock `bar`
}
finally {
bar.writeLock().unlock();
}Or:
bar.readLock().lock()
try {
// critical section guarded by the shared read lock `bar`
}
finally {
bar.readLock().unlock();
}Using CDI
You can also use CDI to inject both the exclusive and read/write lock instances into objects that need them:
// Injects distributed exclusive lock named 'foo' into the 'lock' field.
@Inject
@Remote
@Name("foo")
private Lock lock;
// Injects distributed read/write lock named 'bar' into the 'bar' field.
@Inject
@Remote
@Name("bar")
private ReadWriteLock bar; After you obtain an instance of lock through a CDI injection, you can use it the same way you would use an instance obtained directly from the Locks factory class.
Using Latches and Semaphores
Coherence Concurrent also provides distributed implementations of aCountDownLatch and Semaphore classes from thejava.util.concurrent package, enabling you to implement synchronization of execution across multiple Coherence cluster members as easily as you can implement it within a single process using the two JDK classes. It also provides interfaces for those two concurrency primitives, that both remote and local implementations conform to.
As seen with Atomics, the local implementations are nothing more than thin wrappers around the corresponding JDK classes.
This section includes the following topics:
Using the Count Down Latch
The RemoteCoundDownLatch class provides a distributed implementation of a CountDownLatch, and enables you to ensure that the execution of the code on any cluster member that is waiting for the latch proceeds only when the latch reaches zero. Any cluster member can both wait for a latch and count down.
To obtain an instance of RemoteCountDownLatch, call theLatches.remoteCountDownLatch factory method:
// Creates an instance of a 'RemoteCountDownLatch' with the initial count of '5'.
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);As seen with Atomics and Locks, you can obtain a localCountDownLatch instance from the Latches class by calling the remoteCountDownLatch factory method:
// Creates an instance of a 'LocalCountDownLatch' with the initial count of '10'.
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);After you have a RemoteCountDownLatch instance, you can use it as you normally would, by calling the countDown and await methods on it.
Using a Semaphore
The RemoteSemaphore class provides a distributed implementation of aSemaphore, and enables any cluster member to acquire and release permits from the same semaphore instance.
To obtain an instance of RemoteSemaphore, call theSemaphores.remoteSemaphore factory method:
// Creates an instance of a 'RemoteSemaphore' with '5' permits.
Semaphore foo = Semaphores.remoteSemaphore("foo", 5);As seen with Atomics and Locks, you can obtain a localSemaphore instance from the Semaphores class by calling the localSemaphore factory method:
// Creates an instance of a 'LocalSemaphore' with '0' permits.
Semaphore foo = Semaphores.localSemaphore("foo");After you create a Semaphore instance, you can use it as you normally would, by calling the release and acquire methods on it.
Using CDI
You can also use CDI to inject both the CountDownLatch andSemaphore instances into objects that need them:
// Injects an instance of 'LocalCountDownLatch' with the initial count of '5'.
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;
// Injects an instance of 'RemoteCountDownLatch' with the initial count of '10'.
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;
// Inject an instance of 'LocalSemaphore' with '0' (zero) permits available.
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;
// Inject an instance of 'RemoteSemaphore' with '1' permit available.
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;After you obtain a latch or a semaphore instance through a CDI injection, you can use the same way as you would use an instance obtained directly from the Latches or Semaphores factory classes.
The @Name annotation is optional in both cases as long as the member name (in the examples above, the field name) can be obtained from the injection point, but is required otherwise (such as when you use a constructor injection).
The @Count annotation specifies the initial latch count, and if omitted, will default to one. The @Permits annotation specifies the number of available permits for a semaphore, and if omitted, will default to zero, which means that the first acquire call will block until another thread releases one or more permits.
Using Blocking Queues
Coherence supports Queues as data structure from Coherence CE 24.03. The CoherenceNamedQueue is an implementation of java.util.Queue and NamedDeque is an implementation ofjava.util.Deque.
Coherence has two implementations of BlockingQueue: one is a simple size limited queue, the second is a distributed paged queue that has a much larger capacity. The simple queue is available as both a BlockingQueue and a double-ended BlockingDeque. The distributed paged queue is available only as a BlockingQueue implementation.
Note:
Coherence queues are mapped to caches, which take the same name as the queue. If a cache is being used for a queue, then the same cache must not be used as a normal data cache.
Blocking Queue
The Coherence Concurrent module contains an implementation ofjava.util.concurrent.BlockingQueue calledNamedBlockingQueue and an implementation ofjava.util.concurrent.BlockingDeque calledNamedBlockingDeque.
To use a Coherence blocking queue in your application, you must add a dependency on the coherence-concurrent module as follows:
<dependency>
<groupId>com.oracle.coherence</groupId>
<artifactId>coherence-concurrent</artifactId>
<version>14.1.2-0-0</version>
</dependency>To obtain an instance of a blocking queue use thecom.oracle.coherence.concurrent.Queues factory class.
To obtain a simple size limited BlockingQueue named "my-queue", see the following example:
NamedBlockingQueue<String> queue = Queues.queue("my-queue");To obtain a simple size limited BlockingDeque named "my-deque", see the following example:
NamedBlockingDeque<String> queue = Queues.deque("my-deque");To obtain a distributed paged BlockingQueue named "my-queue", see the following example:
NamedBlockingQueue<String> queue = Queues.pagedQueue("my-queue");The blocking queue implementations work by using Coherence events. When application code calls a blocking method, the calling thread is blocked but the blocking is not on the server. The application code is unblocked when it receives an event from the server.
For example, if the application code calls the NamedBlockingQueue take() method and the queue is empty, this method blocks the calling thread. When an element is put into the queue by another thread (maybe on another JVM) the calling application receives an event. This will retry thetake() and if successful it returns. If the retry of thetake() is unsuccessful the calling thread remains blocked. For example, another thread or another JVM was also blocked taking from the same queue and managed to get it retry in the first attempt.
Another example is an application calling the NamedBlockingQueue put() method, which gets blocked when the queue is full (2GB size limit). In this case, the calling thread is blocked until a delete event is received to signal that there is now space in the queue. The put() is retried and if successful is control returned to the calling thread. If the retry is unsuccessful the thread remains blocked. For example, another thread or JVM is also blocked on a put() and its retry is succeeded and refills the queue.
Sizing Queues
It is important to understand how the two Coherence queue implementations store data and how this limits the size of a queue.
- Simple Coherence Queue – The simple queue (and deque) implementation stores data in a single Coherence cache partition. This enforces a size limit of 2 GB because a Coherence cache partition should not exceed 2 GB in size, and in reality, a partition should be a lot smaller than this. Large partitions slow down recovery when a storage enabled member leaves the cluster. With a modern fast network, 300 MB – 500 MB should be a suitable maximum partition size; on a 10 GB network, this could even go as high as 1 GB.
- Distributed Paged Queue - The distributed paged queue stores data in pages that are distributed around the Coherence cluster over multiple partitions, the same as normal cache data. This means that the paged queue can store far more than 2 GB. It is still important to be aware of how partition sizes limit the total queue size.
The absolute hard limit of 2 GB per partition gives the following size:
2 GB x 257 = 514 GBBut this is far too big to be reliable in production use. If you use a size limit of 500 MB and the default partition count of 257, then you can see how this affects queue size.
500 MB x 257 = 128 GBSo, by default a realistic limit for a paged queue is around 128 GB. If the partition count is increased to 1087, then the queue size becomes:
500 MB x 1087 = 543 GBOf course, all these examples assume that there are enough JVMs with big enough heap sizes in the cluster to store the queue data in memory.
Limitations
The current queue implementation in Coherence has the following limitations:
- As previously mentioned, the simple queue has a hard size limit of 2 GB. When using a simple queue or deque, the Coherence server refuses to accept offers to the queue if its size exceeds 2 GB. The
java.util.Queuecontact allows for queues to reject offers, so this size limitation conforms to the queue contract. Application developers should check the response from offering data to the queue to determine whether the offer has succeeded or not. We use the term "offer" here to cover all queue and deque methods that add data to the queue. An alternative to checking the return Boolean from anoffer()call would be to use aNamedBlockingQueuewhere theput()method gets blocked if the queue is full. - In a normal operation, queues should not get huge as this would usually mean that the processes reading from the queue are not keeping up with the processes writing to the queue. Application developers should obviously load test their applications using queues to ensure that they are not going to have issues with capacity.
- Queue operations such as offering and polling will contend on certain data structures, and this limits the number of parallel requests and the speed at which requests get processed. To maintain ordering, polling contends on either the head or tail entry, depending on which end of the queue is being polled. This means that poll methods can only be processed sequentially, so even though a poll is efficient and fast, many concurrent poll requests will queue and be processed one at a time. Offer methods do not contend on the head or tail but will contend on the atomic counters used to maintain the head and tail identifiers. Coherence can process multiple offer requests on different worker threads but there are minor contentions on the
AtomicLongupdates. - Queue operations that work on the head and tail, such as offering and polling, are efficient. Some of the other methods in
java.util.Queueandjava.util.Dequeare less efficient. For example, iterator methods,contains(), and so on. These are not frequently used by applications that require basic queue functionality. Some optional methods on thejava.util.QueueAPI that mutate the queue will throwUnsupportedOperationException(this is allowed by the Java Queue contract), for example,retainAll(),removeAll(), and removal using an iterator.