May 3, 2020
Estimated Post Reading Time ~

Creating a custom Akamai replication agent in AEM

Replication is central to the AEM experience. AEM is about content and replication is how you move that content across servers. You're most likely using at least two of the out-of-the-box replication agents provided by Adobe: your default agent activates content from an author to publish and your dispatcher flush agent clears your Dispatcher cache. AEM provides several other replication agents for tasks such as replicating in reverse (publish to the author), moving content within the Adobe Marketing Cloud products such as Scene 7 and Test and Target, and static agents for replicating to the file system. This blog post details the steps used to create your own custom replication agents.


The following sample project demonstrates a custom replication agent that purges Akamai CDN (Content Delivery Network) cached content. The full code for this blog is hosted on GitHub. There are three pieces to this project: the transport handler, the content builder, and the replication agent's user interface.

Transport Handler
TransportHandler implementations control the communication with the destination server and determine when to report back a positive ReplicationResult to complete the activation and when to report back a negative ReplicationResult returning the activation to the queue.
AkamaiTransportHandler.java

package com.nateyolles.aem.core.replication;

import java.io.IOException;
import java.util.Arrays;

import com.day.cq.replication.AgentConfig;
import com.day.cq.replication.ReplicationActionType;
import com.day.cq.replication.ReplicationException;
import com.day.cq.replication.ReplicationLog;
import com.day.cq.replication.ReplicationResult;
import com.day.cq.replication.ReplicationTransaction;
import com.day.cq.replication.TransportContext;
import com.day.cq.replication.TransportHandler;

import org.apache.commons.codec.CharEncoding;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.json.JSONArray;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.commons.json.JSONObject;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.jackrabbit.util.Base64;

/**
 * Transport handler to send test and purge requests to Akamai and handle
 * responses. The handler sets up basic authentication with the user/pass from
 * the replication agent's transport config and sends a GET request as a test
 * and POST as purge request. A valid test response is 200 while a valid purge
 * response is 201.
 * 
 * The transport handler is triggered by setting your replication agent's
 * transport URL's protocol to "akamai://".
 *
 * The transport handler builds the POST request body in accordance with
 * Akamai's CCU REST APIs {@link https://api.ccu.akamai.com/ccu/v2/docs/}
 * using the replication agent properties. 
 */
@Service(TransportHandler.class)
@Component(label = "Akamai Purge Agent", immediate = true)
public class AkamaiTransportHandler implements TransportHandler {

    /** Protocol for replication agent transport URI that triggers this transport handler. */
    private final static String AKAMAI_PROTOCOL = "akamai://";

    /** Akamai CCU REST API URL */
    private final static String AKAMAI_CCU_REST_API_URL = "https://api.ccu.akamai.com/ccu/v2/queues/default";

    /** Replication agent type property name. Valid values are "arl" and "cpcode". */
    private final static String PROPERTY_TYPE = "akamaiType";

    /** Replication agent multifield CP Code property name.*/
    private final static String PROPERTY_CP_CODES = "akamaiCPCodes";

    /** Replication agent domain property name. Valid values are "staging" and "production". */
    private final static String PROPERTY_DOMAIN = "akamaiDomain";

    /** Replication agent action property name. Valid values are "remove" and "invalidate". */
    private final static String PROPERTY_ACTION = "akamaiAction";

    /** Replication agent default type value */
    private final static String PROPERTY_TYPE_DEFAULT = "arl";

    /** Replication agent default domain value */
    private final static String PROPERTY_DOMAIN_DEFAULT = "production";

