Coherence Spring Integration
Orache coherence is #1 In-Memory cache solution across Conventional and Cloud Environments and it’s very popular in enterprise wide caching solution provider. It’s key component of Oracle cloud enterprise application Foundation which predictably scales applications to meet cloud demand and mobile computing on shared services and infrastructure. Most of the companies now a days some kind of caching machanism to store data in cache. Having knowlege of cache is mandatory if you are in information technology field. In this Coherence Spring Integration demo I will show you how coherence works, how to store data in cahce, how to retrieve data from cache including create new cache etc…
Note: You will need coherence jars to run this application which you can download from this link
- Coherence in-memory Data Grid sample diagram:
Important configuration: Coherence use Portable Object Format (POF) to serialize and deserialize data to the cache so first you will have to set up model class which you want to serialize to the cache.
- Create maven project name: CoherencePOFSerializationCache and below is final project structure:
- pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javahonk</groupId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>CoherencePOFSerializationCache</name> <url>http://maven.apache.org</url> <properties> <org.springframework.version>4.1.5.RELEASE</org.springframework.version> <log4j.version>1.2.16</log4j.version> <org.apache.log4j.version>2.1</org.apache.log4j.version> <clusterAPI.version>1.0</clusterAPI.version> <coherence.version>3.7</coherence.version> <coherence-incubator.version>11.3.0</coherence-incubator.version> <galaxy.version>v_10_0_20141003</galaxy.version> <xercesImpl.version>unknown</xercesImpl.version> </properties> <dependencies> <!-- Application Context (depends on spring-core, spring-expression, spring-aop, spring-beans) This is the central artifact for Spring's Dependency Injection Container and is generally always defined --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${org.springframework.version}</version> </dependency> <!-- Log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${org.apache.log4j.version}</version> </dependency> <!-- Coherence Jars --> <dependency> <groupId>com.oracle.coherence</groupId> <artifactId>coherence</artifactId> <version>${coherence.version}</version> </dependency> <dependency> <groupId>com.javahonk.clusterAPI</groupId> <artifactId>ClusterClient</artifactId> <version>${clusterAPI.version}</version> </dependency> <dependency> <groupId>com.javahonk.clusterAPI</groupId> <artifactId>ClusterInfoInvocation</artifactId> <version>${clusterAPI.version}</version> </dependency> <dependency> <groupId>com.javahonk.galaxy</groupId> <artifactId>eqd</artifactId> <version>${galaxy.version}</version> </dependency> <dependency> <groupId>com.javahonk.galaxy</groupId> <artifactId>eqdframework</artifactId> <version>${galaxy.version}</version> </dependency> <!-- xerces --> <dependency> <groupId>xerces</groupId> <artifactId>xercesImpl</artifactId> <version>${xercesImpl.version}</version> </dependency> </dependencies> <artifactId>CoherencePOFSerializationCache</artifactId> </project>
First create model class which will be serialize to the cache:
- Prices.java:
package com.javahonk.pricecacheserializers; import java.util.Objects; public class Prices { private String symbol; private String cusip; private String compositeRicCode; private double closingPrice; private String closingDate; public Prices( String symbol, String cusip, String compositeRicCode, double closingPrice, String closingDate){ this.symbol = symbol; this.cusip = cusip; this.compositeRicCode = compositeRicCode; this.closingPrice = closingPrice; this.closingDate = closingDate; } public String getSymbol(){ return this.symbol; } public void setSymbol(String val){ this.symbol = val; } public String getCusip(){ return this.cusip; } public void setCusip(String val){ this.cusip = val; } public String getCompositeRicCode(){ return this.compositeRicCode; } public void setCompositeRicCode(String val){ this.compositeRicCode = val; } public String getClosingDate(){ return this.closingDate; } public void setClosingDate(String val){ this.closingDate = val; } public double getClosingPrice(){ return this.closingPrice; } public void setClosingPrice(double val){ this.closingPrice = val; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Prices prices = (Prices) obj; return Objects.equals(prices.symbol, this.symbol) && Objects.equals(prices.cusip, this.cusip) && Objects.equals(prices.compositeRicCode, this.compositeRicCode) && Objects.equals(prices.closingDate, this.closingDate) && Objects.equals(prices.closingPrice, this.closingPrice); } @Override public int hashCode() { return Objects.hash(symbol,cusip,compositeRicCode,closingPrice,closingDate); }; }
As this is very simple POJA wtih attribute related to stock price. Now we know that this the class we will be seriliazing to the cache. Next you will have to create PricesSerializer which will implements PofSerializer and this interface has two method: serialize and deserialize which you will have implement it as below to store and get data from the cache.
- PricesSerializer.java:
package com.javahonk.pricecacheserializers; import java.io.IOException; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofSerializer; import com.tangosol.io.pof.PofWriter; public class PricesSerializer implements PofSerializer { public Object deserialize(PofReader pofReader) throws IOException { int count = 0; String symbol = pofReader.readString(count++); String cusip = pofReader.readString(count++); String compositeRicCode = pofReader.readString(count++); double closingPrice = pofReader.readDouble(count++); String closingDate = pofReader.readString(count++); pofReader.readRemainder(); return new Prices( symbol, cusip, compositeRicCode, closingPrice, closingDate ); } public void serialize(PofWriter pofWriter, Object object) throws IOException { Prices prices = (Prices) object; int count = 0; pofWriter.writeString(count++, prices.getSymbol()); pofWriter.writeString(count++, prices.getCusip()); pofWriter.writeString(count++, prices.getCompositeRicCode()); pofWriter.writeDouble(count++, prices.getClosingPrice()); pofWriter.writeString(count++, prices.getClosingDate()); pofWriter.writeRemainder(null); } }
Very important file: Now we have created out Serilization logic next create pof-config file and include below:
- custom-types-pof-config.xml
<?xml version="1.0"?> <!DOCTYPE pof-config SYSTEM "pof-config.dtd"> <pof-config> <user-type-list> <include>coherence-pof-config.xml</include> <user-type> <type-id>2701</type-id> <class-name>com.javahonk.pricecacheserializers.Prices</class-name> <serializer> <class-name>com.javahonk.pricecacheserializers.PricesSerializer</class-name> </serializer> </user-type> </user-type-list> </pof-config>
- Because we are integrating with Spring let’s create spring-context.xml which will be our IOC container:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd"> <context:annotation-config /> <context:component-scan base-package="com.javahonk, com.javahonk.consume, com.javahonk.messaging, com.javahonk.publisher" /> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>main.properties</value> </list> </property> <property name="ignoreUnresolvablePlaceholders" value="true"/> </bean> <bean id="cacheLoader" class="com.javahonk.cacheloader.CacheLoader" /> </beans>
- main.properties where we will our keep value which will be used in application:
marketdata_cache=cahceName date_format=yyyyMMdd days_to_keep=5
- log4j2.xml to logging information:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <Properties> <Property name="envrionment.target">DEV</Property> </Properties> <Properties> <Property name="logging.dir">./</Property> </Properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> </Console> <RollingFile name="RollingFile" fileName="./log/rolling-file.log" filePattern="${sys:logging.dir}/logs/rolling-file-%d{yyyy-MM-dd}-%i.log"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> <!-- TODO:Change to time based policy --> <Policies> <TimeBasedTriggeringPolicy interval="1" modulate="true" /> <SizeBasedTriggeringPolicy size="100 MB" /> </Policies> <DefaultRolloverStrategy max="4" /> </RollingFile> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Console" /> <!-- <AppenderRef ref="file" /> --> <AppenderRef ref="RollingFile" /> </Root> </Loggers> </Configuration>
- CacheLoader.java is our business class where we wil be performing all our logic to store data in cache, load data from cache, print data from cache, clear data from cache as you see below:
package com.javahonk.cacheloader; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import javax.annotation.PostConstruct; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import com.javahonk.pricecacheserializers.Prices; import com.tangosol.net.NamedCache; import com.tangosol.util.Filter; import com.tangosol.util.filter.AndFilter; import com.tangosol.util.filter.EqualsFilter; import com.wachovia.cib.spt.datautil.CacheRegistry; public class CacheLoader { @Value("${marketdata_cache}") private String marketDataCacheName; @Value("${days_to_keep}") private int days_to_keep; @Value("${date_format}") private String date_format; private NamedCache marketdatacache; private static final Logger logger = LogManager.getLogger(CacheLoader.class.getName()); private static int counter = 0; @PostConstruct public void init(){ marketdatacache = (NamedCache) CacheRegistry.getNamedCache(marketDataCacheName); } public void LoadPrices(LocalDate date){ List<Prices> list = getPrice(); for(Prices prices : list){ Prices cachePrice = new Prices( prices.getSymbol(), prices.getCusip(), prices.getCompositeRicCode(), prices.getClosingPrice(), prices.getClosingDate() ); String cacheKey = prices.getSymbol()+"_"+prices.getClosingDate(); try { logger.info("Sending Price with detials to Cache:\n" + "Key:" + cacheKey + "\n" + prices.toString()); marketdatacache.put(cacheKey, cachePrice); counter++; } catch (Exception e) { logger.error("Failed to write following to cache:\n" + cacheKey + ":" + prices.toString(), e); } logger.info("Number of Symbols Loaded into Cache:" + counter); } logger.info("CacheLoader Exiting."); } @SuppressWarnings("unchecked") public void ClearPrices(){ LocalDate dateToRemove = LocalDate.now().minusDays(days_to_keep); String dateString = dateToRemove.format(DateTimeFormatter.ofPattern(date_format)); Set<String> keySet = marketdatacache.keySet(); for(String key: keySet){ String datePart = key.split("_")[1]; if(Objects.equals(datePart, dateString)){ logger.info("Removing Key:" + key); marketdatacache.remove(key); } } } @SuppressWarnings("unchecked") public void printAllCahcePrice(){ Set<String> keySet = marketdatacache.keySet(); logger.info("Total Cache size:-->"+keySet.size()); for (String key : keySet) { if (key.equalsIgnoreCase("Testing9_2015-08-16")) { logger.info("Key found: -->{}", key); } logger.info("Key name: {}", key); String datePart = key.split("_")[1]; logger.info(key + "-->date part:-->" + datePart); } Set<Entry<String, Prices>> entries = marketdatacache.entrySet(); for (Entry<String, Prices> entry : entries) { Prices price = entry.getValue(); if (price != null) { if (price.getSymbol().equalsIgnoreCase("Testing3")) { double closingPrice = price.getClosingPrice(); logger.info("Closing price: --> {}",closingPrice); } } } } @SuppressWarnings({"unchecked"}) public void getPriceUsingFilter() { logger.info("Entered into updateClosingPriceInMarketDataCache class: {} "+ getClass().getName()); try { String underlier = "Testing3"; LocalDateTime dateTime = LocalDateTime.now(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); String dateString = dateTime.format(formatter); EqualsFilter ricCodeFilter = new EqualsFilter("getSymbol", underlier); EqualsFilter closingDateFilter = new EqualsFilter("getClosingDate", dateString); //Get by key name; Prices prices = (Prices)marketdatacache.get("Testing4_2015-08-16"); System.out.println(prices.getClosingDate()); Filter compositeFilter = new AndFilter(ricCodeFilter,closingDateFilter); Set<Entry<String, Prices>> entries = marketdatacache.entrySet(compositeFilter); for (Entry<String, Prices> entry : entries) { Prices price = entry.getValue(); if (price != null) { double closingPrice = price.getClosingPrice(); logger.info("Closing price: -->{}", closingPrice); } } } catch (Exception e) { logger.error("Error while attempting to get market close price {}",e); } logger.info("Exit from updateClosingPriceInMarketDataCache class: {} ",getClass().getName()); } public List<Prices> getPrice(){ List<Prices> list = new ArrayList<Prices>(); for (int i = 0; i < 10; i++) { Prices prices = new Prices("Testing"+String.valueOf(i), "123456789", String.valueOf(Math.random()), Math.random(), LocalDate.now().toString()); list.add(prices); } return list; } }
- Finally write main class MainCacheApplicationStarter.java to test our application:
package com.javahonk; import java.time.LocalDate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.javahonk.cacheloader.CacheLoader; public class MainCacheApplicationStarter { private static final Logger logger = LogManager.getLogger(MainCacheApplicationStarter.class.getName()); public static void main(String[] args) { logger.info("Coherence cache started..."); ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); CacheLoader cacheLoader = (CacheLoader)context.getBean(CacheLoader.class); cacheLoader.LoadPrices(LocalDate.now()); cacheLoader.printAllCahcePrice(); cacheLoader.getPriceUsingFilter(); cacheLoader.ClearPrices(); logger.info("All cache opration is done..."); //Close application context ((AbstractApplicationContext)context).close(); } }
- Very very important: Before running MainCacheApplicationStarter.java class you will have to set VM argument as below and don’t forget to replace cahce URL with your company URL. If you are not sure what to pass please contact your company cache administrator he will provide you this URL:
-Dwells.ccu.clientconfig=extend-access.xml -Dcache.ccu.url=http://yourcompany.com/clusterclient/cachelocation -Dcache.cluster.extend.access=true -Dcache.ccu.serializer.config=http://yourcompany.com/clusterclient/cachelocation -Dtangosol.pof.enabled=true -Dlogging.dir=./
- That’s it. To run this application right click MainCacheApplicationStarter.java and run as java application and if everything goes well you will see out from your cache and in my case it was like below as I am just giving you short screen shot because console was very big:
Download Project: CoherencePOFSerializationCache
Nice example,
From above oracle coherence how can I find more details about the below dependencies?
com.javahonk.clusterAPI
ClusterClient
${clusterAPI.version}
com.javahonk.clusterAPI
ClusterInfoInvocation
${clusterAPI.version}
com.javahonk.galaxy
eqd
${galaxy.version}
com.javahonk.galaxy
eqdframework
${galaxy.version}
You don’t need any of those jars. Those are custom API project specific which was used for my personal reference.