JMS Publish Subscribe Application

JMS Publish Subscribe Application

In previous tutorials you saw how to create Queue and Topic standalone application using JBoss JMS. In this demo you will see how to create publish subscribe application where publisher will publish message on the topic and subscriber will consume message from same topic.

Tools needed:

  • JBoss application server jboss-6.1.0.Final community version (You can use any version of JBoss application server which you could download from here). If you are using version other than jboss-6.1.0.Final community version only JMS configuration will be different and use this tutorial to configure Queue and topic.
  • Eclipse Kepler (You could use any version of eclipse above 4.2)

Steps:

  • Configure JMS Topic destination on jboss-6.1.0.Final application server: Go to jboss-6.1.0.Final\server\default\deploy\hornetq\hornetq-jms.xml and add below topic:

JBoss JMS Topic standalone application example

  • Create maven project name: JBossJMSPublishSubscribe and below is final project structure:

JMS Publish Subscribe Application

  • Configure JBoss application server in eclipse using this tutorial
  • Add server run time to include all JBoss related jars to class path as shown below:

JMS Publish Subscribe Application

JBoss JMS Queue standalone application

  • pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.javahonk</groupId>
  <artifactId>JBossJMSPublishSubscribe</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>JBossJMSPublishSubscribe</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>
  • Create class JBossJMSTopicPublisher.java which will publish message on Topic name: “javahonk/topic”:
package com.javahonk;
 
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;
 
public class JBossJMSTopicPublisher {
	
	String destinationName = "javahonk/topic";
	Context ic = null;
	ConnectionFactory cf = null;
	Connection connection = null;
	Topic topic = null;
	Session session = null;
	MessageProducer publisher = null;
 
	public static void main(String[] args) throws Exception {
		
		try {
			
			JBossJMSTopicPublisher jBossJMSTopicPublisher = new JBossJMSTopicPublisher();
			jBossJMSTopicPublisher.initializeConnection();
 
			// Read from command line
			BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));
 
			// Loop until the word "exit" is typed
			while (true) {
				String s = commandLine.readLine();
				if (s.equalsIgnoreCase("exit")) {
					jBossJMSTopicPublisher.close(); // close down connection
					System.exit(0);// exit program
				} else{
					jBossJMSTopicPublisher.sendJMSMessageOnTopic(s);					
				}	
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
 
	public static Context getInitialContext() throws javax.naming.NamingException {
 
		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
		p.put(Context.URL_PKG_PREFIXES, " org.jboss.naming:org.jnp.interfaces");
		p.put(Context.PROVIDER_URL, "jnp://localhost:1099");
		return new javax.naming.InitialContext(p);
	}
	
	public void initializeConnection() throws Exception {
		
		ic = getInitialContext();
		cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
		topic = (Topic) ic.lookup(destinationName);
		connection = cf.createConnection();	
		session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		publisher = session.createProducer(topic);
		connection.start();
		
	}
 
	public void sendJMSMessageOnTopic(String s) throws Exception {
		
		TextMessage message = session.createTextMessage(s);
		publisher.send(message);		
	}
	
	/* Close the JMS connection */
	public void close() throws Exception {
		try {
			connection.close();
		} finally {
			if (ic != null) {
				try {
					ic.close();
				} catch (Exception e) {
					throw e;
				}
			}
 
			try {
				if (connection != null) {
					connection.close();
				}
			} catch (JMSException jmse) {
				System.out.println("Could not close connection " + connection + " exception was " + jmse);
			}
		}
	}
 
}
  • Create class JBossJMSTopicSubscriber.java which will consume message from the topic name “javahonk/topic”. Whenever publisher publish the message on topic subscriber will consume and print message on console:
package com.javahonk;
 
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;
 
public class JBossJMSTopicSubscriber implements MessageListener {
	
	String destinationName = "javahonk/topic";
	Context ic = null;
	ConnectionFactory cf = null;
	Connection connection = null;
	Topic topic = null;
	Session session = null;
	MessageProducer publisher = null;
	
	public static void main(String[] args) throws Exception {
		
		try {
			
			JBossJMSTopicSubscriber jBossJMSTopicSubscriber = new JBossJMSTopicSubscriber();
			jBossJMSTopicSubscriber.initializeConnection();
 
			// Read from command line
			BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));
 
			// Loop until the word "exit" is typed
			while (true) {
				String s = commandLine.readLine();
				if (s.equalsIgnoreCase("exit")) {
					jBossJMSTopicSubscriber.close(); // close down connection
					System.exit(0);// exit program
				}	
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
 
	public static Context getInitialContext() throws javax.naming.NamingException {
 
		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
		p.put(Context.URL_PKG_PREFIXES, " org.jboss.naming:org.jnp.interfaces");
		p.put(Context.PROVIDER_URL, "jnp://localhost:1099");
		return new javax.naming.InitialContext(p);
	}
	
	public void initializeConnection() throws Exception {
		
		ic = getInitialContext();
		cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
		topic = (Topic) ic.lookup(destinationName);
		connection = cf.createConnection();	
		session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		MessageConsumer subscriber = session.createConsumer(topic);
		subscriber.setMessageListener(this);
		connection.start();
		
	}
	
	/* Close the JMS connection */
	public void close() throws Exception {
		try {
			connection.close();
		} finally {
			if (ic != null) {
				try {
					ic.close();
				} catch (Exception e) {
					throw e;
				}
			}
 
			try {
				if (connection != null) {
					connection.close();
				}
			} catch (JMSException jmse) {
				System.out.println("Could not close connection " + connection + " exception was " + jmse);
			}
		}
	}
 
	public synchronized void onMessage(Message message) {
		TextMessage text = (TextMessage) message;
		String strMessage = null;
		try {
			strMessage = text.getText();
		} catch (JMSException e) {
			e.printStackTrace();
		}
		System.out.println("Message received: " + strMessage);
	}		
 
}
  • Start JBoss server: Go to %JBOSS_HOME%/bin –> Double click run.bat to start the server. Once server is started it will pick up configured topic automatically. If you want to verify if topic nametestTopic deployed or not check on console you will see below:

JMS Publish Subscribe Application

  • Our all configuration been done and we are ready to start our JMS publish subscribe application. Start publisher and subscriber separately as you see above both class have main method. To start right click publisher and subscriber class –> Run As –> Java Application in eclipse. You will two console in eclipse one is for subscriber and another one is for publisher as below and both are ready to publish and consume the message:

JMS Publish Subscribe Application

JMS Publish Subscribe Application

  • Now go to JBossJMSTopicPublisher console, write any message and hit enter to publish message on topic:

JMS Publish Subscribe Application

  • Go to JBossJMSTopicSubscriber console where message will be consume from the topic and same message you will see as below:

JMS Publish Subscribe Application

  • That’s it JMS Publish Subscribe Application completed. For more information please visit JBoss documentation here

download  Download Project:  JBossJMSPublishSubscribe

Leave a Reply

Your email address will not be published. Required fields are marked *