Solace Topic Queue Publisher Subscriber Java Example
In recent posts you saw many tutorials on Solace. In this tutorial I will show you how to develop Solace adapter which can be use to publish message on queue/topic and subscriber to get message from queue/topic.
Tools needed:
- Eclipse (Any updated version)
- Java 1.8
- Maven 3.2.5 (It comes with Eclipse)
- Solace version 7.1.2.230
Maven project SolaceJMSPublisherSubscriber details structure below:
- Dependencies maven pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javahonk.jms.solaceadapter</groupId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SolaceJMSPubSubSpring</name> <url>http://maven.apache.org</url> <properties> <org.springframework.version>4.1.5.RELEASE</org.springframework.version> <log4j.version>1.2.16</log4j.version> <org.apache.log4j.version>2.1</org.apache.log4j.version> <com.tibco.version>5.1.2</com.tibco.version> <jms.version>1.1</jms.version> </properties> <dependencies> <!-- Application Context (depends on spring-core, spring-expression, spring-aop, spring-beans) This is the central artifact for Spring's Dependency Injection Container and is generally always defined --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <!-- Solace jars --> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> <scope>system</scope> <systemPath>${basedir}/lib/commons-lang-2.6.jar</systemPath> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.3</version> <scope>system</scope> <systemPath>${basedir}/lib/commons-logging-1.1.3.jar</systemPath> </dependency> <dependency> <groupId>geronimo-jms</groupId> <artifactId>geronimo-jms</artifactId> <version>1.1.1</version> <scope>system</scope> <systemPath>${basedir}/lib/geronimo-jms_1.1_spec-1.1.1.jar</systemPath> </dependency> <dependency> <groupId>sol-common</groupId> <artifactId>sol-common</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-common-7.1.2.230.jar</systemPath> </dependency> <dependency> <groupId>sol-jcsmp</groupId> <artifactId>sol-jcsmp</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-jcsmp-7.1.2.230.jar</systemPath> </dependency> <dependency> <groupId>sol-jms</groupId> <artifactId>sol-jms</artifactId> <version>7.1.2.230</version> <scope>system</scope> <systemPath>${basedir}/lib/sol-jms-7.1.2.230.jar</systemPath> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/config</directory> <excludes> <exclude>**/*.xml</exclude> <exclude>**/*.properties</exclude> </excludes> </resource> <resource> <directory>src/main/resources</directory> <excludes> <exclude>**/*.xml</exclude> <exclude>**/*.properties</exclude> </excludes> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.8</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> </plugins> </build> <artifactId>SolaceJMSPublisherSubscriber</artifactId> </project>
- log4j2.xml:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <Properties> <Property name="envrionment.target">DEV</Property> </Properties> <Properties> <Property name="logging.dir">./</Property> </Properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> </Console> <RollingFile name="RollingFile" fileName="./log/rolling-file.log" filePattern="${sys:logging.dir}/logs/rolling-file-%d{yyyy-MM-dd}-%i.log"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> <!-- TODO:Change to time based policy --> <Policies> <TimeBasedTriggeringPolicy interval="1" modulate="true" /> <SizeBasedTriggeringPolicy size="100 MB" /> </Policies> <DefaultRolloverStrategy max="4" /> </RollingFile> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Console" /> <!-- <AppenderRef ref="file" /> --> <AppenderRef ref="RollingFile" /> </Root> </Loggers> </Configuration>
- javahonk-solace.properties: Here you will enter all solace related properties. Please don’t forget to replace this file value with your own Solace properties:
SOLACE_INITIAL_CONTEXT_FACTORY=com.solacesystems.jndi.SolJNDIInitialContextFactory SOLACE_HOST=tcp://javahonk.com:55555 SOLACE_VPN=javahonk_VPN #Solace Queue Message properties SOLACE_CONNECTION_FACTORY=javahonk_VPN_CF SOLACE_OUTPUT_QUEUE_NAME=javahonk_LISTENER #Queue Publisher credential SOLACE_USERNAME_QUEUE_PUB=javahonk_USER_DEV SOLACE_PASSWORD_QUEUE_PUB=javahonk_Pass1_@DEV@ #Queue Subscriber credential SOLACE_USERNAME_QUEUE_SUB=javahonk_USER_DEV SOLACE_PASSWORD_QUEUE_SUB=javahonk_@DEV@ #Solace Topic Message properties SOLACE_JMS_DIRECT_CONNECTION_FACTORY=javahonk_VPN_DIRECT_CF SOLACE_OUTPUT_TOPIC_NAME=javahonk/TOPIC #Topic Publisher credential SOLACE_USERNAME_TOPIC_PUB=javahonk_USER_DEV SOLACE_PASSWORD_TOPIC_PUB=javahonk_Pass1_@DEV@ #Topic Subscriber credential SOLACE_USERNAME_TOPIC_SUB=javahonk_USER_DEV SOLACE_PASSWORD_TOPIC_SUB=javahonk_Pass1_@DEV@
- spring-context.xml: Main spring context file to load all configurations:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <context:annotation-config /> <context:component-scan base-package="com.javahonk.solace, com.javahonk.solace.messaging" /> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:javahonk-solace.properties</value> </list> </property> <property name="ignoreUnresolvablePlaceholders" value="true"/> </bean> <import resource="javahonk-solace-context.xml"/> </beans>
- javahonk-solace-context.xml: Seperate properties file to load all Solace related beans:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:lang="http://www.springframework.org/schema/lang" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <bean id="JavaHonkQueuePublisher" class="com.javahonk.jms.solace.JavaHonkQueuePublisher"></bean> <bean id="JavaHonkQueueSubscriber" class="com.javahonk.jms.solace.JavaHonkQueueSubscriber"></bean> <bean id="JavaHonkTopicPublisher" class="com.javahonk.jms.solace.JavaHonkTopicPublisher"></bean> <bean id="JavaHonkTopicSubscriber" class="com.javahonk.jms.solace.JavaHonkTopicSubscriber"></bean> </beans>
- JavaHonkQueuePublisher.java: Solace publisher class to publish the message on Solace queue:
package com.javahonk.jms.solace; import java.util.Hashtable; import javax.annotation.PostConstruct; import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import com.solacesystems.jms.SupportedProperty; public class JavaHonkQueuePublisher { @Value("${SOLACE_INITIAL_CONTEXT_FACTORY}") private String SOLACE_INITIAL_CONTEXT_FACTORY; @Value("${SOLACE_HOST}") private String SOLACE_HOST; @Value("${SOLACE_VPN}") private String SOLACE_VPN; @Value("${SOLACE_CONNECTION_FACTORY}") private String SOLACE_CONNECTION_FACTORY; @Value("${SOLACE_OUTPUT_QUEUE_NAME}") private String SOLACE_OUTPUT_QUEUE_NAME; @Value("${SOLACE_USERNAME_QUEUE_PUB}") private String SOLACE_USERNAME_QUEUE_PUB; @Value("${SOLACE_PASSWORD_QUEUE_PUB}") private String SOLACE_PASSWORD_QUEUE_PUB; private Session session; private static MessageProducer producer; private static final Logger logger = LogManager.getLogger(JavaHonkQueuePublisher.class); @PostConstruct public void init() throws JMSException, NamingException{ System.out.println("JavaHonk Solace publisher initializing..."); Hashtable<String, Object> env = new Hashtable<String, Object>(); env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY); env.put(InitialContext.PROVIDER_URL, SOLACE_HOST); env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN); env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_QUEUE_PUB); env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_QUEUE_PUB); InitialContext initialContext = new InitialContext(env); QueueConnectionFactory cf = (QueueConnectionFactory)initialContext.lookup(SOLACE_CONNECTION_FACTORY); QueueConnection connection = cf.createQueueConnection(); session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = (Destination)initialContext.lookup(SOLACE_OUTPUT_QUEUE_NAME); producer = session.createProducer(destination); connection.start(); logger.info("Solace connection made successfully"); } public void sendTextMessage(String message) { try { logger.info("Sending message : {}", message); TextMessage testMessage = session.createTextMessage(message); producer.send(testMessage); logger.info("Message sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e); } } public void sendBytesMessage(String message) { try { logger.info("Sending Bytesmessage : {}", message); BytesMessage testMessage = session.createBytesMessage(); testMessage.writeBytes(message.getBytes()); testMessage.setJMSType("Java Honk type"); producer.send(testMessage); logger.info("Bytesmessage sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e); } } }
- JavaHonkQueueSubscriber.java: Solace queue subscriber class to get and process messages from the queue:
package com.javahonk.jms.solace; import java.util.Hashtable; import javax.annotation.PostConstruct; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import com.solacesystems.jms.SolJmsUtility; import com.solacesystems.jms.SupportedProperty; import com.solacesystems.jms.message.SolBytesMessage; import com.solacesystems.jms.message.SolTextMessage; public class JavaHonkQueueSubscriber implements MessageListener { @Value("${SOLACE_INITIAL_CONTEXT_FACTORY}") private String SOLACE_INITIAL_CONTEXT_FACTORY; @Value("${SOLACE_HOST}") private String SOLACE_HOST; @Value("${SOLACE_VPN}") private String SOLACE_VPN; @Value("${SOLACE_CONNECTION_FACTORY}") private String SOLACE_CONNECTION_FACTORY; @Value("${SOLACE_OUTPUT_QUEUE_NAME}") private String SOLACE_OUTPUT_QUEUE_NAME; @Value("${SOLACE_USERNAME_QUEUE_SUB}") private String SOLACE_USERNAME_QUEUE_SUB; @Value("${SOLACE_PASSWORD_QUEUE_SUB}") private String SOLACE_PASSWORD_QUEUE_SUB; private static final Logger logger = LogManager.getLogger(JavaHonkQueueSubscriber.class); @PostConstruct public void init() throws JMSException, NamingException{ System.out.println("JavaHonk Solace Subscriber initializing..."); Hashtable<String, Object> env = new Hashtable<String, Object>(); env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY); env.put(InitialContext.PROVIDER_URL, SOLACE_HOST); env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN); env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_QUEUE_SUB); env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_QUEUE_SUB); InitialContext initialContext = new InitialContext(env); QueueConnectionFactory cf = (QueueConnectionFactory)initialContext.lookup(SOLACE_CONNECTION_FACTORY); QueueConnection connection = cf.createQueueConnection(); Session session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = (Queue)initialContext.lookup(SOLACE_OUTPUT_QUEUE_NAME); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(this); connection.start(); logger.info("Solace connection made successfully"); } public void onMessage(Message testMessage) { try { if (testMessage instanceof SolBytesMessage) { BytesMessage bytesXMLMessage = ((BytesMessage) testMessage); byte[] b = new byte[(int) bytesXMLMessage.getBodyLength()]; bytesXMLMessage.readBytes(b); //Print Message received as String logger.info("Message received:"+new String(b)); //Get JMS type of message logger.info(bytesXMLMessage.getJMSType()); //Dump all message info logger.info("Message: "+SolJmsUtility.dumpMessage(testMessage)); testMessage.acknowledge(); } if (testMessage instanceof SolTextMessage) { System.out.println(((SolTextMessage) testMessage).getJMSType()); System.out.println("Message received:"+ ((SolTextMessage) testMessage).getText()); System.out.println("Message size: {} kb"+ SolJmsUtility.getMessageSize(testMessage)); testMessage.acknowledge(); } } catch (JMSException e) { e.printStackTrace(); } } }
- JavaHonkTopicPublisher.java: Publisher which publish message on Solace Topic:
package com.javahonk.jms.solace; import java.util.Hashtable; import javax.annotation.PostConstruct; import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import com.solacesystems.jms.SupportedProperty; public class JavaHonkTopicPublisher { @Value("${SOLACE_INITIAL_CONTEXT_FACTORY}") private String SOLACE_INITIAL_CONTEXT_FACTORY; @Value("${SOLACE_HOST}") private String SOLACE_HOST; @Value("${SOLACE_VPN}") private String SOLACE_VPN; @Value("${SOLACE_JMS_DIRECT_CONNECTION_FACTORY}") private String SOLACE_JMS_DIRECT_CONNECTION_FACTORY; @Value("${SOLACE_OUTPUT_TOPIC_NAME}") private String SOLACE_OUTPUT_TOPIC_NAME; @Value("${SOLACE_USERNAME_TOPIC_PUB}") private String SOLACE_USERNAME_TOPIC_PUB; @Value("${SOLACE_PASSWORD_TOPIC_PUB}") private String SOLACE_PASSWORD_TOPIC_PUB; private Session session; private static MessageProducer producer; private static final Logger logger = LogManager.getLogger(JavaHonkTopicPublisher.class); @PostConstruct public void init() throws JMSException, NamingException{ System.out.println("JavaHonk Solace publisher initializing..."); Hashtable<String, Object> env = new Hashtable<String, Object>(); env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY); env.put(InitialContext.PROVIDER_URL, SOLACE_HOST); env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN); env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_TOPIC_PUB); env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_TOPIC_PUB); InitialContext initialContext = new InitialContext(env); TopicConnectionFactory cf = (TopicConnectionFactory)initialContext.lookup(SOLACE_JMS_DIRECT_CONNECTION_FACTORY); TopicConnection connection = cf.createTopicConnection(); session = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = (Destination)initialContext.lookup(SOLACE_OUTPUT_TOPIC_NAME); producer = session.createProducer(destination); connection.start(); logger.info("Solace connection made successfully"); } public void sendTextMessage(String message) { try { logger.info("Sending message : {}", message); TextMessage testMessage = session.createTextMessage(message); producer.send(testMessage); logger.info("Message sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_TOPIC_NAME, e); } } public void sendBytesMessage(String message) { try { logger.info("Sending Bytesmessage : {}", message); BytesMessage testMessage = session.createBytesMessage(); testMessage.writeBytes(message.getBytes()); testMessage.setJMSType("Java Honk type"); producer.send(testMessage); logger.info("Bytesmessage sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_TOPIC_NAME, e); } } }
- JavaHonkTopicSubscriber.java: Subscriber class which subscriber and process messages from the Solace Topic:
package com.javahonk.jms.solace; import java.util.Hashtable; import javax.annotation.PostConstruct; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import com.solacesystems.jms.SolJmsUtility; import com.solacesystems.jms.SupportedProperty; import com.solacesystems.jms.message.SolBytesMessage; import com.solacesystems.jms.message.SolTextMessage; public class JavaHonkTopicSubscriber implements MessageListener { @Value("${SOLACE_INITIAL_CONTEXT_FACTORY}") private String SOLACE_INITIAL_CONTEXT_FACTORY; @Value("${SOLACE_HOST}") private String SOLACE_HOST; @Value("${SOLACE_VPN}") private String SOLACE_VPN; @Value("${SOLACE_JMS_DIRECT_CONNECTION_FACTORY}") private String SOLACE_JMS_DIRECT_CONNECTION_FACTORY; @Value("${SOLACE_OUTPUT_TOPIC_NAME}") private String SOLACE_OUTPUT_TOPIC_NAME; @Value("${SOLACE_USERNAME_TOPIC_SUB}") private String SOLACE_USERNAME_TOPIC_SUB; @Value("${SOLACE_PASSWORD_TOPIC_SUB}") private String SOLACE_PASSWORD_TOPIC_SUB; private static final Logger logger = LogManager.getLogger(JavaHonkTopicSubscriber.class); @PostConstruct public void init() throws JMSException, NamingException{ System.out.println("JavaHonk Solace Subscriber initializing..."); Hashtable<String, Object> env = new Hashtable<String, Object>(); env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY); env.put(InitialContext.PROVIDER_URL, SOLACE_HOST); env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN); env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_TOPIC_SUB); env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_TOPIC_SUB); InitialContext initialContext = new InitialContext(env); TopicConnectionFactory cf = (TopicConnectionFactory)initialContext.lookup(SOLACE_JMS_DIRECT_CONNECTION_FACTORY); TopicConnection connection = cf.createTopicConnection(); Session session = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); Topic queue = (Topic)initialContext.lookup(SOLACE_OUTPUT_TOPIC_NAME); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(this); connection.start(); logger.info("Solace connection made successfully"); } public void onMessage(Message testMessage) { try { if (testMessage instanceof SolBytesMessage) { BytesMessage bytesMessage = ((BytesMessage) testMessage); byte[] b = new byte[(int) bytesMessage.getBodyLength()]; bytesMessage.readBytes(b); //Print Message received as String logger.info("Message received:"+new String(b)); //Get JMS type of message logger.info(bytesMessage.getJMSType()); //Dump all message info logger.info("Message: "+SolJmsUtility.dumpMessage(testMessage)); testMessage.acknowledge(); } if (testMessage instanceof SolTextMessage) { System.out.println(((SolTextMessage) testMessage).getJMSType()); System.out.println("Message received:"+ ((SolTextMessage) testMessage).getText()); System.out.println("Message size: {} kb"+ SolJmsUtility.getMessageSize(testMessage)); testMessage.acknowledge(); } } catch (JMSException e) { e.printStackTrace(); } } }
- QueuePublisherMainApp.java: Test class to publish and subscriber message from the queue:
package com.javahonk.solace; import javax.jms.JMSException; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.javahonk.jms.solace.JavaHonkQueuePublisher; public class QueuePublisherMainApp { private static final Logger logger = LogManager.getLogger(QueuePublisherMainApp.class.getName()); public static void main(String[] args) throws NamingException, JMSException, InterruptedException { logger.info("Java Honk Solace queue adapter Starting..."); setUncaughtExceptionHandler(); ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); JavaHonkQueuePublisher javaHonkQueuePublisher = context.getBean(JavaHonkQueuePublisher.class); for (int i = 0; i < 15; i++) { Thread.sleep(1000); javaHonkQueuePublisher.sendBytesMessage("JavaHonk Solace BytesMessage test"); javaHonkQueuePublisher.sendTextMessage("JavaHonk Solace Text Message test"); } keepThreadAlive(); registerShutdownHook(context); } private static void setUncaughtExceptionHandler() { Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { logger.error("Thread " + t + " threw uncaughtexception ", e); } }); } private static void keepThreadAlive() { Thread keepAliveThread = new Thread(new Runnable() { @Override public void run() { while (!Thread.interrupted()) { } } }); keepAliveThread.start(); } private static void registerShutdownHook(ApplicationContext context) { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { logger.info("Java Honk Solace queue adapter Exiting."); ((AbstractApplicationContext) context).close(); } })); } }
- TopicPublisherMainApp.java: Main test class which publish and subscriber message from Solace Topic:
package com.javahonk.solace; import javax.jms.JMSException; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.javahonk.jms.solace.JavaHonkTopicPublisher; public class TopicPublisherMainApp { private static final Logger logger = LogManager.getLogger(TopicPublisherMainApp.class.getName()); public static void main(String[] args) throws NamingException, JMSException, InterruptedException { logger.info("Java Honk Solace tpic adapter Starting..."); setUncaughtExceptionHandler(); ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); JavaHonkTopicPublisher javaHonkTopicPublisher = context.getBean(JavaHonkTopicPublisher.class); for (int i = 0; i < 15; i++) { Thread.sleep(1000); javaHonkTopicPublisher.sendBytesMessage("JavaHonk Solace BytesMessage test"); javaHonkTopicPublisher.sendTextMessage("JavaHonk Solace Text Message test"); } keepThreadAlive(); registerShutdownHook(context); } private static void setUncaughtExceptionHandler() { Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { logger.error("Thread " + t + " threw uncaughtexception ", e); } }); } private static void keepThreadAlive() { Thread keepAliveThread = new Thread(new Runnable() { @Override public void run() { while (!Thread.interrupted()) { } } }); keepAliveThread.start(); } private static void registerShutdownHook(ApplicationContext context) { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { logger.info("Java Honk Solace topic adapter Exiting."); ((AbstractApplicationContext) context).close(); } })); } }
- To run the application to consume and publish message on queue, right click QueuePublisherMainApp.java –> Run As –> Java Application and if everything goes well you will see message sent & receive details on the console:
- Same you could to if you want to publish and subscriber message form the topic, right click TopicPublisherMainApp.java –> Run As –> Java Application
- For more information please read Solace official documentation here
Download project: SolaceJMSPublisherSubscriber