Set Solace Custom Properties Retrieve Publisher Subscriber
In last many tutorials you saw how to send and receive message on Solace Messaging via different technique. In this tutorial I will show you how to set custom properties on Solace massage while publishing the message and through Subscriber how to retrieve it:
- To set custom properties on publisher do below:
public void sendBytesMessage(String message) { try { logger.info("Sending Bytesmessage : {}", message); BytesMessage testMessage = session.createBytesMessage(); testMessage.writeBytes(message.getBytes()); testMessage.setJMSType("Java Honk type"); testMessage.setStringProperty("target", "target"); testMessage.setStringProperty("command", "command"); producer.send(testMessage); logger.info("Bytesmessage sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e); } }
- And on subscriber retrieve custom properties as below:
public void onMessage(Message testMessage) { try { if (testMessage instanceof SolBytesMessage) { SolBytesMessage bytesMessage2 = ((SolBytesMessage) testMessage); Map<String, Object> properties = bytesMessage2.getProperties(); for(Map.Entry<String, Object> property : properties.entrySet()){ System.out.println("Key: "+property.getKey()); System.out.println("Value: "+property.getValue()); } logger.info("Message: "+SolJmsUtility.dumpMessage(testMessage)); testMessage.acknowledge(); } } catch (Exception e) { e.printStackTrace(); } }
- Complete publisher class for your reference:
package com.javahonk.jms.solace; import java.util.Hashtable; import javax.annotation.PostConstruct; import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import com.javahonk.proto.ConvertToByteAndObject; import com.solacesystems.jms.SupportedProperty; public class JavaHonkQueuePublisher { @Value("${SOLACE_INITIAL_CONTEXT_FACTORY}") private String SOLACE_INITIAL_CONTEXT_FACTORY; @Value("${SOLACE_HOST}") private String SOLACE_HOST; @Value("${SOLACE_VPN}") private String SOLACE_VPN; @Value("${SOLACE_CONNECTION_FACTORY}") private String SOLACE_CONNECTION_FACTORY; @Value("${SOLACE_OUTPUT_QUEUE_NAME}") private String SOLACE_OUTPUT_QUEUE_NAME; @Value("${SOLACE_USERNAME_QUEUE_PUB}") private String SOLACE_USERNAME_QUEUE_PUB; @Value("${SOLACE_PASSWORD_QUEUE_PUB}") private String SOLACE_PASSWORD_QUEUE_PUB; private Session session; private static MessageProducer producer; private static final Logger logger = LogManager.getLogger(JavaHonkQueuePublisher.class); @PostConstruct public void init() throws JMSException, NamingException{ System.out.println("JavaHonk Solace publisher initializing..."); Hashtable<String, Object> env = new Hashtable<String, Object>(); env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY); env.put(InitialContext.PROVIDER_URL, SOLACE_HOST); env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN); env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_QUEUE_PUB); env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_QUEUE_PUB); InitialContext initialContext = new InitialContext(env); QueueConnectionFactory cf = (QueueConnectionFactory)initialContext.lookup(SOLACE_CONNECTION_FACTORY); QueueConnection connection = cf.createQueueConnection(); session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = (Destination)initialContext.lookup(SOLACE_OUTPUT_QUEUE_NAME); producer = session.createProducer(destination); connection.start(); logger.info("Solace connection made successfully"); } public void sendTextMessage(String message) { try { logger.info("Sending message : {}", message); TextMessage testMessage = session.createTextMessage(message); producer.send(testMessage); logger.info("Message sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e); } } public void sendBytesMessage(String message) { try { logger.info("Sending Bytesmessage : {}", message); BytesMessage testMessage = session.createBytesMessage(); testMessage.writeBytes(message.getBytes()); testMessage.setJMSType("Java Honk type"); testMessage.setStringProperty("target", "target"); testMessage.setStringProperty("command", "command"); producer.send(testMessage); logger.info("Bytesmessage sent successfully : {}", message); } catch (Exception e) { logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e); } } }
- Complete subscriber class for your reference:
package com.javahonk.jms.solace; import java.io.IOException; import java.util.Enumeration; import java.util.Hashtable; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import javax.annotation.PostConstruct; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import com.javahonk.proto.ConvertToByteAndObject; import com.solacesystems.jms.SolJmsUtility; import com.solacesystems.jms.SupportedProperty; import com.solacesystems.jms.message.SolBytesMessage; import com.solacesystems.jms.message.SolTextMessage; public class JavaHonkQueueSubscriber implements MessageListener { @Value("${SOLACE_INITIAL_CONTEXT_FACTORY}") private String SOLACE_INITIAL_CONTEXT_FACTORY; @Value("${SOLACE_HOST}") private String SOLACE_HOST; @Value("${SOLACE_VPN}") private String SOLACE_VPN; @Value("${SOLACE_CONNECTION_FACTORY}") private String SOLACE_CONNECTION_FACTORY; @Value("${SOLACE_OUTPUT_QUEUE_NAME}") private String SOLACE_OUTPUT_QUEUE_NAME; @Value("${SOLACE_USERNAME_QUEUE_SUB}") private String SOLACE_USERNAME_QUEUE_SUB; @Value("${SOLACE_PASSWORD_QUEUE_SUB}") private String SOLACE_PASSWORD_QUEUE_SUB; private static final Logger logger = LogManager.getLogger(JavaHonkQueueSubscriber.class); @PostConstruct public void init() throws JMSException, NamingException{ System.out.println("JavaHonk Solace Subscriber initializing..."); Hashtable<String, Object> env = new Hashtable<String, Object>(); env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY); env.put(InitialContext.PROVIDER_URL, SOLACE_HOST); env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN); env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_QUEUE_SUB); env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_QUEUE_SUB); InitialContext initialContext = new InitialContext(env); QueueConnectionFactory cf = (QueueConnectionFactory)initialContext.lookup(SOLACE_CONNECTION_FACTORY); QueueConnection connection = cf.createQueueConnection(); Session session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = (Queue)initialContext.lookup(SOLACE_OUTPUT_QUEUE_NAME); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(this); connection.start(); logger.info("Solace connection made successfully"); } public void onMessage(Message testMessage) { try { if (testMessage instanceof SolBytesMessage) { SolBytesMessage bytesMessage2 = ((SolBytesMessage) testMessage); Map<String, Object> properties = bytesMessage2.getProperties(); for(Map.Entry<String, Object> property : properties.entrySet()){ System.out.println("Key: "+property.getKey()); System.out.println("Value: "+property.getValue()); } logger.info("Message: "+SolJmsUtility.dumpMessage(testMessage)); testMessage.acknowledge(); } } catch (Exception e) { e.printStackTrace(); } } }
- For more information please refer Solace official documentation here