This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 6ece3f8 Changes in HazelcastAggregationRepository to use replicated maps (#3841) 6ece3f8 is described below commit 6ece3f804e5f3694d12df3b71ad225f3e34bfe0a Author: Marco Longobardi <mrclng...@gmail.com> AuthorDate: Wed May 20 14:19:55 2020 +0200 Changes in HazelcastAggregationRepository to use replicated maps (#3841) * Create ReplicatedHazelcastAggregationRepository.java Changes to use replicated maps * Update HazelcastAggregationRepository.java Changes to use replicated maps --- .../hazelcast/HazelcastAggregationRepository.java | 22 +-- ... ReplicatedHazelcastAggregationRepository.java} | 205 +++++---------------- 2 files changed, 60 insertions(+), 167 deletions(-) diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java index 58fde58..a3409a2 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java @@ -57,20 +57,20 @@ public class HazelcastAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { private static final Logger LOG = LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName()); - private static final String COMPLETED_SUFFIX = "-completed"; + protected static final String COMPLETED_SUFFIX = "-completed"; - private boolean optimistic; - private boolean useLocalHzInstance; - private boolean useRecovery = true; + protected boolean optimistic; + protected boolean useLocalHzInstance; + protected boolean useRecovery = true; private IMap<String, DefaultExchangeHolder> cache; private IMap<String, DefaultExchangeHolder> persistedCache; - private HazelcastInstance hzInstance; - private String mapName; - private String persistenceMapName; - private String deadLetterChannel; - private long recoveryInterval = 5000; - private int maximumRedeliveries = 3; - private boolean allowSerializedHeaders; + protected HazelcastInstance hzInstance; + protected String mapName; + protected String persistenceMapName; + protected String deadLetterChannel; + protected long recoveryInterval = 5000; + protected int maximumRedeliveries = 3; + protected boolean allowSerializedHeaders; /** * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java similarity index 65% copy from components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java copy to components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java index 58fde58..21cec1a 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java @@ -18,6 +18,7 @@ package org.apache.camel.processor.aggregate.hazelcast; import java.util.Collections; import java.util.Set; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -46,132 +47,100 @@ import org.slf4j.LoggerFactory; * {@link RecoverableAggregationRepository} and {@link OptimisticLockingAggregationRepository}. * Defaults to thread-safe (non-optimistic) locking and recoverable strategy. * Hazelcast settings are given to an end-user and can be controlled with repositoryName and persistentRespositoryName, - * both are {@link com.hazelcast.map.IMap} <String, Exchange>. However HazelcastAggregationRepository + * both are {@link com.hazelcast.map.IMap} <String, Exchange>. However ReplicatedHazelcastAggregationRepository * can run it's own Hazelcast instance, but obviously no benefits of Hazelcast clustering are gained this way. - * If the {@link HazelcastAggregationRepository} uses it's own local {@link HazelcastInstance} it will DESTROY this + * If the {@link ReplicatedHazelcastAggregationRepository} uses it's own local {@link HazelcastInstance} it will DESTROY this * instance on {@link #doStop()}. You should control {@link HazelcastInstance} lifecycle yourself whenever you instantiate - * {@link HazelcastAggregationRepository} passing a reference to the instance. + * {@link ReplicatedHazelcastAggregationRepository} passing a reference to the instance. * */ -public class HazelcastAggregationRepository extends ServiceSupport - implements RecoverableAggregationRepository, - OptimisticLockingAggregationRepository { - private static final Logger LOG = LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName()); - private static final String COMPLETED_SUFFIX = "-completed"; - - private boolean optimistic; - private boolean useLocalHzInstance; - private boolean useRecovery = true; - private IMap<String, DefaultExchangeHolder> cache; - private IMap<String, DefaultExchangeHolder> persistedCache; - private HazelcastInstance hzInstance; - private String mapName; - private String persistenceMapName; - private String deadLetterChannel; - private long recoveryInterval = 5000; - private int maximumRedeliveries = 3; - private boolean allowSerializedHeaders; +public class ReplicatedHazelcastAggregationRepository extends HazelcastAggregationRepository { + private static final Logger LOG = LoggerFactory.getLogger(ReplicatedHazelcastAggregationRepository.class.getName()); + protected Map<String, DefaultExchangeHolder> replicatedCache; + protected Map<String, DefaultExchangeHolder> replicatedPersistedCache; /** - * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking + * Creates new {@link ReplicatedHazelcastAggregationRepository} that defaults to non-optimistic locking * with recoverable behavior and a local Hazelcast instance. Recoverable repository name defaults to * {@code repositoryName} + "-compeleted". * @param repositoryName {@link IMap} repository name; */ - public HazelcastAggregationRepository(final String repositoryName) { - mapName = repositoryName; - persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX); - optimistic = false; - useLocalHzInstance = true; + public ReplicatedHazelcastAggregationRepository(final String repositoryName) { + super(repositoryName); } /** - * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking + * Creates new {@link ReplicatedHazelcastAggregationRepository} that defaults to non-optimistic locking * with recoverable behavior and a local Hazelcast instance. * @param repositoryName {@link IMap} repository name; * @param persistentRepositoryName {@link IMap} recoverable repository name; */ - public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName) { - mapName = repositoryName; - persistenceMapName = persistentRepositoryName; - optimistic = false; - useLocalHzInstance = true; + public ReplicatedHazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName) { + super(repositoryName,persistentRepositoryName); } /** - * Creates new {@link HazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance. + * Creates new {@link ReplicatedHazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance. * Recoverable repository name defaults to {@code repositoryName} + "-compeleted". * @param repositoryName {@link IMap} repository name; * @param optimistic whether to use optimistic locking manner. */ - public HazelcastAggregationRepository(final String repositoryName, boolean optimistic) { - this(repositoryName); - this.optimistic = optimistic; - useLocalHzInstance = true; + public ReplicatedHazelcastAggregationRepository(final String repositoryName, boolean optimistic) { + super(repositoryName,optimistic); } /** - * Creates new {@link HazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance. + * Creates new {@link ReplicatedHazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance. * @param repositoryName {@link IMap} repository name; * @param persistentRepositoryName {@link IMap} recoverable repository name; * @param optimistic whether to use optimistic locking manner. */ - public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic) { - this(repositoryName, persistentRepositoryName); - this.optimistic = optimistic; - useLocalHzInstance = true; + public ReplicatedHazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic) { + super(repositoryName,persistentRepositoryName,optimistic); } /** - * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking + * Creates new {@link ReplicatedHazelcastAggregationRepository} that defaults to non-optimistic locking * with recoverable behavior. Recoverable repository name defaults to * {@code repositoryName} + "-compeleted". * @param repositoryName {@link IMap} repository name; * @param hzInstanse externally configured {@link HazelcastInstance}. */ - public HazelcastAggregationRepository(final String repositoryName, HazelcastInstance hzInstanse) { - this (repositoryName, false); - this.hzInstance = hzInstanse; - useLocalHzInstance = false; + public ReplicatedHazelcastAggregationRepository(final String repositoryName, HazelcastInstance hzInstanse) { + super(repositoryName,hzInstanse); } /** - * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking + * Creates new {@link ReplicatedHazelcastAggregationRepository} that defaults to non-optimistic locking * with recoverable behavior. * @param repositoryName {@link IMap} repository name; * @param persistentRepositoryName {@link IMap} recoverable repository name; * @param hzInstanse externally configured {@link HazelcastInstance}. */ - public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, HazelcastInstance hzInstanse) { - this (repositoryName, persistentRepositoryName, false); - this.hzInstance = hzInstanse; - useLocalHzInstance = false; + public ReplicatedHazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, HazelcastInstance hzInstanse) { + super(repositoryName, persistentRepositoryName, hzInstanse); } /** - * Creates new {@link HazelcastAggregationRepository} with recoverable behavior. + * Creates new {@link ReplicatedHazelcastAggregationRepository} with recoverable behavior. * Recoverable repository name defaults to {@code repositoryName} + "-compeleted". * @param repositoryName {@link IMap} repository name; * @param optimistic whether to use optimistic locking manner; * @param hzInstance externally configured {@link HazelcastInstance}. */ - public HazelcastAggregationRepository(final String repositoryName, boolean optimistic, HazelcastInstance hzInstance) { - this(repositoryName, optimistic); - this.hzInstance = hzInstance; - useLocalHzInstance = false; + public ReplicatedHazelcastAggregationRepository(final String repositoryName, boolean optimistic, HazelcastInstance hzInstance) { + super(repositoryName,optimistic,hzInstance); } /** - * Creates new {@link HazelcastAggregationRepository} with recoverable behavior. + * Creates new {@link ReplicatedHazelcastAggregationRepository} with recoverable behavior. * @param repositoryName {@link IMap} repository name; * @param optimistic whether to use optimistic locking manner; * @param persistentRepositoryName {@link IMap} recoverable repository name; * @param hzInstance externally configured {@link HazelcastInstance}. */ - public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic, HazelcastInstance hzInstance) { - this(repositoryName, persistentRepositoryName, optimistic); - this.hzInstance = hzInstance; - useLocalHzInstance = false; + public ReplicatedHazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic, HazelcastInstance hzInstance) { + super(repositoryName,persistentRepositoryName,optimistic,hzInstance); } @Override @@ -182,7 +151,7 @@ public class HazelcastAggregationRepository extends ServiceSupport LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", newExchange.getExchangeId(), key); if (oldExchange == null) { DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); - final DefaultExchangeHolder misbehaviorHolder = cache.putIfAbsent(key, holder); + final DefaultExchangeHolder misbehaviorHolder = replicatedCache.putIfAbsent(key, holder); if (misbehaviorHolder != null) { Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder); LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", @@ -192,7 +161,7 @@ public class HazelcastAggregationRepository extends ServiceSupport } else { DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal(oldExchange, true, allowSerializedHeaders); DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); - if (!cache.replace(key, oldHolder, newHolder)) { + if (!replicatedCache.replace(key, oldHolder, newHolder)) { LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", key); throw new OptimisticLockingException(); @@ -212,7 +181,7 @@ public class HazelcastAggregationRepository extends ServiceSupport try { l.lock(); DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); - DefaultExchangeHolder oldHolder = cache.put(key, newHolder); + DefaultExchangeHolder oldHolder = replicatedCache.put(key, newHolder); return unmarshallExchange(camelContext, oldHolder); } finally { LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); @@ -224,7 +193,7 @@ public class HazelcastAggregationRepository extends ServiceSupport public Set<String> scan(CamelContext camelContext) { if (useRecovery) { LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); - Set<String> scanned = Collections.unmodifiableSet(persistedCache.keySet()); + Set<String> scanned = Collections.unmodifiableSet(replicatedPersistedCache.keySet()); LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName()); return scanned; } else { @@ -237,57 +206,12 @@ public class HazelcastAggregationRepository extends ServiceSupport @Override public Exchange recover(CamelContext camelContext, String exchangeId) { LOG.trace("Recovering an Exchange with ID {}.", exchangeId); - return useRecovery ? unmarshallExchange(camelContext, persistedCache.get(exchangeId)) : null; - } - - @Override - public void setRecoveryInterval(long interval, TimeUnit timeUnit) { - this.recoveryInterval = timeUnit.toMillis(interval); - } - - @Override - public void setRecoveryInterval(long interval) { - this.recoveryInterval = interval; - } - - @Override - public long getRecoveryIntervalInMillis() { - return recoveryInterval; - } - - @Override - public void setUseRecovery(boolean useRecovery) { - this.useRecovery = useRecovery; - } - - @Override - public boolean isUseRecovery() { - return useRecovery; - } - - @Override - public void setDeadLetterUri(String deadLetterUri) { - this.deadLetterChannel = deadLetterUri; - } - - @Override - public String getDeadLetterUri() { - return deadLetterChannel; - } - - @Override - public void setMaximumRedeliveries(int maximumRedeliveries) { - this.maximumRedeliveries = maximumRedeliveries; - } - - @Override - public int getMaximumRedeliveries() { - return maximumRedeliveries; + return useRecovery ? unmarshallExchange(camelContext, replicatedPersistedCache.get(exchangeId)) : null; } @Override public Exchange get(CamelContext camelContext, String key) { - return unmarshallExchange(camelContext, cache.get(key)); + return unmarshallExchange(camelContext, replicatedCache.get(key)); } /** @@ -296,21 +220,13 @@ public class HazelcastAggregationRepository extends ServiceSupport * @param key Object - key in question */ public boolean containsKey(Object key) { - if (cache != null) { - return cache.containsKey(key); + if (replicatedCache != null) { + return replicatedCache.containsKey(key); } else { return false; } } - public boolean isAllowSerializedHeaders() { - return allowSerializedHeaders; - } - - public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { - this.allowSerializedHeaders = allowSerializedHeaders; - } - /** * This method performs transactional operation on removing the {@code exchange} * from the operational storage and moving it into the persistent one if the {@link HazelcastAggregationRepository} @@ -324,7 +240,7 @@ public class HazelcastAggregationRepository extends ServiceSupport DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); if (optimistic) { LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key); - if (!cache.remove(key, holder)) { + if (!replicatedCache.remove(key, holder)) { LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", key); throw new OptimisticLockingException(); @@ -333,7 +249,7 @@ public class HazelcastAggregationRepository extends ServiceSupport if (useRecovery) { LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", exchange.getExchangeId(), key); - persistedCache.put(exchange.getExchangeId(), holder); + replicatedPersistedCache.put(exchange.getExchangeId(), holder); LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", exchange.getExchangeId(), key); } @@ -351,8 +267,8 @@ public class HazelcastAggregationRepository extends ServiceSupport try { tCtx.beginTransaction(); - TransactionalMap<String, DefaultExchangeHolder> tCache = tCtx.getMap(cache.getName()); - TransactionalMap<String, DefaultExchangeHolder> tPersistentCache = tCtx.getMap(persistedCache.getName()); + TransactionalMap<String, DefaultExchangeHolder> tCache = tCtx.getMap(mapName); + TransactionalMap<String, DefaultExchangeHolder> tPersistentCache = tCtx.getMap(persistenceMapName); DefaultExchangeHolder removedHolder = tCache.remove(key); LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", @@ -372,7 +288,7 @@ public class HazelcastAggregationRepository extends ServiceSupport throw new RuntimeException(msg, throwable); } } else { - cache.remove(key); + replicatedCache.remove(key); } } } @@ -381,20 +297,13 @@ public class HazelcastAggregationRepository extends ServiceSupport public void confirm(CamelContext camelContext, String exchangeId) { LOG.trace("Confirming an exchange with ID {}.", exchangeId); if (useRecovery) { - persistedCache.remove(exchangeId); + replicatedPersistedCache.remove(exchangeId); } } @Override public Set<String> getKeys() { - return Collections.unmodifiableSet(cache.keySet()); - } - - /** - * @return Persistent repository {@link IMap} name; - */ - public String getPersistentRepositoryName() { - return persistenceMapName; + return Collections.unmodifiableSet(replicatedCache.keySet()); } @Override @@ -413,26 +322,10 @@ public class HazelcastAggregationRepository extends ServiceSupport } else { ObjectHelper.notNull(hzInstance, "hzInstanse"); } - cache = hzInstance.getMap(mapName); + replicatedCache = hzInstance.getReplicatedMap(mapName); if (useRecovery) { - persistedCache = hzInstance.getMap(persistenceMapName); - } - } - - @Override - protected void doStop() throws Exception { - //noop - if (useLocalHzInstance) { - hzInstance.getLifecycleService().shutdown(); + replicatedPersistedCache = hzInstance.getReplicatedMap(persistenceMapName); } } - protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { - Exchange exchange = null; - if (holder != null) { - exchange = new DefaultExchange(camelContext); - DefaultExchangeHolder.unmarshal(exchange, holder); - } - return exchange; - } }