Saturday, March 22, 2008

More Workflow - Events and Asynchronous WorkItems

Last week, I expanded upon my original post where I modeled a workflow as a Petri Net, adding in usability features such as an XML configuration and integration with WorkItems defined in Spring configuration. My goal, however, is to have a network of batch jobs be triggered off in response to a state change in my primary datasource. The only dependencies between the jobs is the order in which they must execute, and the order is specified in the workflow.

Previously, I had methods to traverse the network in a depth-first or breadth-first manner. However, the policy can change depending on the mix of jobs, so an event-driven approach is preferable. Specifically, the caller starts the workflow traversal by calling traverse() on the Workflow object. The Workflow then calls the nearest fireable Transitions (as defined by the Edge weights), which spawns batch jobs in background threads through the mapped AsynchronousWorkItem. Once the batch job is complete, it sends back an event to the WorkflowFactory, which will cause the WorkflowFactory to call workflow.traverse() on the nearest neighbors (reachable Places) of the Transition, and so on, until a stop Place is reached. This is illustrated in the diagram below:

I decided to use the event handling built into Spring. For the event publishers, all I needed to do is have the WorkflowItem be ApplicationContextAware (which exposes the getApplicationContext().publishEvent(Object) method to all its subclasses. The WorkflowFactory is the single event subscriber, and it does this by implementing ApplicationListener which mandates an onApplicationEvent(Object) method. This is off whenever the ApplicationContext has an event published into it.

In the interests of not boring you to death, I reproduce only the code that is relevant to this post. If you have been reading my previous posts on this subject, I had to do some minor refactoring, such as the addition of a stop attribute in the XML to specify the end Place(s) in a network, and the removal of the old Workflow.traverse(boolean) method in favor of the event driven ones. You can probably make these connections yourself. In case you haven't read the previous posts, you probably only care about the event-handling stuff anyway, so the rest is unimportant to you.

The first change is the introduction of the WorkItem layer to insulate the actual job from the Transition. This ensures that the programmer who writes the batch job does not have to worry about whether the job will run synchronously or asynchronously, and in any case, its a common concern, so it should be factored out into one place.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// WorkItem.java
package com.mycompany.myapp.workflow;

import java.util.Map;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public abstract class WorkItem implements ApplicationContextAware {

  private String name;
  private Workflow workflow;
  private String transitionName;
  private IExecutable executable;
  private ApplicationContext applicationContext;

  public WorkItem() {
    super();
  }
  
  public ApplicationContext getApplicationContext() {
    return applicationContext;
  }
  
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public String getTransitionName() {
    return transitionName;
  }

  public void setTransitionName(String transitionName) {
    this.transitionName = transitionName;
  }

  public Workflow getWorkflow() {
    return workflow;
  }

  public void setWorkflow(Workflow workflow) {
    this.workflow = workflow;
  }

  public IExecutable getExecutable() {
    return executable;
  }

  public void setExecutable(IExecutable executable) {
    this.executable = executable;
  }

  public abstract void execute();
}

WorkItem has two implementations, the SynchronousWorkItem (which can be used for testing the workflow itself, among other uses), and the AsynchronousWorkItem (which would probably be the one a client would mostly use).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// SynchronousWorkItem.java
package com.mycompany.myapp.workflow.workitems;

import java.util.Map;

import com.mycompany.myapp.workflow.IExecutable;
import com.mycompany.myapp.workflow.Transition;
import com.mycompany.myapp.workflow.WorkItem;
import com.mycompany.myapp.workflow.Workflow;
import com.mycompany.myapp.workflow.events.TransitionEventSource;
import com.mycompany.myapp.workflow.events.TransitionFailedEvent;
import com.mycompany.myapp.workflow.events.TransitionEvent;

public class SynchronousWorkItem extends WorkItem {

  public SynchronousWorkItem() {
    super();
  }
  
  @Override
  public void execute() {
    Workflow workflow = getWorkflow();
    Transition transition = workflow.getTransitionByName(getTransitionName());
    TransitionEventSource source = new TransitionEventSource();
    source.setWorkflow(workflow);
    source.setCurrentTransition(transition);
    try {
      Map<gString,Boolean> attributes = 
        getExecutable().execute(getWorkflow().getAttributes());
      workflow.setAttributes(attributes);
      source.setWorkflow(workflow);
      source.setTransitionStatus(true);
      getApplicationContext().publishEvent(new TransitionEvent(source));
    } catch (Exception e) {
      source.setTransitionStatus(false);
      getApplicationContext().publishEvent(new TransitionEvent(source));
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// AsynchronousWorkItem.java
package com.mycompany.myapp.workflow.workitems;

import java.util.Map;

import com.mycompany.myapp.workflow.Transition;
import com.mycompany.myapp.workflow.Workflow;
import com.mycompany.myapp.workflow.events.TransitionEventSource;
import com.mycompany.myapp.workflow.events.TransitionFailedEvent;
import com.mycompany.myapp.workflow.events.TransitionEvent;

/**
 * Allows a WorkItem to be fired asynchronously, and send an event back to
 * the Workflow on completion.
 */
public class AsynchronousWorkItem extends SynchronousWorkItem {

  public AsynchronousWorkItem() {
    super();
  }
  
  @Override
  public void execute() {
    Runnable r = new Runnable() {
      public void run() {
        Workflow workflow = getWorkflow();
        Transition transition = workflow.getTransitionByName(getTransitionName());
        TransitionEventSource source = new TransitionEventSource();
        source.setWorkflow(workflow);
        source.setCurrentTransition(transition);
        try {
          Map<String,Boolean> attributes = 
            getExecutable().execute(getWorkflow().getAttributes());
          workflow.setAttributes(attributes);
          source.setWorkflow(workflow);
          source.setTransitionStatus(true);
          getApplicationContext().publishEvent(new TransitionEvent(source));
        } catch (Exception e) {
          source.setTransitionStatus(false);
          getApplicationContext().publishEvent(new TransitionEvent(source));
        }
      }
    };
    new Thread(r).start();
  }
}

A user of the workflow needs to only provide implementations of the IExecutable interface. It has a single method execute(), which takes in the map of workflow attributes, and returns the updated map of workflow attributes. The user must make sure that the correct Javascript variables are updated before returning the attributes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// IExecutable.java
package com.mycompany.myapp.workflow;

import java.util.Map;

/**
 * Generic interface that all WorkItem Executors must implement.
 */
public interface IExecutable {
  public Map<String,Boolean> execute(Map<String,Boolean> attributes) throws Exception;
}

An example IExecutable implementation is the MockExecutable I use for my tests. The functionality is similar to the MockClosure I have been using in previous iterations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// MockExecutable.java
package com.mycompany.myapp.workflow.executables;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import com.mycompany.myapp.workflow.IExecutable;

/**
 * Mock Executor that just prints a message and updates the _OK variable.
 * This executor could (or not) be used as a model for building executors
 * which wrap existing legacy classes.
 */
public class MockExecutable implements IExecutable {

  private String name;

  public String getName() {
    return name;
  }
  
  public void setName(String name) {
    this.name = name;
  }

  public Map<String,Boolean> execute(Map<String,Boolean> attributes) 
      throws Exception {
    System.out.println("Executing:" + name);
    attributes.put(StringUtils.upperCase(name) + "_OK", Boolean.TRUE);
    return attributes;
  }
}

And here is how I hook all these things into the Spring Application Context. In the interest of brevity, only one definition is shown, the others are similar. The workflow-conf.xml is unchanged as a result of this change.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<beans ..>
  ...
  <bean id="workItem_t01" 
      class="com.mycompany.myapp.workflow.workitems.AsynchronousWorkItem">
    <property name="name" value="t01"/>
    <property name="executable">
      <bean class="com.mycompany.myapp.workflow.executables.MockExecutable">
        <property name="name" value="t01"/>
      </bean>
    </property>
  </bean>
  ...
</beans>

As you can see, both WorkItem implementations take care of extracting the current attributes from the Workflow object, pass it into the mapped IExecutable, retrieve and update the attributes into the Workflow, then call ApplicationContext.publishEvent() to publish the changes. We need an Event class that the WorkflowFactory will listen for (so we don't react to other events Spring may be throwing). We also need an Event source object, which will contain the information to be passed from the Event publisher to the subscriber. Both are shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// TransitionEvent.java
package com.mycompany.myapp.workflow.events;

import org.springframework.context.ApplicationEvent;

public class TransitionEvent extends ApplicationEvent {

  private static final long serialVersionUID = 2221611924011056575L;

  public TransitionEvent(Object source) {
    super(source);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// TransitionEventSource.java
package com.mycompany.myapp.workflow.events;

import com.mycompany.myapp.workflow.Transition;
import com.mycompany.myapp.workflow.Workflow;

public class TransitionEventSource {
  
  private Workflow workflow;
  private Transition currentTransition;
  private boolean transitionStatus;
  
  public Workflow getWorkflow() {
    return workflow;
  }
  
  public void setWorkflow(Workflow workflow) {
    this.workflow = workflow;
  }
  
  public Transition getCurrentTransition() {
    return currentTransition;
  }
  
  public void setCurrentTransition(Transition currentTransition) {
    this.currentTransition = currentTransition;
  }
  
  public boolean isTransitionStatus() {
    return transitionStatus;
  }

  public void setTransitionStatus(boolean transitionStatus) {
    this.transitionStatus = transitionStatus;
  }
}

And finally, the listener in the onApplicationEvent() method of WorkflowFactory.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// WorkflowFactory.java
package com.mycompany.myapp.workflow;

public class WorkflowFactory implements BeanFactoryAware, InitializingBean, ApplicationListener {
  ...
  public void onApplicationEvent(ApplicationEvent event) {
    try {
      if (TransitionEvent.class.isInstance(event)) {
        TransitionEventSource source = TransitionEventSource.class.cast(event.getSource());
        if (source.isTransitionStatus()) {
          Workflow workflow = source.getWorkflow();
          Transition transition = source.getCurrentTransition();
          List<Place> reachablePlaces = workflow.getNextReachablePlaces(transition);
          for (Place reachablePlace : reachablePlaces) {
            workflow.traverse(reachablePlace);
          }
        } else {
          Workflow workflow = source.getWorkflow();
          Transition transition = source.getCurrentTransition();
          log.error("Workflow:[" + workflow.getName() + 
            "] failed Transition:[" + transition.getName() + 
            "], current attributes:" + workflow.getAttributes());
        }
      }
    } catch (Exception e) {
      log.error("Error in Event listener", e);
    }
  }
}

Unlike the caller, who calls the no-args version of the traverse() method on the Workflow object, the onApplicationEvent() method will find the nearest reachable neighbors of the workflow given the current transition and the attributes, and call the overloaded version of the traverse() method. These methods are shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Workflow.java
package com.mycompany.myapp.workflow;

public class Workflow {

  ...
  public void traverse() throws Exception {
    traverse(places.get(startPlaceName));
  }
  
  public void traverse(Place place) throws Exception {
    if (place.isStopPlace()) {
      return;
    }
    List<Transition> fireableTransitions = getNextFireableTransitions(place);
    for (Transition fireableTransition : fireableTransitions) {
      if (alreadyFiredTransitions.contains(fireableTransition)) {
        continue;
      }
      WorkItem workItem = fireableTransition.getWorkItem();
      workItem.execute();
      alreadyFiredTransitions.add(fireableTransition);
    }
  }
  ...
}

Finally, to tie all this together (and to show how a typical client would call this thing), I have the following JUnit test, which does the event driven traversal.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// WorkflowFactoryTest.java
package com.mycompany.myapp.workflow;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class WorkflowFactoryTest {
  
  private final Log log = LogFactory.getLog(getClass());
  
  private static WorkflowFactory factory;
  private Workflow workflow;

  @BeforeClass
  public static void setUpBeforeClass() throws Exception {
    ApplicationContext context = new ClassPathXmlApplicationContext(
      "classpath:applicationContext.xml");
    factory = (WorkflowFactory) context.getBean("workflowFactory");
  }
  
  @Test
  public void testEventDrivenTraversalWithSyncWorkItem() throws Exception {
    workflow = factory.take("myapp-sync-wf");
    log.debug("Event driven workflow traversal with synchronous work items");
    workflow.traverse();
    factory.give("myapp-sync-wf");
  }
  
  @Test
  public void testEventDrivenTraversalWithAsyncItems() throws Exception {
    workflow = factory.take("myapp-async-wf");
    log.debug("Event driven workflow traversal with asynchronous work items");
    workflow.traverse();
    factory.give("myapp-async-wf");
  }
}

In this case, both tests return the same results (similar to the results from depth first traversal in my previous version of this application). However, if I create a SleepyMockExecutable class like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// SleepyMockExecutable.java
package com.mycompany.myapp.workflow.executables;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

public class SleepyMockExecutable extends MockExecutable {

  @Override
  public Map<String,Boolean> execute(Map<String,Boolean> attributes) 
      throws Exception {
    System.out.println("Executing:" + getName());
    Thread.sleep(10000L);
    attributes.put(StringUtils.upperCase(getName()) + "_OK", Boolean.TRUE);
    return attributes;
  }
}

and plug this into Transition t23 like so, thus simulating a batch job that runs slower than the other batch jobs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<beans ...>
  ...
  <bean id="async_workItem_t23" 
      class="com.mycompany.myapp.workflow.workitems.AsynchronousWorkItem">
    <property name="name" value="t23"/>
    <property name="executable">
      <bean class="com.mycompany.myapp.workflow.executables.SleepyMockExecutable">
        <property name="name" value="t23"/>
      </bean>
    </property>
  </bean>
  ...
</beans>

The traversal changes as expected. Since the traversal is now event-driven, there is now no need to tell the workflow which traversal algorithm to use, it is dictated by the completion events and the edge weights. The outputs (before and after introducing SleepyMockExecutable into the chain) are shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// all executables are MockExecutable
Executing:t01
Executing:t12
Executing:t23
Executing:t24
Executing:t345
Executing:t16
Executing:t67
Executing:t578
// t23 is a SleepyMockExecutable, all others MockExecutable
Executing:t01
Executing:t12
Executing:t16
Executing:t23
Executing:t24
Executing:t67
Executing:t345
Executing:t578

As I mentioned before, I used the built-in Spring event handling because I was using Spring anyway. In case you not using Spring, a very good alternative is EventBus, that provides an event handling mechanism similar to Spring and is annotation driven as well. Some sample code that I tried using EventBus is shown below, you can adapt it to use in the application above if you want.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// EventBusBasedPublisher.java
package com.mycompany.myapp.events;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bushe.swing.event.EventBus;
import org.bushe.swing.event.annotation.AnnotationProcessor;
import org.bushe.swing.event.annotation.EventPublisher;

@EventPublisher
public class EventBusBasedPublisher {

  private final Log log = LogFactory.getLog(getClass());
  
  private String name;
  private Map<String,Boolean> attributes = new HashMap<String,Boolean>();
  
  public EventBusBasedPublisher() {
    super();
    AnnotationProcessor.process(this);
  }
  
  public void setName(String name) {
    this.name = name;
  }
  
  public void setAttributes(Map<String,Boolean> attributes) {
    this.attributes = attributes;
  }
  
  public void execute() throws Exception {
    log.debug("Executing:" + name);
    attributes.put(name, Boolean.TRUE);
    EventBus.publish(attributes);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// EventBusBasedSubscriber.java
package com.mycompany.myapp.events;

import java.util.Map;

import org.bushe.swing.event.annotation.AnnotationProcessor;
import org.bushe.swing.event.annotation.EventSubscriber;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class EventBusBasedSubscriber {

  private final Log log = LogFactory.getLog(getClass());
  
  public EventBusBasedSubscriber() {
    super();
    AnnotationProcessor.process(this);
  }
  
  @EventSubscriber
  public void onEvent(Map<String,Boolean> attributes) {
    log.debug("Caught an event");
    log.debug(attributes.toString());
  }
}

And to test this, the following code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// EventBusEventsTest.java
package com.mycompany.myapp.events;

import org.bushe.swing.event.EventBus;
import org.junit.Test;

public class EventBusEventsTest {

  @Test
  public void testEvents() throws Exception {
    EventBusBasedSubscriber subscriber = new EventBusBasedSubscriber();
    EventBusBasedPublisher publisher = new EventBusBasedPublisher();
    for (int i = 0; i < 10; i++) {
      publisher.setName("PUB_" + i);
      publisher.execute();
    }
  }
}

So what's next for me in the Workflow world? Surprisingly, not much. I was talking with a colleague, and he expressed (a valid, IMO) concern that I was burning cycles trying to build something for which stable, mature versions are already widely available in the open source world. So figuring out how to adapt one of these to my application would probably make more sense in the long run. I agree with his assessment, and I will do as he suggests. However, it was a fun three weeks, and what I have got out of it is a better understanding of how workflow engines work, and the issues that have to be dealt with. Hopefully this will help me make a good decision in selecting the right workflow engine for my application.

9 comments (moderated to prevent spam):

eman said...

Hi,
i need to use bossa GUI tool, but i don't know where can i find it?

Sujit Pal said...

You will find it in their downloads page, (bossa-pnk-0.1). Its called the "PNK WFNet Editor", and is described in the release notes as: "This is a customized version of the fantastic Petri Net Kernel (PNK) that can be used to graphically create and edit Petri net workflow case types for the Bossa engine." Keep in mind, however, that this is for building Bossa XML configurations for Petri Nets. The XML format I used is not related in any way to the format this tool will generate.

eman said...

many thanks for ur reply,
as i know than Bossa is embedded library, i want to know if the Bossa GUI is also embedded (i.e. can i add this GUI into a java application?)

eman said...

many thanks for ur reply.
i downloaded bossa-pnk-0.1, & i run it using the command
java -jar PNK2classes.jar

but i don't know how i can use it with Bossa workflow?

Sujit Pal said...

Hi eman, I am probably going to give you unhelpful answers to both your comments. Since I don't know Bossa well enough, these are just educated guesses.

To the first comment, I am not sure why you would want to embed the GUI tool into a java application. Usually in a corporate environment, you would have a few workflow designers who would be ok with using a desktop GUI to build them. In case you want your end users to design their own workflows (a very unlikely scenario, although it may occur), then you probably want to embed it into the same web application that runs these workflows. In that case, I guess you can expose it as an applet, but can't say for sure.

To the second comment, my understanding is that you would use the tool to design your workflow visually, and then generated the XML configuration that be consumed by the Bossa library and your application code.

eman said...

Hi Dr.sujit,
actually, i need the end user to create his own work flow, so i need to embed a work flow tool inside my application. u told me to use applet,
actually i don't know how i can allow user to build his work flow using applet, can u tell me how i can do this? or if u have any illustrated example?

Sujit Pal said...

Hi Eman, I am not a Dr., neither the medical kind nor a Ph.D. :-). And no, I don't have an example to illustrate what you need, this was just a guess as to maybe how you could do it if at all you want to support the kind of user-designed workflow scenario you mention.

kannan said...

Hi Salmon ,

I went through your post on work flow using spring events , its really very useful for me. iam new to spring and trying to use spring events for one of our projects .
can u please help by giving the source code of the project or this tutorial.

thanks
anithkan

Sujit Pal said...

Hi Kannan, I wrote this stuff to self-train for a project that would use Spring events heavily...unfortunately, that project never worked out, so I didn't end up checking it in anywhere, and now couple of disk crashes and machines later, I don't think I have it anymore. However, you should be able to piece together the entire application from the source code on the page (and neighboring pages, I believe I wrote more than 1 blog post about this stuff). Sorry couldn't be of more help.