JMS Publisher Multiple Subscriber Application
We will use JMS topic where publisher will publish the messages on the topic and multiple subscriber will receive message from the topic. If you want to see more tutorial on JMS topic please visit here. All configuration and settings are same which you can copy from here. Once all configuration and setting are done please include below class to your project:
- JMSTopicPublisher.java:
package com.javahonk; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.naming.Context; public class JMSTopicPublisher { String destinationName = "javahonk/topic"; Context ic = null; ConnectionFactory cf = null; Connection connection = null; Topic topic = null; Session session = null; MessageProducer publisher = null; public static void main(String[] args) throws Exception { try { JMSTopicPublisher jBossJMSTopicPublisher = new JMSTopicPublisher(); jBossJMSTopicPublisher.initializeConnection(); // Read from command line BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in)); // Loop until the word "exit" is typed while (true) { String s = commandLine.readLine(); if (s.equalsIgnoreCase("exit")) { jBossJMSTopicPublisher.close(); // close down connection System.exit(0);// exit program } else{ jBossJMSTopicPublisher.sendJMSMessageOnTopic(s); } } } catch (Exception e) { e.printStackTrace(); } } public static Context getInitialContext() throws javax.naming.NamingException { Properties p = new Properties(); p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); p.put(Context.URL_PKG_PREFIXES, " org.jboss.naming:org.jnp.interfaces"); p.put(Context.PROVIDER_URL, "jnp://localhost:1099"); return new javax.naming.InitialContext(p); } public void initializeConnection() throws Exception { ic = getInitialContext(); cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); topic = (Topic) ic.lookup(destinationName); connection = cf.createConnection(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); publisher = session.createProducer(topic); connection.start(); } public void sendJMSMessageOnTopic(String s) throws Exception { TextMessage message = session.createTextMessage(s); publisher.send(message); } /* Close the JMS connection */ public void close() throws Exception { try { connection.close(); } finally { if (ic != null) { try { ic.close(); } catch (Exception e) { throw e; } } try { if (connection != null) { connection.close(); } } catch (JMSException jmse) { System.out.println("Could not close connection " + connection + " exception was " + jmse); } } } }
- JMSTopicSubscriber1.java:
package com.javahonk; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.naming.Context; public class JMSTopicSubscriber1 implements MessageListener { String destinationName = "javahonk/topic"; Context ic = null; ConnectionFactory cf = null; Connection connection = null; Topic topic = null; Session session = null; MessageProducer publisher = null; public static void main(String[] args) throws Exception { try { JMSTopicSubscriber1 jBossJMSTopicSubscriber = new JMSTopicSubscriber1(); jBossJMSTopicSubscriber.initializeConnection(); // Read from command line BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in)); // Loop until the word "exit" is typed while (true) { String s = commandLine.readLine(); if (s.equalsIgnoreCase("exit")) { jBossJMSTopicSubscriber.close(); // close down connection System.exit(0);// exit program } } } catch (Exception e) { e.printStackTrace(); } } public static Context getInitialContext() throws javax.naming.NamingException { Properties p = new Properties(); p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); p.put(Context.URL_PKG_PREFIXES, " org.jboss.naming:org.jnp.interfaces"); p.put(Context.PROVIDER_URL, "jnp://localhost:1099"); return new javax.naming.InitialContext(p); } public void initializeConnection() throws Exception { ic = getInitialContext(); cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); topic = (Topic) ic.lookup(destinationName); connection = cf.createConnection(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer subscriber = session.createConsumer(topic); subscriber.setMessageListener(this); connection.start(); } /* Close the JMS connection */ public void close() throws Exception { try { connection.close(); } finally { if (ic != null) { try { ic.close(); } catch (Exception e) { throw e; } } try { if (connection != null) { connection.close(); } } catch (JMSException jmse) { System.out.println("Could not close connection " + connection + " exception was " + jmse); } } } public synchronized void onMessage(Message message) { TextMessage text = (TextMessage) message; String strMessage = null; try { strMessage = text.getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println("Message received: " + strMessage); } }
- JMSTopicSubscriber2.java:
package com.javahonk; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.naming.Context; public class JMSTopicSubscriber2 implements MessageListener { String destinationName = "javahonk/topic"; Context ic = null; ConnectionFactory cf = null; Connection connection = null; Topic topic = null; Session session = null; MessageProducer publisher = null; public static void main(String[] args) throws Exception { try { JMSTopicSubscriber2 jBossJMSTopicSubscriber = new JMSTopicSubscriber2(); jBossJMSTopicSubscriber.initializeConnection(); // Read from command line BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in)); // Loop until the word "exit" is typed while (true) { String s = commandLine.readLine(); if (s.equalsIgnoreCase("exit")) { jBossJMSTopicSubscriber.close(); // close down connection System.exit(0);// exit program } } } catch (Exception e) { e.printStackTrace(); } } public static Context getInitialContext() throws javax.naming.NamingException { Properties p = new Properties(); p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); p.put(Context.URL_PKG_PREFIXES, " org.jboss.naming:org.jnp.interfaces"); p.put(Context.PROVIDER_URL, "jnp://localhost:1099"); return new javax.naming.InitialContext(p); } public void initializeConnection() throws Exception { ic = getInitialContext(); cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); topic = (Topic) ic.lookup(destinationName); connection = cf.createConnection(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer subscriber = session.createConsumer(topic); subscriber.setMessageListener(this); connection.start(); } /* Close the JMS connection */ public void close() throws Exception { try { connection.close(); } finally { if (ic != null) { try { ic.close(); } catch (Exception e) { throw e; } } try { if (connection != null) { connection.close(); } } catch (JMSException jmse) { System.out.println("Could not close connection " + connection + " exception was " + jmse); } } } public synchronized void onMessage(Message message) { TextMessage text = (TextMessage) message; String strMessage = null; try { strMessage = text.getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println("Message received: " + strMessage); } }
- All classes have main method please run all as java application and go to publisher console type any message, all subscribers will receive same message:
- For more information please visit JMS tutorial here