This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ece3f8  Changes in HazelcastAggregationRepository to use replicated 
maps (#3841)
6ece3f8 is described below

commit 6ece3f804e5f3694d12df3b71ad225f3e34bfe0a
Author: Marco Longobardi <mrclng...@gmail.com>
AuthorDate: Wed May 20 14:19:55 2020 +0200

    Changes in HazelcastAggregationRepository to use replicated maps (#3841)
    
    * Create ReplicatedHazelcastAggregationRepository.java
    
    Changes to use replicated maps
    
    * Update HazelcastAggregationRepository.java
    
    Changes to use replicated maps
---
 .../hazelcast/HazelcastAggregationRepository.java  |  22 +--
 ... ReplicatedHazelcastAggregationRepository.java} | 205 +++++----------------
 2 files changed, 60 insertions(+), 167 deletions(-)

diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
index 58fde58..a3409a2 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
@@ -57,20 +57,20 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
                                                   implements 
RecoverableAggregationRepository,
                                                              
OptimisticLockingAggregationRepository {
     private static final Logger LOG = 
LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName());
-    private static final String COMPLETED_SUFFIX = "-completed";
+    protected static final String COMPLETED_SUFFIX = "-completed";
     
-    private boolean optimistic;
-    private boolean useLocalHzInstance;
-    private boolean useRecovery = true;
+    protected boolean optimistic;
+    protected boolean useLocalHzInstance;
+    protected boolean useRecovery = true;
     private IMap<String, DefaultExchangeHolder> cache;
     private IMap<String, DefaultExchangeHolder> persistedCache;
-    private HazelcastInstance hzInstance;
-    private String mapName;
-    private String persistenceMapName;
-    private String deadLetterChannel;
-    private long recoveryInterval = 5000;
-    private int maximumRedeliveries = 3;
-    private boolean allowSerializedHeaders;
+    protected HazelcastInstance hzInstance;
+    protected String mapName;
+    protected String persistenceMapName;
+    protected String deadLetterChannel;
+    protected long recoveryInterval = 5000;
+    protected int maximumRedeliveries = 3;
+    protected boolean allowSerializedHeaders;
 
     /**
      * Creates new {@link HazelcastAggregationRepository} that defaults to 
non-optimistic locking
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java
similarity index 65%
copy from 
components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
copy to 
components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java
index 58fde58..21cec1a 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor.aggregate.hazelcast;
 
 import java.util.Collections;
 import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
@@ -46,132 +47,100 @@ import org.slf4j.LoggerFactory;
  * {@link RecoverableAggregationRepository} and {@link 
OptimisticLockingAggregationRepository}.
  * Defaults to thread-safe (non-optimistic) locking and recoverable strategy.
  * Hazelcast settings are given to an end-user and can be controlled with 
repositoryName and persistentRespositoryName,
- * both are {@link com.hazelcast.map.IMap} &lt;String, Exchange&gt;. However 
HazelcastAggregationRepository
+ * both are {@link com.hazelcast.map.IMap} &lt;String, Exchange&gt;. However 
ReplicatedHazelcastAggregationRepository
  * can run it's own Hazelcast instance, but obviously no benefits of Hazelcast 
clustering are gained this way.
- * If the {@link HazelcastAggregationRepository} uses it's own local {@link 
HazelcastInstance} it will DESTROY this
+ * If the {@link ReplicatedHazelcastAggregationRepository} uses it's own local 
{@link HazelcastInstance} it will DESTROY this
  * instance on {@link #doStop()}. You should control {@link HazelcastInstance} 
lifecycle yourself whenever you instantiate
- * {@link HazelcastAggregationRepository} passing a reference to the instance.
+ * {@link ReplicatedHazelcastAggregationRepository} passing a reference to the 
instance.
  *
  */
-public class HazelcastAggregationRepository extends ServiceSupport
-                                                  implements 
RecoverableAggregationRepository,
-                                                             
OptimisticLockingAggregationRepository {
-    private static final Logger LOG = 
LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName());
-    private static final String COMPLETED_SUFFIX = "-completed";
-    
-    private boolean optimistic;
-    private boolean useLocalHzInstance;
-    private boolean useRecovery = true;
-    private IMap<String, DefaultExchangeHolder> cache;
-    private IMap<String, DefaultExchangeHolder> persistedCache;
-    private HazelcastInstance hzInstance;
-    private String mapName;
-    private String persistenceMapName;
-    private String deadLetterChannel;
-    private long recoveryInterval = 5000;
-    private int maximumRedeliveries = 3;
-    private boolean allowSerializedHeaders;
+public class ReplicatedHazelcastAggregationRepository extends 
HazelcastAggregationRepository {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicatedHazelcastAggregationRepository.class.getName());
+    protected Map<String, DefaultExchangeHolder> replicatedCache;
+    protected Map<String, DefaultExchangeHolder> replicatedPersistedCache;
 
     /**
-     * Creates new {@link HazelcastAggregationRepository} that defaults to 
non-optimistic locking
+     * Creates new {@link ReplicatedHazelcastAggregationRepository} that 
defaults to non-optimistic locking
      * with recoverable behavior and a local Hazelcast instance. Recoverable 
repository name defaults to
      * {@code repositoryName} + "-compeleted".
      * @param repositoryName {@link IMap} repository name;
      */
-    public HazelcastAggregationRepository(final String repositoryName) {
-        mapName = repositoryName;
-        persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX);
-        optimistic = false;
-        useLocalHzInstance = true;
+    public ReplicatedHazelcastAggregationRepository(final String 
repositoryName) {
+        super(repositoryName);
     }
 
     /**
-    * Creates new {@link HazelcastAggregationRepository} that defaults to 
non-optimistic locking
+    * Creates new {@link ReplicatedHazelcastAggregationRepository} that 
defaults to non-optimistic locking
     * with recoverable behavior and a local Hazelcast instance.
     * @param repositoryName {@link IMap} repository name;
     * @param  persistentRepositoryName {@link IMap} recoverable repository 
name;
     */
-    public HazelcastAggregationRepository(final String repositoryName, final 
String persistentRepositoryName) {
-        mapName = repositoryName;
-        persistenceMapName = persistentRepositoryName;
-        optimistic = false;
-        useLocalHzInstance = true;
+    public ReplicatedHazelcastAggregationRepository(final String 
repositoryName, final String persistentRepositoryName) {
+        super(repositoryName,persistentRepositoryName);
     }
 
     /**
-     * Creates new {@link HazelcastAggregationRepository} with recoverable 
behavior and a local Hazelcast instance.
+     * Creates new {@link ReplicatedHazelcastAggregationRepository} with 
recoverable behavior and a local Hazelcast instance.
      * Recoverable repository name defaults to {@code repositoryName} + 
"-compeleted".
      * @param repositoryName {@link IMap} repository name;
      * @param  optimistic whether to use optimistic locking manner.
      */
-    public HazelcastAggregationRepository(final String repositoryName, boolean 
optimistic) {
-        this(repositoryName);
-        this.optimistic = optimistic;
-        useLocalHzInstance = true;
+    public ReplicatedHazelcastAggregationRepository(final String 
repositoryName, boolean optimistic) {
+        super(repositoryName,optimistic);
     }
 
     /**
-     * Creates new {@link HazelcastAggregationRepository} with recoverable 
behavior and a local Hazelcast instance.
+     * Creates new {@link ReplicatedHazelcastAggregationRepository} with 
recoverable behavior and a local Hazelcast instance.
      * @param repositoryName {@link IMap} repository name;
      * @param  persistentRepositoryName {@link IMap} recoverable repository 
name;
      * @param optimistic whether to use optimistic locking manner.
      */
-    public HazelcastAggregationRepository(final String repositoryName, final 
String persistentRepositoryName, boolean optimistic) {
-        this(repositoryName, persistentRepositoryName);
-        this.optimistic = optimistic;
-        useLocalHzInstance = true;
+    public ReplicatedHazelcastAggregationRepository(final String 
repositoryName, final String persistentRepositoryName, boolean optimistic) {
+        super(repositoryName,persistentRepositoryName,optimistic);
     }
 
     /**
-     * Creates new {@link HazelcastAggregationRepository} that defaults to 
non-optimistic locking
+     * Creates new {@link ReplicatedHazelcastAggregationRepository} that 
defaults to non-optimistic locking
      * with recoverable behavior. Recoverable repository name defaults to
      * {@code repositoryName} + "-compeleted".
      * @param repositoryName {@link IMap} repository name;
      * @param hzInstanse externally configured {@link HazelcastInstance}.
      */
-    public HazelcastAggregationRepository(final String repositoryName, 
HazelcastInstance hzInstanse) {
-        this (repositoryName, false);
-        this.hzInstance = hzInstanse;
-        useLocalHzInstance = false;
+    public ReplicatedHazelcastAggregationRepository(final String 
repositoryName, HazelcastInstance hzInstanse) {
+        super(repositoryName,hzInstanse);
     }
 
     /**
-     * Creates new {@link HazelcastAggregationRepository} that defaults to 
non-optimistic locking
+     * Creates new {@link ReplicatedHazelcastAggregationRepository} that 
defaults to non-optimistic locking
      * with recoverable behavior.
      * @param repositoryName {@link IMap} repository name;
      * @param  persistentRepositoryName {@link IMap} recoverable repository 
name;
      * @param hzInstanse externally configured {@link HazelcastInstance}.
      */
-    public HazelcastAggregationRepository(final String repositoryName, final 
String persistentRepositoryName, HazelcastInstance hzInstanse) {
-        this (repositoryName, persistentRepositoryName, false);
-        this.hzInstance = hzInstanse;
-        useLocalHzInstance = false;
+    public ReplicatedHazelcastAggregationRepository(final String 
repositoryName, final String persistentRepositoryName, HazelcastInstance 
hzInstanse) {
+        super(repositoryName, persistentRepositoryName, hzInstanse);
     }
 
     /**
-     * Creates new {@link HazelcastAggregationRepository} with recoverable 
behavior.
+     * Creates new {@link ReplicatedHazelcastAggregationRepository} with 
recoverable behavior.
      * Recoverable repository name defaults to {@code repositoryName} + 
"-compeleted".
      * @param repositoryName {@link IMap} repository name;
      * @param  optimistic whether to use optimistic locking manner;
      * @param hzInstance  externally configured {@link HazelcastInstance}.
      */
-    public HazelcastAggregationRepository(final String repositoryName, boolean 
optimistic, HazelcastInstance hzInstance) {
-        this(repositoryName, optimistic);
-        this.hzInstance = hzInstance;
-        useLocalHzInstance = false;
+    public ReplicatedHazelcastAggregationRepository(final String 
repositoryName, boolean optimistic, HazelcastInstance hzInstance) {
+        super(repositoryName,optimistic,hzInstance);
     }
 
     /**
-     * Creates new {@link HazelcastAggregationRepository} with recoverable 
behavior.
+     * Creates new {@link ReplicatedHazelcastAggregationRepository} with 
recoverable behavior.
      * @param repositoryName {@link IMap} repository name;
      * @param optimistic whether to use optimistic locking manner;
      * @param persistentRepositoryName {@link IMap} recoverable repository 
name;
      * @param hzInstance  externally configured {@link HazelcastInstance}.
      */
-    public HazelcastAggregationRepository(final String repositoryName, final 
String persistentRepositoryName, boolean optimistic, HazelcastInstance 
hzInstance) {
-        this(repositoryName, persistentRepositoryName, optimistic);
-        this.hzInstance = hzInstance;
-        useLocalHzInstance = false;
+    public ReplicatedHazelcastAggregationRepository(final String 
repositoryName, final String persistentRepositoryName, boolean optimistic, 
HazelcastInstance hzInstance) {
+        super(repositoryName,persistentRepositoryName,optimistic,hzInstance);
     }
 
     @Override
@@ -182,7 +151,7 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
         LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic 
manner.", newExchange.getExchangeId(), key);
         if (oldExchange == null) {
             DefaultExchangeHolder holder = 
DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders);
-            final DefaultExchangeHolder misbehaviorHolder = 
cache.putIfAbsent(key, holder);
+            final DefaultExchangeHolder misbehaviorHolder = 
replicatedCache.putIfAbsent(key, holder);
             if (misbehaviorHolder != null) {
                 Exchange misbehaviorEx = unmarshallExchange(camelContext, 
misbehaviorHolder);
                 LOG.error("Optimistic locking failed for exchange with key {}: 
IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges 
to be returned",
@@ -192,7 +161,7 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
         } else {
             DefaultExchangeHolder oldHolder = 
DefaultExchangeHolder.marshal(oldExchange, true, allowSerializedHeaders);
             DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders);
-            if (!cache.replace(key, oldHolder, newHolder)) {
+            if (!replicatedCache.replace(key, oldHolder, newHolder)) {
                 LOG.error("Optimistic locking failed for exchange with key {}: 
IMap#replace returned no Exchanges, while it's expected to replace one",
                         key);
                 throw new OptimisticLockingException();
@@ -212,7 +181,7 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
         try {
             l.lock();
             DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
-            DefaultExchangeHolder oldHolder = cache.put(key, newHolder);
+            DefaultExchangeHolder oldHolder = replicatedCache.put(key, 
newHolder);
             return unmarshallExchange(camelContext, oldHolder);
         } finally {
             LOG.trace("Added an Exchange with ID {} for key {} in a 
thread-safe manner.", exchange.getExchangeId(), key);
@@ -224,7 +193,7 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
     public Set<String> scan(CamelContext camelContext) {
         if (useRecovery) {
             LOG.trace("Scanning for exchanges to recover in {} context", 
camelContext.getName());
-            Set<String> scanned = 
Collections.unmodifiableSet(persistedCache.keySet());
+            Set<String> scanned = 
Collections.unmodifiableSet(replicatedPersistedCache.keySet());
             LOG.trace("Found {} keys for exchanges to recover in {} context", 
scanned.size(), camelContext.getName());
             return scanned;
         } else {
@@ -237,57 +206,12 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
     @Override
     public Exchange recover(CamelContext camelContext, String exchangeId) {
         LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
-        return useRecovery ? unmarshallExchange(camelContext, 
persistedCache.get(exchangeId)) : null;
-    }
-
-    @Override
-    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
-        this.recoveryInterval = timeUnit.toMillis(interval);
-    }
-
-    @Override
-    public void setRecoveryInterval(long interval) {
-        this.recoveryInterval = interval;
-    }
-
-    @Override
-    public long getRecoveryIntervalInMillis() {
-        return recoveryInterval;
-    }
-
-    @Override
-    public void setUseRecovery(boolean useRecovery) {
-        this.useRecovery = useRecovery;
-    }
-
-    @Override
-    public boolean isUseRecovery() {
-        return useRecovery;
-    }
-
-    @Override
-    public void setDeadLetterUri(String deadLetterUri) {
-        this.deadLetterChannel = deadLetterUri;
-    }
-
-    @Override
-    public String getDeadLetterUri() {
-        return deadLetterChannel;
-    }
-
-    @Override
-    public void setMaximumRedeliveries(int maximumRedeliveries) {
-        this.maximumRedeliveries = maximumRedeliveries;
-    }
-
-    @Override
-    public int getMaximumRedeliveries() {
-        return maximumRedeliveries;
+        return useRecovery ? unmarshallExchange(camelContext, 
replicatedPersistedCache.get(exchangeId)) : null;
     }
 
     @Override
     public Exchange get(CamelContext camelContext, String key) {
-        return unmarshallExchange(camelContext, cache.get(key));
+        return unmarshallExchange(camelContext, replicatedCache.get(key));
     }
     
     /**
@@ -296,21 +220,13 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
      * @param key Object - key in question
      */
     public boolean containsKey(Object key) {
-        if (cache != null) {
-            return cache.containsKey(key);
+        if (replicatedCache != null) {
+            return replicatedCache.containsKey(key);
         } else {
             return false;
         }
     }
     
-    public boolean isAllowSerializedHeaders() {
-        return allowSerializedHeaders;
-    }
-
-    public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
-        this.allowSerializedHeaders = allowSerializedHeaders;
-    }
-
     /**
      * This method performs transactional operation on removing the {@code 
exchange}
      * from the operational storage and moving it into the persistent one if 
the {@link HazelcastAggregationRepository}
@@ -324,7 +240,7 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
         DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, 
true, allowSerializedHeaders);
         if (optimistic) {
             LOG.trace("Removing an exchange with ID {} for key {} in an 
optimistic manner.", exchange.getExchangeId(), key);
-            if (!cache.remove(key, holder)) {
+            if (!replicatedCache.remove(key, holder)) {
                 LOG.error("Optimistic locking failed for exchange with key {}: 
IMap#remove removed no Exchanges, while it's expected to remove one.",
                         key);
                 throw new OptimisticLockingException();
@@ -333,7 +249,7 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
             if (useRecovery) {
                 LOG.trace("Putting an exchange with ID {} for key {} into a 
recoverable storage in an optimistic manner.",
                         exchange.getExchangeId(), key);
-                persistedCache.put(exchange.getExchangeId(), holder);
+                replicatedPersistedCache.put(exchange.getExchangeId(), holder);
                 LOG.trace("Put an exchange with ID {} for key {} into a 
recoverable storage in an optimistic manner.",
                         exchange.getExchangeId(), key);
             }
@@ -351,8 +267,8 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
                 try {
                     tCtx.beginTransaction();
 
-                    TransactionalMap<String, DefaultExchangeHolder> tCache = 
tCtx.getMap(cache.getName());
-                    TransactionalMap<String, DefaultExchangeHolder> 
tPersistentCache = tCtx.getMap(persistedCache.getName());
+                    TransactionalMap<String, DefaultExchangeHolder> tCache = 
tCtx.getMap(mapName);
+                    TransactionalMap<String, DefaultExchangeHolder> 
tPersistentCache = tCtx.getMap(persistenceMapName);
 
                     DefaultExchangeHolder removedHolder = tCache.remove(key);
                     LOG.trace("Putting an exchange with ID {} for key {} into 
a recoverable storage in a thread-safe manner.",
@@ -372,7 +288,7 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
                     throw new RuntimeException(msg, throwable);
                 }
             } else {
-                cache.remove(key);
+                replicatedCache.remove(key);
             }
         }
     }
@@ -381,20 +297,13 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
     public void confirm(CamelContext camelContext, String exchangeId) {
         LOG.trace("Confirming an exchange with ID {}.", exchangeId);
         if (useRecovery) {
-            persistedCache.remove(exchangeId);
+            replicatedPersistedCache.remove(exchangeId);
         }
     }
 
     @Override
     public Set<String> getKeys() {
-        return Collections.unmodifiableSet(cache.keySet());
-    }
-
-    /**
-     * @return Persistent repository {@link IMap} name;
-     */
-    public String getPersistentRepositoryName() {
-        return persistenceMapName;
+        return Collections.unmodifiableSet(replicatedCache.keySet());
     }
 
     @Override
@@ -413,26 +322,10 @@ public class HazelcastAggregationRepository extends 
ServiceSupport
         } else {
             ObjectHelper.notNull(hzInstance, "hzInstanse");
         }
-        cache = hzInstance.getMap(mapName);
+        replicatedCache = hzInstance.getReplicatedMap(mapName);
         if (useRecovery) {
-            persistedCache = hzInstance.getMap(persistenceMapName);
-        }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        //noop
-        if (useLocalHzInstance) {
-            hzInstance.getLifecycleService().shutdown();
+            replicatedPersistedCache = 
hzInstance.getReplicatedMap(persistenceMapName);
         }
     }
 
-    protected Exchange unmarshallExchange(CamelContext camelContext, 
DefaultExchangeHolder holder) {
-        Exchange exchange = null;
-        if (holder != null) {
-            exchange = new DefaultExchange(camelContext);
-            DefaultExchangeHolder.unmarshal(exchange, holder);
-        }
-        return exchange;
-    }
 }

Reply via email to