How to Write a Source

Source SystemObjects receive events external to the system, often in an asynchronous manner. Due to the unpredictable arrival rate of events, a source must be threaded, depending on the input parameter for the setEventProcessor() call you make in the SystemObject’s setVars() method. A source is responsible for converting a given protocol to EventGnosis’ proprietary, normalized Event format (basically a small XML document or property sheet). This document is then routed successively to the next SystemObject in the ECA component topology, perhaps modified and finally sent out of the system.

The following steps need to be performed when writing a typical EventGnosis Source. It is assumed that you do understand, in detail, the protocol for the Source you are writing.

  1. Extend the abstract class EventSourceListenerBase, which itself extends SystemObject and implements the InputEvents interface. Since a Source is a SystemObject, you inherit significant functionality, and your new object will be part of the startup and runtime object hierarchy automatically, given that this source is active and configured into an ECA. This behind-the-scenes work is performed by the framework and your job is to attach it correctly.
  2. Exception: for some protocols, it is possible to have multiple listeners for a single instance of a server using the same configuration values for your input protocol (for example TCP-IP sockets). If this is true, you will need to add another class that is a slave of this object (see example XMLTCPIPSocketListenerSlave and XmlTCPIPSocketListener). This parent object will be responsible for receiving the connection requests and instantiating a new slave object for each new connection running on the same port. It must also have its own thread started in doWork(), implement the Runnable interface and a run() method. Its getNextEvent() method should return null and actual event processing is deferred to the slave implementation.
    Typical case: Otherwise, when there is only one connection instance per set of unique configuration parameters, there is no need for a slave object (for example, there is no need for a slave for a FromLogListener which may have multiple instances, each listening to a different file based on different configuration). In this (typical) case, the getNextEvent() method from the InputEvents interface must be implemented, and a ThreadedEventGenerator must be set as the event processor in setVars().

  3. Come up with a terse English text description of your source with configuration parameters between the ‘%’ modeled after the sourceType examples in EV_HOME/config/extra/emptyECA.xml. Any text surrounded by ‘%’ will be assumed to be a parameter by the GUI and will be presented to the user as a configuration parameter, validated and stored as a configuration value. Every parameter should have a corresponding parameterType definition in the EMML (XML) file. Finally, this parameter will be parsed by the ECS Configuration Manager at startup and used by the particular source to set its configuration in its setVars() method.
    Example XML:
    <sourceType description="Receive EventGnosis ECS events on network interface %Host% using %Port%." objectId="EcsTcpEventReceiver" schema="">
         <implement class="com.eventgnosis.sources.XmlTCPIPSocketListener" source="ecs.jar" type="Java" /> 
    </sourceType>

    This XML snippet should be placed in EV_HOME/config/extra/emptyECA.xml, the source of configuration information for the GUI and the ECS. Note that this example has two parameters (host & port), which will be set and validated by the GUI when a user selects an “EcsTcpEventReceiver” source. You will also need to set the path to your implementation
    (here com.eventgnosis.sources.XmlTCPIPSocketListener) and its jar. Typically, you could create an addon jar that is separate from the ecs.jar and is specific to your company. This new jar must, of course, be in the classpath of the ECS for it to be recognized.

  4. Add any specific attributes (private class variables) that are needed to retain any state information for your protocol to the implementing class, as in the private attributes at the top of the class com.eventgnosis.sources.XmlTCPIPSocketListener.java.

  5. Implement setVars(). First call the superclass method, extracting each ECA configuration parameter into your Source attributes for this instance of the protocol. Examples of configuration parameters for the XMLTCPIPSocketListener are port number and interface name. Note that each parameter set by the GUI can be extracted using the object EmmlConfig (eCfg) object using method getParameter( name, instance# ), where name is the name of the parameter from its XML description and instance number is a simple index of which configuration instance (there can be more than one with the same name) starting from zero. The first instance is zero, the next one, and so on. Note that getParameter(…) returns a String. If the Source’s class attribute is not of type java.lang.String it must be converted to its requisite type. Conversion utilities for the most common types can be found in com.eventgnosis.util.Util.java. Furthermore, some types set by the GUI are not mere primitives but are composite types, such as the type LoginInfo, which includes user name and password. A composite type can be referred to in a SystemObject description, but it must then be defined with a parameterType XML snippet in ECA so that the GUI can understand the composition:
    <parameterType description="%Login%" objectId="Login">
         <implement class="com.eventgnosis.sources.LoginInfo" source="ecs.jar" type="Java" /> 
    </parameterType>

    If composite types are utilized, they must of course be implemented, added into your jar and the usual information specified in the “implement” section so that the ECS can locate your code.
    Examples of such implementations are com.eventgnosis.sources.LoginInfo, and its use in setVars() in com.eventgnosis.sources.EmailSlave. In summary, each Source attribute must be carefully processed in setVars() using the following steps:

    1. Get the parameter instance as a string specified by name and index.
    2. Convert to the requisite class attribute type, if it’s not a Java String.
    3. Check for conversion errors, and if so, log errors and consider disabling the entire object for fatal values or applying a default value.
    4. Implement the consistency rules between parameters and error out if there are serious inconsistencies.
  6. Implement connect() using the class attributes you just set. Implement these methods according to the needs of your particular protocol, knowing that this method alone is responsible for initiating the connection to the external event source. Log errors using logging conventions such as err, warn or info to indicate severity. These messages are very useful for debugging problems. Finally, note that connect() will not be explicitly called by the framework. The suggested method for establishing a connection is to use a class attribute that assumes an illegal value (such as null) until the connection sequence has succeeded. Place the call to connect() in the getNextEvent() method surrounded by a check for this illegal value so that the connection logic is only performed when there is no connection, or if a prior connection has gone bad and a new connection initialization sequence needs to happen.
    Example:
     
    if( connection == null ) {
    	connection =  {do connection logic }
    }
  7. Implement disconnect() using the class and connection attributes. Implement this method according to the needs of your particular protocol, knowing that this method is responsible for terminating the connection to the external event source and releasing system resources. This method should also be called if the ECS exits in a controlled manner.

  8. Implement getNextEvent(), probably the most important method in a source. It reads your specific protocol stream, converting it to the EventGnosis-specific internal format. This method is continually called by the framework, which has the effect of pulling events into the ECS, given your particular protocol configuration instance. For an example of a good getNextEvent() implementation see
    com.eventgnosis.sources.XMLTCPIPSocketListenerSlave.java.

    Overall, its responsibilities are the following:
    • Monitor the health of the inbound connection (may not be necessary in many circumstances), forcing reconnection if necessary.
    • Format the incoming protocol-specific stream into discrete EventGnosis events, returning one for each getNextEvent() call.
    • Continue to check if the ECS is still running via the SystemObject. continueOperation() method.
  9. Implement shutdown() and kill(), methods that are used to perform cleanup operations when the ECS is shutdown gracefully or killed.

  10. Implement getProtocolName(), a simple method that returns a string naming the particular protocol.

  11. Implement getConfigXml(), which is a snapshot of configuration state.

  12. Implement getRuntimeXml(), which is a snapshot of runtime state.

  13. Implement toString(), another method that shows object state which is useful for debugging.