March 23, 2021
Estimated Post Reading Time ~

Sling Jobs

One of the most underused and powerful features of the sling is Sling Jobs. We use OSGI events for the handling of events but in the general event, the mechanism has no knowledge about the content of events. Therefore, it can't decide if an event is important and should be processed by someone. As the event mechanism is a "fire event and forgets about it" algorithm, there is no way for an event admin to tell if someone has really processed the event. Processing of an event could fail, the server or bundle could be stopped, etc.

On the other hand, there are use cases where the guarantee of processing is a must and usually, this comes with the requirement of processing exactly once. Typical examples are sending notification emails (or SMS), post-processing of content (like thumbnail generation of images or documents), workflow steps, etc.

Sling Event takes care of the above use cases. Sling Event has a concept of Job. A Job is also an event that is guaranteed to be executed at least once with the exception being the instance processing the job crashed after the job processing is finished but before the state is persisted. This results in job consumers processing the job twice.

Job Fundamentals:

1) A job has two parts - the topic which describes the nature of the job and payload, the data in the form of key-value pairs.

2) A job consumer is a service consuming and processing a job. It registers itself as an OSGi service together with a property defining which topics this consumer can process.

3) A job executor is a service processing a job. It registers itself as an OSGi service together with a property defining which topics this consumer can process.

Now, let's start by creating an event handler for publishing content:

ActivationEventHandler

package aem.project.core.jobs.handlers;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.osgi.framework.Constants;
import org.osgi.service.event.EventConstants;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationActionType;

import java.util.HashMap;
import java.util.Map;

import org.apache.sling.event.jobs.JobManager;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service = EventHandler.class,
immediate = true,
property = {
Constants.SERVICE_DESCRIPTION + "=Demo to listen on changes in the replication",
EventConstants.EVENT_TOPIC + "=" + ReplicationAction.EVENT_TOPIC
})
public class ActivationEventHandler implements EventHandler{

private final Logger logger = LoggerFactory.getLogger(getClass());

@Reference
JobManager jobManager;

@Override
public void handleEvent(Event event) {

try {
logger.debug("Resource event: {} at: {}", event.getTopic());
logger.debug("Replication Event is {}", ReplicationAction.fromEvent(event).getType());
if (ReplicationAction.fromEvent(event).getType().equals(ReplicationActionType.ACTIVATE)) {
logger.debug("Triggered activate on {}", ReplicationAction.fromEvent(event).getPath());

//Create a property map to pass it to the JobConsumer service
Map<String, Object> jobProperties = new HashMap<String, Object>();
jobProperties.put("path", ReplicationAction.fromEvent(event).getPath());

//For some reason if the job fails, but you want to keep retrying ; then in JobConsumer//Set the result as failed . Check the JobConsumer

jobManager.addJob("aem/replication/job", jobProperties); // This can point to you registered Job Consumer Property Topics

logger.debug("the job has been started for: {}", jobProperties);
}

} catch (Exception e) {
logger.error("Exception is " , e);
}

}

}

ActivationEventConsumer
package aem.project.core.jobs.handlers;

import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.component.annotations.Component;
import org.osgi.framework.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service = JobConsumer.class,
immediate = true,
property = {
Constants.SERVICE_DESCRIPTION + "=Demo to listen on changes in the resource tree",
JobConsumer.PROPERTY_TOPICS + "=aem/replication/job" //topic names like sample.replication.job will NOT WORK
})
public class ActivationEventConsumer implements JobConsumer{

private final Logger logger = LoggerFactory.getLogger(getClass());

@Override
public JobResult process(Job job) {

try {
logger.debug("Processing the JOB *******");

//A Property map will be passed on so we can fetch the values we need here to//Process the request

String path = (String) job.getProperty("path");
logger.debug("The path in which the replication is triggered and passed to the Job is " +
"{}", path);
return JobConsumer.JobResult.OK;
} catch (Exception e) {
logger.error("Exception is ", e);
return JobResult.FAILED;
}
}

}


Deploy the code to your AEM instance and navigate to OSGI Configuration Apache Sling Job Queue Configuration. Define the topic for the job alongside the other configurations such as retry delays, maximum retries, etc


The jobs will be added to var/eventing/slings


Reference:
https://sling.apache.org/documentation/bundles/apache-sling-eventing-and-job-handling.html


By aem4beginner

No comments:

Post a Comment

If you have any doubts or questions, please let us know.