    /** Replication agent default action value */
    private final static String PROPERTY_ACTION_DEFAULT = "remove";

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean canHandle(AgentConfig config) {
        final String transportURI = config.getTransportURI();

        return (transportURI != null) ? transportURI.toLowerCase().startsWith(AKAMAI_PROTOCOL) : false;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public ReplicationResult deliver(TransportContext ctx, ReplicationTransaction tx)
            throws ReplicationException {

        final ReplicationActionType replicationType = tx.getAction().getType();

        if (replicationType == ReplicationActionType.TEST) {
            return doTest(ctx, tx);
        } else if (replicationType == ReplicationActionType.ACTIVATE ||
                replicationType == ReplicationActionType.DEACTIVATE) {
            return doActivate(ctx, tx);
        } else {
            throw new ReplicationException("Replication action type " + replicationType + " not supported.");
        }
    }

    /**
     * Send test request to Akamai via a GET request.
     *
     * Akamai will respond with a 200 HTTP status code if the request was
     * successfully submitted. The response will have information about the
     * queue length, but we're simply interested in the fact that the request
     * was authenticated.
     *
     * @param ctx Transport Context
     * @param tx Replication Transaction
     * @return ReplicationResult OK if 200 response from Akamai
     * @throws ReplicationException
     */
    private ReplicationResult doTest(TransportContext ctx, ReplicationTransaction tx)
            throws ReplicationException {

        final ReplicationLog log = tx.getLog();
        final HttpGet request = new HttpGet(AKAMAI_CCU_REST_API_URL);
        final HttpResponse response = sendRequest(request, ctx, tx);

        if (response != null) {
            final int statusCode = response.getStatusLine().getStatusCode();

            log.info(response.toString());
            log.info("---------------------------------------");

            if (statusCode == HttpStatus.SC_OK) {
                return ReplicationResult.OK;
            }
        }

        return new ReplicationResult(false, 0, "Replication test failed");
    }

    /**
     * Send purge request to Akamai via a POST request
     *
     * Akamai will respond with a 201 HTTP status code if the purge request was
     * successfully submitted.
     *
     * @param ctx Transport Context
     * @param tx Replication Transaction
     * @return ReplicationResult OK if 201 response from Akamai
     * @throws ReplicationException
     */
    private ReplicationResult doActivate(TransportContext ctx, ReplicationTransaction tx)
            throws ReplicationException {

        final ReplicationLog log = tx.getLog();
        final HttpPost request = new HttpPost(AKAMAI_CCU_REST_API_URL);

        createPostBody(request, ctx, tx);

        final HttpResponse response = sendRequest(request, ctx, tx);

        if (response != null) {       
            final int statusCode = response.getStatusLine().getStatusCode();

            log.info(response.toString());
            log.info("---------------------------------------");
            
            if (statusCode == HttpStatus.SC_CREATED) {
                return ReplicationResult.OK;
            }
        }

        return new ReplicationResult(false, 0, "Replication failed");
    }

    /**
     * Build preemptive basic authentication headers and send request.
     *
     * @param request The request to send to Akamai
     * @param ctx The TransportContext containing the username and password
     * @return HttpResponse The HTTP response from Akamai
     * @throws ReplicationException if a request could not be sent
     */
    private <T extends HttpRequestBase> HttpResponse sendRequest(final T request,
            final TransportContext ctx, final ReplicationTransaction tx)
            throws ReplicationException {

        final ReplicationLog log = tx.getLog();
        final String auth = ctx.getConfig().getTransportUser() + ":" + ctx.getConfig().getTransportPassword();
        final String encodedAuth = Base64.encode(auth);
        
        request.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth);
        request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());

        HttpClient client = HttpClientBuilder.create().build();
        HttpResponse response;

        try {
            response = client.execute(request);
        } catch (IOException e) {
            throw new ReplicationException("Could not send replication request.", e);
        }

        return response;
    }

    /**
     * Build the Akamai purge request body based on the replication agent
     * settings and append it to the POST request.
     *
     * @param request The HTTP POST request to append the request body
     * @param ctx TransportContext
     * @param tx ReplicationTransaction
     * @throws ReplicationException if errors building the request body 
     */
    private void createPostBody(final HttpPost request, final TransportContext ctx,
            final ReplicationTransaction tx) throws ReplicationException {

        final ValueMap properties = ctx.getConfig().getProperties();
        final String type = PropertiesUtil.toString(properties.get(PROPERTY_TYPE), PROPERTY_TYPE_DEFAULT);
        final String domain = PropertiesUtil.toString(properties.get(PROPERTY_DOMAIN), PROPERTY_DOMAIN_DEFAULT);
        final String action = PropertiesUtil.toString(properties.get(PROPERTY_ACTION), PROPERTY_ACTION_DEFAULT);

        JSONObject json = new JSONObject();
        JSONArray purgeObjects = null;

        /*
         * Get list of CP codes or ARLs/URLs depending on agent setting
         */
        if (type.equals(PROPERTY_TYPE_DEFAULT)) {

            /*
             * Get the content created with the custom content builder class
             * 
             * The list of activated resources (e.g.: ["/content/geometrixx/en/blog"])
             * is available in tx.getAction().getPaths(). For this example, we want the
             * content created in our custom content builder which is available in
             * tx.getContent().getInputStream().
             */
            try {
                final String content = IOUtils.toString(tx.getContent().getInputStream());

                if (StringUtils.isNotBlank(content)) {
                    purgeObjects = new JSONArray(content);
                }
            } catch (IOException | JSONException e) {
                throw new ReplicationException("Could not retrieve content from content builder", e);
            }
        } else {
            final String[] cpCodes = PropertiesUtil.toStringArray(properties.get(PROPERTY_CP_CODES));
            purgeObjects = new JSONArray(Arrays.asList(cpCodes));
        }

        if (purgeObjects != null && purgeObjects.length() > 0) {
            try {
                json.put("type", type)
                    .put("action", action)
                    .put("domain", domain)
                    .put("objects", purgeObjects);
            } catch (JSONException e) {
                throw new ReplicationException("Could not build purge request content", e);
            }

            final StringEntity entity = new StringEntity(json.toString(), CharEncoding.ISO_8859_1);
            request.setEntity(entity);

        } else {
            throw new ReplicationException("No CP codes or pages to purge");
        }
    }
}

