Author: davsclaus Date: Tue Feb 23 16:35:05 2010 New Revision: 915418 URL: http://svn.apache.org/viewvc?rev=915418&view=rev Log: CAMEL-217: Working on a persistent aggregator.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java?rev=915418&r1=915417&r2=915418&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java Tue Feb 23 16:35:05 2010 @@ -28,7 +28,7 @@ * Holder object for sending an exchange over a remote wire as a serialized object. * This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint. * <p/> - * As opposed to normal usage where only the body part of the exchange is transfered over the wire, + * As opposed to normal usage where only the body part of the exchange is transferred over the wire, * this holder object serializes the following fields over the wire: * <ul> * <li>in body</li> @@ -52,9 +52,9 @@ private Object inBody; private Object outBody; private Boolean outFaultFlag = Boolean.FALSE; - private final Map<String, Object> inHeaders = new LinkedHashMap<String, Object>(); - private final Map<String, Object> outHeaders = new LinkedHashMap<String, Object>(); - private final Map<String, Object> properties = new LinkedHashMap<String, Object>(); + private Map<String, Object> inHeaders; + private Map<String, Object> outHeaders; + private Map<String, Object> properties; private Exception exception; /** @@ -65,16 +65,30 @@ * @return the holder object with information copied form the exchange */ public static DefaultExchangeHolder marshal(Exchange exchange) { + return marshal(exchange, true); + } + + /** + * Creates a payload object with the information from the given exchange. + * Only marshal the Serializable object + * + * @param exchange the exchange + * @param includeProperties whether or not to include exchange properties + * @return the holder object with information copied form the exchange + */ + public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) { DefaultExchangeHolder payload = new DefaultExchangeHolder(); payload.inBody = checkSerializableObject("in body", exchange, exchange.getIn().getBody()); - payload.inHeaders.putAll(checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders())); + payload.safeSetInHeaders(exchange); if (exchange.hasOut()) { payload.outBody = checkSerializableObject("out body", exchange, exchange.getOut().getBody()); - payload.outHeaders.putAll(checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders())); payload.outFaultFlag = exchange.getOut().isFault(); + payload.safeSetOutHeaders(exchange); + } + if (includeProperties) { + payload.safeSetProperties(exchange); } - payload.properties.putAll(checkMapSerializableObjects("exchange properties", exchange, exchange.getProperties())); payload.exception = exchange.getException(); return payload; @@ -88,14 +102,20 @@ */ public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) { exchange.getIn().setBody(payload.inBody); - exchange.getIn().setHeaders(payload.inHeaders); + if (payload.inHeaders != null) { + exchange.getIn().setHeaders(payload.inHeaders); + } if (payload.outBody != null) { exchange.getOut().setBody(payload.outBody); - exchange.getOut().setHeaders(payload.outHeaders); + if (payload.outHeaders != null) { + exchange.getOut().setHeaders(payload.outHeaders); + } exchange.getOut().setFault(payload.outFaultFlag.booleanValue()); } - for (String key : payload.properties.keySet()) { - exchange.setProperty(key, payload.properties.get(key)); + if (payload.properties != null) { + for (String key : payload.properties.keySet()) { + exchange.setProperty(key, payload.properties.get(key)); + } } exchange.setException(payload.exception); } @@ -108,6 +128,36 @@ return sb.append(']').toString(); } + private Map<String, Object> safeSetInHeaders(Exchange exchange) { + if (exchange.getIn().hasHeaders()) { + Map<String, Object> map = checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders()); + if (map != null && !map.isEmpty()) { + inHeaders = new LinkedHashMap<String, Object>(map); + } + } + return null; + } + + private Map<String, Object> safeSetOutHeaders(Exchange exchange) { + if (exchange.hasOut() && exchange.getOut().hasHeaders()) { + Map<String, Object> map = checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders()); + if (map != null && !map.isEmpty()) { + outHeaders = new LinkedHashMap<String, Object>(map); + } + } + return null; + } + + private Map<String, Object> safeSetProperties(Exchange exchange) { + if (exchange.hasProperties()) { + Map<String, Object> map = checkMapSerializableObjects("properties", exchange, exchange.getProperties()); + if (map != null && !map.isEmpty()) { + properties = new LinkedHashMap<String, Object>(map); + } + } + return null; + } + private static Object checkSerializableObject(String type, Exchange exchange, Object object) { if (object == null) { return null; Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=915418&r1=915417&r2=915418&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Tue Feb 23 16:35:05 2010 @@ -18,7 +18,10 @@ import java.io.IOException; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; import org.apache.camel.spi.AggregationRepository; import org.fusesource.hawtdb.api.Index; import org.fusesource.hawtdb.api.Transaction; @@ -30,15 +33,13 @@ /** * An instance of AggregationRepository which is backed by a HawtDB. - * - * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ public class HawtDBAggregationRepository<K> implements AggregationRepository<K> { - + private HawtDBFile file; private String name; - private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>(); - private Marshaller<Exchange> exchangeMarshaller = new ObjectMarshaller<Exchange>(); + private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>(); + private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>(); public Exchange add(K key, Exchange exchange) { try { @@ -55,7 +56,7 @@ return index.put(keyBuffer, exchangeBuffer); } }); - if( rc ==null ) { + if (rc == null) { return null; } return unmarshallExchange(rc); @@ -74,7 +75,7 @@ return index.get(keyBuffer); } }); - if( rc==null ) { + if (rc == null) { return null; } return unmarshallExchange(rc); @@ -102,57 +103,40 @@ keyMarshaller.writePayload(key, baos); return baos.toBuffer(); } - + protected Buffer marshallExchange(Exchange exchange) throws IOException { DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); - exchangeMarshaller.writePayload(exchange, baos); + DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false); + exchangeMarshaller.writePayload(pe, baos); return baos.toBuffer(); } - + protected Exchange unmarshallExchange(Buffer buffer) throws IOException { DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer); - return exchangeMarshaller.readPayload(bais); - } + DefaultExchangeHolder pe = exchangeMarshaller.readPayload(bais); + + // create a new dummy default exchange which the aggregator must + // set the CamelContext + Exchange answer = new DefaultExchange((CamelContext) null); + DefaultExchangeHolder.unmarshal(answer, pe); + return answer; + } public HawtDBFile getFile() { return file; } - public void setFile(HawtDBFile file) { this.file = file; } - public String getName() { return name; } - public void setName(String name) { this.name = name; } - - public Marshaller<K> getKeyMarshaller() { - return keyMarshaller; - } - - - public void setKeyMarshaller(Marshaller<K> keyMarshaller) { - this.keyMarshaller = keyMarshaller; - } - - - public Marshaller<Exchange> getExchangeMarshaller() { - return exchangeMarshaller; - } - - - public void setExchangeMarshaller(Marshaller<Exchange> exchangeMarshaller) { - this.exchangeMarshaller = exchangeMarshaller; - } - - } Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java?rev=915418&r1=915417&r2=915418&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java Tue Feb 23 16:35:05 2010 @@ -17,6 +17,8 @@ package org.apache.camel.component.hawtdb; import org.apache.camel.Service; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.fusesource.hawtdb.api.BTreeIndexFactory; import org.fusesource.hawtdb.api.Index; import org.fusesource.hawtdb.api.Transaction; @@ -29,18 +31,18 @@ /** * Manages access to a shared HawtDB file from multiple HawtDBAggregationRepository objects. - * - * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ public class HawtDBFile extends HawtPageFileFactory implements Service { - private final static BTreeIndexFactory<String,Integer> indexesFactory = new BTreeIndexFactory<String,Integer>(); - private final static BTreeIndexFactory<Buffer,Buffer> indexFactory = new BTreeIndexFactory<Buffer,Buffer>(); - + private static final transient Log LOG = LogFactory.getLog(HawtDBFile.class); + + private final static BTreeIndexFactory<String, Integer> indexesFactory = new BTreeIndexFactory<String, Integer>(); + private final static BTreeIndexFactory<Buffer, Buffer> indexFactory = new BTreeIndexFactory<Buffer, Buffer>(); + public HawtDBFile() { setSync(false); } - + static { indexesFactory.setKeyMarshaller(StringMarshaller.INSTANCE); indexesFactory.setValueMarshaller(IntegerMarshaller.INSTANCE); @@ -51,23 +53,28 @@ } private HawtPageFile pageFile; - + public void start() { + if (LOG.isDebugEnabled()) { + LOG.debug("Starting HawtDB using file: " + getFile()); + } + final boolean initialize = !file.exists(); open(); pageFile = getConcurrentPageFile(); - + execute(new Work<Boolean>() { public Boolean execute(Transaction tx) { - if( initialize ) { + if (initialize) { int page = tx.allocator().alloc(1); // if we just created the file, first allocated page should be 0 assert page == 0; indexesFactory.create(tx, 0); - System.out.println("Aggregation repository data store created."); + LOG.info("Aggregation repository data store created using file: " + getFile()); } else { Index<String, Integer> indexes = indexesFactory.open(tx, 0); - System.out.println("You have "+indexes.size()+" aggregation repositories stored."); + LOG.info("Aggregation repository data store loaded using file: " + getFile() + + " containing " + indexes.size() + " repositories."); } return true; } @@ -75,17 +82,21 @@ } public void stop() { + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping HawtDB using file: " + getFile()); + } + close(); pageFile = null; } - + public <T> T execute(Work<T> work) { Transaction tx = pageFile.tx(); try { T rc = work.execute(tx); tx.commit(); return rc; - } catch (RuntimeException e){ + } catch (RuntimeException e) { tx.rollback(); throw e; } @@ -94,12 +105,18 @@ public Index<Buffer, Buffer> getRepositoryIndex(Transaction tx, String name) { Index<String, Integer> indexes = indexesFactory.open(tx, 0); Integer location = indexes.get(name); - if( location == null ) { + if (location == null) { // create it.. - return indexFactory.create(tx, tx.allocator().alloc(1)); - } else { + int page = tx.allocator().alloc(1); + Index<Buffer, Buffer> created = indexFactory.create(tx, page); + + // add it to indexes so we can find it the next time + indexes.put(name, page); + + return created; + } else { return indexFactory.open(tx, location); } } - + } Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java?rev=915418&r1=915417&r2=915418&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java Tue Feb 23 16:35:05 2010 @@ -21,9 +21,15 @@ /** * Demarcates the statements that need to be performed as a * HawtDB transactional unit of work. - * - * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ interface Work<T> { + + /** + * Executs the work within the bounds of the given transaction + * + * @param transaction the transaction + * @return result of the work, can be <tt>null</tt> if no result to return. + */ T execute(Transaction transaction); + } \ No newline at end of file Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html?rev=915418&r1=915417&r2=915418&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html Tue Feb 23 16:35:05 2010 @@ -18,7 +18,7 @@ <head> </head> <body> - +Camel HawtDB support </body> </html> Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java?rev=915418&r1=915417&r2=915418&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java Tue Feb 23 16:35:05 2010 @@ -19,37 +19,39 @@ import java.io.File; import org.apache.camel.Exchange; -import org.apache.camel.ExchangeTestSupport; import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; /** * Tests the HawtDBAggregationRepository implementation. - * - * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ -public class HawtDBAggregationRepositoryTest extends ExchangeTestSupport { - +public class HawtDBAggregationRepositoryTest extends CamelTestSupport { + private HawtDBFile hawtDBFile; @Override - protected void setUp() throws Exception { - File file = new File("target/test-data/"+getClass().getName()+"-"+getName()); + public void setUp() throws Exception { + super.setUp(); + deleteDirectory("target/data"); + File file = new File("target/data/hawtdb.dat"); hawtDBFile = new HawtDBFile(); hawtDBFile.setFile(file); hawtDBFile.start(); } - + @Override - protected void tearDown() throws Exception { + public void tearDown() throws Exception { hawtDBFile.stop(); + super.tearDown(); } - + + @Test public void testOperations() { - HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>(); repo.setFile(hawtDBFile); repo.setName("repo1"); - + // Can't get something we have not put in... Exchange actual = repo.get("missing"); assertEquals(null, actual); @@ -59,20 +61,21 @@ exchange1.getIn().setBody("counter:1"); actual = repo.add("foo", exchange1); assertEquals(null, actual); - + // Get it back.. actual = repo.get("foo"); - assertEquals(exchange1, actual); - + assertEquals("counter:1", actual.getIn().getBody()); + // Change it.. Exchange exchange2 = new DefaultExchange(context); exchange2.getIn().setBody("counter:2"); actual = repo.add("foo", exchange2); - assertEquals(exchange1, actual); - + // the old one + assertEquals("counter:1", actual.getIn().getBody()); + // Get it back.. actual = repo.get("foo"); - assertEquals(exchange2, actual); + assertEquals("counter:2", actual.getIn().getBody()); } } Modified: camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties?rev=915418&r1=915417&r2=915418&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties (original) +++ camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties Tue Feb 23 16:35:05 2010 @@ -33,4 +33,4 @@ log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n -log4j.appender.file.file=target/camel-ognl-test.log \ No newline at end of file +log4j.appender.file.file=target/camel-hawtdb-test.log \ No newline at end of file