LMAX Spring Integration Java Example

LMAX is High Performance Inter-Thread Messaging Library which can be use to any application where process of many trades needs low latency. Please read Martin Fowler blog about LMAX architecture and github code for its library.

You could also refer Getting started tutorial example to understand how it can be use.

In this tutorial you will see how to integrate this library with Spring framework. We will create Maven project LMAXSpringIntegration below is its structure:

Tools needed:

  • JDK1.8
  • Eclipse (Any latest version)
  • LMAX jars

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javahonk</groupId>
	<artifactId>LMAXSpringIntegration</artifactId>
	<version>1.0-SNAPSHOT</version>

	<properties>
 		<maven-compiler-plugin.version>3.0</maven-compiler-plugin.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<org.springframework.version>4.2.3.RELEASE</org.springframework.version>	
	</properties>
	
	<dependencies>    

		<!-- Spring -->
		<!-- Core utilities used by other modules. Define this if you use Spring 
			Utility APIs (org.springframework.core.*/org.springframework.util.*) -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Expression Language (depends on spring-core) Define this if you use 
			Spring Expression APIs (org.springframework.expression.*) -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-expression</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Bean Factory and JavaBeans utilities (depends on spring-core) Define 
			this if you use Spring Bean APIs (org.springframework.beans.*) -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Aspect Oriented Programming (AOP) Framework (depends on spring-core, 
			spring-beans) Define this if you use Spring AOP APIs (org.springframework.aop.*) -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-aop</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Application Context (depends on spring-core, spring-expression, spring-aop, 
			spring-beans) This is the central artifact for Spring's Dependency Injection 
			Container and is generally always defined -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Various Application Context utilities, including EhCache, JavaMail, 
			Quartz, and Freemarker integration Define this if you need any of these integrations -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context-support</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- JDBC Data Access Library (depends on spring-core, spring-beans, spring-context, 
			spring-tx) Define this if you use Spring's JdbcTemplate API (org.springframework.jdbc.*) -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Object-to-Relation-Mapping (ORM) integration with Hibernate, JPA, 
			and iBatis. (depends on spring-core, spring-beans, spring-context, spring-tx) 
			Define this if you need ORM (org.springframework.orm.*) -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-orm</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Support for testing Spring applications with tools such as JUnit and 
			TestNG This artifact is generally always defined with a 'test' scope for 
			the integration testing framework and unit testing stubs -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>${org.springframework.version}</version>
			<scope>test</scope>
		</dependency>
		
		<!-- Spring Tx -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-tx</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- dom4j -->
		<dependency>
			<groupId>dom4j</groupId>
			<artifactId>dom4j</artifactId>
			<version>1.6.1</version>
		</dependency>
		
		<!-- apache commons -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.3.2</version>
		</dependency>
		
		<!-- commons-io -->		
		<dependency>	
			<groupId>commons-io</groupId>
  		  	<artifactId>commons-io</artifactId>
  		  	<version>2.4</version>
		</dependency>
			
		<!-- commons collections -->
		<dependency>
			<groupId>commons-collections</groupId>
			<artifactId>commons-collections</artifactId>
			<version>3.2.1</version>
		</dependency>	
		
		<!-- commons logging -->
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.2</version>
		</dependency>

		<!-- javassist -->
		<dependency>
			<groupId>org.javassist</groupId>
			<artifactId>javassist</artifactId>
			<version>3.18.1-GA</version>
		</dependency>
		
		<!-- jboss logging -->
		<dependency>
			<groupId>org.jboss.logging</groupId>
			<artifactId>jboss-logging</artifactId>
			<version>3.3.0.Final</version>
		</dependency>
		
		<!-- Log4j -->
        <dependency>
        	<groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.1</version>
		</dependency>
        
        <dependency>
        	<groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.1</version>
		</dependency>
		
		<!-- Jaspyt -->
		<dependency>
			<groupId>org.jasypt</groupId>
			<artifactId>jasypt</artifactId>
			<version>1.9.2</version>
		</dependency>
		
		<dependency>
			<groupId>org.jasypt</groupId>
			<artifactId>jasypt-spring2</artifactId>
			<version>1.9.2</version>
		</dependency>
		
		<!-- Disruptor -->
		<dependency>
		    <groupId>com.lmax</groupId>
		    <artifactId>disruptor</artifactId>
		    <version>3.3.4</version>
		</dependency>
		
		
    </dependencies>

	<build>				
 		
		<resources>
			<resource>
				<directory>src/main/config</directory>
				<excludes>
					<exclude>**/*.xml</exclude>
					<exclude>**/*.properties</exclude>
				</excludes>
			</resource>
		    <resource>
				<directory>src/main/resources</directory>
				<excludes>
					<exclude>**/*.xml</exclude>
					<exclude>**/*.properties</exclude>
				</excludes>
			</resource>
		</resources> 
		
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-dependency-plugin</artifactId>
				<version>2.8</version>
				<executions>
					<execution>
						<id>copy-dependencies</id>
						<phase>package</phase>
						<goals>
							<goal>copy-dependencies</goal>
						</goals>
						<configuration>
							<outputDirectory>${project.build.directory}/lib</outputDirectory>
							<overWriteReleases>false</overWriteReleases>
							<overWriteSnapshots>false</overWriteSnapshots>
							<overWriteIfNewer>true</overWriteIfNewer>
						</configuration>
					</execution>
				</executions>
			</plugin>

			<plugin>
			    <groupId>org.apache.maven.plugins</groupId>
			    <artifactId>maven-jar-plugin</artifactId>
			    <version>2.6</version>
			    <configuration>
			        <archive>
			            <manifest>
			                <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
			            </manifest>
			            <manifestEntries>
			                <git-SHA-1>${buildNumber}</git-SHA-1>
			            </manifestEntries>
			        </archive>
			    </configuration>
			</plugin>
			
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<executions>
					<execution>
						<id>tar</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
				<configuration>				
					<descriptors>
				        <descriptor>assembly.xml</descriptor>
				    </descriptors>
				</configuration>
			</plugin>

 			<plugin>
			    <groupId>org.codehaus.mojo</groupId>
			    <artifactId>buildnumber-maven-plugin</artifactId>
			    <version>1.3</version>
			    <executions>
			        <execution>
			            <phase>validate</phase>
			            <goals>
			                <goal>create</goal>
			            </goals>
			         </execution>
			    </executions>
			</plugin> 
		</plugins> 
		
	</build>

	<scm>
	    <connection>scm:git:https://github.com/javahonk/LMAXSpringIntegration.git</connection>
	</scm>

	
