Solace Topic Queue Publisher Subscriber Java Example

Solace Topic Queue Publisher Subscriber Java Example

In recent posts you saw many tutorials on Solace. In this tutorial I will show you how to develop Solace adapter which can be use to publish message on queue/topic and subscriber to get message from queue/topic.

Tools needed:

  • Eclipse (Any updated version)
  • Java 1.8
  • Maven 3.2.5 (It comes with Eclipse)
  • Solace version 7.1.2.230

Maven project SolaceJMSPublisherSubscriber details structure below:

Solace Topic Queue Publisher Subscriber Java Example

  • Dependencies maven pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.javahonk.jms.solaceadapter</groupId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>SolaceJMSPubSubSpring</name>
	<url>http://maven.apache.org</url>

	<properties>
		<org.springframework.version>4.1.5.RELEASE</org.springframework.version>
		<log4j.version>1.2.16</log4j.version>
		<org.apache.log4j.version>2.1</org.apache.log4j.version>
		<com.tibco.version>5.1.2</com.tibco.version>
		<jms.version>1.1</jms.version>
	</properties>

	<dependencies>

		<!-- Application Context (depends on spring-core, spring-expression, spring-aop, 
			spring-beans) This is the central artifact for Spring's Dependency Injection 
			Container and is generally always defined -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

		<!-- Log4j -->
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${org.apache.log4j.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${org.apache.log4j.version}</version>
		</dependency>
		
		<!-- Solace jars -->
		
		<dependency>
			<groupId>commons-lang</groupId>
			<artifactId>commons-lang</artifactId>
			<version>2.6</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/commons-lang-2.6.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.1.3</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/commons-logging-1.1.3.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>geronimo-jms</groupId>
			<artifactId>geronimo-jms</artifactId>
			<version>1.1.1</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/geronimo-jms_1.1_spec-1.1.1.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>sol-common</groupId>
			<artifactId>sol-common</artifactId>
			<version>7.1.2.230</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/sol-common-7.1.2.230.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>sol-jcsmp</groupId>
			<artifactId>sol-jcsmp</artifactId>
			<version>7.1.2.230</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/sol-jcsmp-7.1.2.230.jar</systemPath>
		</dependency>
		<dependency>
			<groupId>sol-jms</groupId>
			<artifactId>sol-jms</artifactId>
			<version>7.1.2.230</version>
			<scope>system</scope>
			<systemPath>${basedir}/lib/sol-jms-7.1.2.230.jar</systemPath>
		</dependency>

	</dependencies>
	
	<build>				
 		
		<resources>
			<resource>
				<directory>src/main/config</directory>
				<excludes>
					<exclude>**/*.xml</exclude>
					<exclude>**/*.properties</exclude>
				</excludes>
			</resource>
		    <resource>
				<directory>src/main/resources</directory>
				<excludes>
					<exclude>**/*.xml</exclude>
					<exclude>**/*.properties</exclude>
				</excludes>
			</resource>
		</resources> 
		
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-dependency-plugin</artifactId>
				<version>2.8</version>
				<executions>
					<execution>
						<id>copy-dependencies</id>
						<phase>package</phase>
						<goals>
							<goal>copy-dependencies</goal>
						</goals>
						<configuration>
							<outputDirectory>${project.build.directory}/lib</outputDirectory>
							<overWriteReleases>false</overWriteReleases>
							<overWriteSnapshots>false</overWriteSnapshots>
							<overWriteIfNewer>true</overWriteIfNewer>
						</configuration>
					</execution>
				</executions>
			</plugin>
						
		</plugins> 
		
	</build>


	<artifactId>SolaceJMSPublisherSubscriber</artifactId>
</project>
  • log4j2.xml:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">

	<Properties>
		<Property name="envrionment.target">DEV</Property>
	</Properties>

	<Properties>
		<Property name="logging.dir">./</Property>
	</Properties>

	<Appenders>
		<Console name="Console" target="SYSTEM_OUT">
			<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
		</Console>

		<RollingFile name="RollingFile"
			fileName="./log/rolling-file.log"	filePattern="${sys:logging.dir}/logs/rolling-file-%d{yyyy-MM-dd}-%i.log">
			<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
			<!-- TODO:Change to time based policy -->
			<Policies>
				<TimeBasedTriggeringPolicy interval="1"	modulate="true" />
				<SizeBasedTriggeringPolicy size="100 MB" />
			</Policies>
			<DefaultRolloverStrategy max="4" />
		</RollingFile>
	</Appenders>

	<Loggers>
		<Root level="info">
			<AppenderRef ref="Console" />
			<!-- <AppenderRef ref="file" /> -->
			<AppenderRef ref="RollingFile" />
		</Root>
	</Loggers>
</Configuration>
  • javahonk-solace.properties: Here you will enter all solace related properties. Please don’t forget to replace this file value with your own Solace properties:
SOLACE_INITIAL_CONTEXT_FACTORY=com.solacesystems.jndi.SolJNDIInitialContextFactory
SOLACE_HOST=tcp://javahonk.com:55555
SOLACE_VPN=javahonk_VPN

#Solace Queue Message properties

SOLACE_CONNECTION_FACTORY=javahonk_VPN_CF
SOLACE_OUTPUT_QUEUE_NAME=javahonk_LISTENER

#Queue Publisher credential
SOLACE_USERNAME_QUEUE_PUB=javahonk_USER_DEV
SOLACE_PASSWORD_QUEUE_PUB=javahonk_Pass1_@DEV@

#Queue Subscriber credential
SOLACE_USERNAME_QUEUE_SUB=javahonk_USER_DEV
SOLACE_PASSWORD_QUEUE_SUB=javahonk_@DEV@

#Solace Topic Message properties

SOLACE_JMS_DIRECT_CONNECTION_FACTORY=javahonk_VPN_DIRECT_CF
SOLACE_OUTPUT_TOPIC_NAME=javahonk/TOPIC

#Topic Publisher credential
SOLACE_USERNAME_TOPIC_PUB=javahonk_USER_DEV
SOLACE_PASSWORD_TOPIC_PUB=javahonk_Pass1_@DEV@

#Topic Subscriber credential
SOLACE_USERNAME_TOPIC_SUB=javahonk_USER_DEV
SOLACE_PASSWORD_TOPIC_SUB=javahonk_Pass1_@DEV@
  • spring-context.xml: Main spring context file to load all configurations:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd">
	
	<context:annotation-config />
	<context:component-scan base-package="com.javahonk.solace, com.javahonk.solace.messaging" />
	
	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>classpath:javahonk-solace.properties</value>							
			</list>
		</property>
		<property name="ignoreUnresolvablePlaceholders" value="true"/>
	</bean>	

	<import resource="javahonk-solace-context.xml"/>
		
</beans>
  • javahonk-solace-context.xml: Seperate properties file to load all Solace related beans:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:lang="http://www.springframework.org/schema/lang"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd">
	
	<bean id="JavaHonkQueuePublisher" class="com.javahonk.jms.solace.JavaHonkQueuePublisher"></bean>
	
	<bean id="JavaHonkQueueSubscriber" class="com.javahonk.jms.solace.JavaHonkQueueSubscriber"></bean>
	
	<bean id="JavaHonkTopicPublisher" class="com.javahonk.jms.solace.JavaHonkTopicPublisher"></bean>
	
	<bean id="JavaHonkTopicSubscriber" class="com.javahonk.jms.solace.JavaHonkTopicSubscriber"></bean>
		 
</beans>
  • JavaHonkQueuePublisher.java: Solace publisher class to publish the message on Solace queue:
package com.javahonk.jms.solace;

import java.util.Hashtable;

import javax.annotation.PostConstruct;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;

import com.solacesystems.jms.SupportedProperty;

public class JavaHonkQueuePublisher {

	@Value("${SOLACE_INITIAL_CONTEXT_FACTORY}")
	private String SOLACE_INITIAL_CONTEXT_FACTORY;
	
	@Value("${SOLACE_HOST}")
	private String SOLACE_HOST;
	
	@Value("${SOLACE_VPN}")
	private String SOLACE_VPN;
	
	@Value("${SOLACE_CONNECTION_FACTORY}")
	private String SOLACE_CONNECTION_FACTORY;
	
	@Value("${SOLACE_OUTPUT_QUEUE_NAME}")
	private String SOLACE_OUTPUT_QUEUE_NAME;
	
	@Value("${SOLACE_USERNAME_QUEUE_PUB}")
	private String SOLACE_USERNAME_QUEUE_PUB;
	
	@Value("${SOLACE_PASSWORD_QUEUE_PUB}")
	private String SOLACE_PASSWORD_QUEUE_PUB;
	
	private Session session;
	
	private static MessageProducer producer;
	
	private static final Logger logger = LogManager.getLogger(JavaHonkQueuePublisher.class);
	
	@PostConstruct
	public void init() throws JMSException, NamingException{
		
		System.out.println("JavaHonk Solace publisher initializing...");
    	
    	Hashtable<String, Object> env = new Hashtable<String, Object>();
        env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY);
        env.put(InitialContext.PROVIDER_URL, SOLACE_HOST);
        env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN);
        env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_QUEUE_PUB);
        env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_QUEUE_PUB);

        InitialContext initialContext = new InitialContext(env);
    	QueueConnectionFactory cf = (QueueConnectionFactory)initialContext.lookup(SOLACE_CONNECTION_FACTORY);
    	QueueConnection connection = cf.createQueueConnection();

        session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        Destination destination = (Destination)initialContext.lookup(SOLACE_OUTPUT_QUEUE_NAME);

        producer = session.createProducer(destination);

        connection.start();  
        
        logger.info("Solace connection made successfully");
	}
    
	
	public void sendTextMessage(String message) {
		
		try {
			
			logger.info("Sending message : {}", message);
			
			TextMessage testMessage = session.createTextMessage(message);
			producer.send(testMessage);
			
			logger.info("Message sent successfully : {}", message);
			
		} catch (Exception e) {
			logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e);
		}
	}
	
	public void sendBytesMessage(String message) {
		
		try {
			
			logger.info("Sending Bytesmessage : {}", message);
			
			BytesMessage testMessage = session.createBytesMessage();
			
			testMessage.writeBytes(message.getBytes());
			testMessage.setJMSType("Java Honk type");
			
			producer.send(testMessage);
			
			logger.info("Bytesmessage sent successfully : {}", message);
			
		} catch (Exception e) {
			logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e);
		}
	}

}
  • JavaHonkQueueSubscriber.java: Solace queue subscriber class to get and process messages from the queue:
