Skip to Main Content U.S. Department of Energy

Chat Traffic Analysis Example

This page describes a more complete MIF example that utilizes many of the common MIF constructs. The example application is a set of components that are combined to analyze Internet chat messages flowing across the network.

The following describes:

  1. An overview of the chat sample included with the MIF.
  2. How to run the chat sample.
  3. A walkthrough of the code used to construct this sample.

Prerequisites:

Overview of chat

The chat example expands upon the core ideas of the helloWorld example and presents a more realistic sample that uses multiple modules and MIF constructs. This sample should help bring together ideas from other samples into one cohesive application. Key concepts covered are:

  • MIF Components
  • MIF Modules
  • Package structure
  • Aggregators
  • Routing
  • Connectors

Description

When the pipeline starts, the main program initializes the pipeline and then simulates an external process that is pulling chat messages off of a network and inserting them into the pipeline via JMS. From there, the Ingest module takes a line of chat data and parses it into an object (MapWrapper) that is utilized throughout the rest of the pipeline. From the Ingest Module, separate copies of the data (in the form of a MapWrapper) are routed to 3 parallel processing modules. The actual logic of the processing is delegated to the chat-specific code called by the MIF wrapper classes and is beyond the scope of this description. Next, an aggregator combines the 3 resulting data objects into one message that is forwarded outside the pipeline for display, in this example's case, to the console.

Chat component

Running chat

Note: $MIF_HOME refers to the MIF installation directory

Follow these steps:

  • Open a shell, navigate to $MIF_HOME/bin
  • Execute the following
run-mif.bat -c gov.pnnl.mif.samples.chat.ChatComponentDriver
  • MeDICI has fully started when the console displays something similar to:
MeDICI has started
  • The console output then displays all of the various chat messages after they've gone through the analysis. Below is an example:
2007-11-06 14:12:27,371 INFO  [Keyword] 16 : jpherro
18:19:26 to 18:19:26
UNM will get a multihop BGP over Abilene to CHIC.  they want to mcast to the showfloor.  less than 1G of traffic
2
2007-11-06 14:12:27,379 INFO  [Affect] 12 : 140.221.241.248
18:19:26 to 18:19:26
click once on the desktop
3
2007-11-06 14:12:27,384 INFO  [Blackout] 16 : jpherro
18:19:26 to 18:19:26
UNM will get a multihop BGP over Abilene to CHIC.  they want to mcast to the showfloor.  less than 1G of traffic
2
CHAT RESULT:16 : jpherro
18:19:26 to 18:19:26
UNM will get a multihop BGP over Abilene to CHIC.  they want to mcast to the showfloor.  less than 1G of traffic
2

Creating the Chat Pipeline

All the source files for this example can be found in $MIF_HOME/sources/mif-samples/src/gov/pnnl/mif/samples/chat. The following code snippets demonstrate the pertinent portions of the code. The code is presented in a “top-down” manner where we present the higher level code first and move progressively down to the implementation details.

Setup Pipeline (main)

First, we need to create a MifPipeline which is needed to start and stop the processing pipeline. MifPipeline is also used to create and register all the objects that run within the pipeline.

  MifPipeline pipeline = new MifPipeline();

Next, we create and add a JMS connector, giving it the server address and the name of the server which it'll be using (in this case, ActiveMQ).

  pipeline.addMifJmsConnector("tcp://localhost:61616", JmsProvider.ACTIVEMQ);

Next, we create the ChatCompenent object, assign the endpoints and add it to the pipeline, as well as start the pipeline so that it's ready to start receiving messages (the heavy lifting of pipeline configuration is inside the component and discusses below).

  ChatComponent chat = new Chatcomponent();
  chat.setInEndpoint("jms://topic:ChatDataTopic");
  chat.setOutEndpoint("stdio://stdio?outputMessage=CHAT RESULT: ");
  pipeline.addMifComponent(chat);
 
  pipeline.start();

Finally, there is a utility method to call that simulates a stream of chat messages flowing into the pipeline over JMS by reading a file of chat messages and sending them into the pipeline:

  simulateChatStream();

Chat Component

The Chat Component is the primary mechanism for wiring the applications pieces together to produce a useful result. Below is a walkthrough of the various modules and the “configure” method.

First, we set the endpoints specified in the code that calls these methods (ChatComponentDriver.java in this case), which were setup to specify the input and output data flows in this example:

  public void setInEndpoint (String inEndpoint) {
   this.inEndpoint = inEndpoint;
  }
 
  public void setOutEndpoint (String outEndpoint) {
   this.outEndpoint = outEndpoint;
  }