</project>
  • log4j2.xml:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" shutdownHook="disable">

	<Properties>
		<Property name="envrionment.target">DEV</Property>
	</Properties>

	<Properties>
		<Property name="logging.dir">./</Property>
	</Properties>

	<Appenders>
		<Console name="Console" target="SYSTEM_OUT">
			<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
		</Console>

		<RollingFile name="RollingFile"
			fileName="${sys:logging.dir}/logs/${sys:environment.target}/LMAX.log"	filePattern="${sys:logging.dir}/logs/Orion-%d{yyyy-MM-dd}-%i.log">
			<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
			<!-- TODO:Change to time based policy -->
			<Policies>
				<TimeBasedTriggeringPolicy interval="1"	modulate="true" />
				<SizeBasedTriggeringPolicy size="100 MB" />
			</Policies>
			<DefaultRolloverStrategy max="4" />
		</RollingFile>
	</Appenders>

	<Loggers>
		<Root level="info">
			<AppenderRef ref="Console" />
			<!-- <AppenderRef ref="file" /> -->
			<AppenderRef ref="RollingFile" />
		</Root>
	</Loggers>
</Configuration>
  • javahonk.properties:
pid_file=..\\logs\\pid.txt
smtp_host=localhost
num_available_threads=16
ring_buffer_size=4096
number_lock_stripes=16
  • javahonk-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd">

    <context:annotation-config />
    <context:component-scan base-package="com.javahonk" />

    <util:list id="propertiesList" value-type="java.lang.String" >
        <value>javahonk.properties</value>
    </util:list>

    <bean class="org.jasypt.spring.properties.EncryptablePropertyPlaceholderConfigurer">
        <constructor-arg ref="encryptor" />
        <property name="locations" ref="propertiesList" />
        <property name="ignoreUnresolvablePlaceholders" value="true"/>
        <property name="ignoreResourceNotFound" value="true"/>
    </bean>

    <bean id="encryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
        <property name="algorithm" value="PBEWithMD5AndDES" />
        <property name="password" value="otc_decryption_key" />
    </bean>

    <bean id="propertiesBean" class="org.springframework.beans.factory.config.PropertiesFactoryBean" >
        <property name="locations" ref="propertiesList" />
        <property name="ignoreResourceNotFound" value="true" />
    </bean>
    
    <bean id="disruptorPoolExceptionHandler" class="com.javahonk.DisruptorPoolExceptionHandler" />
	
	<bean id="messageManager" class="com.javahonk.MessageManager" />
	
	<bean id="messageProcessor" class="com.javahonk.MessageProcessor" scope="prototype"/>
