Skip to Main Content U.S. Department of Energy
 
Table of Contents

Using MIF Aggregators

Aggregators combine multiple messages from one or more sources into a single message. This can be used to collate the results of modules working in parallel or to reduce a high volume of messages into a single object.

Usage

To create a MIF aggregator, one must create a class extending the AbstractMifAggregator abstract class. This, in turn, requires implementing two methods, shouldAggregate and aggregate. Both of these methods take a single argument, a list of serializable objects called “payloadList”. This represents a pool of MifEvent objects remaining to be aggregated, grouped by the correlation ID set on each one.

The first method, shouldAggregate, is called each time a new message is received by the aggregator. Its return value is a boolean, representing whether or not that group of events contains a complete set that is ready to be aggregated into a single message. Typical actions done by this method include counting the number of messages in the event group (for aggregating the results of a certain number of parallel processes) or looking for a particular message's presence (for digesting messages arriving over a certain period of time).

The second method, aggregate, is called on a group of messages whenever shouldAggregate returns a true result for that group. It returns an object which represents the value of the objects in that group aggregated together. In an application distributing requests for airline fares, for instance, the return value might represent the lowest fair returned.

Example

This example shows a very simple aggregator which collects objects until a special TimerHeartBeat object is received, and at that point returns a count of the objects. In many applications, the aggregator might return the complete list of messages it has received as single collection object.

First, we need to extend the MifAggregator abstract class, so the object can be included in a MIF pipeline:

public class TimedMessageCounter extends MifAggregator {

Next we need to create the shouldAggregateEvents method. This will cycle through all of the messages sent to the aggregator so far, looking for a TimerHeartBeat object. If one is found, it will return true because its arrival signifies that it is time to count the messages. If none is found, it will return false.

public boolean shouldAggregateEvents(MifEventGroup mifEvents) {
  Iterator<MifEvent> itr = mifEvents.iterator();
  while(itr.hasNext()) {
    MifEvent e = itr.next();
    if(e.getPayload().getClass().equals(TimerHeartBeat.getClass())) 
      return true;
  }
  return false;
}

Finally, we need a doAggregateEvents method to count all of the objects received and return the tally. A little bit of extra logic must be completed to skip the TimerHeartBeat object itself, as it is an artifact of controlling the aggregator.

public Object doAggregateEvents(MifEventGroup mifEvents) {
  Iterator<MifEvent> itr = mifEvents.iterator();
  Integer count = new Integer(0);
  while(itr.hasNext()) {
    MifEvent e = itr.next();
    if(!e.getPayload().getClass().equals(TimerHeartBeat.getClass())) 
      count++;
    return count;
  }
}
 
using_aggregators.txt · Last modified: 2009/08/27 15:39 by d3x072