This section discusses some useful, general design guidelines that should be borne in mind when designing MIF components and pipelines. MIF applications are essentially instances of the general pipeline design pattern and are based on asynchronous messaging middleware 1). Hence if you're familiar with designing such applications, you'll know much of the following already.
The MeDICi technology is being designed as a general purpose platform for analytical applications. In this spectrum, we expect there to be two extreme use cases that have somewhat different design issues. These are:
- Streaming applications: Relatively small messages (~1-100KB) arrive frequently (10s-1000s per second), and need processing quickly by the components in the pipeline. Often, arriving messages will be aggregated into groups based on such parameters as arrival time, and processed as a batch by downstream components. Many input messages are processed concurrently, and hence components must execute quickly (milliseconds to seconds) and can be considered fine-grained. Sensor-networks and network traffic processing applications are typical data streaming applications.
- Workflow applications: An application pipeline performs a series of complex tasks, often passing large data sets between the components in the pipeline. Individual components may take many seconds to hours or days to execute. Hence the pipeline is typically processing only a small number of concurrent input requests, and the individual components are coarse grained, complex codes and typically execute on their own dedicated hardware resources in a distributed environment. Scientific workflow applications are typical of this class of workflow applications.
Bear these in mind when you think about designing MIF pipelines.
MIF components must be designed to integrate in to a pipeline architecture. This means they have one or more inputs, and generate zero or more outputs based on these inputs and potentially external context/data that they access. Some example component design patterns are as follows:
- In the simplest case, many components will wait for a single input message to arrive, process it, and produce a single output. In our experience designing applications, this is a very common design pattern.
- A component may receive a single input message, process it, and produce multiple output messages, either on one or multiple endpoints. If there are multiple endpoints, this component effectively distributes work to multiple downstream components.
- A component receives multiple inputs on either one or multiple endpoints. The component stores inputs until it has sufficient to process and produce its (one or more) outputs. We commonly see components that act as aggegrators, that produce a single output (often a statistical signature or some form of compressed summary) from multiple input messages.
Component and pipeline designers need to think carefully about the memory that the components in a pipeline consume. In general, there are two types of components, namely:
- stateless: Stateless components simply input messages, process them, and produce outputs. They maintain no memory footprint when they are not active. In general this is a good thing, as their memory utilization is temporary (while doing work) and not accumulative, as all the memory used to process the input message is released as soon as the corresponding output message is sent. Stateless components also scale very well, as they can be trivially replicated and distributed. So if you can make a components stateless, this is a good thing for many reasons.
- stateful: Stateless components aren't always possible. For example, a component that needs to output the most frequently occurring words in a series of text messages needs to maintain some sort of hash table with word counts. And this data structure will grow as new words are detected. The hash table is therefore the state that a component maintains in between processing individual inputs.
Obviously memory is a finite resource, and the memory available to a MIF pipeline depends on the heap configuration of the Java Virtual Machine that hosts the MIF container. If a stateful Java component is running within the MIF container, and its memory usage continually grows, it could eventually consume all the memory available to the MIF container. This would cause the pipeline to fail. And this kind or memory exhaustion might not be caused by a single stateful component, but by a combination of stateful and stateless components composed in the same pipeline. Needless to say, such memory errors are darn tricky to diagnose.
The only way to be certain that such errors will not occur is 'take the state out' of stateful components that can potentially use large amounts of memory. Doing this means basically putting the state outside of the component's memory. Common ways to do this are to store the state in a database, or a file, or in another, potentially distributed component. Adopting one of these strategies will make your components stateless, with predictable memory usage, and hence greatly reduce the chances of your long-running applications failing unpredictably due to memory exhaustion. (As a side effect, it also makes it easier to scale your application, as your components are stateless and can be replicated. Of course, you have to make sure access to shared state doesn't become a bottleneck, but hey, building scalable high performance applications is never easy!)
Another strategy is to make a local MIF component a remote component. Remote components execute outside the MIF container in their own address space, which they can use to their heart's content without interfering with the other components in a pipeline.
Components in a pipeline communicate by passing messages. One component creates an output message, and passes it to the MIF over an endpoint. The MIF then passes this messages to corresponding input endpoint of the downstream component. Passing a message through the MIF container incurs object copy overheads at the minimum. If your endpoint is implemented with a JMS topic, you have the JMS overheads too.
Large messages, particularly of the order of multiple megabytes, can create problems. There's obviously a limit to how many large messages you can be passing down a pipeline before memory issues arise. Also, copying large messages, maybe sending them over JMS endpoints or to remote distributed components, all slows down performance and might cause parts of the MIF infrastructure to fail as some limits are exceeded (e.g. every JMS implementation has a limit on the maximum message size it can handle).
It's wise therefore to avoid passing large messages directly between components. One solution is to store the message payload in the MIF cache, and pass a cache reference between components. The cache will provide performance and reliability improvements for large messages, especially if they are read-only (ie some/all components in the pipeline only read them, not write too). Writing large messages to files, or perhaps a database, is another possible strategy.
As always, application design involves making trade-oof between performance, scalability and reliability. There's no free lunches here.
Future plans (we'd happily accept further funding donations!) involve us creating a high performance, asynchronous and intelligent 'data moving service'. This will be used to transfer large messages and very large files between components using some efficient copying mechanism. This will improve application performance and reduce resource utilization.
MIF pipelines are basically instances of queued applications. Any queued application's performance is limited by the slowest step in the pipeline. Many pipelines can sped up by simply improving the performance of the slowest steps. This is probably the first place to look when you are tuning your application.
The MIF container aims to be as 'frictionless' as possible, but as already noted, it introduces overheads every time a message is passed over an endpoint. Often, when you're using existing components in a pipeline that you didn't create, you can't do much about this - it's the price you pay for not having to create the component yourself.
When you're designing pipelines and components yourself, think about minimizing message passing passing between components by creating coarse grain processing units. And remember the points above about memory usage and message size and passing strategies.
If you find your pipeline isn't providing the necessary performance, you can distribute it. Consider first making remote components from compute-intensive local ones and executing the remote components on their own hardware. You can also split or replicate a MIF pipeline and run multiple instances on multiple nodes. We haven't formally put any features into the MIF to support distributed and replicated pipelines, but hope to in the future. However, 'rolling your own' solution is very straightforward, and we've done it several times in applications, ie:
- replicated pipelines: using stateless components, run multiple pipeline instances on multiple machines and write a load-balancer to distribute the incoming messages across the instances. If you need to process the outputs of the pipelines in the order they arrived, then you'll need something that takes the replica pipeline outputs and makes sure they are processed in the correct order.
- partitioned pipelines: basically this involves splitting the pipeline in to two or more sub-pipelines, and running each sub-pipeline on its own node. The endpoints that connect the sub-pipelines will need to be realized using JMS or some other suitable distributed communications protocol.