On the other hand, there are use cases where the guarantee of processing is a must and usually, this comes with the requirement of processing exactly once. Typical examples are sending notification emails (or SMS), post-processing of content (like thumbnail generation of images or documents), workflow steps, etc.
Sling Event takes care of the above use cases. Sling Event has a concept of Job. A Job is also an event that is guaranteed to be executed at least once with the exception being the instance processing the job crashed after the job processing is finished but before the state is persisted. This results in job consumers processing the job twice.
Job Fundamentals:
1) A job has two parts - the topic which describes the nature of the job and payload, the data in the form of key-value pairs.
2) A job consumer is a service consuming and processing a job. It registers itself as an OSGi service together with a property defining which topics this consumer can process.
3) A job executor is a service processing a job. It registers itself as an OSGi service together with a property defining which topics this consumer can process.
Now, let's start by creating an event handler for publishing content:
ActivationEventHandler
package aem.project.core.jobs.handlers;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.osgi.framework.Constants;
import org.osgi.service.event.EventConstants;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationActionType;
import java.util.HashMap;
import java.util.Map;
import org.apache.sling.event.jobs.JobManager;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(service = EventHandler.class,
immediate = true,
property = {
Constants.SERVICE_DESCRIPTION + "=Demo to listen on changes in the replication",
EventConstants.EVENT_TOPIC + "=" + ReplicationAction.EVENT_TOPIC
})
public class ActivationEventHandler implements EventHandler{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Reference
JobManager jobManager;
@Override
public void handleEvent(Event event) {
try {
logger.debug("Resource event: {} at: {}", event.getTopic());
logger.debug("Replication Event is {}", ReplicationAction.fromEvent(event).getType());
if (ReplicationAction.fromEvent(event).getType().equals(ReplicationActionType.ACTIVATE)) {
logger.debug("Triggered activate on {}", ReplicationAction.fromEvent(event).getPath());
//Create a property map to pass it to the JobConsumer service
Map<String, Object> jobProperties = new HashMap<String, Object>();
jobProperties.put("path", ReplicationAction.fromEvent(event).getPath());
//For some reason if the job fails, but you want to keep retrying ; then in JobConsumer//Set the result as failed . Check the JobConsumer
jobManager.addJob("aem/replication/job", jobProperties); // This can point to you registered Job Consumer Property Topics
logger.debug("the job has been started for: {}", jobProperties);
}
} catch (Exception e) {
logger.error("Exception is " , e);
}
}
}
ActivationEventConsumer
package aem.project.core.jobs.handlers;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.component.annotations.Component;
import org.osgi.framework.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(service = JobConsumer.class,
immediate = true,
property = {
Constants.SERVICE_DESCRIPTION + "=Demo to listen on changes in the resource tree",
JobConsumer.PROPERTY_TOPICS + "=aem/replication/job" //topic names like sample.replication.job will NOT WORK
})
public class ActivationEventConsumer implements JobConsumer{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public JobResult process(Job job) {
try {
logger.debug("Processing the JOB *******");
//A Property map will be passed on so we can fetch the values we need here to//Process the request
String path = (String) job.getProperty("path");
logger.debug("The path in which the replication is triggered and passed to the Job is " +
"{}", path);
return JobConsumer.JobResult.OK;
} catch (Exception e) {
logger.error("Exception is ", e);
return JobResult.FAILED;
}
}
}
Deploy the code to your AEM instance and navigate to OSGI Configuration Apache Sling Job Queue Configuration. Define the topic for the job alongside the other configurations such as retry delays, maximum retries, etc
The jobs will be added to var/eventing/slings
Reference:
https://sling.apache.org/documentation/bundles/apache-sling-eventing-and-job-handling.html
No comments:
Post a Comment
If you have any doubts or questions, please let us know.