Next, we add the Ingest Module which is responsible for taking a chat message in the form of a String and parsing it into a data structure (MapWrapper) to be used by all of the downstream processing modules. The slightly tricky part of this code is that there are multiple outbound endpoints since the outgoing message will be routed to 3 processing modules (Affect, Blackout, and Keyword).

  //first, setup outbound endpoints for each of the processing modules (this is essentially splitting the incoming message)
  //construct the ingest module and add the extra outbound endpoints
  MifModule ingestModule = pipeline.addMifModule (Ingest.class.getName(), inEndpoint, "vm://ingest.keyword.queue");
  ingestModule.addOutboundEndpoint("vm://ingest.affect.queue");
  ingestModule.addOutboundEndpoing("vm://ingest.blackout.queue");

Next, the processing modules are wired in by creating inbound endpoints that match the “path” name of the ingest's outbound endpoints (“ingest.keyword.queue” in this case/ the creation of endpoints is done in the addMifModule method of the pipeline class). This is the Keyword Module, but the others work similarly so we'll leave those out of this description.

  //KEYWORD Module
  pipeline.addMifModule(Keyword.class.getName(), "vm:ingest.keywourd.queue", "vm://keyword.queue");

Finally, the last step of the component configuration is to aggregate the results of the processing modules into one message and forward it outside of the component using the outbound endpoint. First, set up all of the inbound endpoints to aggregate on. Create the aggregator module as a place holder for the actual aggregator construct. Then add the aggregator to the pipelinea nd assign it to the module itself.

  MifModule chatAggregateModule = pipeline.addMifModule(ChatAggregate.class.getName(), "vm://keyword.queue", outEndpoint);
  chatAggregateModule.addInboundEndpoint("vm://affect.queue");
  chatAggregateModule.addInboundEndpoing("vm://blackout.queue");
 
  MifAggregator chatAnalysisAggregator = pipeline.addMifAggregator(new ChatAnalysisAggregator());
  chatAggregateModule.setAggregator(chatAnalsysiAggregator);

ChatAnalysisAggregator

The ChatAnalysisAggregator is responsible for coordinating the messages coming out of the 3 processing modules and combining them into one message. This is accomplished by using a correlation ID (i.e. unique ID for each message that was assigned at the Ingest stage) and counting that all 3 messages have been received (1 for each processing module). See using aggregators for details of what is necessary to implement an aggregator.

Implementation code (Modules)

For this sample application, all processing modules are written in Java and wrap Chat-specific libraries that have the actual logic of the application. Below is an example of one of the processing modules written in Java. The BlackoutProcessor implements the functionality of the BlackoutModule, which is a very common example of utilizing a wrapper class to call the “real” logic (usually in a library or jar). In this case, the code simply delegates to an application-specific method blackout.processContentAnalysis(message);. Additionally, this example shows how properties are set (through getters/setters) and how to read data files off of the classpath. Finally, the input/output data types are specified by getInputType() and getOutputType(), which both return “gov.pnl.tvis.sift.MapWrapper” in order to specify the concrete type of the argument and return value of the listen() method.

public class Blackout implements MifInOutProcessor {
  Logger log = Logger.getLogger(Blackout.class);
  private String pathToBlackoutFile = "blackout.txt";
  private static BlackoutId blackout = null;
 
  public Blackout() {
    initBlackout();
  }  
 
  public Serializable listen(Serializable input) {
    MapWrapper data = (MapWrapper) input;
    HashMap message = data.getMap();
 
    if(blackout != null){
      blackout.processContentAnalysis(message);
    }
 
    return new MapWrapper(message);
  }
 
  public String getInputType() {
    return MapWrapper.class.getName();
  }
 
  public String getOutputType() {
    return MapWrapper.class.getName();
  }
 
  /**
   * Used to get/set properties on module.
   */
  public String getPathToBlackoutFile() {
    return pathToBlackoutFile;
  }
  /**
   * Used to get/set properties on module.
   */
  public void setPathToBlackoutFile(String pathToKeywordFile) {
    this.pathToBlackoutFile = pathToKeywordFile;
    initBlackout();
  }
 
  /**
   * Utility method to retrieve the blackout list from a text file
   * located on the classpath.
   */
  private void initBlackout() {
    if(pathToBlackoutFile != null && blackout == null) {
      URL url = Thread.currentThread().getContextClassLoader().getResource(pathToBlackoutFile);
 
      // ASW: this fixes issue of url with spaces being used as a file
      // on windows platform.  Basically, turn back to a real space.
      String fileStr = url.getFile();
      fileStr = fileStr.replaceAll("%20", "\\ ");
 
      log.info("path to blackout file: " + fileStr);
      blackout = new BlackoutId(fileStr);
    }
  }
}

Summary

This page demonstrates many of the constructs of the MIF and the Base Component Model and demonstrates how to use them in a real-world application.

 
chat_cyber_security_example.txt · Last modified: 2010/06/02 09:44 by adamw