To put the above into context, let's assume we have a 'custom CQ workflow process step' (a.k.a. automated step) - PostPageActivationWorkFlow.java - developed already. The customer step is normally part of some workflow. This custom process step was designed so that it will 'replicate' the payload (the entity upon which a workflow instance acts) to a hypothetical remote place via HTTP POST. To run and trigger this custom process step (which in terms will execute PostPageActivationWorkFlow.replicate() method that I named and implemented), you normally create a workflow by dropping this custom process step into it. When the workflow runs into this step, its execute() will be executed. However, instead of triggering the process step from the workflow, what if we also want such 'replication' to be triggered by a CQ event (specifically, by the action type of the ReplicationAction event)? The following code sample will show you how.
First, let's take a quick look at the custom workflow process step - PostPageActivationWorkFlow.java. I'm not going into details about how to code a custom workflow process step. This custom workflow process step was implemented according to an article titled Extending Workflow Functionality if you're interested to know. But the only thing I'd point out is that the class is registered as an OSGi service, and it is a method that we named it replicate() we want to start based on a triggered event. As you'll see later, it will be called by a job processor's process() to process a job in the background.
package com.test.services;
import ...
/**
* Workflow process step that sends the activated payload to a remote server via
* HTTP POST.
*
* Declares self as OSGI Component, enable property changes from OSGI configuration
* console, immediate activation
*/
@Component(metatype = true, immediate = true, label = "Post Page Activation Workflow", description = "A workflow process step implementation to be executed after a page is activated.")
@Service
@Properties( {
@Property(name = Constants.SERVICE_DESCRIPTION, value = "Replicate to remote workflow process implementation."),
@Property(name = Constants.SERVICE_VENDOR, value = "Test, Inc."),
@Property(name = "process.label", value = "Post Page Activation Workflow Process")
} )
public class PostPageWorkflow implements WorkflowProcess {
...
/**
* Called when the custom process step is executed.
* A WorkflowPorcess must implements execute()
*/
public void execute(WorkItem item, WorkflowSession session, MetaDataMap args) throws WorkflowException { // 1
...
}
/**
* A custom named method to transform a given jcr node into XML and replicate/post
* it to the remote place via HTTP.
*
* @param path full path (jcr:content excluded) of the JCR node of which to be published
* to the remote place. This must be a node under which jcr:content node resides.
* @throws WorkflowException
*/
public void replicate(String path) throws WorkflowException {
String result = null;
path = path + "/jcr:content";
// within an OSGi service, get a Resource based on 'path'
ResourceResolver resourceResolver = null;
Resource res = null;
try {
resourceResolver = resolverFactory.getAdministrativeResourceResolver(null);
res = resourceResolver.getResource(path);
} catch (Exception e) {
log.error("calling replicate() failed: " + e);
}
//
try {
final Node node = res.adaptTo(Node.class);
if (node != null) {
// ...
// transform node to XML and post it to the remote server
} else {
result = new String("Node was null.");
}
} catch (RepositoryException e) {
throw new WorkflowException(e.getMessage(), e);
}
resourceResolver.close();
}
}
[1] When the workflow runs into this step, it's execute() will be executed.
Below is the actual hook to the event so that when the event happens, the event job will be processed (put to a queue to be executed) as soon as the resource is available. The combination of event, job, and queue assure the job is guaranteed to be processed and won't be lost in the oblivion after an event was triggered.
At Sling level, the hook to events is done by:
developing a service class that implements EventHandler interface which must be registered with a service property EventConstants.EVENT_TOPIC (e.g. ReplicationAction.EVENT_TOPIC , PageEvent.EVENT_TOPIC) whose value is the list of topics (a string or an array of strings) in which the event handler is interested, and
implementing the handleEvent(Event) method to trigger the job.
package com.test.service;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Service;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.BundleContext;
import org.osgi.service.event.EventHandler;
import org.osgi.service.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.day.cq.wcm.api.Page;
import com.day.cq.wcm.api.PageManager;
import com.day.cq.workflow.event.WorkflowEvent;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationActionType;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.JobProcessor;
import org.apache.sling.event.EventUtil;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.jcr.resource.JcrResourceResolverFactory;
@Component(metatype = false, immediate = true)
@Service
@Property(name = "event.topics", value = { ReplicationAction.EVENT_TOPIC }) // 1
public class PageActivationEventHandler implements
EventHandler, // 2
JobProcessor{ // 3
@Reference // 4
private JcrResourceResolverFactory resolverFactory;
Logger log = LoggerFactory.getLogger(this.getClass());
public void handleEvent(Event event) { // 5
if (event.getTopic().equals(ReplicationAction.EVENT_TOPIC)) {
// adopt the Sling's job model to guarantee execution
if (EventUtil.isLocal(event)) {
JobUtil.processJob(event, this); // 6
}
// adopt the Sling's model which does not guarantee execution
// ReplicationAction action = ReplicationAction.fromEvent(event);
// log.info("********User {} has tried to replicate {} to Marklogic",
// action.getUserId(), action.getPath());
}
}
/**
* process(Event event) is the required method of being a subclass of JobProcessor
* that actually publish page to the remote place.
*
* @param event
* @return
*/
@Override
public boolean process(Event event) { // 7
log.info("******** start processing event job");
// get a handle of MarklogicPublisherWorkflow service
BundleContext bundleContext = FrameworkUtil.getBundle(PageActivationEventHandler.class).getBundleContext(); // 8
String filter = "(process.label=Post Page Activation Workflow Process)"; // 9
ServiceReference[] serviceReferences = null;
try {
serviceReferences =
bundleContext.getServiceReferences(com.day.cq.workflow.exec.WorkflowProcess.class.getName(), filter); // 10
} catch (Exception e) {
e.printStackTrace();
}
PostPageActivationWorkflow postPageActivationWorkflow =
(PostPageActivationWorkflow) bundleContext.getService(serviceReferences[0]); // 11
// call PostPageActivationWorkflow.replicate() to replication node to
// other remote place
ReplicationAction action = ReplicationAction.fromEvent(event);
ReplicationActionType actionType = action.getType();
ResourceResolver resourceResolver = null;
try {
if (actionType.equals(ReplicationActionType.ACTIVATE)) { // 12
resourceResolver = resolverFactory.getAdministrativeResourceResolver(null);
final PageManager pm = resourceResolver.adaptTo(PageManager.class);
String path = action.getPath(); // 13
final Page page = pm.getContainingPage(path);
if (page != null) {
log.info("********activating page {}, path {}", page.getTitle(), path);
postPageActivationWorkflow.replicate(path); // 14
}
} else if (actionType.equals(ReplicationActionType.DEACTIVATE) ||
actionType.equals(ReplicationActionType.DELETE)) {
// to-do
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (resourceResolver != null && resourceResolver.isLive()) {
resourceResolver.close();
}
}
return true;
}
}
[1] Register event handler to listen to replication actions. handleEvent(Event) will be called each time the replication action is applied to a payload. A replication action can be of different types - activate, deactivate, reverse, delete. For the same effect, the following code inside the class may (not tested) serve the same purpose instead.
@Property(value=com.day.cq.replication.ReplicationAction)
static final String EVENT_TOPICS = "event.topics";
[2] You must implement the EventHandler interface to hook a handler to an event.
[3] You must implement the JobProcessor interface for the parameter class to be passed to EventUtil.processJob(Event, JobProcessor).
[4] Inject a registered OSGi service by class name using Felix SCR annotation.
[5] Implicitly called when the registered event happens. This event listener handles the event by calling into EventUtil.processJob(Event, JobProcessor) which will in terms calls into JobProcessor.process().
[6] Start processing a job (calls JobProcessor.process()) in the background.
[7] A job processor's process() processes a job in the background is called by EventUtil.processJob(event, this)
[8] Get an OSGi bundle context to be used toward getting a reference to a specific OSGi service in the context.
[9] A special syntax of filter to be used toward located a specific OSGi service registered under the WorkFlowProcess interface.
[10] Get the reference to a list of registered OSGi service(s) using Interface class and a filter string.
[11] Get the reference to the registered OSGi service (the first one).
[12] Do different things based on the replication action types - activate, deactivate, delete, reverse.
[13] full path (jcr:content excluded) of the content node the action applied to.
[14] The core of the job.
References
Event Handling in CQ
How to Manage Job in SlingEventing, Jobs, and Scheduling
Extending Workflow Functionality
Listening for Workflow Event
No comments:
Post a Comment
If you have any doubts or questions, please let us know.