Skip to Main Content U.S. Department of Energy
 

Aggregator Sample

This sample demonstrates the use of a MifAggregator which implements the Aggregator integration pattern. An aggregator is a stateful object that receives multiple messages and uses some logic to combine them into a single message. The programmer implements the aggregation logic by extending a class provided by the MIF API.

Sample Overview

In this sample, the client sends two separate strings, representing a person's first and last name, to a MIF pipeline which combines them into a single name. It does this by using a MifAggregator which receieves each string from the client and holds them in memory until it has received extactly 2 strings. Then, it combines the strings by passing them in a single message as a String array onto a MifProcessor, which concatenates the array's elements into a single string.

Aggregator Sample

In the above diagram, you can see the inner workings of the MIF module in this pipeline. The diagram shows how the aggregator and processor are combined to create the fullNameModule. It also shows how the client sends 2 separate messages to the module which are intercepted by the aggregator and stored in memory until both are received. It then passes along a single object to the processor, which does some additional work and sends the object on the module's outbound endpoint.

Running the Sample

To run this sample you need to first start the MIF pipeline, then run a client Publisher program that sends the 2 strings in succession.

To start the MIF pipeline, execute the following command from MIF_HOME:

bin/mif.sh -c gov.pnnl.mif.samples.aggregator.NameAggregatorDriver

This will produce the console message ”– MIF has started –”

Then, run the command to start the client:

bin/mif.sh -c gov.pnnl.mif.samples.aggregator.Publisher

This causes the publisher to send the string “John”, then send the string “Doe”. Then look for the following output in the MIF console:

NameAggregator.shouldAggregate? [John] ... false
NameAggregator.shouldAggregate? [John, Doe] ... true
NameArrayProcessor.listen: John Doe 

The first line of output shows that the aggregator received the first name, and decided not to aggregate (indicated by it pringing “false” to the console) because it had received only one string. The second line shows that the last name has been received at which point it decides to aggregate them into an array. Finally, the third line shows that the NameArrayProcessor has received the array representing the full name and combined it into a single string.

Code Walkthrough

Implementing a MifAggregator

To implement a MifAggregator, you need to extend the class AbstractMifAggregator. This class defines two abstract methods, the first of which is shouldAggregate method which is called whenever the module that this aggregator is attached to receives a message. The object passed in is a reference to all message payloads which have been received by this aggregator so far. It is the implementers job to return true when the list of payload objects should be aggregated.

boolean shouldAggregate(List<Serializable> payloadList)

When shouldAggregate returns true, MIF calls the aggregate method, which does the logic of combining the payloads into a single object and returning it. The returned object is then passed on to the MifProcessor that implements the module.

Serializable aggregate(List<Serializable> payloadList)

Constructing the Pipeline

The code that sets up the MIF pipeline is in the class NameAggregatorDriver. The main method for this class creates the MifPipeline object as usual and creates a MifJmsConnector so that the pipeline can communicate over JMS (for more information on the purpose and usage of connectors see this page). Then the fullNameModule is created with the line:

MifModule fullNameModule = pipeline.addMifModule(NameArrayProcessor.class, "jms://topic:NameTopic", "stdio://stdout");

This creates a module which is listening on the specified JMS topic and sends it output to standard out.

Then, the aggregator is created with another call to MifPipeline which specifies the NameAggregator class which we defined to extend AbstractMifAggregator:

MifAggregator nameAggregator = pipeline.addMifAggregator(NameAggregator.class);

Finally, the aggregator is set on the module and the pipeline is started:

fullNameModule.setAggregator(nameAggregator);
pipeline.start();

Below is the implementation of the NameAggregator class' shouldAggregate method. This method checks to see if the number of payloads received is equal to 2. If not, it returns false and the list is not aggregated. If the size of the list is equal to 2, it returns true

public boolean shouldAggregate(List<Serializable> payloadList) {
    System.out.print("NameAggregator.shouldAggregate? " + payloadList);
    if (payloadList.size() == 2) {
        System.out.println(" ... true");
        return true;
    }
    System.out.println(" ... false");
    return false;
}

The implementation of the aggregate method simply takes the list of strings and converts it to an array, then returns the array so that it can be processed by the MifProcessor.

public Serializable aggregate(List<Serializable> payloadList) {
    String[] arr = {};
    return payloadList.toArray(arr);
}
 
aggregator_sample.txt · Last modified: 2010/05/28 13:47 by adamw