Tibco EMS Publisher Consumer Spring Integration

Tibco EMS Publisher Consumer Spring Integration

TIBCO EMS (Enterprise Message Service) is fully compliant famous Java Message Service (JMS) implementation from TIBCO where they have done some enterprise class sort of enhancements. Most big companies uses havily TIBCO EMS for their enterprise messaging system which makes it easy to write business applications which asynchronously send and receive events and business data.

As JMS (Java Message Service) defines common enterprise messaging API which is designed to efficiently and easily supported through wide range of enterprise messaging products. JMS supports both message queuing and publish-subscribe styles of messaging (topics). In this tutorial you will see how to integrate TIBCO EMS with Spring framework to send and receive data.

  • Enterprise flow diagram:

Tibco EMS Publisher Consumer Spring Integration

  • Tibco Web messaging flow:

2015-08-16 12_04_44-the enterprise backbone tibco - Google Search

  • Precondition: Tibco EMS queues is already created by Tibco adminstrator and proper pemissions given to the users to send and receive message on this queue. If you get an error while sending and receiving data thorugh this queue please contact your Tibco admin to sort out the issue.

Generally Tibco administrator will create two queues which is

  • Input queue — Here you publish your message
  • Output queue — Here you subscribe as consumer to receive the message

Tools needed: As Tibco ESM is fully JSM compliant so you don’t need any other software except setting up queue on Tibco and ofcourse IDE whatever your preference is. Jars list is already there in maven.

  • Create maven project name: TibcoEMS below is complete project structure:

Tibco EMS Publisher Consumer Spring Integration

  • pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.javahonk</groupId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>TibcoEMS</name>
	<url>http://maven.apache.org</url>

	<properties>
		<org.springframework.version>4.1.5.RELEASE</org.springframework.version>
		<log4j.version>1.2.16</log4j.version>
		<org.apache.log4j.version>2.1</org.apache.log4j.version>
		<com.tibco.version>5.1.2</com.tibco.version>
		<jms.version>1.1</jms.version>
	</properties>

	<dependencies>

		<!-- Application Context (depends on spring-core, spring-expression, spring-aop, 
			spring-beans) This is the central artifact for Spring's Dependency Injection 
			Container and is generally always defined -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>

	   <!-- Log4j -->
		<dependency>
        	<groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${org.apache.log4j.version}</version>
		</dependency>
        
        <dependency>
        	<groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${org.apache.log4j.version}</version>
		</dependency>
		
		<!--TIBCO-->
		<dependency>
			<groupId>com.tibco</groupId>
			<artifactId>tibjms</artifactId>
			<version>${com.tibco.version}</version>
		</dependency>
		
		<dependency>
			<groupId>javax.jms</groupId>
			<artifactId>jms</artifactId>
			<version>${jms.version}</version>
		</dependency>

	</dependencies>


	<artifactId>TibcoEMS</artifactId>
</project>
  • spring-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd">
	
	<context:annotation-config />
	<context:component-scan base-package="com.javahonk, com.javahonk.consume, com.javahonk.messaging, com.javahonk.publisher" />
	
	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>classpath:messaging/messaging.properties</value>
				<value>main.properties</value>				
			</list>
		</property>
		<property name="ignoreUnresolvablePlaceholders" value="true"/>
	</bean>	

	<import resource="tibco-context.xml"/>
		
</beans>
  • tibco-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:lang="http://www.springframework.org/schema/lang"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd">
		
		
	 <bean id="tibcoConnectionFactory" class="com.tibco.tibjms.TibjmsConnectionFactory">
	 	<constructor-arg value="${tibco.url}"/>
	 </bean>
	 
	 <bean id="connectionFactory" class="com.javahonk.messaging.ConnectionFactory" >
	 	<constructor-arg value="${tibco.user}"/>
	 	<constructor-arg value="${tibco.password}"/>
	 	<constructor-arg ref="tibcoConnectionFactory"/>
	 </bean>
	 
	 <bean id="tibcoPublisherConsumer" class="com.javahonk.messaging.TibcoPublisherConsumer">
		<constructor-arg ref="connectionFactory"/>
		<constructor-arg value="${tibco.msgType}"/>
		<constructor-arg value="${tibco.inputQueueName}"/>
		<constructor-arg value="${tibco.outputQueueName}"/>
	</bean>
	
	<bean id="tibcoPublisher" class="com.javahonk.messaging.TibcoPublisher">
		<constructor-arg ref="connectionFactory"/>
		<constructor-arg value="${tibco.spi.msgType}"/>
		<constructor-arg value="${tibco.spi.inputQueueName}"/>		
	</bean>
	 
