LMAX Spring Integration Java Example
LMAX is High Performance Inter-Thread Messaging Library which can be use to any application where process of many trades needs low latency. Please read Martin Fowler blog about LMAX architecture and github code for its library.
You could also refer Getting started tutorial example to understand how it can be use.
In this tutorial you will see how to integrate this library with Spring framework. We will create Maven project LMAXSpringIntegration below is its structure:
Tools needed:
- JDK1.8
- Eclipse (Any latest version)
- LMAX jars
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javahonk</groupId> <artifactId>LMAXSpringIntegration</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven-compiler-plugin.version>3.0</maven-compiler-plugin.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <org.springframework.version>4.2.3.RELEASE</org.springframework.version> </properties> <dependencies> <!-- Spring --> <!-- Core utilities used by other modules. Define this if you use Spring Utility APIs (org.springframework.core.*/org.springframework.util.*) --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Expression Language (depends on spring-core) Define this if you use Spring Expression APIs (org.springframework.expression.*) --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-expression</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Bean Factory and JavaBeans utilities (depends on spring-core) Define this if you use Spring Bean APIs (org.springframework.beans.*) --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Aspect Oriented Programming (AOP) Framework (depends on spring-core, spring-beans) Define this if you use Spring AOP APIs (org.springframework.aop.*) --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Application Context (depends on spring-core, spring-expression, spring-aop, spring-beans) This is the central artifact for Spring's Dependency Injection Container and is generally always defined --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Various Application Context utilities, including EhCache, JavaMail, Quartz, and Freemarker integration Define this if you need any of these integrations --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- JDBC Data Access Library (depends on spring-core, spring-beans, spring-context, spring-tx) Define this if you use Spring's JdbcTemplate API (org.springframework.jdbc.*) --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Object-to-Relation-Mapping (ORM) integration with Hibernate, JPA, and iBatis. (depends on spring-core, spring-beans, spring-context, spring-tx) Define this if you need ORM (org.springframework.orm.*) --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-orm</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Support for testing Spring applications with tools such as JUnit and TestNG This artifact is generally always defined with a 'test' scope for the integration testing framework and unit testing stubs --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${org.springframework.version}</version> <scope>test</scope> </dependency> <!-- Spring Tx --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- dom4j --> <dependency> <groupId>dom4j</groupId> <artifactId>dom4j</artifactId> <version>1.6.1</version> </dependency> <!-- apache commons --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <!-- commons-io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <!-- commons collections --> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency> <!-- commons logging --> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <!-- javassist --> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.18.1-GA</version> </dependency> <!-- jboss logging --> <dependency> <groupId>org.jboss.logging</groupId> <artifactId>jboss-logging</artifactId> <version>3.3.0.Final</version> </dependency> <!-- Log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.1</version> </dependency> <!-- Jaspyt --> <dependency> <groupId>org.jasypt</groupId> <artifactId>jasypt</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>org.jasypt</groupId> <artifactId>jasypt-spring2</artifactId> <version>1.9.2</version> </dependency> <!-- Disruptor --> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.4</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/config</directory> <excludes> <exclude>**/*.xml</exclude> <exclude>**/*.properties</exclude> </excludes> </resource> <resource> <directory>src/main/resources</directory> <excludes> <exclude>**/*.xml</exclude> <exclude>**/*.properties</exclude> </excludes> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.8</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <addDefaultImplementationEntries>true</addDefaultImplementationEntries> </manifest> <manifestEntries> <git-SHA-1>${buildNumber}</git-SHA-1> </manifestEntries> </archive> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <id>tar</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <descriptors> <descriptor>assembly.xml</descriptor> </descriptors> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>buildnumber-maven-plugin</artifactId> <version>1.3</version> <executions> <execution> <phase>validate</phase> <goals> <goal>create</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <scm> <connection>scm:git:https://github.com/javahonk/LMAXSpringIntegration.git</connection> </scm> </project>
- log4j2.xml:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO" shutdownHook="disable"> <Properties> <Property name="envrionment.target">DEV</Property> </Properties> <Properties> <Property name="logging.dir">./</Property> </Properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> </Console> <RollingFile name="RollingFile" fileName="${sys:logging.dir}/logs/${sys:environment.target}/LMAX.log" filePattern="${sys:logging.dir}/logs/Orion-%d{yyyy-MM-dd}-%i.log"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> <!-- TODO:Change to time based policy --> <Policies> <TimeBasedTriggeringPolicy interval="1" modulate="true" /> <SizeBasedTriggeringPolicy size="100 MB" /> </Policies> <DefaultRolloverStrategy max="4" /> </RollingFile> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Console" /> <!-- <AppenderRef ref="file" /> --> <AppenderRef ref="RollingFile" /> </Root> </Loggers> </Configuration>
- javahonk.properties:
pid_file=..\\logs\\pid.txt smtp_host=localhost num_available_threads=16 ring_buffer_size=4096 number_lock_stripes=16
- javahonk-context.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <context:annotation-config /> <context:component-scan base-package="com.javahonk" /> <util:list id="propertiesList" value-type="java.lang.String" > <value>javahonk.properties</value> </util:list> <bean class="org.jasypt.spring.properties.EncryptablePropertyPlaceholderConfigurer"> <constructor-arg ref="encryptor" /> <property name="locations" ref="propertiesList" /> <property name="ignoreUnresolvablePlaceholders" value="true"/> <property name="ignoreResourceNotFound" value="true"/> </bean> <bean id="encryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor"> <property name="algorithm" value="PBEWithMD5AndDES" /> <property name="password" value="otc_decryption_key" /> </bean> <bean id="propertiesBean" class="org.springframework.beans.factory.config.PropertiesFactoryBean" > <property name="locations" ref="propertiesList" /> <property name="ignoreResourceNotFound" value="true" /> </bean> <bean id="disruptorPoolExceptionHandler" class="com.javahonk.DisruptorPoolExceptionHandler" /> <bean id="messageManager" class="com.javahonk.MessageManager" /> <bean id="messageProcessor" class="com.javahonk.MessageProcessor" scope="prototype"/> </beans>
- DisruptorPoolExceptionHandler.java:
package com.javahonk; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.lmax.disruptor.ExceptionHandler; public class DisruptorPoolExceptionHandler implements ExceptionHandler<MessageEvent>{ private static final Logger logger = LogManager.getLogger(DisruptorPoolExceptionHandler.class); @Override public void handleEventException(Throwable arg0, long arg1, MessageEvent arg2) { logger.error("Exception processing message {}", arg2.getMessage().getMessageID(), arg0); } @Override public void handleOnShutdownException(Throwable arg0) { logger.error("Exception On Shutdown", arg0); } @Override public void handleOnStartException(Throwable arg0) { logger.error("Exception On StartUp", arg0); } }
- JavaHonkMessage.java:
package com.javahonk; public class JavaHonkMessage { private String messageID; private String messageType; private String commandTarget; public JavaHonkMessage(String messageID, String messageType, String commandTarget) { this.messageID = messageID; this.messageType = messageType; this.commandTarget = commandTarget; } public String getMessageID() { return messageID; } public void setMessageID(String messageID) { this.messageID = messageID; } public String getMessageType() { return messageType; } public void setMessageType(String messageType) { this.messageType = messageType; } public String getCommandTarget() { return commandTarget; } public void setCommandTarget(String commandTarget) { this.commandTarget = commandTarget; } @Override public String toString() { return "JavaHonkMessage [messageID=" + messageID + ", messageType=" + messageType + ", commandTarget=" + commandTarget + "]"; } }
- MessageEvent.java:
package com.javahonk; import com.lmax.disruptor.EventFactory; public class MessageEvent { private JavaHonkMessage message; public JavaHonkMessage getMessage() { return message; } public void setMessage(JavaHonkMessage message) { this.message = message; } public static final EventFactory<MessageEvent> EVENT_FACTORY = new EventFactory<MessageEvent>() { public MessageEvent newInstance() { return new MessageEvent(); } }; }
- MessageManager.java:
package com.javahonk; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.PostConstruct; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkerPool; public class MessageManager { private WorkerPool<MessageEvent> workerPool; private RingBuffer<MessageEvent> ringBuffer; @Value("${num_available_threads}") private int threadCount; @Value("${ring_buffer_size}") private int ringBufferSize; private SequenceBarrier sequenceBarrier; private ExecutorService executorService; private MessageProcessor[] processors; private BlockingQueue<JavaHonkMessage> overflowQ = new LinkedBlockingQueue<>(); private Thread overflowQThread = new Thread(new OverflowManager()); @Autowired private DisruptorPoolExceptionHandler disruptorPoolExceptionHandler; @Autowired private ApplicationContext applicationContext; private static final Logger logger = LogManager.getLogger(MessageManager.class); @PostConstruct private void setupMessageManager(){ ringBuffer = RingBuffer.createMultiProducer(MessageEvent.EVENT_FACTORY, ringBufferSize); sequenceBarrier = ringBuffer.newBarrier(); processors = new MessageProcessor[threadCount]; for(int i = 0; i<threadCount; i++){ processors[i] = applicationContext.getBean(MessageProcessor.class); } workerPool = new WorkerPool<MessageEvent>(ringBuffer, sequenceBarrier, disruptorPoolExceptionHandler, processors); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); executorService = Executors.newFixedThreadPool(threadCount); } public void startMessageManager(){ overflowQThread.start(); ringBuffer = workerPool.start(executorService); } public void process(JavaHonkMessage message){ try{ if(ringBuffer.remainingCapacity() > 0){ publishToBuffer(message); } else{ overflowQ.put(message); } } catch(Exception e){ logger.error("Exception processing message {}", message.getMessageID(), e); } } private void publishToBuffer(JavaHonkMessage message){ long sequence = ringBuffer.next(); MessageEvent messageEvent = ringBuffer.get(sequence); messageEvent.setMessage(message); logger.info("Publishing message to buffer of type {}", message.getMessageType()); ringBuffer.publish(sequence); } class OverflowManager implements Runnable{ @Override public void run() { do{ try { //Thread will sleep until overflowQ notifies JavaHonkMessage message = overflowQ.take(); while(!ringBuffer.hasAvailableCapacity(1)){ //Busy spin until capacity frees up //This WILL burn CPU cycles. } publishToBuffer(message); } catch (InterruptedException e) { logger.error("InterruptedException...", e); } } while(true); } } }
- MessageProcessor.java:
package com.javahonk; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.lmax.disruptor.WorkHandler; public class MessageProcessor implements WorkHandler<MessageEvent> { private static final Logger logger = LogManager.getLogger(MessageProcessor.class); @Override public void onEvent(MessageEvent event) throws Exception { try { JavaHonkMessage message = event.getMessage(); logger.info(message.getCommandTarget()); logger.info(message.getMessageID()); logger.info(message.getMessageType()); } catch (Exception e) { logger.error("Exception Processing Message ", e); } } }
- LMAXSpringTest.java: Main test program which will start application and send one sample message:
package com.javahonk; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class LMAXSpringTest { private static final Logger logger = LogManager.getLogger(LMAXSpringTest.class); public static void main(String args[]){ logger.info("Starting LMAXSpringIntegration..."); setUncaughtExceptionHandler(); ApplicationContext context = new ClassPathXmlApplicationContext("javahonk-context.xml"); registerShutdownHook(context); context.getBean(MessageManager.class).startMessageManager(); //Now send some message which will be received by MessageProcessor class for processing JavaHonkMessage message = new JavaHonkMessage("messageID", "messageType", "commandTarget"); context.getBean(MessageManager.class).process(message); } private static void setUncaughtExceptionHandler() { Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { logger.error("Thread " + t + " threw uncaughtexception ", e); } }); } private static void registerShutdownHook(ApplicationContext context) { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { logger.info("LMAXSpringIntegration is Exiting."); ((AbstractApplicationContext) context).close(); } })); } }
- To run this example right click LMAXSpringTest.java –> Run As –> Java Application you will see below message printed on screen:
Note: Please don’t forget to setup VM argument:
-Denvironment.target=DEV –> This set environment and you could change accordingly
-Dlogging.dir=./ –> It sets logging directory path
INFO: Loading properties file from class path resource [javahonk.properties] 2016-11-24 23:21:06.811 [main] INFO com.javahonk.MessageManager - Publishing message to buffer of type messageType 2016-11-24 23:21:06.812 [pool-3-thread-1] INFO com.javahonk.MessageProcessor - commandTarget 2016-11-24 23:21:06.812 [pool-3-thread-1] INFO com.javahonk.MessageProcessor - messageID 2016-11-24 23:21:06.812 [pool-3-thread-1] INFO com.javahonk.MessageProcessor - messageType
- Reference: Getting started with LMAX Disruptor