JMS Publisher Multiple Subscriber Application

JMS Publisher Multiple Subscriber Application

We will use JMS topic where publisher will publish the messages on the topic and multiple subscriber will receive message from the topic. If you want to see more tutorial on JMS topic please visit here. All configuration and settings are same which you can copy from here. Once all configuration and setting are done please include below class to your project:

  • JMSTopicPublisher.java:
package com.javahonk;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;

public class JMSTopicPublisher {
	
	String destinationName = "javahonk/topic";
	Context ic = null;
	ConnectionFactory cf = null;
	Connection connection = null;
	Topic topic = null;
	Session session = null;
	MessageProducer publisher = null;

	public static void main(String[] args) throws Exception {
		
		try {
			
			JMSTopicPublisher jBossJMSTopicPublisher = new JMSTopicPublisher();
			jBossJMSTopicPublisher.initializeConnection();

			// Read from command line
			BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));

			// Loop until the word "exit" is typed
			while (true) {
				String s = commandLine.readLine();
				if (s.equalsIgnoreCase("exit")) {
					jBossJMSTopicPublisher.close(); // close down connection
					System.exit(0);// exit program
				} else{
					jBossJMSTopicPublisher.sendJMSMessageOnTopic(s);					
				}	
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static Context getInitialContext() throws javax.naming.NamingException {

		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
		p.put(Context.URL_PKG_PREFIXES, " org.jboss.naming:org.jnp.interfaces");
		p.put(Context.PROVIDER_URL, "jnp://localhost:1099");
		return new javax.naming.InitialContext(p);
	}
	
	public void initializeConnection() throws Exception {
		
		ic = getInitialContext();
		cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
		topic = (Topic) ic.lookup(destinationName);
		connection = cf.createConnection();	
		session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		publisher = session.createProducer(topic);
		connection.start();
		
	}

	public void sendJMSMessageOnTopic(String s) throws Exception {
		
		TextMessage message = session.createTextMessage(s);
		publisher.send(message);		
	}
	
	/* Close the JMS connection */
	public void close() throws Exception {
		try {
			connection.close();
		} finally {
			if (ic != null) {
				try {
					ic.close();
				} catch (Exception e) {
					throw e;
				}
			}

			try {
				if (connection != null) {
					connection.close();
				}
			} catch (JMSException jmse) {
				System.out.println("Could not close connection " + connection + " exception was " + jmse);
			}
		}
	}

}
  • JMSTopicSubscriber1.java:
package com.javahonk;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;

public class JMSTopicSubscriber1 implements MessageListener {
	
	String destinationName = "javahonk/topic";
	Context ic = null;
	ConnectionFactory cf = null;
	Connection connection = null;
	Topic topic = null;
	Session session = null;
	MessageProducer publisher = null;
	
	public static void main(String[] args) throws Exception {
		
		try {
			
			JMSTopicSubscriber1 jBossJMSTopicSubscriber = new JMSTopicSubscriber1();
			jBossJMSTopicSubscriber.initializeConnection();

			// Read from command line
			BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));

			// Loop until the word "exit" is typed
			while (true) {
				String s = commandLine.readLine();
				if (s.equalsIgnoreCase("exit")) {
					jBossJMSTopicSubscriber.close(); // close down connection
					System.exit(0);// exit program
				}	
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static Context getInitialContext() throws javax.naming.NamingException {

		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
		p.put(Context.URL_PKG_PREFIXES, " org.jboss.naming:org.jnp.interfaces");
		p.put(Context.PROVIDER_URL, "jnp://localhost:1099");
		return new javax.naming.InitialContext(p);
	}
	
	public void initializeConnection() throws Exception {
		
		ic = getInitialContext();
		cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
		topic = (Topic) ic.lookup(destinationName);
		connection = cf.createConnection();	
		session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		MessageConsumer subscriber = session.createConsumer(topic);
		subscriber.setMessageListener(this);
		connection.start();
		
	}
	
	/* Close the JMS connection */
	public void close() throws Exception {
		try {
			connection.close();
		} finally {
			if (ic != null) {
				try {
					ic.close();
				} catch (Exception e) {
					throw e;
				}
			}

			try {
				if (connection != null) {
					connection.close();
				}
			} catch (JMSException jmse) {
				System.out.println("Could not close connection " + connection + " exception was " + jmse);
			}
		}
	}

	public synchronized void onMessage(Message message) {
		TextMessage text = (TextMessage) message;
		String strMessage = null;
		try {
			strMessage = text.getText();
		} catch (JMSException e) {
			e.printStackTrace();
		}
		System.out.println("Message received: " + strMessage);
	}	
	

}
  • JMSTopicSubscriber2.java:
package com.javahonk;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;

public class JMSTopicSubscriber2 implements MessageListener {
	
	String destinationName = "javahonk/topic";
	Context ic = null;
	ConnectionFactory cf = null;
	Connection connection = null;
	Topic topic = null;
	Session session = null;
	MessageProducer publisher = null;
	
	public static void main(String[] args) throws Exception {
		
		try {
			
			JMSTopicSubscriber2 jBossJMSTopicSubscriber = new JMSTopicSubscriber2();
			jBossJMSTopicSubscriber.initializeConnection();

			// Read from command line
			BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));

			// Loop until the word "exit" is typed
			while (true) {
				String s = commandLine.readLine();
				if (s.equalsIgnoreCase("exit")) {
					jBossJMSTopicSubscriber.close(); // close down connection
					System.exit(0);// exit program
				}	
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static Context getInitialContext() throws javax.naming.NamingException {

		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
		p.put(Context.URL_PKG_PREFIXES, " org.jboss.naming:org.jnp.interfaces");
		p.put(Context.PROVIDER_URL, "jnp://localhost:1099");
		return new javax.naming.InitialContext(p);
	}
	
	public void initializeConnection() throws Exception {
		
		ic = getInitialContext();
		cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
		topic = (Topic) ic.lookup(destinationName);
		connection = cf.createConnection();	
		session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		MessageConsumer subscriber = session.createConsumer(topic);
		subscriber.setMessageListener(this);
		connection.start();
		
	}
	
	/* Close the JMS connection */
	public void close() throws Exception {
		try {
			connection.close();
		} finally {
			if (ic != null) {
				try {
					ic.close();
				} catch (Exception e) {
					throw e;
				}
			}

			try {
				if (connection != null) {
					connection.close();
				}
			} catch (JMSException jmse) {
				System.out.println("Could not close connection " + connection + " exception was " + jmse);
			}
		}
	}

	public synchronized void onMessage(Message message) {
		TextMessage text = (TextMessage) message;
		String strMessage = null;
		try {
			strMessage = text.getText();
		} catch (JMSException e) {
			e.printStackTrace();
		}
		System.out.println("Message received: " + strMessage);
	}	
	

}
  • All classes have main method please run all as java application and go to publisher console type any message, all subscribers will receive same message:

JMS Publisher Multiple Subscriber Application JMS Publisher Multiple Subscriber Application JMS Publisher Multiple Subscriber Application

  • For more information please visit JMS tutorial here

Leave a Reply

Your email address will not be published. Required fields are marked *