</beans>
  • messaging.properties: This is very important configuration please don’t forget to replace queue name and user name, password in this file.
#TIBCO values
tibco.url=tcp://wts-uatems.yourcompany.com:8222
tibco.user=TibcoUser
tibco.password=Tibco_pass
tibco.inputQueueName=wf.application.Input
tibco.outputQueueName=wf.application.Output
tibco.msgType=Stock.Input.Data.UAT

#EMS Queue
tibco.spi.inputQueueName=wf.application.position.Input
tibco.spi.outputQueueName=wf.application.position.Output
tibco.spi.msgType=Stock.Input.postions.Data.UAT
  • main.properties: I have created one more properties file in case you need to keep some global properties then please use this file
#You can keep any global properties of your application

Interfaces:

  • ITibcoPublisher.java:
package com.javahonk.interfaces;

/**
 * @author Java Honk
 *
 */
public interface ITibcoPublisher {
	
	void sendStockOrderToSPI(String stockOrder);

}
  • TradePublisher.java:
package com.javahonk.interfaces;


/**
 * @author Java Honk
 *
 */
public interface TradePublisher {
	void sendTrade(String values);
	
}

Actual implementation classes:

  • ConnectionFactory.java:
package com.javahonk.messaging;


import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.tibco.tibjms.TibjmsConnectionFactory;

/**
 * @author Java Honk
 *
 */
public class ConnectionFactory {
	
	private static final Logger logger = LogManager.getLogger(ConnectionFactory.class.getName());
	
	private String userName;
	private String userPassword;
	private TibjmsConnectionFactory factory;
	
	public ConnectionFactory(String userName, String userPassword, TibjmsConnectionFactory factory) {
		this.userName = userName;
		this.userPassword = userPassword;
		this.factory = factory;

	}
	
	public Connection createConnection() {
		
		Connection connection = null;

		try {
			connection =  factory.createConnection(userName, userPassword);			
			
		} catch (JMSException e) {
			logger.error("Failed to create connection", e);
		}
		
		return connection;
	}
	
	public Session createSession(Connection connection) {
		Session session = null;
		
		try {
			session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e) {
			logger.error("Failed to create session", e);
		} 
		
		return session;
		
		
	}

}
  • TibcoPublisher.java: This class only publish message on the queue. Means its just publisher works as send and forget.
package com.javahonk.messaging;

import javax.annotation.PostConstruct;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.javahonk.interfaces.ITibcoPublisher;

/**
 * @author Java Honk
 *
 */
public class TibcoPublisher implements ITibcoPublisher {
	
	private static final Logger logger = LogManager.getLogger(TibcoPublisher.class.getName());


	private ConnectionFactory connectionFactorySPI;
	private String msgType;
	private String inputQueueName;
	
	private Connection connection;
	private Session session;
	private MessageProducer msgProducer;
		
	public TibcoPublisher(ConnectionFactory connectionFactorySPI,
			String msgType, 
			String inputQueueName) {
		this.connectionFactorySPI = connectionFactorySPI;
		this.msgType = msgType;
		this.inputQueueName = inputQueueName;
		
		logger.info("Tibco inputQueue: {}", inputQueueName);
		logger.info("Tibco msgType: {}", msgType);
	}
	
	@PostConstruct
	public void init() {
		
		try {
			connection = connectionFactorySPI.createConnection();			
			session = connectionFactorySPI.createSession(connection);
			
			Queue inputQueue = session.createQueue(inputQueueName);
			msgProducer = session.createProducer(inputQueue);
			
			connection.start();
			
		} catch (JMSException e) {
			logger.error("Error creating connection to tibco: {}", e);
		} 
	}

	@Override
	public void sendStockOrderToSPI(String stockOrder) {
		
		try {
			TextMessage msg = session.createTextMessage(stockOrder);
			msg.setJMSType(msgType);			
			
			msgProducer.send(msg);
			logger.info("Published Physical Settlement message : {}", stockOrder);
			
		} catch (JMSException e) {
			logger.error("Error sending Physical Settlement to input queue name:{} --> details exceptions: --> ", inputQueueName, e);
		}
		
	}	

}
  • TibcoPublisherConsumer.java: This class is responsible for publishing and receiving message.
package com.javahonk.messaging;


import java.util.List;
import java.util.Map;