</beans>
  • DisruptorPoolExceptionHandler.java:
package com.javahonk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.lmax.disruptor.ExceptionHandler;

public class DisruptorPoolExceptionHandler implements ExceptionHandler<MessageEvent>{

	private static final Logger logger = LogManager.getLogger(DisruptorPoolExceptionHandler.class);
	
	@Override
	public void handleEventException(Throwable arg0, long arg1,
			MessageEvent arg2) {
		
		logger.error("Exception processing message {}", arg2.getMessage().getMessageID(), arg0);
	}

	@Override
	public void handleOnShutdownException(Throwable arg0) {
		logger.error("Exception On Shutdown", arg0);
		
	}

	@Override
	public void handleOnStartException(Throwable arg0) {
		logger.error("Exception On StartUp", arg0);
	}
	
	

}
  • JavaHonkMessage.java:
package com.javahonk;


public class JavaHonkMessage {

	private String messageID;	
	private String messageType;	
	private String commandTarget;
	
	public JavaHonkMessage(String messageID, String messageType,
			String commandTarget) {
		this.messageID = messageID;
		this.messageType = messageType;
		this.commandTarget = commandTarget;
	}
	public String getMessageID() {
		return messageID;
	}
	public void setMessageID(String messageID) {
		this.messageID = messageID;
	}
	public String getMessageType() {
		return messageType;
	}
	public void setMessageType(String messageType) {
		this.messageType = messageType;
	}
	public String getCommandTarget() {
		return commandTarget;
	}
	public void setCommandTarget(String commandTarget) {
		this.commandTarget = commandTarget;
	}
	@Override
	public String toString() {
		return "JavaHonkMessage [messageID=" + messageID + ", messageType="
				+ messageType + ", commandTarget=" + commandTarget + "]";
	}
	
}
  • MessageEvent.java:
package com.javahonk;

import com.lmax.disruptor.EventFactory;

public class MessageEvent {
	
	private JavaHonkMessage message;	

    public JavaHonkMessage getMessage()
    {
        return message;
    }
    
    public void setMessage(JavaHonkMessage message)
    {
        this.message = message;
    }

    public static final EventFactory<MessageEvent> EVENT_FACTORY = new EventFactory<MessageEvent>()
    {
        public MessageEvent newInstance()
        {
            return new MessageEvent();
        }
    };

}
  • MessageManager.java:
package com.javahonk;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import javax.annotation.PostConstruct;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;

public class MessageManager {

	private WorkerPool<MessageEvent> workerPool;
	
	private RingBuffer<MessageEvent> ringBuffer;
	
	@Value("${num_available_threads}")
	private int threadCount;
	
	@Value("${ring_buffer_size}")
	private int ringBufferSize;
	
	private SequenceBarrier sequenceBarrier;
	
	private ExecutorService executorService;
	
	private MessageProcessor[] processors;
	
	private BlockingQueue<JavaHonkMessage> overflowQ = new LinkedBlockingQueue<>();
	
	private Thread overflowQThread = new Thread(new OverflowManager());
	
	@Autowired
	private DisruptorPoolExceptionHandler disruptorPoolExceptionHandler;
	
	@Autowired
	private ApplicationContext applicationContext;
	
	private static final Logger logger = LogManager.getLogger(MessageManager.class);
	
	@PostConstruct
	private void setupMessageManager(){
		
		ringBuffer = RingBuffer.createMultiProducer(MessageEvent.EVENT_FACTORY, ringBufferSize);
		
		sequenceBarrier = ringBuffer.newBarrier();
		
		processors =  new MessageProcessor[threadCount];
		
		for(int i = 0; i<threadCount; i++){
			processors[i] = applicationContext.getBean(MessageProcessor.class);
		}
		
		workerPool = new WorkerPool<MessageEvent>(ringBuffer, sequenceBarrier, 
				disruptorPoolExceptionHandler, processors);
		
		ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
		
		executorService = Executors.newFixedThreadPool(threadCount);
		
	}
	
