Solace JMS Subscriber Pure Java Spring Integration
In previous tutorial you saw how to create pure Java Spring Solace Publisher and this is next series of same part but here I will show you how to create pure java Solace Subscriber who consume message publish on publisher queue.
Note: If you are planning to use spring in your project then I will suggest please use this tutorial for Solace JMS integration OR its up to you which one suits best.
Solace JMS version: sol-jms-7.1.2.230
- Final project structure:
- Below Solace jar are needed:
- pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javahonk.jms.solaceadapter</groupId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>JMSSolaceSubscriberAdapter</name> <url>http://maven.apache.org</url> <properties> <org.springframework.version>4.1.5.RELEASE</org.springframework.version> <log4j.version>1.2.16</log4j.version> <org.apache.log4j.version>2.1</org.apache.log4j.version> <com.tibco.version>5.1.2</com.tibco.version> <jms.version>1.1</jms.version> </properties> <dependencies> <!-- Application Context (depends on spring-core, spring-expression, spring-aop, spring-beans) This is the central artifact for Spring's Dependency Injection Container and is generally always defined --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <!-- Solace jars --> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> <scope>system</scope> <systemPath>${basedir}/lib/commons-lang-2.6.jar</systemPath> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.3</version> <scope>system</scope> <systemPath>${basedir}/lib/commons-logging-1.1.3.jar</systemPath> </dependency> <dependency> <groupId>geronimo-jms</groupId> <artifactId>geronimo-jms</artifactId> <version>1.1.1</version> <scope>system</scope> <systemPath>${basedir}/lib/geronimo-jms_1.1_spec-1.1.1.jar</systemPath> </dependency> <dependency> <groupId>sol-common</groupId> <artifactId>sol-common</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-common-7.1.2.230.jar</systemPath> </dependency> <dependency> <groupId>sol-jcsmp</groupId> <artifactId>sol-jcsmp</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-jcsmp-7.1.2.230.jar</systemPath> </dependency> <dependency> <groupId>sol-jms</groupId> <artifactId>sol-jms</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-jms-7.1.2.230.jar</systemPath> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/config</directory> <excludes> <exclude>**/*.xml</exclude> <exclude>**/*.properties</exclude> </excludes> </resource> <resource> <directory>src/main/resources</directory> <excludes> <exclude>**/*.xml</exclude> <exclude>**/*.properties</exclude> </excludes> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.8</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> </plugins> </build> <artifactId>JMSSolaceSubscriberAdapter</artifactId> </project>
- spring-context.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <context:annotation-config /> <context:component-scan base-package="com.javahonk.solace, com.javahonk.solace.messaging" /> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:javahonk-solace.properties</value> </list> </property> <property name="ignoreUnresolvablePlaceholders" value="true"/> </bean> <import resource="javahonk-solace-context.xml"/> </beans>
- javahonk-solace-context.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:lang="http://www.springframework.org/schema/lang" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <bean id="env_subscriber" class="java.util.Hashtable"> <constructor-arg> <map key-type="java.lang.Object" value-type="java.lang.String"> <entry key="java.naming.factory.initial" value="com.solacesystems.jndi.SolJNDIInitialContextFactory"/> <entry key="java.naming.provider.url" value="${SOLACE_HOST}"/> <entry key="Solace_JMS_VPN" value="${SOLACE_VPN}"/> <entry key="java.naming.security.principal" value="${SOLACE_USERNAME_SUB}"/> <entry key="java.naming.security.credentials" value="${SOLACE_PASSWORD_SUB}"/> </map> </constructor-arg> </bean> <bean id="initialContext_subscriber" class="javax.naming.InitialContext"> <constructor-arg ref="env_subscriber"/> </bean> <bean id="solaceConnectionFactory" class="com.javahonk.jms.solace.SolaceConnectionFactory"/> <bean id="solaceMessageSubscriber" class="com.javahonk.jms.solace.SolaceMessageSubscriber"/> </beans>
- javahonk-solace.properties
#Solace Message properties SOLACE_HOST=tcp://URL:PORT SOLACE_VPN=VPN SOLACE_CONNECTION_FACTORY=VPN_CF SOLACE_OUTPUT_QUEUE_NAME=QUEUE NAME #Subscriber credential SOLACE_USERNAME_SUB=USER NAME SOLACE_PASSWORD_SUB=PASSWORD
- SolaceConnectionFactory.java:
package com.javahonk.jms.solace; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Session; import javax.naming.InitialContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; public class SolaceConnectionFactory { private static final Logger logger = LogManager.getLogger(SolaceConnectionFactory.class.getName()); @Autowired private InitialContext initialContext_publisher; @Autowired private InitialContext initialContext_subscriber; @Value("${SOLACE_CONNECTION_FACTORY}") private String solaceConnectionFactory; public Connection createPublisherConnection() { Connection connection = null; try { ConnectionFactory connectionFactory = (ConnectionFactory) initialContext_publisher.lookup(solaceConnectionFactory); connection = connectionFactory.createConnection(); } catch (Exception e) { logger.error("Failed to create solace connection", e); } return connection; } public Connection createSubscriberConnection() { Connection connection = null; try { ConnectionFactory connectionFactory = (ConnectionFactory) initialContext_subscriber.lookup(solaceConnectionFactory); connection = connectionFactory.createConnection(); } catch (Exception e) { logger.error("Failed to create solace connection", e); } return connection; } public Session createSession(Connection connection) { Session session = null; try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { logger.error("Failed to create solace session", e); } return session; } }
- SolaceMessageSubscriber.java:
package com.javahonk.jms.solace; import javax.annotation.PostConstruct; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.naming.InitialContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import com.solacesystems.jms.SolJmsUtility; import com.solacesystems.jms.message.SolBytesMessage; import com.solacesystems.jms.message.SolTextMessage; public class SolaceMessageSubscriber { private static final Logger logger = LogManager.getLogger(SolaceMessageSubscriber.class.getName()); @Autowired private InitialContext initialContext_subscriber; @Value("${SOLACE_CONNECTION_FACTORY}") private String solaceConnectionFactoryLookup; @Value("${SOLACE_OUTPUT_QUEUE_NAME}") private String solaceOutputQueueName; private MessageConsumer consumer; @PostConstruct public void init() { logger.info("Solace outputQueue: {}", solaceOutputQueueName); try { QueueConnectionFactory cf = (QueueConnectionFactory)initialContext_subscriber.lookup(solaceConnectionFactoryLookup); QueueConnection connection = cf.createQueueConnection(); Session session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = (Queue)initialContext_subscriber.lookup(solaceOutputQueueName); consumer = session.createConsumer(queue); connection.start(); onReceive(); logger.info("Connection made successfully. Waiting for a message..."); } catch (Exception e) { logger.error("Error creating connection to Solace: {}", e); } } public void onReceive() { try { while (true) { Message testMessage = consumer.receive(); logger.info("Message received"); if (testMessage == null) { logger.info("An error has occured. Solace Text message was null"); } if (testMessage instanceof SolBytesMessage) { System.out.println("Message received:"+((SolBytesMessage)testMessage).getMessage()); logger.info("Message size: {} kb",SolJmsUtility.getMessageSize(testMessage)); testMessage.acknowledge(); } if (testMessage instanceof SolTextMessage) { System.out.println("Message received:"+((SolTextMessage)testMessage).getText()); logger.info("Message size: {} kb",SolJmsUtility.getMessageSize(testMessage)); testMessage.acknowledge(); } } } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", solaceOutputQueueName, e); } } }
- Finally user JavahonkSolaceSubscriberTest.java test class to test subscriber application:
package com.javahonk.solace; import javax.jms.JMSException; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class JavahonkSolaceSubscriberTest { private static final Logger logger = LogManager.getLogger(JavahonkSolaceSubscriberTest.class.getName()); public static void main(String[] args) throws NamingException, JMSException, InterruptedException { logger.info("Java Honk Solace Output Adapter Starting..."); ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); Thread thread = new Thread(new Runnable() { @Override public void run() { } }); while (!thread.isInterrupted()) {} ((AbstractApplicationContext) context).close(); } }
Reference:
Download source code: JMSSolaceSubscriberAdapter