package com.javahonk.jms.solace;

import java.util.Hashtable;

import javax.annotation.PostConstruct;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;

import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;
import com.solacesystems.jms.message.SolBytesMessage;
import com.solacesystems.jms.message.SolTextMessage;

public class JavaHonkQueueSubscriber implements MessageListener {
	
	@Value("${SOLACE_INITIAL_CONTEXT_FACTORY}")
	private String SOLACE_INITIAL_CONTEXT_FACTORY;
	
	@Value("${SOLACE_HOST}")
	private String SOLACE_HOST;
	
	@Value("${SOLACE_VPN}")
	private String SOLACE_VPN;
	
	@Value("${SOLACE_CONNECTION_FACTORY}")
	private String SOLACE_CONNECTION_FACTORY;
	
	@Value("${SOLACE_OUTPUT_QUEUE_NAME}")
	private String SOLACE_OUTPUT_QUEUE_NAME;
	
	@Value("${SOLACE_USERNAME_QUEUE_SUB}")
	private String SOLACE_USERNAME_QUEUE_SUB;
	
	@Value("${SOLACE_PASSWORD_QUEUE_SUB}")
	private String SOLACE_PASSWORD_QUEUE_SUB;
	
	private static final Logger logger = LogManager.getLogger(JavaHonkQueueSubscriber.class);
	
