Tibco EMS Publisher Consumer Spring Integration
TIBCO EMS (Enterprise Message Service) is fully compliant famous Java Message Service (JMS) implementation from TIBCO where they have done some enterprise class sort of enhancements. Most big companies uses havily TIBCO EMS for their enterprise messaging system which makes it easy to write business applications which asynchronously send and receive events and business data.
As JMS (Java Message Service) defines common enterprise messaging API which is designed to efficiently and easily supported through wide range of enterprise messaging products. JMS supports both message queuing and publish-subscribe styles of messaging (topics). In this tutorial you will see how to integrate TIBCO EMS with Spring framework to send and receive data.
- Enterprise flow diagram:
- Tibco Web messaging flow:
- Precondition: Tibco EMS queues is already created by Tibco adminstrator and proper pemissions given to the users to send and receive message on this queue. If you get an error while sending and receiving data thorugh this queue please contact your Tibco admin to sort out the issue.
Generally Tibco administrator will create two queues which is
- Input queue — Here you publish your message
- Output queue — Here you subscribe as consumer to receive the message
Tools needed: As Tibco ESM is fully JSM compliant so you don’t need any other software except setting up queue on Tibco and ofcourse IDE whatever your preference is. Jars list is already there in maven.
- Create maven project name: TibcoEMS below is complete project structure:
- pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javahonk</groupId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>TibcoEMS</name> <url>http://maven.apache.org</url> <properties> <org.springframework.version>4.1.5.RELEASE</org.springframework.version> <log4j.version>1.2.16</log4j.version> <org.apache.log4j.version>2.1</org.apache.log4j.version> <com.tibco.version>5.1.2</com.tibco.version> <jms.version>1.1</jms.version> </properties> <dependencies> <!-- Application Context (depends on spring-core, spring-expression, spring-aop, spring-beans) This is the central artifact for Spring's Dependency Injection Container and is generally always defined --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <!--TIBCO--> <dependency> <groupId>com.tibco</groupId> <artifactId>tibjms</artifactId> <version>${com.tibco.version}</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>${jms.version}</version> </dependency> </dependencies> <artifactId>TibcoEMS</artifactId> </project>
- spring-context.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <context:annotation-config /> <context:component-scan base-package="com.javahonk, com.javahonk.consume, com.javahonk.messaging, com.javahonk.publisher" /> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:messaging/messaging.properties</value> <value>main.properties</value> </list> </property> <property name="ignoreUnresolvablePlaceholders" value="true"/> </bean> <import resource="tibco-context.xml"/> </beans>
- tibco-context.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:lang="http://www.springframework.org/schema/lang" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <bean id="tibcoConnectionFactory" class="com.tibco.tibjms.TibjmsConnectionFactory"> <constructor-arg value="${tibco.url}"/> </bean> <bean id="connectionFactory" class="com.javahonk.messaging.ConnectionFactory" > <constructor-arg value="${tibco.user}"/> <constructor-arg value="${tibco.password}"/> <constructor-arg ref="tibcoConnectionFactory"/> </bean> <bean id="tibcoPublisherConsumer" class="com.javahonk.messaging.TibcoPublisherConsumer"> <constructor-arg ref="connectionFactory"/> <constructor-arg value="${tibco.msgType}"/> <constructor-arg value="${tibco.inputQueueName}"/> <constructor-arg value="${tibco.outputQueueName}"/> </bean> <bean id="tibcoPublisher" class="com.javahonk.messaging.TibcoPublisher"> <constructor-arg ref="connectionFactory"/> <constructor-arg value="${tibco.spi.msgType}"/> <constructor-arg value="${tibco.spi.inputQueueName}"/> </bean> </beans>
- messaging.properties: This is very important configuration please don’t forget to replace queue name and user name, password in this file.
#TIBCO values tibco.url=tcp://wts-uatems.yourcompany.com:8222 tibco.user=TibcoUser tibco.password=Tibco_pass tibco.inputQueueName=wf.application.Input tibco.outputQueueName=wf.application.Output tibco.msgType=Stock.Input.Data.UAT #EMS Queue tibco.spi.inputQueueName=wf.application.position.Input tibco.spi.outputQueueName=wf.application.position.Output tibco.spi.msgType=Stock.Input.postions.Data.UAT
- main.properties: I have created one more properties file in case you need to keep some global properties then please use this file
#You can keep any global properties of your application
Interfaces:
- ITibcoPublisher.java:
package com.javahonk.interfaces; /** * @author Java Honk * */ public interface ITibcoPublisher { void sendStockOrderToSPI(String stockOrder); }
- TradePublisher.java:
package com.javahonk.interfaces; /** * @author Java Honk * */ public interface TradePublisher { void sendTrade(String values); }
Actual implementation classes:
- ConnectionFactory.java:
package com.javahonk.messaging; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.tibco.tibjms.TibjmsConnectionFactory; /** * @author Java Honk * */ public class ConnectionFactory { private static final Logger logger = LogManager.getLogger(ConnectionFactory.class.getName()); private String userName; private String userPassword; private TibjmsConnectionFactory factory; public ConnectionFactory(String userName, String userPassword, TibjmsConnectionFactory factory) { this.userName = userName; this.userPassword = userPassword; this.factory = factory; } public Connection createConnection() { Connection connection = null; try { connection = factory.createConnection(userName, userPassword); } catch (JMSException e) { logger.error("Failed to create connection", e); } return connection; } public Session createSession(Connection connection) { Session session = null; try { session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { logger.error("Failed to create session", e); } return session; } }
- TibcoPublisher.java: This class only publish message on the queue. Means its just publisher works as send and forget.
package com.javahonk.messaging; import javax.annotation.PostConstruct; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.javahonk.interfaces.ITibcoPublisher; /** * @author Java Honk * */ public class TibcoPublisher implements ITibcoPublisher { private static final Logger logger = LogManager.getLogger(TibcoPublisher.class.getName()); private ConnectionFactory connectionFactorySPI; private String msgType; private String inputQueueName; private Connection connection; private Session session; private MessageProducer msgProducer; public TibcoPublisher(ConnectionFactory connectionFactorySPI, String msgType, String inputQueueName) { this.connectionFactorySPI = connectionFactorySPI; this.msgType = msgType; this.inputQueueName = inputQueueName; logger.info("Tibco inputQueue: {}", inputQueueName); logger.info("Tibco msgType: {}", msgType); } @PostConstruct public void init() { try { connection = connectionFactorySPI.createConnection(); session = connectionFactorySPI.createSession(connection); Queue inputQueue = session.createQueue(inputQueueName); msgProducer = session.createProducer(inputQueue); connection.start(); } catch (JMSException e) { logger.error("Error creating connection to tibco: {}", e); } } @Override public void sendStockOrderToSPI(String stockOrder) { try { TextMessage msg = session.createTextMessage(stockOrder); msg.setJMSType(msgType); msgProducer.send(msg); logger.info("Published Physical Settlement message : {}", stockOrder); } catch (JMSException e) { logger.error("Error sending Physical Settlement to input queue name:{} --> details exceptions: --> ", inputQueueName, e); } } }
- TibcoPublisherConsumer.java: This class is responsible for publishing and receiving message.
package com.javahonk.messaging; import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.javahonk.interfaces.TradePublisher; /** * @author Java Honk * */ public class TibcoPublisherConsumer implements TradePublisher, MessageListener { private static final Logger logger = LogManager.getLogger(TibcoPublisherConsumer.class.getName()); private ConnectionFactory connectionFactory; private String msgType; private String inputQueueName; private String outputQueueName; private Connection connection; private Session session; private MessageProducer msgProducer; private Queue outputQueue; private MessageConsumer msgConsumer; public TibcoPublisherConsumer(ConnectionFactory connectionFactory, String msgType, String inputQueueName, String outputQueueName) { this.connectionFactory = connectionFactory; this.msgType = msgType; this.inputQueueName = inputQueueName; this.outputQueueName = outputQueueName; logger.info("Tibco inputQueue: {}", inputQueueName); logger.info("Tibco outputQueue: {}", outputQueueName); logger.info("Tibco msgType: {}", msgType); } @PostConstruct public void init() { try { connection = connectionFactory.createConnection(); session = connectionFactory.createSession(connection); Queue inputQueue = session.createQueue(inputQueueName); msgProducer = session.createProducer(inputQueue); outputQueue = session.createQueue(outputQueueName); msgConsumer = session.createConsumer(outputQueue); msgConsumer.setMessageListener(this); connection.start(); } catch (JMSException e) { logger.error("Error creating connection to tibco: ", e); } } @Override public void sendTrade(String values) { try { TextMessage msg = session.createTextMessage(values); msg.setJMSType(msgType); logger.info("Sending Trade {}", values); msgProducer.send(msg); } catch (JMSException ex) { ex.printStackTrace(); } } @Override public void onMessage(Message msg) { try { if (msg instanceof TextMessage) { TextMessage txtMsg = (TextMessage) msg; logger.info("Received response {}", msg); Object msgTextObj = txtMsg.getText(); logger.info("Received response {}", msgTextObj.toString()); System.out.println(msgTextObj.toString()); @SuppressWarnings("unchecked") List<Map<String, Object>> list = (List<Map<String, Object>>)msgTextObj; for(Map<String, Object> map: list) { if (isValidMap(map)) { String successString = (String) map.get("status"); String processState = successString.equalsIgnoreCase("success") ? "EXPIRED" : "ERROR"; logger.info("Process state:{}", processState); } else { logger.error("Invalid map received in response or subscriber not set."); } } } } catch (Exception e) { logger.error("Failed to read response", e); } } @SuppressWarnings("rawtypes") private boolean isValidMap(Map responseMap) { return responseMap.containsKey("status") && responseMap.containsKey("key"); } }
- MainApplication.java: Main class to test our appliction. This class will load all context and will make call to both publisher and sender and print output on the console:
package com.javahonk; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.javahonk.messaging.TibcoPublisher; import com.javahonk.messaging.TibcoPublisherConsumer; /** * @author Java Honk * */ public class MainApplication { private static final Logger logger = LogManager.getLogger(MainApplication.class.getName()); public static void main(String[] args) { logger.info("Tibco EMS started."); ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); TibcoPublisher tibcoPublisher = (TibcoPublisher)context.getBean(TibcoPublisher.class); tibcoPublisher.sendStockOrderToSPI("Test"); TibcoPublisherConsumer tibcoTradePublisher = (TibcoPublisherConsumer)context.getBean(TibcoPublisherConsumer.class); tibcoTradePublisher.sendTrade("Test"); System.out.println(context); } }
- To test this complete application. Right click MainApplication.java as java application and if everything is configured properly you will see below success message on the console:
- That’s it. For more information please visit Tibco EMS here
Download Project: TibcoEMS