The default HTTP transport handler sends an authenticated request to the destination server specified in the "Transport URI" setting and expects an HTTP response with a status code of "200 OK" in order to mark the activation as successful. The Akamai CCU REST API responds with a status code of "201 Created" for a successful purge request. Therefore, a custom Transport Handler was utilized and given the responsibility for sending the POST requests, looking for 201 responses, and returning the proper ReplicationResult.

The transport handler service determines which transport handler to use based on the overridden canHandle method. You'll notice that any replication agent configured with a "Transport URI" that begins with http:// or https:// will be handled by AEM's HTTP transport handler. The convention is to create a unique URL protocol/scheme and have your transport handler's canHandle method watch for Transport URIs that start with your URL scheme. For example, by navigating to a clean instance's Agents on Author page, you'll find default AEM replication agents using http://, static://, tnt://, s7delivery:// and repo://. In this example, the transport handler is activated on the amakai:// scheme and uses the hardcoded Akamai API REST endpoint. A popular convention is to set your transport handler's Transport URIs that start with something like "foo-", which allows the user to configure their replication agent with either "foo-http://" or "foo-https://" and have your transport handler simply remove the custom prefix before making the HTTP request.


The transport handler also handles other ReplicationActionTypes. The Akamai example implements the standard "Test Connection" feature of replication agents by making a GET request to the Akamai API endpoint which expects a "200 OK" response in return - it's simply testing the replication agent's configured username and password. The example does not implement other replication action types such as deletions, deactivations, and polling (reverse).

When you speak of a replication agent, you're first thought will probably be of the default agent that moves content from an author to publish via HTTP. Likewise, I've been discussing the Akamai transport handler example and its usage of HTTP GET and POST requests. However, it's important to note that your transport handler doesn't need to make HTTP calls. For example, you can write an FTP transport handler or a transport handler that interacts with the server's file system like the static replication agent does. Your transport handler can do anything as long as it returns a positive or negative replication result to update the queue.

Content Builder
ContentBuilder implementations build the body of the replication request. Implementations of the ContentBuilder interface end up as serialization options in the replication agent configuration dialog alongside the Default, Dispatcher Flush, Binary less, and Static Content Builder options.
AkamaiContentBuilder.java

package com.nateyolles.aem.core.replication;

import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.nio.file.Files;
import java.nio.file.Path;

import javax.jcr.Session;

import org.apache.commons.lang3.StringUtils;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.json.JSONArray;
import org.apache.sling.jcr.resource.JcrResourceConstants;

import com.day.cq.commons.Externalizer;
import com.day.cq.replication.ContentBuilder;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationContent;
import com.day.cq.replication.ReplicationContentFactory;
import com.day.cq.replication.ReplicationException;
import com.day.cq.replication.ReplicationLog;
import com.day.cq.wcm.api.Page;
import com.day.cq.wcm.api.PageManager;

/**
 * Akamai content builder to create replication content containing a JSON array
 * of URLs for Akamai to purge through the Akamai Transport Handler. This class
 * takes the internal resource path and converts it to external URLs as well as
 * adding vanity URLs and pages that may Sling include the activated resource.
 */
@Component(metatype=false)
@Service(ContentBuilder.class)
@Property(name="name", value="akamai")
public class AkamaiContentBuilder implements ContentBuilder {

    @Reference
    private ResourceResolverFactory resolverFactory;

    /** The name of the replication agent */
    public static final String NAME = "akamai";

    /**
     * The serialization type as it will display in the replication
     * agent edit dialog selection field.
     */
    public static final String TITLE = "Akamai Purge Agent";

    /**
     * {@inheritDoc}
     */
    @Override
    public ReplicationContent create(Session session, ReplicationAction action,
            ReplicationContentFactory factory) throws ReplicationException {
        return create(session, action, factory, null);
    }