	public void startMessageManager(){
		
		overflowQThread.start();
		
		ringBuffer = workerPool.start(executorService);
	}
	
	public void process(JavaHonkMessage message){
		
		try{
			if(ringBuffer.remainingCapacity() > 0){
				publishToBuffer(message);
			}
			else{
				overflowQ.put(message);
			}
		}
		catch(Exception e){
			logger.error("Exception processing message {}", message.getMessageID(), e);
		}
	}
	
	private void publishToBuffer(JavaHonkMessage message){
		
		long sequence = ringBuffer.next();
		
		MessageEvent messageEvent = ringBuffer.get(sequence);
		messageEvent.setMessage(message);
		
		logger.info("Publishing message to buffer of type {}", message.getMessageType());
		ringBuffer.publish(sequence);
	}
	
	class OverflowManager implements Runnable{
		
		@Override
		public void run() {
			
			do{
				
				try {
					
					//Thread will sleep until overflowQ notifies
					JavaHonkMessage message = overflowQ.take();
					
					while(!ringBuffer.hasAvailableCapacity(1)){
						//Busy spin until capacity frees up
						//This WILL burn CPU cycles.
					}
					
					publishToBuffer(message);
				} 
				catch (InterruptedException e) {
					logger.error("InterruptedException...", e);
				}
				
			}
			while(true);
		}
	}
}
  • MessageProcessor.java:
package com.javahonk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.lmax.disruptor.WorkHandler;


public class MessageProcessor implements WorkHandler<MessageEvent> {
	
	private static final Logger logger = LogManager.getLogger(MessageProcessor.class);
	
	@Override
	public void onEvent(MessageEvent event) throws Exception {
		
		try {
			
			JavaHonkMessage message = event.getMessage();
			logger.info(message.getCommandTarget());
			logger.info(message.getMessageID());
			logger.info(message.getMessageType());
			
		} 
		catch (Exception e) {
			logger.error("Exception Processing Message ", e);
		}
		
	}

}
  • LMAXSpringTest.java: Main test program which will start application and send one sample message:
package com.javahonk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class LMAXSpringTest {

	private static final Logger logger = LogManager.getLogger(LMAXSpringTest.class);

	public static void main(String args[]){
		
		logger.info("Starting LMAXSpringIntegration...");
		
		setUncaughtExceptionHandler();
		
		ApplicationContext context = new ClassPathXmlApplicationContext("javahonk-context.xml");
		
		registerShutdownHook(context);
		
		context.getBean(MessageManager.class).startMessageManager();
		
		//Now send some message which will be received by MessageProcessor class for processing
		
		JavaHonkMessage message = new JavaHonkMessage("messageID", "messageType", "commandTarget");
		
		context.getBean(MessageManager.class).process(message);
		
	}
	
	private static void setUncaughtExceptionHandler() {
		Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 
            public void uncaughtException(Thread t, Throwable e) { 
                  logger.error("Thread " + t +  " threw uncaughtexception ", e); 
            }
        });  
	}
	
	private static void registerShutdownHook(ApplicationContext context) {
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

			@Override
			public void run() {
				logger.info("LMAXSpringIntegration is Exiting.");
				((AbstractApplicationContext) context).close();

			}
		}));
	}
}
  • To run this example right click LMAXSpringTest.java –> Run As –> Java Application you will see below message printed on screen:

Note: Please don’t forget to setup VM argument:

-Denvironment.target=DEV –> This set environment and you could change accordingly
-Dlogging.dir=./ –> It sets logging directory path

INFO: Loading properties file from class path resource [javahonk.properties]
2016-11-24 23:21:06.811 [main] INFO  com.javahonk.MessageManager - Publishing message to buffer of type messageType
2016-11-24 23:21:06.812 [pool-3-thread-1] INFO  com.javahonk.MessageProcessor - commandTarget
2016-11-24 23:21:06.812 [pool-3-thread-1] INFO  com.javahonk.MessageProcessor - messageID
2016-11-24 23:21:06.812 [pool-3-thread-1] INFO  com.javahonk.MessageProcessor - messageType

Leave a Reply

Your email address will not be published. Required fields are marked *