	@PostConstruct
	public void init() throws JMSException, NamingException{
		
		System.out.println("JavaHonk Solace Subscriber initializing...");
    	
    	Hashtable<String, Object> env = new Hashtable<String, Object>();
        env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY);
        env.put(InitialContext.PROVIDER_URL, SOLACE_HOST);
        env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN);
        env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_QUEUE_SUB);
        env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_QUEUE_SUB);

        InitialContext initialContext = new InitialContext(env);
    	QueueConnectionFactory cf = (QueueConnectionFactory)initialContext.lookup(SOLACE_CONNECTION_FACTORY);
    	QueueConnection connection = cf.createQueueConnection();

        Session session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        Queue queue = (Queue)initialContext.lookup(SOLACE_OUTPUT_QUEUE_NAME);

        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(this);
        
        connection.start();  
        
        logger.info("Solace connection made successfully");
	}
    
   public void onMessage(Message testMessage) {

		try {
			
			if (testMessage instanceof SolBytesMessage) {

				BytesMessage bytesXMLMessage = ((BytesMessage) testMessage);
				byte[] b = new byte[(int) bytesXMLMessage.getBodyLength()];
				bytesXMLMessage.readBytes(b);
				
				//Print Message received as String
				logger.info("Message received:"+new String(b));
				
				//Get JMS type of message
				logger.info(bytesXMLMessage.getJMSType());
				
				//Dump all message info
				logger.info("Message: "+SolJmsUtility.dumpMessage(testMessage));	
				testMessage.acknowledge();

			}
			
			if (testMessage instanceof SolTextMessage) {
				System.out.println(((SolTextMessage) testMessage).getJMSType());
				System.out.println("Message received:"+ ((SolTextMessage) testMessage).getText());
				System.out.println("Message size: {} kb"+ SolJmsUtility.getMessageSize(testMessage));
				testMessage.acknowledge();
			}
			
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}
}
  • JavaHonkTopicPublisher.java: Publisher which publish message on Solace Topic:
package com.javahonk.jms.solace;

import java.util.Hashtable;

import javax.annotation.PostConstruct;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;

import com.solacesystems.jms.SupportedProperty;

public class JavaHonkTopicPublisher {

	@Value("${SOLACE_INITIAL_CONTEXT_FACTORY}")
	private String SOLACE_INITIAL_CONTEXT_FACTORY;
	
	@Value("${SOLACE_HOST}")
	private String SOLACE_HOST;
	
	@Value("${SOLACE_VPN}")
	private String SOLACE_VPN;
	
	@Value("${SOLACE_JMS_DIRECT_CONNECTION_FACTORY}")
	private String SOLACE_JMS_DIRECT_CONNECTION_FACTORY;
	
	@Value("${SOLACE_OUTPUT_TOPIC_NAME}")
	private String SOLACE_OUTPUT_TOPIC_NAME;
	
	@Value("${SOLACE_USERNAME_TOPIC_PUB}")
	private String SOLACE_USERNAME_TOPIC_PUB;
	
	@Value("${SOLACE_PASSWORD_TOPIC_PUB}")
	private String SOLACE_PASSWORD_TOPIC_PUB;
	
	private Session session;
	
	private static MessageProducer producer;
	
	private static final Logger logger = LogManager.getLogger(JavaHonkTopicPublisher.class);
	
	@PostConstruct
	public void init() throws JMSException, NamingException{
		
		System.out.println("JavaHonk Solace publisher initializing...");
    	
    	Hashtable<String, Object> env = new Hashtable<String, Object>();
        env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY);
        env.put(InitialContext.PROVIDER_URL, SOLACE_HOST);
        env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN);
        env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_TOPIC_PUB);
        env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_TOPIC_PUB);

        InitialContext initialContext = new InitialContext(env);
    	TopicConnectionFactory cf = (TopicConnectionFactory)initialContext.lookup(SOLACE_JMS_DIRECT_CONNECTION_FACTORY);
    	TopicConnection connection = cf.createTopicConnection();

        session = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);

        Destination destination = (Destination)initialContext.lookup(SOLACE_OUTPUT_TOPIC_NAME);

        producer = session.createProducer(destination);

        connection.start();  
        
        logger.info("Solace connection made successfully");
	}
    
	
	public void sendTextMessage(String message) {
		
		try {
			
			logger.info("Sending message : {}", message);
			
			TextMessage testMessage = session.createTextMessage(message);
			producer.send(testMessage);
			
			logger.info("Message sent successfully : {}", message);
			
		} catch (Exception e) {
			logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_TOPIC_NAME, e);
		}
	}
	
	public void sendBytesMessage(String message) {
		
		try {
			
			logger.info("Sending Bytesmessage : {}", message);
			
			BytesMessage testMessage = session.createBytesMessage();
			
			testMessage.writeBytes(message.getBytes());
			testMessage.setJMSType("Java Honk type");
			
			producer.send(testMessage);
			
			logger.info("Bytesmessage sent successfully : {}", message);
			
		} catch (Exception e) {
			logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_TOPIC_NAME, e);
		}
	}

}
  • JavaHonkTopicSubscriber.java: Subscriber class which subscriber and process messages from the Solace Topic:
package com.javahonk.jms.solace;

import java.util.Hashtable;

import javax.annotation.PostConstruct;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;

import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;
import com.solacesystems.jms.message.SolBytesMessage;
import com.solacesystems.jms.message.SolTextMessage;

public class JavaHonkTopicSubscriber implements MessageListener {
	
	@Value("${SOLACE_INITIAL_CONTEXT_FACTORY}")
	private String SOLACE_INITIAL_CONTEXT_FACTORY;
	
	@Value("${SOLACE_HOST}")
	private String SOLACE_HOST;
	
	@Value("${SOLACE_VPN}")
	private String SOLACE_VPN;
	
	@Value("${SOLACE_JMS_DIRECT_CONNECTION_FACTORY}")
	private String SOLACE_JMS_DIRECT_CONNECTION_FACTORY;
	
	@Value("${SOLACE_OUTPUT_TOPIC_NAME}")
	private String SOLACE_OUTPUT_TOPIC_NAME;
	
	@Value("${SOLACE_USERNAME_TOPIC_SUB}")
	private String SOLACE_USERNAME_TOPIC_SUB;
	
	@Value("${SOLACE_PASSWORD_TOPIC_SUB}")
	private String SOLACE_PASSWORD_TOPIC_SUB;
	
	private static final Logger logger = LogManager.getLogger(JavaHonkTopicSubscriber.class);
	
	@PostConstruct
	public void init() throws JMSException, NamingException{
		
		System.out.println("JavaHonk Solace Subscriber initializing...");
    	
    	Hashtable<String, Object> env = new Hashtable<String, Object>();
        env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY);
        env.put(InitialContext.PROVIDER_URL, SOLACE_HOST);
        env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN);
        env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_TOPIC_SUB);
        env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_TOPIC_SUB);

        InitialContext initialContext = new InitialContext(env);
        TopicConnectionFactory cf = (TopicConnectionFactory)initialContext.lookup(SOLACE_JMS_DIRECT_CONNECTION_FACTORY);
        TopicConnection connection = cf.createTopicConnection();

        Session session = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);

        Topic queue = (Topic)initialContext.lookup(SOLACE_OUTPUT_TOPIC_NAME);

        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(this);
        
        connection.start();  
        
        logger.info("Solace connection made successfully");
	}
    
   public void onMessage(Message testMessage) {

		try {
			if (testMessage instanceof SolBytesMessage) {

				BytesMessage bytesMessage = ((BytesMessage) testMessage);
				byte[] b = new byte[(int) bytesMessage.getBodyLength()];
				bytesMessage.readBytes(b);
				
				//Print Message received as String
				logger.info("Message received:"+new String(b));
				
				//Get JMS type of message
				logger.info(bytesMessage.getJMSType());
				
				//Dump all message info
				logger.info("Message: "+SolJmsUtility.dumpMessage(testMessage));	
				testMessage.acknowledge();

			}
			
			if (testMessage instanceof SolTextMessage) {
				System.out.println(((SolTextMessage) testMessage).getJMSType());
				System.out.println("Message received:"+ ((SolTextMessage) testMessage).getText());
				System.out.println("Message size: {} kb"+ SolJmsUtility.getMessageSize(testMessage));
				testMessage.acknowledge();
			}
			
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}
}
  • QueuePublisherMainApp.java: Test class to publish and subscriber message from the queue:
package com.javahonk.solace;

import javax.jms.JMSException;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.javahonk.jms.solace.JavaHonkQueuePublisher;

public class QueuePublisherMainApp {
	
	private static final Logger logger = LogManager.getLogger(QueuePublisherMainApp.class.getName());
	
	public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
		
		logger.info("Java Honk Solace queue adapter Starting...");
		
		setUncaughtExceptionHandler();
		
		ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");	
		
		JavaHonkQueuePublisher javaHonkQueuePublisher = context.getBean(JavaHonkQueuePublisher.class);
		
		for (int i = 0; i < 15; i++) {
			Thread.sleep(1000);
			javaHonkQueuePublisher.sendBytesMessage("JavaHonk Solace BytesMessage test");
			javaHonkQueuePublisher.sendTextMessage("JavaHonk Solace Text Message test");
		}
		
		keepThreadAlive();	
		
		registerShutdownHook(context);
		
	}

	private static void setUncaughtExceptionHandler() {
		Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 
            public void uncaughtException(Thread t, Throwable e) { 
                  logger.error("Thread " + t +  " threw uncaughtexception ", e); 
            }
        });  
	}
	
	private static void keepThreadAlive() {
		
		Thread keepAliveThread = new Thread(new Runnable() {
 
			@Override
			public void run() {
				while (!Thread.interrupted()) {
				}
			}
		});
    	
		keepAliveThread.start();
	}
	
	
	private static void registerShutdownHook(ApplicationContext context) {
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

			@Override
			public void run() {
				logger.info("Java Honk Solace queue adapter Exiting.");
				((AbstractApplicationContext) context).close();

			}
		}));
	}
}
  • TopicPublisherMainApp.java: Main test class which publish and subscriber message from Solace Topic:
package com.javahonk.solace;

import javax.jms.JMSException;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.javahonk.jms.solace.JavaHonkTopicPublisher;

public class TopicPublisherMainApp {
	
	private static final Logger logger = LogManager.getLogger(TopicPublisherMainApp.class.getName());
	
	public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
		
		logger.info("Java Honk Solace tpic adapter Starting...");
		
		setUncaughtExceptionHandler();
		
		ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");	
		
		JavaHonkTopicPublisher javaHonkTopicPublisher = context.getBean(JavaHonkTopicPublisher.class);
		
		for (int i = 0; i < 15; i++) {
			Thread.sleep(1000);
			javaHonkTopicPublisher.sendBytesMessage("JavaHonk Solace BytesMessage test");
			javaHonkTopicPublisher.sendTextMessage("JavaHonk Solace Text Message test");		
		}
		
		keepThreadAlive();	
		
		registerShutdownHook(context);
		
	}

	private static void setUncaughtExceptionHandler() {
		Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 
            public void uncaughtException(Thread t, Throwable e) { 
                  logger.error("Thread " + t +  " threw uncaughtexception ", e); 
            }
        });  
	}
	
	private static void keepThreadAlive() {
		
		Thread keepAliveThread = new Thread(new Runnable() {
 
			@Override
			public void run() {
				while (!Thread.interrupted()) {
				}
			}
		});
    	
		keepAliveThread.start();
	}
	
	
	private static void registerShutdownHook(ApplicationContext context) {
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

			@Override
			public void run() {
				logger.info("Java Honk Solace topic adapter Exiting.");
				((AbstractApplicationContext) context).close();

			}
		}));
	}
}
  • To run the application to consume and publish message on queue, right click QueuePublisherMainApp.java –> Run As –> Java Application and if everything goes well you will see message sent & receive details on the console:

Solace Topic Queue Publisher Subscriber Java Example

  • Same you could to if you want to publish and subscriber message form the topic, right click TopicPublisherMainApp.java –> Run As –> Java Application
  • For more information please read Solace official documentation here

Download project: SolaceJMSPublisherSubscriber

Leave a Reply

Your email address will not be published. Required fields are marked *