Solace Publisher Subscriber Spring Integration Using JMS
You have seen many example of creating Solace Publisher and Consumer in previous tutorials . In this example I will show you how to create Solace Publisher and Consumer using JMS API and we will use MessageListener interface from JMS to listen message on Solace queue for Subscriber.
Tools needed:
1. Eclipse
2. Java 1.8
3. Maven 3.2.1
4. Solace jars already included in project for download (Version used: 7.1.2.230)
Steps:
- First create maven project name: SolaceJMSPubSubSpring belwo is final project structure:
- All dependencies for project: pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javahonk.jms.solaceadapter</groupId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SolaceJMSPubSubSpring</name> <url>http://maven.apache.org</url> <properties> <org.springframework.version>4.1.5.RELEASE</org.springframework.version> <log4j.version>1.2.16</log4j.version> <org.apache.log4j.version>2.1</org.apache.log4j.version> <com.tibco.version>5.1.2</com.tibco.version> <jms.version>1.1</jms.version> </properties> <dependencies> <!-- Application Context (depends on spring-core, spring-expression, spring-aop, spring-beans) This is the central artifact for Spring's Dependency Injection Container and is generally always defined --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <!-- Solace jars --> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> <scope>system</scope> <systemPath>${basedir}/lib/commons-lang-2.6.jar</systemPath> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.3</version> <scope>system</scope> <systemPath>${basedir}/lib/commons-logging-1.1.3.jar</systemPath> </dependency> <dependency> <groupId>geronimo-jms</groupId> <artifactId>geronimo-jms</artifactId> <version>1.1.1</version> <scope>system</scope> <systemPath>${basedir}/lib/geronimo-jms_1.1_spec-1.1.1.jar</systemPath> </dependency> <dependency> <groupId>sol-common</groupId> <artifactId>sol-common</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-common-7.1.2.230.jar</systemPath> </dependency> <dependency> <groupId>sol-jcsmp</groupId> <artifactId>sol-jcsmp</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-jcsmp-7.1.2.230.jar</systemPath> </dependency> <dependency> <groupId>sol-jms</groupId> <artifactId>sol-jms</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-jms-7.1.2.230.jar</systemPath> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/config</directory> <excludes> <exclude>**/*.xml</exclude> <exclude>**/*.properties</exclude> </excludes> </resource> <resource> <directory>src/main/resources</directory> <excludes> <exclude>**/*.xml</exclude> <exclude>**/*.properties</exclude> </excludes> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.8</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> </plugins> </build> <artifactId>SolaceJMSPubSubSpring</artifactId> </project>
- javahonk-solace.properties: Here you will have to mentioned your Solace properites:
#Solace Message properties SOLACE_HOST=tcp://solacehost.com:55555 SOLACE_VPN=VPN_NAME SOLACE_CONNECTION_FACTORY=VPN_CF_NAME SOLACE_OUTPUT_QUEUE_NAME=QUEUENAME #Publisher credential SOLACE_USERNAME_PUB=USERNAME SOLACE_PASSWORD_PUB=PASS #Subscriber credential SOLACE_USERNAME_SUB=USERNAME SOLACE_PASSWORD_SUB=PASS
- For logging use this: log4j2.xml:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <Properties> <Property name="envrionment.target">DEV</Property> </Properties> <Properties> <Property name="logging.dir">./</Property> </Properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> </Console> <RollingFile name="RollingFile" fileName="./log/rolling-file.log" filePattern="${sys:logging.dir}/logs/rolling-file-%d{yyyy-MM-dd}-%i.log"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> <!-- TODO:Change to time based policy --> <Policies> <TimeBasedTriggeringPolicy interval="1" modulate="true" /> <SizeBasedTriggeringPolicy size="100 MB" /> </Policies> <DefaultRolloverStrategy max="4" /> </RollingFile> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Console" /> <!-- <AppenderRef ref="file" /> --> <AppenderRef ref="RollingFile" /> </Root> </Loggers> </Configuration>
- Spring main framework file to load spring related configuration: spring-context.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <context:annotation-config /> <context:component-scan base-package="com.javahonk.solace, com.javahonk.solace.messaging" /> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:javahonk-solace.properties</value> </list> </property> <property name="ignoreUnresolvablePlaceholders" value="true"/> </bean> <import resource="javahonk-solace-context.xml"/> </beans>
- I have separated out Solace related configuration javahonk-solace-context.xml in this file which is going to import in above spring-context file:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:lang="http://www.springframework.org/schema/lang" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <bean id="env_publisher" class="java.util.Hashtable"> <constructor-arg> <map key-type="java.lang.Object" value-type="java.lang.String"> <entry key="java.naming.factory.initial" value="com.solacesystems.jndi.SolJNDIInitialContextFactory"/> <entry key="java.naming.provider.url" value="${SOLACE_HOST}"/> <entry key="Solace_JMS_VPN" value="${SOLACE_VPN}"/> <entry key="java.naming.security.principal" value="${SOLACE_USERNAME_PUB}"/> <entry key="java.naming.security.credentials" value="${SOLACE_PASSWORD_PUB}"/> </map> </constructor-arg> </bean> <bean id="initialContext_publisher" class="javax.naming.InitialContext"> <constructor-arg ref="env_publisher"/> </bean> <bean id="env_subscriber" class="java.util.Hashtable"> <constructor-arg> <map key-type="java.lang.Object" value-type="java.lang.String"> <entry key="java.naming.factory.initial" value="com.solacesystems.jndi.SolJNDIInitialContextFactory"/> <entry key="java.naming.provider.url" value="${SOLACE_HOST}"/> <entry key="Solace_JMS_VPN" value="${SOLACE_VPN}"/> <entry key="java.naming.security.principal" value="${SOLACE_USERNAME_PUB}"/> <entry key="java.naming.security.credentials" value="${SOLACE_PASSWORD_PUB}"/> </map> </constructor-arg> </bean> <bean id="initialContext_subscriber" class="javax.naming.InitialContext"> <constructor-arg ref="env_subscriber"/> </bean> <bean id="solaceConnectionFactory" class="com.javahonk.jms.solace.SolaceConnectionFactory"/> <bean id="solaceMessagePublisher" class="com.javahonk.jms.solace.SolaceMessagePublisher"/> <bean id="solaceMessageSubscriber" class="com.javahonk.jms.solace.SolaceMessageSubscriber"/> <bean id="solaceMessagePublisherSubscriber" class="com.javahonk.jms.solace.SolaceMessagePublisherSubscriber"/> </beans>
- Main class SolaceConnectionFactory.java responsible for creating a connection to the Solace and work as factory:
package com.javahonk.jms.solace; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Session; import javax.naming.InitialContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; public class SolaceConnectionFactory { private static final Logger logger = LogManager.getLogger(SolaceConnectionFactory.class.getName()); @Autowired private InitialContext initialContext_publisher; @Autowired private InitialContext initialContext_subscriber; @Value("${SOLACE_CONNECTION_FACTORY}") private String solaceConnectionFactory; public Connection createPublisherConnection() { Connection connection = null; try { ConnectionFactory connectionFactory = (ConnectionFactory) initialContext_publisher.lookup(solaceConnectionFactory); connection = connectionFactory.createConnection(); } catch (Exception e) { logger.error("Failed to create solace connection", e); } return connection; } public Connection createSubscriberConnection() { Connection connection = null; try { ConnectionFactory connectionFactory = (ConnectionFactory) initialContext_subscriber.lookup(solaceConnectionFactory); connection = connectionFactory.createConnection(); } catch (Exception e) { logger.error("Failed to create solace connection", e); } return connection; } public Session createSession(Connection connection) { Session session = null; try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { logger.error("Failed to create solace session", e); } return session; } }
- SolaceMessagePublisher.java: Publisher class which publish message to Solace queue:
package com.javahonk.jms.solace; import javax.annotation.PostConstruct; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; public class SolaceMessagePublisher { private static final Logger logger = LogManager.getLogger(SolaceMessagePublisher.class.getName()); @Autowired private SolaceConnectionFactory solaceConnectionFactory; @Autowired private InitialContext initialContext_publisher; @Value("${SOLACE_OUTPUT_QUEUE_NAME}") private String solaceOutputQueueName; private Session session; private MessageProducer producer; @PostConstruct public void init() { logger.info("Solace outputQueue: {}", solaceOutputQueueName); Connection connection; try { connection = solaceConnectionFactory.createPublisherConnection(); session = solaceConnectionFactory.createSession(connection); Destination destination = (Destination) initialContext_publisher.lookup(solaceOutputQueueName); producer = session.createProducer(destination); } catch (Exception e) { logger.error("Error creating connection to Solace: {}", e); } } public void sendMessage(String message) { try { logger.info("Sending message : {}", message); TextMessage testMessage = session.createTextMessage(message); producer.send(testMessage); logger.info("Message sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", solaceOutputQueueName, e); } } }
- SolaceMessageSubscriber.java: Solace Subscriber class which will subscribe to the queue for new messages:
package com.javahonk.jms.solace; import javax.annotation.PostConstruct; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.naming.InitialContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import com.solacesystems.jms.SolJmsUtility; import com.solacesystems.jms.message.SolBytesMessage; import com.solacesystems.jms.message.SolTextMessage; public class SolaceMessageSubscriber implements MessageListener { private static final Logger logger = LogManager.getLogger(SolaceMessageSubscriber.class.getName()); @Autowired private InitialContext initialContext_subscriber; @Value("${SOLACE_CONNECTION_FACTORY}") private String solaceConnectionFactoryLookup; @Value("${SOLACE_OUTPUT_QUEUE_NAME}") private String solaceOutputQueueName; private MessageConsumer consumer; @PostConstruct public void init() { logger.info("Solace outputQueue: {}", solaceOutputQueueName); try { QueueConnectionFactory cf = (QueueConnectionFactory)initialContext_subscriber.lookup(solaceConnectionFactoryLookup); QueueConnection connection = cf.createQueueConnection(); Session session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = (Queue)initialContext_subscriber.lookup(solaceOutputQueueName); consumer = session.createConsumer(queue); consumer.setMessageListener(this); connection.start(); logger.info("Connection made successfully. Waiting for a message..."); } catch (Exception e) { logger.error("Error creating connection to Solace: {}", e); } } @Override public void onMessage(Message testMessage) { try { logger.info("Message received"); if (testMessage == null) { logger.info("An error has occured. Solace Text message was null"); } if (testMessage instanceof SolBytesMessage) { System.out.println("Message received:"+((SolBytesMessage)testMessage).getMessage()); logger.info("Message size: {} kb",SolJmsUtility.getMessageSize(testMessage)); testMessage.acknowledge(); } if (testMessage instanceof SolTextMessage) { System.out.println("Message received:"+((SolTextMessage)testMessage).getText()); logger.info("Message size: {} kb",SolJmsUtility.getMessageSize(testMessage)); testMessage.acknowledge(); } } catch (JMSException e) { System.out.println("Error processing incoming message."); e.printStackTrace(); } } }
- SolaceMessagePublisherSubscriber.java: In this class I have combined both publisher and Subscriber because this is needed if you have requirement to publish and subscriber both on the queue:
package com.javahonk.jms.solace; import javax.annotation.PostConstruct; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import com.solacesystems.jms.SolJmsUtility; import com.solacesystems.jms.message.SolBytesMessage; import com.solacesystems.jms.message.SolTextMessage; public class SolaceMessagePublisherSubscriber implements MessageListener { private static final Logger logger = LogManager.getLogger(SolaceMessagePublisherSubscriber.class.getName()); @Autowired private SolaceConnectionFactory solaceConnectionFactory; @Autowired private InitialContext initialContext_publisher; @Autowired private InitialContext initialContext_subscriber; @Value("${SOLACE_OUTPUT_QUEUE_NAME}") private String solaceOutputQueueName; @Value("${SOLACE_CONNECTION_FACTORY}") private String solaceConnectionFactoryLookup; private Session session; private MessageProducer producer; private MessageConsumer consumer; @PostConstruct public void init() { logger.info("Solace outputQueue: {}", solaceOutputQueueName); Connection connection; try { //Publisher connection = solaceConnectionFactory.createPublisherConnection(); session = solaceConnectionFactory.createSession(connection); Destination destination = (Destination) initialContext_publisher.lookup(solaceOutputQueueName); producer = session.createProducer(destination); //Subscriber QueueConnectionFactory cf = (QueueConnectionFactory)initialContext_subscriber.lookup(solaceConnectionFactoryLookup); QueueConnection subsConnection = cf.createQueueConnection(); Session session = subsConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = (Queue)initialContext_subscriber.lookup(solaceOutputQueueName); consumer = session.createConsumer(queue); consumer.setMessageListener(this); connection.start(); } catch (Exception e) { logger.error("Error creating connection to Solace: {}", e); } } public void sendMessage(String message) { try { logger.info("Sending message : {}", message); TextMessage testMessage = session.createTextMessage(message); producer.send(testMessage); logger.info("Message sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", solaceOutputQueueName, e); } } @Override public void onMessage(Message testMessage) { try { logger.info("Message received"); if (testMessage == null) { logger.info("An error has occured. Solace Text message was null"); } if (testMessage instanceof SolBytesMessage) { System.out.println("Message received:"+((SolBytesMessage)testMessage).getMessage()); logger.info("Message size: {} kb",SolJmsUtility.getMessageSize(testMessage)); testMessage.acknowledge(); } if (testMessage instanceof SolTextMessage) { System.out.println("Message received:"+((SolTextMessage)testMessage).getText()); logger.info("Message size: {} kb",SolJmsUtility.getMessageSize(testMessage)); testMessage.acknowledge(); } } catch (JMSException e) { System.out.println("Error processing incoming message."); e.printStackTrace(); } } }
- That’s it. Now our final JavahonkSolacePublisherTest.java class which will start the application and publish and receive the messages on Solace queue:
package com.javahonk.solace; import javax.jms.JMSException; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.javahonk.jms.solace.SolaceMessagePublisherSubscriber; public class JavahonkSolacePublisherTest { private static final Logger logger = LogManager.getLogger(JavahonkSolacePublisherTest.class.getName()); public static void main(String[] args) throws NamingException, JMSException, InterruptedException { logger.info("Java Honk Solace Adapter Starting..."); setUncaughtExceptionHandler(); ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); SolaceMessagePublisherSubscriber solaceMessagePublisher = context.getBean(SolaceMessagePublisherSubscriber.class); for (int i = 0; i < 15; i++) { Thread.sleep(1000); solaceMessagePublisher.sendMessage("Solace test"); } keepThreadAlive(); registerShutdownHook(context); } private static void setUncaughtExceptionHandler() { Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { logger.error("Thread " + t + " threw uncaughtexception ", e); } }); } private static void keepThreadAlive() { Thread keepAliveThread = new Thread(new Runnable() { @Override public void run() { while (!Thread.interrupted()) { } } }); keepAliveThread.start(); } private static void registerShutdownHook(ApplicationContext context) { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { logger.info("Java Honk Solace Adapter Exiting."); ((AbstractApplicationContext) context).close(); } })); } }
- Now to run the application please edit and put Solace queue name, Connection factory, vpn, subscriber and publisher user id and password in javahonk-solace.properties then right click JavahonkSolacePublisherTest.java –> Run As –> Java Application and if everything goes will you will see below message on console for publisher and subscriber:
- Download project: SolaceJMSPubSubSpring
For more information please visit Solace official web site here