Set Solace Custom Properties Retrieve Publisher Subscriber

Set Solace Custom Properties Retrieve Publisher Subscriber

In last many tutorials you saw how to send and receive message on Solace Messaging via different technique. In this tutorial I will show you how to set custom properties on Solace massage while publishing the message and through Subscriber how to retrieve it:

  • To set custom properties on publisher do below:
public void sendBytesMessage(String message) {
		
		try {
			
			logger.info("Sending Bytesmessage : {}", message);
			
			BytesMessage testMessage = session.createBytesMessage();
			
			testMessage.writeBytes(message.getBytes());
			testMessage.setJMSType("Java Honk type");
			testMessage.setStringProperty("target", "target");
			testMessage.setStringProperty("command", "command");
			
			producer.send(testMessage);
			
			logger.info("Bytesmessage sent successfully : {}", message);
			
		} catch (Exception e) {
			logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e);
		}
}
  • And on subscriber retrieve custom properties as below:
public void onMessage(Message testMessage) {

		try {
			
			if (testMessage instanceof SolBytesMessage) {
				
				SolBytesMessage bytesMessage2 = ((SolBytesMessage) testMessage);
				Map<String, Object>  properties = bytesMessage2.getProperties();
				
				for(Map.Entry<String, Object> property : properties.entrySet()){
					
					System.out.println("Key: "+property.getKey());
					System.out.println("Value: "+property.getValue());
					
				}
				
				logger.info("Message: "+SolJmsUtility.dumpMessage(testMessage));	
				testMessage.acknowledge();

			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}

}
  • Complete publisher class for your reference:
package com.javahonk.jms.solace;

import java.util.Hashtable;

import javax.annotation.PostConstruct;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;

import com.javahonk.proto.ConvertToByteAndObject;
import com.solacesystems.jms.SupportedProperty;

public class JavaHonkQueuePublisher {

	@Value("${SOLACE_INITIAL_CONTEXT_FACTORY}")
	private String SOLACE_INITIAL_CONTEXT_FACTORY;
	
	@Value("${SOLACE_HOST}")
	private String SOLACE_HOST;
	
	@Value("${SOLACE_VPN}")
	private String SOLACE_VPN;
	
	@Value("${SOLACE_CONNECTION_FACTORY}")
	private String SOLACE_CONNECTION_FACTORY;
	
	@Value("${SOLACE_OUTPUT_QUEUE_NAME}")
	private String SOLACE_OUTPUT_QUEUE_NAME;
	
	@Value("${SOLACE_USERNAME_QUEUE_PUB}")
	private String SOLACE_USERNAME_QUEUE_PUB;
	
	@Value("${SOLACE_PASSWORD_QUEUE_PUB}")
	private String SOLACE_PASSWORD_QUEUE_PUB;
	
	private Session session;
	
	private static MessageProducer producer;
	
	private static final Logger logger = LogManager.getLogger(JavaHonkQueuePublisher.class);
	
	@PostConstruct
	public void init() throws JMSException, NamingException{
		
		System.out.println("JavaHonk Solace publisher initializing...");
    	
    	Hashtable<String, Object> env = new Hashtable<String, Object>();
        env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY);
        env.put(InitialContext.PROVIDER_URL, SOLACE_HOST);
        env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN);
        env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_QUEUE_PUB);
        env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_QUEUE_PUB);

        InitialContext initialContext = new InitialContext(env);
    	QueueConnectionFactory cf = (QueueConnectionFactory)initialContext.lookup(SOLACE_CONNECTION_FACTORY);
    	QueueConnection connection = cf.createQueueConnection();

        session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        Destination destination = (Destination)initialContext.lookup(SOLACE_OUTPUT_QUEUE_NAME);

        producer = session.createProducer(destination);

        connection.start();  
        
        logger.info("Solace connection made successfully");
	}
    
	
	public void sendTextMessage(String message) {
		
		try {
			
			logger.info("Sending message : {}", message);
			
			TextMessage testMessage = session.createTextMessage(message);
			producer.send(testMessage);
			
			logger.info("Message sent successfully : {}", message);
			
		} catch (Exception e) {
			logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e);
		}
	}
	
	public void sendBytesMessage(String message) {
		
		try {
			
			logger.info("Sending Bytesmessage : {}", message);
			
			BytesMessage testMessage = session.createBytesMessage();
			
			testMessage.writeBytes(message.getBytes());
			testMessage.setJMSType("Java Honk type");
			testMessage.setStringProperty("target", "target");
			testMessage.setStringProperty("command", "command");
			
			producer.send(testMessage);
			
			logger.info("Bytesmessage sent successfully : {}", message);
			
		} catch (Exception e) {
			logger.error("Error sending message to output queue name:{} --> details exceptions: --> ", SOLACE_OUTPUT_QUEUE_NAME, e);
		}
	}

}
  • Complete subscriber class for your reference:
package com.javahonk.jms.solace;

import java.io.IOException;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import javax.annotation.PostConstruct;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;

import com.javahonk.proto.ConvertToByteAndObject;
import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;
import com.solacesystems.jms.message.SolBytesMessage;
import com.solacesystems.jms.message.SolTextMessage;

public class JavaHonkQueueSubscriber implements MessageListener {
	
	@Value("${SOLACE_INITIAL_CONTEXT_FACTORY}")
	private String SOLACE_INITIAL_CONTEXT_FACTORY;
	
	@Value("${SOLACE_HOST}")
	private String SOLACE_HOST;
	
	@Value("${SOLACE_VPN}")
	private String SOLACE_VPN;
	
	@Value("${SOLACE_CONNECTION_FACTORY}")
	private String SOLACE_CONNECTION_FACTORY;
	
	@Value("${SOLACE_OUTPUT_QUEUE_NAME}")
	private String SOLACE_OUTPUT_QUEUE_NAME;
	
	@Value("${SOLACE_USERNAME_QUEUE_SUB}")
	private String SOLACE_USERNAME_QUEUE_SUB;
	
	@Value("${SOLACE_PASSWORD_QUEUE_SUB}")
	private String SOLACE_PASSWORD_QUEUE_SUB;
	
	private static final Logger logger = LogManager.getLogger(JavaHonkQueueSubscriber.class);
	
	@PostConstruct
	public void init() throws JMSException, NamingException{
		
		System.out.println("JavaHonk Solace Subscriber initializing...");
    	
    	Hashtable<String, Object> env = new Hashtable<String, Object>();
        env.put(InitialContext.INITIAL_CONTEXT_FACTORY, SOLACE_INITIAL_CONTEXT_FACTORY);
        env.put(InitialContext.PROVIDER_URL, SOLACE_HOST);
        env.put(SupportedProperty.SOLACE_JMS_VPN, SOLACE_VPN);
        env.put(Context.SECURITY_PRINCIPAL, SOLACE_USERNAME_QUEUE_SUB);
        env.put(Context.SECURITY_CREDENTIALS, SOLACE_PASSWORD_QUEUE_SUB);

        InitialContext initialContext = new InitialContext(env);
    	QueueConnectionFactory cf = (QueueConnectionFactory)initialContext.lookup(SOLACE_CONNECTION_FACTORY);
    	QueueConnection connection = cf.createQueueConnection();

        Session session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        Queue queue = (Queue)initialContext.lookup(SOLACE_OUTPUT_QUEUE_NAME);

        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(this);
        
        connection.start();  
        
        logger.info("Solace connection made successfully");
	}
    
   public void onMessage(Message testMessage) {

		try {
			
			if (testMessage instanceof SolBytesMessage) {
				
				SolBytesMessage bytesMessage2 = ((SolBytesMessage) testMessage);
				Map<String, Object>  properties = bytesMessage2.getProperties();
				
				for(Map.Entry<String, Object> property : properties.entrySet()){
					
					System.out.println("Key: "+property.getKey());
					System.out.println("Value: "+property.getValue());
					
				}
				
				logger.info("Message: "+SolJmsUtility.dumpMessage(testMessage));	
				testMessage.acknowledge();

			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}

	}
   
   
}
  • For more information please refer Solace official documentation here

Leave a Reply

Your email address will not be published. Required fields are marked *