Every Task in a BPMN Process is of a particular type: The type determines the Task's logic. Therefore, if you need a task that will perform actions specific to your business or interact with another system, consider implementing a custom task type.
When a token enters a task, a task instance is created and becomes Alive: At this point, the start() method of the task is executed. The method can return either with Result.FINISHED
or Result.WAITING_FOR_INPUT
:
processInput()
method is executed. If it returns Result.FINISHED
, the task becomes Accomplished; if it returns Result.WAITING_FOR_INPUT
, it remains Alive and waits for the next message.When a task instance is terminated abnormally, for example, by a timeout intermediate event attached to the task, the terminate() method is called.
To create a custom task type, you will need to do the following:
To declare the task type and make it accessible in PDS, do the following:
Under Task Type Details, enter the task-type name and in the Classname field enter the fully qualified name of the class. Mind that the task class must be in the ejb
package of your application.
If you have already implemented the task type, you can copy the qualified name of the task class in its Outline view: right-click the class name and select Copy Qualified Name.
In the Parameters list box, define the task parameters if applicable.
Check Dynamic to wrap the parameter value in a non-parametric closure: Such a parameter is processed as { -> <parameter_value> }
. The task-type implementation has to process the parameter as a closure.
Consider distributing the module as a library.
You can implement a task type that:
To implement a custom task type that starts and finishes in the same transaction and does not wait for any event, do the following:
start()
method and make it return Result.FINISHED
Important: The methods start() must be implemented in such a way that the thread is not blocked.
You can now use the task in processes: make sure the task declaration is available to the process, for example, by the means of module import.
Simple task implementation
public class RecordReturn implements ExecutableTask { @Override public Result processInput(TaskContext arg0, Object arg1) throws ErrorException { return null; } @Override public Result start(TaskContext context) throws ErrorException { //get the value of the param parameter of type String: String message = (String) context.getParameter("param"); //to get the message parameter which is a Record: //RecordHolder message = (RecordHolder) context.getParameter("message"); //save the value of the "text" property of the message Record instance: //String messageText = (String) message.getProperty("text"); //set the "myProcessVariable" of the process to the messageText: context.setVariableValue(QID.create("procVar"), message); //finish and release the token: return Result.FINISHED; } @Override public void terminate(TaskContext context, TerminationReason reason) throws ErrorException { } }
To implement a custom task type that waits for an event and potentially continues in a new transaction, do the following:
@PermitAll
annotation.Result.FINISHED
and Result.WAITING_FOR_INPUT
as appropriate.processInput()
method, which is executed when the task receives a message. The message is sent via the CommunicationService, which requires the task to be implemented as an EJB.Important: The methods start() and processInput() must be implemented in such a way that the thread is not blocked.
@Stateless @PermitAll public class WaitForInput implements ExecutableTask { @Override public Result start(TaskContext context) throws ErrorException { //implement the logic of the start method. return Result.WAITING_FOR_INPUT; } @Override public Result processInput(TaskContext context, Object input) throws ErrorException { return Result.FINISHED; } @Override public void terminate(TaskContext context, TerminationReason reason) throws ErrorException { } }
@EJB(beanName = "WaitForInput") private ExecutableTask waitForInput; @Override protected void registerCustomComponents() { register(waitForInput, WaitForInput.class); }
processInput()
method):start()
method, or as a shared record from the module. You will need the id to address the message. @Override
public Result start(TaskContext context) throws ErrorException {
//saves the task id in process variable taskId
//(taskId is modeled in the process):
context.setVariableValue(QID.create("taskId"), new Decimal(context.getTaskId()));
return Result.WAITING_FOR_INPUT;
}
processInput()
method. //example function declaration
//passing id of the target task as parameter:
public void wakeUpTask(Integer taskId)
native org.eko.taskapp.ejb.WakeUpFunctions.wakeUpWaitingTask;
processInput()
method of the task.Note: In the example, the
wakeUpTask()
call used within the same model instance (as the task context parameter, you get the context tree of the model instance, through the current process instance, to the task. If you want to call the task from another model instance, make the id of the model instance with the task accessible, for example, via database.
Example interface of the function
@Local
public interface WakeUpFunctions {
void wakeUpWaitTask(ExecutionContext context, Decimal taskId)
throws ModelInstanceNotFoundException, InvalidModelInstanceStateException, ErrorException;
}
Example bean that implements the interface of the function
@Stateless
@PermitAll
public class WakeUpFunctionsBean implements WakeUpFunctions {
@EJB
private CommunicationService communicationService;
@Override
public void wakeUpWaitTask(ExecutionContext context, Decimal taskId) throws ModelInstanceNotFoundException, InvalidModelInstanceStateException, ErrorException {
//message from the current model instance to the task passed in the param:
Message myMessage = new Message(Identifier.ofModelInstance(context.getModelInstance().getId()),
Identifier.ofTask(context.getModelInstance().getId(), taskId.longValue()));
communicationService.sendAsync(myMessage);
}
}
Example registration of the EJB
@EJB(beanName = "WaitForInputTask")
private ExecutableTask waitForInput;
@EJB
private WakeUpFunctions wakeUpFunctions;
@Override
protected void registerCustomComponents() {
register(waitForInput, WaitForInputTask.class);
register(wakeUpFunctions, WakeUpFunctions.class);
}
Example task implementation that sends a message to the waiting task
@Stateless
@PermitAll
public class WakeTask implements ExecutableTask {
@EJB
private CommunicationService communicationService;
@Override
public Result start(TaskContext context) throws ErrorException {
context.getVariableValue(QID.create("taskId"));
Message myMessage = new Message(
Identifier.ofModelInstance(
context.getModelInstance().getId()),
Identifier.ofTask(
context.getModelInstance().getId(), ((Decimal) context.getVariableValue(QID.create("taskId"))).longValue()));
communicationService.sendAsync(myMessage);
return Result.FINISHED;
}
...
Asynchronous tasks represent the border of a model transaction: they hold the token but do not block further process execution (other tokens can continue). The task still waits for the result of its implementation and only then finishes.
An asynchronous task type must implement ExecutableTask
and extend the AbstractAsynchronousExecutionTask
which is an EJB.
Note that implementations of asynchronous task types must run in a single transaction: if your task-type implementation requires multiple transactions, split it into multiple task types. For such tasks, if the server is restarted during the task execution, the execution is repeated.
Important: If you still decide to use multiple transactions in the implementation of your asynchronous tasks, make sure that the execution is always consistent: consider handling the scenario when the server is restarted while the task is running: since the asynchronous execution of the task is not governed by the task, on server restart, the execution seizes to exist while the task itself becomes running again and waits for the result from the execution. To solve this problem, you can
- define the timeout on the task in your models to handle the case when the task receives no results after server restart: add an interrupting timer event on the border of your task with a timeout duration. Mind that such handling can cause premature interruption of the task as a side effect.
- persist data about the execution phase in the database and check its status regularly from the running task (heartbeat check).
To implement an asynchronous task type, do the following:
AbstractAsynchronousExecutionTask
and implements ExecutableTask
.AbstractAsynchronousExecutionTask
is implemented as an EJB).executeAsynchronously()
: checks prior to task execution whether the task should be executed asynchronously; this is useful for tasks that can run both as asynchronous or synchronous;collectDataForExecution()
: provides the data from the task context required for the Java implementation, typically parameters of the task typegetImplementationClass()
: returns the class that implements the task type (typically this class)processDataAsynchronously()
: processing logic that returns the result of the actionsprocessExecutionResult()
: processes the result after the asynchronous action
@Stateless
public class GoogleSearchResultStats extends AbstractAsynchronousExecutionTask implements ExecutableTask {
@Override
public Serializable collectDataForExecution(TaskContext context) throws ErrorException {
String collectedData = (String) context.getParameter("queryString");
return collectedData;
}
@Override
public boolean executeAsynchronously(TaskContext context) throws ErrorException {
//the task is always asynchronous:
return true;
}
@Override
public Class<? extends AbstractAsynchronousExecutionTask> getImplementationClass() {
return GoogleSearchResultStats.class;
}
@Override
public Serializable processDataAsynchronously(Serializable data) {
String result;
try {
String keyword = (String) data;
result = readFromUrl(keyword);
} catch (IOException e) {
result = "not found";
}
return result;
}
@Override
public void processExecutionResult(TaskContext context, Serializable result) throws ErrorException {
System.out.println("Number of results: " + result);
}
/**
*
* @param keyword
* @return number of results
* @throws IOException
*/
public String readFromUrl(String keyword) throws IOException {
String url = "https://www.google.sk/search?q=" + keyword;
Document document = Jsoup.connect(url).userAgent("Mozilla").timeout(10000).get();
String question = document.select("#resultStats").text();
String resultCount = question.split(": ")[1];
return resultCount;
}
}
LSPSRuntimeException
.@EJB(beanName = "GoogleSearchResultStats") private ExecutableTask searchResultTask; @Override protected void registerCustomComponents() { // parameter 1 is the ejb, parameter 2 is the implementation class as referenced in the task type definition: register(searchResultTask, GoogleSearchResultStats.class);
To have a task type implementation that is an EJB, you need to register it with the server via the register()
method of the ComponentServiceBean
class.
ComponentServiceBean
class as the ExecutableTask: ComponentServiceBean
class in the registerCustomComponents()
method. Implementation:
@Stateless public class DecimalAddition implements ExecutableTask { @Override public Result processInput(TaskContext context, Object input) throws ErrorException { return null; } @Override public Result start(TaskContext context) throws ErrorException { Decimal a = (Decimal) context.getParameter("a"); Decimal b = (Decimal) context.getParameter("b"); System.out.println("################## result" + a.add(b)); return Result.FINISHED; } @Override public void terminate(TaskContext context, TerminationReason reason) throws ErrorException { } }
Registration:
//Modifications in the ComponentServiceBean class: (beanName = "DecimalAddition") private ExecutableTask decimalAddition;
protected void registerCustomComponents() { register(decimalAddition, DecimalAddition.class); }