import javax.annotation.PostConstruct;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.javahonk.interfaces.TradePublisher;

/**
 * @author Java Honk
 *
 */
public class TibcoPublisherConsumer implements TradePublisher, MessageListener {

	private static final Logger logger = LogManager.getLogger(TibcoPublisherConsumer.class.getName());


	private ConnectionFactory connectionFactory;
	private String msgType;
	private String inputQueueName;
	private String outputQueueName;
	
	private Connection connection;
	private Session session;
	private MessageProducer msgProducer;
	private Queue outputQueue;
	
	private MessageConsumer msgConsumer;
		
	public TibcoPublisherConsumer(ConnectionFactory connectionFactory,
			String msgType, 
			String inputQueueName, 
			String outputQueueName) {
		this.connectionFactory = connectionFactory;
		this.msgType = msgType;
		this.inputQueueName = inputQueueName;
		this.outputQueueName = outputQueueName;
		
		logger.info("Tibco inputQueue: {}", inputQueueName);
		logger.info("Tibco outputQueue: {}", outputQueueName);
		logger.info("Tibco msgType: {}", msgType);
	}
	
	@PostConstruct
	public void init() {
		try {
			connection = connectionFactory.createConnection();			
			session = connectionFactory.createSession(connection);
			
			Queue inputQueue = session.createQueue(inputQueueName);
			msgProducer = session.createProducer(inputQueue);
			
			outputQueue = session.createQueue(outputQueueName);			
			msgConsumer = session.createConsumer(outputQueue);
			msgConsumer.setMessageListener(this);
			
			connection.start();
		} catch (JMSException e) {
			logger.error("Error creating connection to tibco: ", e);
		} 
	}

	@Override
	public void sendTrade(String values)  {
		try {
			TextMessage msg = session.createTextMessage(values);
			msg.setJMSType(msgType);

			logger.info("Sending Trade {}", values);
			
			msgProducer.send(msg);
		} catch (JMSException ex) {
			ex.printStackTrace();
		}
		
	}
	
	@Override
	public void onMessage(Message msg) {

		try {
			if (msg instanceof TextMessage) {
				TextMessage txtMsg = (TextMessage) msg;
				logger.info("Received response {}", msg);

				Object msgTextObj = txtMsg.getText();
				
				logger.info("Received response {}", msgTextObj.toString());
				System.out.println(msgTextObj.toString());
				
				@SuppressWarnings("unchecked")
				List<Map<String, Object>> list = (List<Map<String, Object>>)msgTextObj;

				for(Map<String, Object> map: list) {
					if (isValidMap(map)) {
						
						String successString = (String) map.get("status");
						String processState = successString.equalsIgnoreCase("success") ? "EXPIRED" : "ERROR";
						logger.info("Process state:{}", processState);
						
					} else {
						logger.error("Invalid map received in response or subscriber not set.");
					}
				}
			}

		} catch (Exception e) {
			logger.error("Failed to read response", e);
		}

	}
	
	@SuppressWarnings("rawtypes")
	private boolean isValidMap(Map responseMap) {
		return responseMap.containsKey("status") 
				&& responseMap.containsKey("key"); 
	}
	
	
}
  • MainApplication.java: Main class to test our appliction. This class will load all context and will make call to both publisher and sender and print output on the console:
package com.javahonk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.javahonk.messaging.TibcoPublisher;
import com.javahonk.messaging.TibcoPublisherConsumer;

/**
 * @author Java Honk
 *
 */
public class MainApplication {
	
	private static final Logger logger = LogManager.getLogger(MainApplication.class.getName());
	
	public static void main(String[] args) {
		
		logger.info("Tibco EMS started.");
		
		ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
		
		TibcoPublisher tibcoPublisher = (TibcoPublisher)context.getBean(TibcoPublisher.class);
		tibcoPublisher.sendStockOrderToSPI("Test");
		
		TibcoPublisherConsumer tibcoTradePublisher = (TibcoPublisherConsumer)context.getBean(TibcoPublisherConsumer.class);
		tibcoTradePublisher.sendTrade("Test");		
		
		System.out.println(context);
			
		
	}
}
  • To test this complete application. Right click MainApplication.java as java application and if everything is configured properly you will see below success message on the console:

Tibco EMS Publisher Consumer Spring Integration

  • That’s it. For more information please visit Tibco EMS here

download Download Project:  TibcoEMS

Leave a Reply

Your email address will not be published. Required fields are marked *