    /**
     * Create the replication content containing the public facing URLs for
     * Akamai to purge.
     */
    @Override
    public ReplicationContent create(Session session, ReplicationAction action,
            ReplicationContentFactory factory, Map<String, Object> parameters)
            throws ReplicationException {

        final String path = action.getPath();
        final ReplicationLog log = action.getLog();

        ResourceResolver resolver = null;
        PageManager pageManager = null;
        JSONArray jsonArray = new JSONArray();

        if (StringUtils.isNotBlank(path)) {
            try {
                HashMap<String, Object> sessionMap = new HashMap<>();
                sessionMap.put(JcrResourceConstants.AUTHENTICATION_INFO_SESSION, session);
                resolver = resolverFactory.getResourceResolver(sessionMap);

                if (resolver != null) {
                    pageManager = resolver.adaptTo(PageManager.class);
                }
            } catch (LoginException e) {
                log.error("Could not retrieve Page Manager", e);
            }

            if (pageManager != null) {
                Page purgedPage = pageManager.getPage(path);

                /*
                 * Get the external URL if the resource is a page. Otherwise, use the
                 * provided resource path.
                 */
                if (purgedPage != null) {
                    /* 
                     * Use the Externalizer, Sling mappings, Resource Resolver mapping and/or
                     * string manipulation to transform "/content/my-site/foo/bar" into
                     * "https://www.my-site.com/foo/bar.html". This example assumes a custom
                     * "production" externalizer setting.
                     */
                    Externalizer externalizer = resolver.adaptTo(Externalizer.class);
                    final String link = externalizer.externalLink(resolver, "production", path) + ".html";

                    jsonArray.put(link);
                    log.info("Page link added: " + link);

                    /*
                     * Add page's vanity URL if it exists.
                     */
                    final String vanityUrl = purgedPage.getVanityUrl();

                    if (StringUtils.isNotBlank(vanityUrl)) {
                        jsonArray.put(vanityUrl);
                        log.info("Vanity URL added: " + vanityUrl);
                    }

                    /*
                     * Get containing pages that includes the resource.
                     */
                    // Run project specific query

                } else {
                    jsonArray.put(path);
                    log.info("Resource path added: " + path);
                }

                return createContent(factory, jsonArray);
            }
        }

        return ReplicationContent.VOID;
    }

    /**
     * Create the replication content containing 
     *
     * @param factory Factory to create replication content
     * @param jsonArray JSON array of URLS to include in replication content
     * @return replication content
     *
     * @throws ReplicationException if an error occurs
     */
    private ReplicationContent createContent(final ReplicationContentFactory factory,
            final JSONArray jsonArray) throws ReplicationException {

        Path tempFile;

        try {
            tempFile = Files.createTempFile("akamai_purge_agent", ".tmp");
        } catch (IOException e) {
            throw new ReplicationException("Could not create temporary file", e);
        }

        try (BufferedWriter writer = Files.newBufferedWriter(tempFile, Charset.forName("UTF-8"))) {
            writer.write(jsonArray.toString());
            writer.flush();

            return factory.create("text/plain", tempFile.toFile(), true);
        } catch (IOException e) {
            throw new ReplicationException("Could not write to temporary file", e);
        }
    }

    /**
     * {@inheritDoc}
     *
     * @return {@value #NAME}
     */
    @Override
    public String getName() {
        return NAME;
    }

    /**
     * {@inheritDoc}
     *
     * @return {@value #TITLE}
     */
    @Override
    public String getTitle() {
        return TITLE;
    }
}

The provided Akamai example project could have been done without implementing a content builder; the logic in the content builder could have been completed in the transport handler as the transport handler created it's own request anyways. Another thing to consider is whether you need the session as ContentBuilder implementations give you that while TransportHandler implementations do not.

A perfect example of utilizing the content builder is in Andrew Khoury's Dispatcher refetching flush agent where the default HTTP transport handler is still used to communicate with the Dispatcher and only the HTTP request body needed to be built out in order for Dispatcher to fetch and re-cache content.

User Interface
By implementing a content builder, the user can simply use a default replication agent and choose the custom serialization type. However, the Akamai replication agent requires the following custom configurations:
  1. an option to remove versus invalidate content
  2. an option to purge resources versus purging via CP Codes
  3. an option to purge the production versus staging domain
  4. the reverse replication option removed

A clean user interface was provided in order for users to implement and configure the Akamai replication agent. To accomplish this, a custom cq:Template as well as a corresponding cq:Component including the view and dialog was made. The easiest way is to copy the default replication agent from /libs/cq/replication/templates/agent and /libs/cq/replication/components/agent to /apps/your-project/replication and update the agent like any other AEM component.

To keep things clean and simple, the Akamai replication agent component inherits from the default replication agent by setting the sling:resourceSuperType to cq/replication/components/agent. The only update needed to the copied component was the dialog options and the agent.jsp file as it contains JavaScript to open the dialog for which you need to update the path. Any additions to the dialog can be retrieved through the TransportContext's getConfig().getProperties() ValueMap.

Download the Akamai Replication Agent project from GitHub.



By aem4beginner

No comments:

Post a Comment

If you have any doubts or questions, please let us know.