OpenSpaces comes with support for executing tasks in a collocated asynchronous manner with the Space (processing unit that started an embedded Space). Tasks can be executed either directly on a specific cluster member using typical routing declarations. Tasks can also be executed in a "broadcast" mode on all the primary cluster members concurrently and reduced to a single result on the client side. Tasks are completely dynamic both in terms of content and class definitions (the task class definition does not have to be defined within the space classpath).
The Task interface is defined as follows:
Here is a simple implementation of a task that accepts a value that will be returned in the execute phase.
Executing the task itself uses the GigaSpace API with a routing value of 2 (the second parameter):
Task execution is asynchronous in nature, returning an AyncFuture as the result of the execution allowing to get the result at a later stage in the code. AsyncFuture itself extends java.util.concurrent.Future.
Passing the listener can be done by setting it on the AsyncFuture or when executing a task using the GigaSpace API as an additional parameter.
AsyncResult can be used to extract the result or the exception of the execution. Here it is:
When executing single Task there are several ways its routing can be controlled. The simplest form is by passing the routing information as a parameter to the execute command, for example:
The routing parameter value will be used as the routing value. In case it is a POJO defined with a @SpaceRouting on one of its properties, the value of that property will be used as the routing information when passed as a parameter. For example:
Routing information can also be defined on the Task itself either using @SpaceRouting annotation or using the optional TaskRoutingProvider interface (for non annotations based configuration). Here are how they can be used:
Once the routing information is defined on the Task, it can be executed (without the need for additional parameters):
A DistributedTask is a task that ends up executing more than once (concurrently) and returns a result that is a reduced operation of all the different execution.
Phase 1 - Sending the Tasks to be executed:
Phase 2 - Getting the results back to be reduced:
Here is the DistributedTask API:
The distributed task interface extends both Task and AsyncResultsReducer. The Task interface is used to execute a specific execution of the distributed task (there will be several executions of it), and the AsyncResultsReducer is used to reduce the results of all the executions.
Lets write a (very) simple example of a DistributedTask:
The above task simply returns 1 for its execute operation, and the reducer simply sums all the executions. If there was an exception thrown during the execute operation (in our case, it will never happen), the exception will be throws back to the user during the reduce operation.
A distributed task is used when either executing a task the is directed to several nodes based on different routing values, or one that is broadcast to all the primary nodes of the cluster. Executing a distributed task on several nodes based on different routing values can be done as follows:
In the above case, the distributed task is executed (concurrently and asynchronously) on 4 nodes that correspond to routing values of 1, 4, 6, and 7.
Broadcasting the execution to all current primary nodes can be done by simply executing just the distributed task. Here is an example:
In this case, the distributed task will be executed on all the primary spaces of the cluster.
When executing a distributed task, results arrive in an asynchronous manner and once all the results have arrived, the AsyncResultsReducer is used to reduce them. The AsyncResultFitler can be used to as a callback and filter mechanism to be invoked for each result that arrives.
The filter can be used to control if a result should be used or not (the SKIP decision). If a we have enough results and we can move to the reduce phase (the BREAK decision). Or, if we should continue accumulating results (the CONTINUE decision).
The filter can also be used as a way to be identify that results have arrived and we can do something within our application as a result of that. Note, in this case, make sure that heavy processing should be performed on a separate (probably pooled) thread.
The executor builder API allows to combine several task executions (both distributed ones and non distributed ones) into a seemingly single execution (with a reduce phase). Think of the ExecutorBuilder as a cartridge that accumulates all the tasks to be executed, and then executes all of them at ones giving back a reduced result (in a concurrent and asynchronous manner). Here is an example of the API:
In the above case, there are several tasks that are "added" to the ExecutorBuilder, executed (in a similar manner to a single distributed task) and then reduced using a sum reducer that is provided when building the ExecutorBuilder.
The ExecutorBuilder can also be passed an optional AsyncResultFilter if the reducer also implements it.
The most common scenario for using executors is by interacting with the collocated Space the task in executed on. A GigaSpace instance, which works against a collocated Space can be easily injected either using annotations or using an interface. Here is an example:
A task might need to make use of resources defined within the processing unit is is executed at (which are not the collocated Space). For example, have access to a bean defined within the collocated processing unit. A Task executed goes through the same lifecycle of a bean defined within a processing unit (except for the fact that it is not registered with a processing unit). Thanks to this fact, injecting resources can be done using annotations (@Autowired and @Resource) or lifecycle interfaces (such as ApplicationContextAware).
In order to enable resource injection, the Task must either be annotated with AutowireTask or implement the marker interface AutowireTaskMarker. Here is an example of injecting a resource of type OrderDao registered under the bean name orderDao. The OrderDao is then used to count the number of orders for each node.
When enabling autowiring of tasks, OpenSpaces annotations/interface injection can also be used such as ClusterInfo injection.
OpenSpaces comes with several built in reducers and distributed tasks that can be used to perform common reduce operations (such as Min, Max, Avg and Sum). For example, if you use a simple Task:
We can easily make a distributed task out of it that sums all the results using the SumTask:
In the above case, SumTask is a distributed task that wraps a simple Task. It automatically implements the reduce operation by summing all the results. This execution will result in executing a distributed task against all the primaries.
SumTask uses internally the SumReducer which is just implements AsyncResultsReducer. The reducer, by itself, can be used with APIs that just use a reducer, for example, the ExecutorBuilder construction.
Executors fully support transactions similar to other GigaSpace API. Once an execute operation is executed within a declarative transaction, it will automatically join it. The transaction itself is then passed to the node the task executed on and added declaratively to it. This means that any GigaSpace operation performed within the task execute operation will automatically join the transaction started on the client side.
An exception thrown within the execute operation will not cause the transaction to rollback (since it might be a valid exception). Transaction commit/rollback is controlled just by the client the executed the task.
When executing distributed tasks or tasks that executed on more than one node within the same execution should use the distributed transaction manager. Tasks that execute just on a single node can use the distributed transaction manager, but should use the local transaction manager.
OpenSpaces executors support allows to easily implement java.util.concurrent.ExecutorService which allows to support the ExecutorService API and executed Callable and Runnable as tasks within the Space. Here is an example of how to get an ExecutorService implementation based on OpenSpaces executors and use it:
The java.util.concurrent support also comes with built in adapters from Callable/Runnable to Task/DistributedTask. The adapters are used internally to implement the ExecutorService, but can be used on their own. The adapters can be constructed easily using utility methods found within the TaskExecutors factory. Here is an example: