Skip to Main Content U.S. Department of Energy

Integrating External Services

Imagine you have an existing, non-Java executable that you need to integrate into a MIF pipeline. It can't easily interface to Java, and certainly can't run as a component in the MIF container. You could custom-build a solution, but fortunately don't need to, the MIF has created a simple API to support this.

This page describes how to integrate an existing long running executable (written in any language) into a MIF pipeline. In this case, long running means that the program runs while the MIF application is active. So, the executable starts when the MIF starts, exchanges data with MIF Components via sockets or another suitable protocol, and then terminates when the MIF terminates. We'll demonstrate how to achieve this below using an example from an actual MIF application to illustrate how the API is used to integrate an executable into a MIF pipeline.

Overview

The basic element needed to add an existing executable to a pipeline is a MifExecModule. This is a module which encapsulates the commands needed to start and stop the executable as well as a description of the MifExecModule's endpoints. These endpoints are treated slightly differently than those on a regular MifModule because they cannot be changed, because the communication channels used by the existing executable are fixed. Therefore, the endpoints on a MifExecModule are said to be static.

These static endpoints need to be described in a standard way so that MIF components in the pipeline know how to exchange data with the executable. To describe a static endpoint, we use a StaticEndpointDescriptor. Once a MifExecModule has been created, it can be added to a component by extending the class AbstractMifExecComponent (which extends from the regular component type) and implementing the configure() and deploy() methods as usual.

Components vs. Executable Components

As we've already explained, a regular MIF component's endpoints are interchangeable: any endpoint type may be added by the person building a pipeline from existing components. This is possible because the endpoints on a MifModule are interchangeable. The image below represents a regular MIF component.

However, since a MifExecModule's endpoints are fixed, we have added convenience methods in the class AbstractMifExecComponent to create special lightweight endpoint adapters which are simply pass through MifModules. This maintains the flexibility of a MIF pipeline in which any component can be connected to any other component using any communication protocol. These adapters send data straight through from the inbound endpoint to the outbound endpoint without performing any transformations or processing on the data. In the image below, the black boxes represent static endpoints which cannot be changed.

Note that the use of the AbstractMifExecComponent is not required. You could also use a regular AbstractMifComponent as described in Base Component Model. However, you would have to manually set up the equivalent of pass through modules to allow the component's external endpoints to be set from outside the component.

Definitions

  • MifExecModule - A module whose implementation is an existing long running executable and has static (unchangeable) endpoints.
  • StaticEndpointDescriptor - Describes a static endpoint with the following fields:
    • tag: a handle for the endpoint which is used to match the static endpoint to the actual component endpoint being passed in by the pipeline builder.
    • dataType: identifies the data entering or exiting the endpoint
    • description: a free-form description of what kind of data is expected on the endpoint.
    • direction: whether the endpoint is inbound or outbound.
    • uri: describes the protocol and address of the endpoint. For example, a tcp endpoint may have the uri: tcp://servername.domain.com:12345.
  • AbstractMifExecComponent - A component which helps the programmer to adapt the static endpoints of an exec module so that the component's external endpoints can be set by the pipeline designer. Methods are provided in this class to automatically create these adapters.

Code Example

The following example is a single component taken from a distributed cyber-security application that we have built. In this application, the component – NuanceModelGenComponent – is responsible for generating a model of the received data which is passed on for display by a visualization program. The component encapsulated the functionality of a standalone program written in the functional programming language Haskell. This program continuously consumes a stream of data from a network sensor and emits a string representing the model fro display.

In this example, JMS endpoints are used to integrate the component into a MIF pipeline, but any other kind of endpoint could be used instead. The following image illustrates how tags are used by the MifExecComponent to match external endpoints with the static internal endpoints. Then, pass-through modules are automatically created to adapt the static endpoints to the endpoint types set by the pipeline creator.

Creating an executable component involves implementing configure() and deploy() methods just as needs to be done when creating a regular component as demonstrated in Base Component Model.

configure()

The configure method is responsible for setting up all the modules and internal endpoints for the component and has the following signature:

public void configure(MifPipeline pipeline) throws MifException {

First, a list of EndpointDescriptors is created. This will be passed to the MifExecModule when it is created and added to the pipeline.

  List<StaticEndpointDescriptor> endpDescriptors = new ArrayList<StaticEndpointDescriptor>();

Next the StaticEndpoint descriptors are created and added to this list. These explicitly describe the TCP sockets used by the Haskell program. The first descriptor describes an inbound endpoint identified by the tag “flo-in”. It accepts data from the flo sensor in a format of type ByteArray and listens for data on a TCP socket bound to the IP address corresponding to the host grove-1 on port 12000.

  StaticEndpointDescriptor inEndpDesc = new StaticEndpointDescriptor("flo-in", Direction.IN, "ByteArray", "Receives binary flo data", "tcp://grove-1:12000");
  endpDescriptors.add(inEndpDesc);

The next descriptor represents an outbound endpoint identified by the tag “models-out”. It sends strings which should be interpreted as model data to a TCP socket bound to the IP corresponding to grove-1 on port 32000.

  StaticEndpointDescriptor respEndpDesc = new StaticEndpointDescriptor("models-out", Direction.OUT, "string", "Sends model data", "tcp://grove-1:32000");
  endpDescriptors.add(respEndpDesc);

Then the module is created and added to the pipeline with the following method call. The arguments startCommand and stopCommand are class fields which represent the commands to start and stop the executable. When the pipeline is started, MIF calls startCommand to start the model generator. When the pipeline is stopped, MIF calls the stopCommand to terminate it.

  pipeline.addMifExecModule("NuanceModelGenerator", startCommand, stopCommand, endpDescriptors);

Finally, we call buildAdapterModule() to automatically create pass-through adapters as described above. The first method call creates an adapter for the inbound endpoint by supplying its descriptor. Behind the scenes, MIF is able to do this by matching the tag from the descriptor to the tag on the actual endpoint passed into the component.

  buildAdapterModule(pipeline, inEndpDesc); 
  buildAdapterModule(pipeline, respEndpDesc); 

}

deploy()

Now, a deploy() method must also be written which sets the external endpoints and adds the component to the pipeline.

public void deploy(MifPipeline pipeline) {

Since we are using TCP sockets, we must define a TCP connector by calling the appropriate pipeline method. The second argument to the method call below indicates that any socket using this connector should stay open after sending. Otherwise, the default behavior of opening and closing for each send will result. Since we have a streaming application, we set this value to true to improve performance.

  pipeline.addMifTcpConnector("nuanceTcpConnector", true);

Next, a mapping is created so that tags can be matched with endpoints that are passed into the component.

  Map<String,MifEndpoint> modelGenEndps = new HashMap<String,MifEndpoint>();
	

Then the inbound and outbound endpoints are created. In this case JMS endpoints are used to integrate the component into a MIF pipeline

  MifEndpoint modelGenFloIn = pipeline.addMifEndpoint("modelGenFloIn", EndpointType.JMS, "topic/FloInboundTopic");
  modelGenEndps.put("flo-in", modelGenFloIn);
  
  MifEndpoint modelResponseEndp = pipeline.addMifEndpoint("NuanceModelsOut", EndpointType.JMS, "topic/NuanceModelTopic");
  modelGenEndps.put("transformed-models-out", modelResponseEndp);

Finally, the endpoint mapping is set on the component and it is added to the pipeline.

  setEndpoints(modelGenEndps);
  pipeline.addMifComponent(this);
  
}
 
long_running_executables.txt · Last modified: 2008/02/11 12:12 by adamw