Solace JMS Subscriber Pure Java Spring Integration

Solace JMS Subscriber Pure Java Spring Integration

In previous tutorial you saw how to create pure Java Spring Solace Publisher and this is next series of same part but here I will show you how to create pure java Solace Subscriber who consume message publish on publisher queue. 

Note: If you are planning to use spring in your project then I will suggest please use this tutorial for Solace JMS integration OR its up to you which one suits best.

Solace JMS version: sol-jms-7.1.2.230

  • Final project structure:

Solace JMS Subscriber Pure Java Spring Integration

  • Below Solace jar are needed:

Solace JMS Subscriber Pure Java Spring Integration

  • pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.javahonk.jms.solaceadapter</groupId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>JMSSolaceSubscriberAdapter</name>
	<url>http://maven.apache.org</url>

	<properties>
		<org.springframework.version>4.1.5.RELEASE</org.springframework.version>
		<log4j.version>1.2.16</log4j.version>
		<org.apache.log4j.version>2.1</org.apache.log4j.version>
		<com.tibco.version>5.1.2</com.tibco.version>
		<jms.version>1.1</jms.version>
	</properties>

	<dependencies>

		<!-- Application Context (depends on spring-core, spring-expression, spring-aop, 
			spring-beans) This is the central artifact for Spring's Dependency Injection 
			Container and is generally always defined -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Log4j -->
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${org.apache.log4j.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${org.apache.log4j.version}</version>
		</dependency>
		
		<!-- Solace jars -->
		
		<dependency>
			<groupId>commons-lang</groupId>
			<artifactId>commons-lang</artifactId>
			<version>2.6</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/commons-lang-2.6.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.1.3</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/commons-logging-1.1.3.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>geronimo-jms</groupId>
			<artifactId>geronimo-jms</artifactId>
			<version>1.1.1</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/geronimo-jms_1.1_spec-1.1.1.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>sol-common</groupId>
			<artifactId>sol-common</artifactId>
			<version>7.1.2.230</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/sol-common-7.1.2.230.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>sol-jcsmp</groupId>
			<artifactId>sol-jcsmp</artifactId>
			<version>7.1.2.230</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/sol-jcsmp-7.1.2.230.jar</systemPath>
		</dependency>
		<dependency>
			<groupId>sol-jms</groupId>
			<artifactId>sol-jms</artifactId>
			<version>7.1.2.230</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/sol-jms-7.1.2.230.jar</systemPath>
		</dependency>

	</dependencies>
	
	<build>				
 		
		<resources>
			<resource>
				<directory>src/main/config</directory>
				<excludes>
					<exclude>**/*.xml</exclude>
					<exclude>**/*.properties</exclude>
				</excludes>
			</resource>
		    <resource>
				<directory>src/main/resources</directory>
				<excludes>
					<exclude>**/*.xml</exclude>
					<exclude>**/*.properties</exclude>
				</excludes>
			</resource>
		</resources> 
		
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-dependency-plugin</artifactId>
				<version>2.8</version>
				<executions>
					<execution>
						<id>copy-dependencies</id>
						<phase>package</phase>
						<goals>
							<goal>copy-dependencies</goal>
						</goals>
						<configuration>
							<outputDirectory>${project.build.directory}/lib</outputDirectory>
							<overWriteReleases>false</overWriteReleases>
							<overWriteSnapshots>false</overWriteSnapshots>
							<overWriteIfNewer>true</overWriteIfNewer>
						</configuration>
					</execution>
				</executions>
			</plugin>
						
		</plugins> 
		
	</build>


	<artifactId>JMSSolaceSubscriberAdapter</artifactId>
</project>
  • spring-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd">
	
	<context:annotation-config />
	<context:component-scan base-package="com.javahonk.solace, com.javahonk.solace.messaging" />
	
	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>classpath:javahonk-solace.properties</value>							
			</list>
		</property>
		<property name="ignoreUnresolvablePlaceholders" value="true"/>
	</bean>	

	<import resource="javahonk-solace-context.xml"/>
		
</beans>
  • javahonk-solace-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:lang="http://www.springframework.org/schema/lang"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd">		
	
	<bean id="env_subscriber" class="java.util.Hashtable">
		<constructor-arg>
			<map key-type="java.lang.Object" value-type="java.lang.String">
				<entry key="java.naming.factory.initial" value="com.solacesystems.jndi.SolJNDIInitialContextFactory"/>
				<entry key="java.naming.provider.url" value="${SOLACE_HOST}"/>
				<entry key="Solace_JMS_VPN" value="${SOLACE_VPN}"/>
				<entry key="java.naming.security.principal" value="${SOLACE_USERNAME_SUB}"/>
				<entry key="java.naming.security.credentials" value="${SOLACE_PASSWORD_SUB}"/>
			</map>
		</constructor-arg>
	</bean>
	
	<bean id="initialContext_subscriber" class="javax.naming.InitialContext">
	 	<constructor-arg ref="env_subscriber"/>
	</bean>
	
	<bean id="solaceConnectionFactory" class="com.javahonk.jms.solace.SolaceConnectionFactory"/>
	
	<bean id="solaceMessageSubscriber" class="com.javahonk.jms.solace.SolaceMessageSubscriber"/>
		 
</beans>
  • javahonk-solace.properties
#Solace Message properties

SOLACE_HOST=tcp://URL:PORT
SOLACE_VPN=VPN
SOLACE_CONNECTION_FACTORY=VPN_CF
SOLACE_OUTPUT_QUEUE_NAME=QUEUE NAME

#Subscriber credential
SOLACE_USERNAME_SUB=USER NAME
SOLACE_PASSWORD_SUB=PASSWORD
  • SolaceConnectionFactory.java:
package com.javahonk.jms.solace;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import javax.naming.InitialContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

public class SolaceConnectionFactory {
    
	private static final Logger logger = LogManager.getLogger(SolaceConnectionFactory.class.getName());

    @Autowired
    private InitialContext initialContext_publisher;
    
    @Autowired
    private InitialContext initialContext_subscriber;
    
    @Value("${SOLACE_CONNECTION_FACTORY}")
	private String solaceConnectionFactory;

    public Connection createPublisherConnection() {

        Connection connection = null;

        try {
        	ConnectionFactory connectionFactory = (ConnectionFactory) initialContext_publisher.lookup(solaceConnectionFactory);
			connection = connectionFactory.createConnection();

        } catch (Exception e) {
            logger.error("Failed to create solace connection", e);
        }

        return connection;
    }
    
    public Connection createSubscriberConnection() {

        Connection connection = null;

        try {
        	ConnectionFactory connectionFactory = (ConnectionFactory) initialContext_subscriber.lookup(solaceConnectionFactory);
			connection = connectionFactory.createConnection();

        } catch (Exception e) {
            logger.error("Failed to create solace connection", e);
        }

        return connection;
    }

    public Session createSession(Connection connection) {
        
    	Session session = null;
    	try {
        	
        	session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            
			
        } catch (Exception e) {
            
        	logger.error("Failed to create solace session", e);
        }

        return session;
    }
}
  • SolaceMessageSubscriber.java:
package com.javahonk.jms.solace;

import javax.annotation.PostConstruct;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.naming.InitialContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.message.SolBytesMessage;
import com.solacesystems.jms.message.SolTextMessage;

public class SolaceMessageSubscriber {
	
	private static final Logger logger = LogManager.getLogger(SolaceMessageSubscriber.class.getName());

	@Autowired
    private InitialContext initialContext_subscriber;
	
	@Value("${SOLACE_CONNECTION_FACTORY}")
	private String solaceConnectionFactoryLookup;
	
	@Value("${SOLACE_OUTPUT_QUEUE_NAME}")
	private String solaceOutputQueueName;
	
	private MessageConsumer consumer;
	
	@PostConstruct
	public void init() {
		
		logger.info("Solace outputQueue: {}", solaceOutputQueueName);
		
		try {
			
			QueueConnectionFactory cf = (QueueConnectionFactory)initialContext_subscriber.lookup(solaceConnectionFactoryLookup);
	    	QueueConnection connection = cf.createQueueConnection();
	        Session session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
	        Queue queue = (Queue)initialContext_subscriber.lookup(solaceOutputQueueName);
	        consumer = session.createConsumer(queue);	        
	        
	        connection.start();	        
	        onReceive();
	        
	        logger.info("Connection made successfully. Waiting for a message...");
			
		} catch (Exception e) {
			logger.error("Error creating connection to Solace: {}", e);
		} 
	}
	
	public void onReceive() {		
		
		try {
			
			while (true) {
	            
				Message testMessage = consumer.receive();
				
				logger.info("Message received");
	            
	            if (testMessage == null) {	            	
	            	logger.info("An error has occured. Solace Text message was null");	            	
	            }
	            
	            if (testMessage instanceof SolBytesMessage) {
	            	
	            	System.out.println("Message received:"+((SolBytesMessage)testMessage).getMessage());
	            	logger.info("Message size: {} kb",SolJmsUtility.getMessageSize(testMessage));
	            	testMessage.acknowledge();	                 
	            }
	            
	            if (testMessage instanceof SolTextMessage) {
	            	
	            	System.out.println("Message received:"+((SolTextMessage)testMessage).getText());
	            	logger.info("Message size: {} kb",SolJmsUtility.getMessageSize(testMessage));
	            	testMessage.acknowledge();
	            }
	        }
			
		} catch (Exception e) {
			logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", solaceOutputQueueName, e);
		}
		
	}

}
  • Finally user JavahonkSolaceSubscriberTest.java test class to test subscriber application:
package com.javahonk.solace;

import javax.jms.JMSException;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;


public class JavahonkSolaceSubscriberTest {
	
	private static final Logger logger = LogManager.getLogger(JavahonkSolaceSubscriberTest.class.getName());
	
	public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
		
		logger.info("Java Honk Solace Output Adapter Starting...");
		
		ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
		
		Thread thread = new Thread(new Runnable() {			
			@Override
			public void run() {
			}
		});
		
		while (!thread.isInterrupted()) {}
		
		((AbstractApplicationContext) context).close();
		
	}
}

Reference:

Download source codeJMSSolaceSubscriberAdapter

Leave a Reply

Your email address will not be published. Required fields are marked *