The Problem
Recently I was tasked with building a workflow execution engine. After reviewing the product requirements, I formulated a short list of design requirements I wished to achieve:
- Each "step" in the workflow could communicate with an external system and could take an indeterminate amount of time.
- Each "step" could interact with persistent objects. This customer was already using Hibernate and Spring for this purpose.
- A failed "step" should be able to be replayed. For example, an update conflict in the database should result in the step being retried after fetching a current copy of the persistent objects.
- The API for writing steps should be as free of any framework or API requirements as possible. The workflow mechanism should be transparently managed.
- It should be possible to plug in different execution strategies for workflow execution.
- Workflow state transitions should look like normal Java method calls. Specifically, an interface should not reveal an implementation as state machine.
The Solution
Java lacks built-in features for asynchronous method invocation. With this in mind, I set out to find a way to modify Java's normal calling convention to introduce asynchronous behavior.
Here is an example interface of a state machine. By design, this is just a plain Java interface. Nothing is revealed about the implementation, whether it is an asynchronous state machine or normal synchronous method:
public interface ExampleStateMachine {
public void execute();
}
However, the implementation has new annotations to define the state machine behavior:
@StateMachine
@Configurable("exampleStateMachine")
public class ExampleStateMachineImpl implements ExampleStateMachine {
static private Log log = LogFactory.getLog(StateMachine.class.getName());
private int numExecutions = 0;
static int NumberExecutions = 3;
@StateTransition(from = { "" }, to = "started")
public void execute() {
log.info("state machine started");
// Invoke a state transition on self. AspectJ allows such
// methods invocations to be intercepted; Spring AOP does not
running();
}
@StateTransition(from = { "started", "running" }, to = "running")
public void running() {
log.info("state machine running");
numExecutions++;
if (numExecutions >= NumberExecutions) {
finished();
} else {
running();
}
}
@StateTransition(from = { "running" }, to = "finished")
public void finished() {
log.info("state machine finished");
}
}
I've introduced two new annotations.- @StateMachine - this is used to indicate a class should be implemented as a state machine.
- @StateTransition - this is used to indicate a method should be used as a state transition. The "to" argument is the name of this state. The "from" argument list is a list of valid state transitions to this new state.
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface StateMachine {
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface StateTransition {
String to();
String[] from();
}
The @StateMachine is valid only on the class definition, and the @StateTransition is valid only on methods. Note that both types have runtime retention that will allow me to access these annotations at runtime.
The next challenge is implementing something that will process these new annotations. For this, I use AspectJ. AspectJ allows me to use annotations in pointcut expressions. We will also see that AspectJ allows me to give the state machine a more intuitive behavior compared to the proxy based AOP provided by Spring.
Here is the implementation for the State Machine aspects:
@Aspect(value="pertarget(stateMachine())")
@Configurable("stateMachineAdvice")
public class StateMachineAspect {
static private Log log = LogFactory.getLog(StateMachine.class.getName());
private StateTransitionStrategy stateTransitionStrategy;
private String currentState = "";
public void setStateTransitionStrategy(
StateTransitionStrategy stateTransitionStrategy) {
this.stateTransitionStrategy = stateTransitionStrategy;
}
/**
* Pointcut for state machines. This reads "where any class annotated
* with @StateMachine is the target of the invocation". This is used
* to tie the lifecycle of this advice to individual state machines.
* See the @Aspect annotation on this class.
*/
@Pointcut(value="@target(com.example.statemachine.StateMachine)")
public void stateMachine() {}
/**
* Pointcut for state transition. This reads: "for any class annotated
* with @StateMachine, for any method annotated with @StateTransition, for
* execution of that method".
*
* @param stateTransition @StateTransition annotation on method, used to
* provide values at runtime to validate state transtion.
*
* @see StateMachineAspect.queueStateTransition
*/
@Pointcut(value="@target(com.example.statemachine.StateMachine) && @annotation(stateTransition) && execution(* *(..))", argNames="stateTransition")
public void stateTransition(StateTransition stateTransition) {}
/**
* Around advice for state transitions. First the state transition
* is validated based upon the valid transitions indicated in the
* @StateTransition annotation arguments. Then, the state transition
* strategy is invoked to run the transition. The strategy is pluggable
* and will manage queuing and transactions.
*
* @param joinPoint The method being advised
* @param stateTransition The @StateTransition annotation
* @return Not used, all state transitions should return void
* @throws Throwable
*/
@Around(value="stateTransition(stateTransition)")
public Object queueStateTransition(ProceedingJoinPoint joinPoint,
StateTransition stateTransition) throws Throwable {
for (String s : stateTransition.from()) {
if (s.equals(currentState)) {
currentState=stateTransition.to();
stateTransitionStrategy.transition(joinPoint, stateTransition);
return null;
}
}
IllegalStateException e =
new IllegalStateException(joinPoint.getClass().getName() + ": illegal transition from state \"" +
currentState + "\" to state \"" + stateTransition.to() +"\"");
log.fatal("",e);
throw e;
}
}
There is a lot going on here; let's walk through the code:
- First, I am using the annotation form of input for AspectJ. There are also ways to use XML or AspectJ's own domain specific language. My customer was already making heavy use of annotations for Hibernate and Spring, so I chose the annotation approach.
- @Aspect declares this as an aspect to AspectJ
- @Configurable is a Spring annotation that will cause instances of this class ("bean") to be configured from XML. We'll look at that more in a bit.
- Next, we define the pointcuts. The empty methods stateMachine() and stateTransition() are simply marker methods that serve to give the pointcuts a name. I think AspectJ is pushing the limit of annotations here, but there you go. The stateMachine pointcut will advise any class with a @StateMachine annotation. The stateTransition pointcut will advise any method in a class with a @StateMachine annotated class that has a @StateTransition annotation, and only during the execution of that method.
- The @Around annotation declares the advice, or code that will be run for the advised @StateTransition annotated methods.
The advice in #5 above does two things. First it validates the state transition by looking at the allowed state transitions ("from", on the @StateTransition annotation) and comparing those to the current state, which this class tracks. If the transition is valid, this advice delegates execution to a pluggable strategy class. If the transition is not valid, an IllegalStateException is thrown.
Note how the actual state machine implementation class ExampleStateMachineImpl does not know its current state. The aspect transparently tracks and manages the state. To do this, we need to ensure exactly one instance of this class, StateMachineAspect, is created and associated with each instance of a class annotated with @StateMachine. This is the reason for the arguments on @Aspect:
@Aspect(value="pertarget(stateMachine())")
The pertarget(stateMachine()) says to create a new instance on each unique occurrence of the the stateMachine pointcut. This pointcut was written to evaluate to each class instance, so we end up with exactly one aspect instance per state machine instance.
The @Configurable annotation allows us to use Spring to inject the transition strategy:
One last thing before discussing the transition strategy. Remember from our example that state transitions are normal java method invocations. It is even possible to invoke a state transition on yourself:
One of the big reasons to use Aspectj is that it will intercept these calls. Spring uses a proxy mechanism for its AOP, meaning invocations on self are never advised. Using Spring AOP would require implementations always call through a proxy, something that isn't convenient or intuitive. Note that when using AspectJ you must not mix in Spring AOP. Fortunately Spring has excellent AspectJ support and provides AspectJ equivalents for all its AOP features. Just add "spring-aspects.jar" to your AspectJ Build path.
@StateTransition(from = { "" }, to = "started")
public void execute() {
log.info("state machine started");
running(); // Call state transition on self
}
@StateTransition(from = { "started", "running" }, to = "running")
public void running() {...
One of the big reasons to use Aspectj is that it will intercept these calls. Spring uses a proxy mechanism for its AOP, meaning invocations on self are never advised. Using Spring AOP would require implementations always call through a proxy, something that isn't convenient or intuitive. Note that when using AspectJ you must not mix in Spring AOP. Fortunately Spring has excellent AspectJ support and provides AspectJ equivalents for all its AOP features. Just add "spring-aspects.jar" to your AspectJ Build path.
Finally we arrive at the core of the State Machine infrastructure. The StateMachineStrategy is a pluggable implementation that implements this interface:
public class StateTransitionStrategyImpl extends Thread implements
StateTransitionStrategy, TransactionSynchronization {
static private Log log = LogFactory.getLog(StateMachine.class.getName());
private SessionFactory sessionFactory;
private long threadId;
private class ExecutionContext {
boolean replay = false;
ProceedingJoinPoint joinPoint;
StateTransition stateTransition;
}
// Queue of pending committed state transitions - ready to execute
private BlockingQueue committedQueue = new LinkedBlockingQueue();
// Queue of uncommitted state transitions - awaiting transaction commit.
private Queue openTransactionQueue = new ConcurrentLinkedQueue();
// Test synchronization for whitebox test
private SynchronousQueue testQueue;
/**
* Start a new service thread as a daemon thread. The JVM will not
* wait for daemon threads upon exit.
*/
public StateTransitionStrategyImpl(SessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
setDaemon(true);
start();
}
/**
* This is a test method for unit test
*
* @param monitor
* Object to synchronize upon
*/
public void setTestMode() {
this.testQueue = new SynchronousQueue();
}
/**
* This is a test method for unit tests
*/
public void waitForState(Class testExpectedClass, String testExpectedState) {
if (!this.isAlive()) {
throw new IllegalStateException("Service thread is not running");
}
log.debug("Unittest: waiting for state " +
testExpectedClass.getSimpleName() +
":" + testExpectedState);
while (true) {
ExecutionContext ctx = null;
try {
ctx = testQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("Unittest: observed state \"" + getStateName(ctx));
if (testExpectedClass.isInstance(ctx.joinPoint.getTarget())
&& testExpectedState.equals(ctx.stateTransition.to())) {
log
.debug("Unittest: found expected state \"" + testExpectedState
+ "\", unit test proceeds from here");
break;
}
}
}
/**
* Queue this state transition. If there is an open transaction,
* the transition is place on a pending queue and copied to the committed
* queue only when the transaction completes. If there is no transaction,
* the transition is copied directly to the committed queue. This
* behavior is necessary because state machines may receive external
* transition events from non-transactional sources.
*/
@Override
public void transition(ProceedingJoinPoint joinPoint,
StateTransition stateTransition) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.joinPoint = joinPoint;
executionContext.stateTransition = stateTransition;
// We know that any call from our daemon thread has a transaction
// so we can short-circuit this test
if (Thread.currentThread().getId() == this.threadId) {
openTransactionQueue.add(executionContext);
if (log.isDebugEnabled()) {
log.debug("Queued local transactional state transition: " +
getStateName(executionContext));
}
// If the caller has a transaction we want to enlist in it.
} else {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
if (!TransactionSynchronizationManager.getSynchronizations().contains(this)) {
TransactionSynchronizationManager.registerSynchronization(this);
}
openTransactionQueue.add(executionContext);
if (log.isDebugEnabled()) {
log.debug("Queued external transactional state transition: " +
getStateName(executionContext));
}
} else {
committedQueue.add(executionContext);
if (log.isDebugEnabled()) {
log.debug("Queued external non-transactional state transition: " +
getStateName(executionContext));
}
}
}
}
/**
* Run a state.
*
* Normally Spring would require us to call through a proxy to get
* a transactional advice. However AspectJ correctly advises private
* methods. But the method must be public!
*
* @param executionContext
* @throws Throwable
*/
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Throwable.class)
public void runState(ExecutionContext executionContext) throws Throwable {
// Install a transaction listener on this thread.
// Do this here, after spring has bound the session to this thread.
TransactionSynchronizationManager.registerSynchronization(this);
try {
if (log.isDebugEnabled()) {
log.debug("Start execution of state: " +
getStateName(executionContext));
}
executionContext.joinPoint.proceed();
} finally {
if (log.isDebugEnabled()) {
log.debug("Finshed execution of state: " +
getStateName(executionContext));
}
}
}
/**
* State transition methods are transactional. Thus, if there is a
* recoverable DB problem, we can simply replay the transition and try
* again. Currently, we try to replay only once. Other strategies are
* possible.
*
* @param executionContext
* Transition
* @param e
* Exception that was caught
*/
private void replay(ExecutionContext executionContext, Exception e) {
if (!executionContext.replay) {
log.warn("State " + getStateName(executionContext) +
" threw an exception. This was the first attempt; the state will be replayed",e);
executionContext.replay = true;
committedQueue.add(executionContext);
} else {
log.error("State " + getStateName(executionContext) +
" threw an exception. This was the second attempt; the state will not be replayed",e);
}
}
/**
* Start and run service queue.
*/
@Override
public void run() {
try {
log.debug("Service thread started");
this.threadId = Thread.currentThread().getId();
while (!isInterrupted()) {
ExecutionContext executionContext = null;
try {
log.debug("Service thread is waiting for a state transition");
executionContext = committedQueue.take();
} catch (InterruptedException e) {
log.warn("State transition queue thread interrupted, service thread terminated");
return; // exit thread
}
try {
runState(executionContext);
} catch (java.lang.AssertionError e) {
log.error("State transition queue thread assertion, service thread terminated", e);
return;
} catch (HibernateException e) {
replay(executionContext, e);
} catch (Throwable e) {
log.error("State " + getStateName(executionContext) +
" threw an exception. This exception is not recoverable; the state will not be replayed");
} finally {
// Whitebox test method. This puts state machine in
// lockstep with unit tests.
if (testQueue != null) {
log.debug("Unittest: state machine is synchronizing with unit test" +
getStateName(executionContext));
testQueue.put(executionContext);
}
}
} // while thread not interrupted
} catch (Throwable e) {
log.error("Service thread terminated", e);
return;
} finally {
}
}
/**
* Implementation of TransactionSynchronization.
* This allows us to intercept the commit and move the pending transitions
* to the committed queue.
*/
@Override
public void afterCommit() {
for (ExecutionContext ctx : openTransactionQueue) {
committedQueue.add(ctx);
}
openTransactionQueue.clear();
}
@Override
public void afterCompletion(int arg0) {
}
@Override
public void beforeCommit(boolean arg0) {
}
@Override
public void beforeCompletion() {
}
private String getStateName(ExecutionContext executionContext) {
return executionContext.joinPoint.getTarget().getClass().getSimpleName() +
":" + executionContext.stateTransition.to();
}
}
Let's walk through the salient points of this class:
- This class implements Thread. This strategy uses a daemon thread to process and run state transitions.
- A BlockingQueue is used to hold pending state transitions.
- Another Queue is used to hold state transitions as they are executed in a transaction. If the transaction commits, these are copied to the main "committed" queue.
- There is a bit of whitebox test instrumentation here to facilitate unit tests that would otherwise be difficult to write. setTestMode() and waitForState() are test only methods.
- transition(), runState(), replay() and run() are the meat of this class, which I will discuss in detail below.
- afterCommit() and friends are implementations of the Spring transaction callbacks. I use this to copy pending state transactions to the committed queue upon transaction commit.
The run() method is the daemon thread that simply calls take() on the committed queue over and over. The rest of this method is primarily exception handling. Note that for exceptions we deemed "retry-able" we call replay(). More on that in a bit.
The runState() method actually calls the state implementation, using AspectJ's ProceedingJoinPoint API. Note, however, the @Transactional annotation on this method. There is a lot of behavior packed in this simple Spring annotation, and it is the key to having a transactional state machine:
- Somewhat obviously, this annotation indicates the method will run in a new transaction. The Propagation.REQUIRES_NEW ensures a new transaction and the rollbackFor = Throwable.class ensures we will rollback on any thrown exception.
- Less obvious is the fact that this annotation provides us with a new Hibernate session. It is important to have a new session per state versus one session for the entire state machine. Remember that states take a long time to execute. Hibernate caches a lot of things in the session, and sessions consume database resources, so holding a session open a long time isn't a good idea. This annotation frees us from having to manage the session ourselves.
Testing state machines is possible using JUnit. The whitebox test methods help test state transitions:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext-stateMachine.xml" })
public class ExampleStateMachineTest {
static protected Log log = LogFactory.getLog(ExampleStateMachineTest.class.getName());
@Autowired
protected StateTransitionStrategy stateTransitionStrategy;
@BeforeClass
public static void initLogging() {
BasicConfigurator.configure();
String curDir = System.getProperty("user.dir");
System.out.println("Current working directory: " + curDir);
String classPath = System.getProperty("java.class.path");
System.out.println("Class path: " + classPath);
RootLogger.getRootLogger().getLogger("com.example").setLevel(Level.DEBUG);
}
@Before
public void setUp() throws Exception {
log.debug("tid: " + Thread.currentThread().getId() + " setUp");
stateTransitionStrategy.setTestMode();
}
@Test
public void testExampleStateMachine() {
ExampleStateMachineImpl sm = new ExampleStateMachineImpl();
sm.execute();
stateTransitionStrategy.waitForState(sm.getClass(), "started");
stateTransitionStrategy.waitForState(sm.getClass(), "running");
stateTransitionStrategy.waitForState(sm.getClass(), "running");
stateTransitionStrategy.waitForState(sm.getClass(), "running");
stateTransitionStrategy.waitForState(sm.getClass(), "finished");
}
}
Here I am using Spring's JUnit support, something I recommend. The key to testing the actual state machine is the method stateTransitionStrategy.setTestMode(), which places the transition strategy in a special test mode that keeps state execution in lockstep with the waitForState() method.
Conclusions
Using AspectJ and Spring, it is possible to create natural feeling and intuitive support for asynchronous state machines in Java. The state transition strategy could be extended for parallel execution or to support priority transitions. Shlaer-Mellor "expedited events to self" could also be implemented where it would help make state machines more deterministic.
Using ApectJ does require you use the AspectJ compiler in place of javac, or you use special loader support at runtime. I prefer using the compiler as it is less prone to creating deployment issues.
I strongly recommend the Eclipse AspectJ developer tools, as writing pointcut expressions is very difficult without them. With this plugin, Eclipse will show you exactly what will be intercepted. The plugin also adds compile support for AspectJ to eclipse.
In my customer's application, state machines manipulated Hibernate persistent objects. You do need to take care that if you store a persistent object in your state machine, that you reattach it to the database at the start of each state. Remember each state runs in a separate transaction, with a separate Hibernate session. Hibernate will not reattach or refresh your persistent objects automatically. However, you can use AspectJ to do this for you as well. I will discuss this in a future blog.