gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499051782


##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/AbstractHashMapPersister.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public abstract class AbstractHashMapPersister<K, V> implements 
Persister<JournalHashMap.MapRecord<K, V>> {
+
+   @Override
+   public byte getID() {
+      return 0;
+   }
+
+   @Override
+   public final int getEncodeSize(JournalHashMap.MapRecord<K, V> record) {
+      return DataConstants.SIZE_BYTE + // FILLER, could be used for versioning 
in the future
+             DataConstants.SIZE_LONG + // recordID
+             DataConstants.SIZE_LONG + // collectionID
+             getKeySize(record.key) +
+             getValueSize(record.value);
+   }
+
+   protected abstract int getKeySize(K key);
+
+   protected abstract void encodeKey(ActiveMQBuffer buffer, K key);
+
+   protected abstract K decodeKey(ActiveMQBuffer buffer);
+
+   protected abstract int getValueSize(V value);
+
+   protected abstract void encodeValue(ActiveMQBuffer buffer, V value);
+
+   protected abstract V decodeValue(ActiveMQBuffer buffer, K key);
+
+   @Override
+   public final void encode(ActiveMQBuffer buffer, JournalHashMap.MapRecord<K, 
V> record) {
+      buffer.writeByte((byte)0); // filler - could be used for versioning in 
the future.
+      buffer.writeLong(record.id);
+      buffer.writeLong(record.collectionID);
+      encodeKey(buffer, record.key);
+      encodeValue(buffer, record.value);
+   }
+
+   @Override
+   public final JournalHashMap.MapRecord<K, V> decode(ActiveMQBuffer buffer,
+                                                JournalHashMap.MapRecord<K, V> 
record,
+                                                CoreMessageObjectPools pool) {
+      buffer.readByte(); // filler - not used currently - just in case we ever 
need to version this wiring
+      long id = buffer.readLong();
+      long collectionID = buffer.readLong();
+      K key = decodeKey(buffer);
+      V value = decodeValue(buffer, key);
+
+      JournalHashMap.MapRecord<K, V> mapRecord = new 
JournalHashMap.MapRecord<>(collectionID, id, key, value);

Review Comment:
   
   
   Is there a reason the persister favours the 'id' as the first arg, but the 
object representation favours the 'collectionId' as the first?
   
   Would be nice if they were consistent unless there is a reason they shouldnt 
be?



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/AbstractHashMapPersister.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public abstract class AbstractHashMapPersister<K, V> implements 
Persister<JournalHashMap.MapRecord<K, V>> {
+
+   @Override
+   public byte getID() {
+      return 0;
+   }
+
+   @Override
+   public final int getEncodeSize(JournalHashMap.MapRecord<K, V> record) {
+      return DataConstants.SIZE_BYTE + // FILLER, could be used for versioning 
in the future
+             DataConstants.SIZE_LONG + // recordID
+             DataConstants.SIZE_LONG + // collectionID
+             getKeySize(record.key) +
+             getValueSize(record.value);
+   }
+
+   protected abstract int getKeySize(K key);
+
+   protected abstract void encodeKey(ActiveMQBuffer buffer, K key);
+
+   protected abstract K decodeKey(ActiveMQBuffer buffer);
+
+   protected abstract int getValueSize(V value);
+
+   protected abstract void encodeValue(ActiveMQBuffer buffer, V value);
+
+   protected abstract V decodeValue(ActiveMQBuffer buffer, K key);
+
+   @Override
+   public final void encode(ActiveMQBuffer buffer, JournalHashMap.MapRecord<K, 
V> record) {
+      buffer.writeByte((byte)0); // filler - could be used for versioning in 
the future.
+      buffer.writeLong(record.id);
+      buffer.writeLong(record.collectionID);
+      encodeKey(buffer, record.key);
+      encodeValue(buffer, record.value);
+   }
+
+   @Override
+   public final JournalHashMap.MapRecord<K, V> decode(ActiveMQBuffer buffer,
+                                                JournalHashMap.MapRecord<K, V> 
record,
+                                                CoreMessageObjectPools pool) {
+      buffer.readByte(); // filler - not used currently - just in case we ever 
need to version this wiring

Review Comment:
   Could we just say its the version now, and make the code + comments (perhaps 
then deletable..) clearer for later?



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {
+      final long collectionID;
+      long id;
+      K key;
+      V value;
+
+      MapRecord(long collectionID, long id, K key, V value) {
+         this.collectionID = collectionID;
+         this.id = id;
+         this.key = key;
+         this.value = value;
+      }
+
+      @Override
+      public K getKey() {
+         return key;
+      }
+
+      @Override
+      public V getValue() {
+         return value;
+      }
+
+      @Override
+      public V setValue(V value) {
+         V oldValue = this.value;
+         this.value = value;
+         return oldValue;
+      }
+
+      @Override
+      public String toString() {
+         return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+      }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, 
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+      this.collectionId = collectionId;
+      this.journal = journal;
+      this.idGenerator = idGenerator;
+      this.persister = persister;
+      this.recordType = recordType;
+      this.exceptionListener = ioExceptionListener;
+      this.completionSupplier = completionSupplier;
+      this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction<C> contextProvider;
+
+   private final Persister<MapRecord<K, V>> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier<IOCompletion> completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+   public long getCollectionId() {
+      return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+      return map.size();
+   }
+
+   public C getContext() {
+      if (context == null && contextProvider != null) {
+         context = contextProvider.apply(this.collectionId);
+      }
+      return context;
+   }
+
+   public JournalHashMap<K, V, C> setContext(C context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+      return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object value) {
+      for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+         if (value.equals(entry.getValue().value)) {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   @Override
+   public synchronized V get(Object key) {
+      MapRecord<K, V> reccord = map.get(key);
+      if (reccord == null) {
+         return null;
+      } else {
+         return reccord.value;
+      }
+   }
+
+   /** This is to be called from a single thread during reload, no need to be 
synchronized */
+   public void reload(MapRecord<K, V> reloadValue) {
+      map.put(reloadValue.getKey(), reloadValue);
+   }
+
+   @Override
+   public synchronized V put(K key, V value) {
+      logger.debug("adding {} = {}", key, value);
+      long id = idGenerator.getAsLong();
+      MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+      store(key, record);
+      MapRecord<K, V> oldRecord = map.put(key, record);
+
+      if (oldRecord != null) {
+         removed(oldRecord);
+         return oldRecord.value;
+      } else {
+         return null;
+      }
+
+   }
+
+   private synchronized void store(K key, MapRecord<K, V> record) {
+      try {
+         IOCompletion callback = null;
+         if (completionSupplier != null) {
+            callback = completionSupplier.get();
+         }
+
+         if (callback == null) {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
false);
+         } else {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
true, callback);
+         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord) {

Review Comment:
   Typo, record. Also, add type info.



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {
+      final long collectionID;
+      long id;
+      K key;
+      V value;
+
+      MapRecord(long collectionID, long id, K key, V value) {
+         this.collectionID = collectionID;
+         this.id = id;
+         this.key = key;
+         this.value = value;
+      }
+
+      @Override
+      public K getKey() {
+         return key;
+      }
+
+      @Override
+      public V getValue() {
+         return value;
+      }
+
+      @Override
+      public V setValue(V value) {
+         V oldValue = this.value;
+         this.value = value;
+         return oldValue;
+      }
+
+      @Override
+      public String toString() {
+         return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+      }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, 
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+      this.collectionId = collectionId;
+      this.journal = journal;
+      this.idGenerator = idGenerator;
+      this.persister = persister;
+      this.recordType = recordType;
+      this.exceptionListener = ioExceptionListener;
+      this.completionSupplier = completionSupplier;
+      this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction<C> contextProvider;
+
+   private final Persister<MapRecord<K, V>> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier<IOCompletion> completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+   public long getCollectionId() {
+      return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+      return map.size();
+   }
+
+   public C getContext() {
+      if (context == null && contextProvider != null) {
+         context = contextProvider.apply(this.collectionId);
+      }
+      return context;
+   }
+
+   public JournalHashMap<K, V, C> setContext(C context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+      return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object value) {
+      for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+         if (value.equals(entry.getValue().value)) {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   @Override
+   public synchronized V get(Object key) {
+      MapRecord<K, V> reccord = map.get(key);
+      if (reccord == null) {
+         return null;
+      } else {
+         return reccord.value;
+      }
+   }
+
+   /** This is to be called from a single thread during reload, no need to be 
synchronized */
+   public void reload(MapRecord<K, V> reloadValue) {
+      map.put(reloadValue.getKey(), reloadValue);
+   }
+
+   @Override
+   public synchronized V put(K key, V value) {
+      logger.debug("adding {} = {}", key, value);
+      long id = idGenerator.getAsLong();
+      MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+      store(key, record);
+      MapRecord<K, V> oldRecord = map.put(key, record);
+
+      if (oldRecord != null) {
+         removed(oldRecord);
+         return oldRecord.value;
+      } else {
+         return null;
+      }
+
+   }
+
+   private synchronized void store(K key, MapRecord<K, V> record) {
+      try {
+         IOCompletion callback = null;
+         if (completionSupplier != null) {
+            callback = completionSupplier.get();
+         }
+
+         if (callback == null) {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
false);
+         } else {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
true, callback);
+         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord) {
+      try {
+         journal.appendDeleteRecord(reccord.id, false);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord, long txid) {
+      try {
+         journal.appendDeleteRecordTransactional(txid, reccord.id);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   @Override
+   public synchronized V remove(Object key) {
+      MapRecord<K, V> record = map.remove(key);
+      this.removed(record);
+      return record.value;
+   }
+
+   /** This method will remove the element from the HashMap immediately 
however the record is still part of a transaction.
+    *  This is not playing with rollbacks. So a rollback on the transaction 
wouldn't place the elements back.
+    *  This is intended to make sure the operation would be atomic in case of 
a failure, while an appendRollback is not expected. */
+   public synchronized V remove(Object key, long transactionID) {
+      MapRecord<K, V> record = map.remove(key);
+      this.removed(record, transactionID);
+      return record.value;
+   }
+
+   @Override
+   public synchronized void putAll(Map<? extends K, ? extends V> m) {
+      m.forEach(this::put);
+   }
+
+   @Override
+   public synchronized void clear() {
+      map.values().forEach(v -> remove(v));
+      map.clear();
+   }
+
+   @Override
+   public synchronized Set<K> keySet() {
+      HashSet<K> keys = new HashSet(map.size());
+      map.values().forEach(v -> keys.add(v.key));
+      return keys;
+   }

Review Comment:
   Pretty sure this violates the Map contract for keySet() that the Set is 
backed by the map contents.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java:
##########
@@ -43,6 +43,8 @@ public class AmqpSupport {
    public static final int AMQP_CREDITS_DEFAULT = 1000;
    public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
 
+   public static final int AMQP_MIRROR_ACK_RETRY_INTERVAL = 10_000;
+

Review Comment:
   Is this used any more?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
+import 
org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AckManager implements ActiveMQComponent {
+
+   // we first retry on the queue a few tiems
+   public static final short MAX_QUEUE_ATTEMPT = 
Short.parseShort(System.getProperty(AckRetry.class.getName() + 
".MAX_QUEUE_ATTEMPT", "5"));

Review Comment:
   tiems -> times
   
   I'd probably change/remove 'a few times' given its actually 5
   
   Do we retry on the queue after trying paging?



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {
+      final long collectionID;
+      long id;
+      K key;
+      V value;

Review Comment:
   Why is only the first one final, when apparently only the last one can be 
changed?



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {
+      final long collectionID;
+      long id;
+      K key;
+      V value;
+
+      MapRecord(long collectionID, long id, K key, V value) {
+         this.collectionID = collectionID;
+         this.id = id;
+         this.key = key;
+         this.value = value;
+      }
+
+      @Override
+      public K getKey() {
+         return key;
+      }
+
+      @Override
+      public V getValue() {
+         return value;
+      }
+
+      @Override
+      public V setValue(V value) {
+         V oldValue = this.value;
+         this.value = value;
+         return oldValue;
+      }
+
+      @Override
+      public String toString() {
+         return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+      }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, 
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+      this.collectionId = collectionId;
+      this.journal = journal;
+      this.idGenerator = idGenerator;
+      this.persister = persister;
+      this.recordType = recordType;
+      this.exceptionListener = ioExceptionListener;
+      this.completionSupplier = completionSupplier;
+      this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction<C> contextProvider;
+
+   private final Persister<MapRecord<K, V>> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier<IOCompletion> completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+   public long getCollectionId() {
+      return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+      return map.size();
+   }
+
+   public C getContext() {
+      if (context == null && contextProvider != null) {
+         context = contextProvider.apply(this.collectionId);
+      }
+      return context;
+   }
+
+   public JournalHashMap<K, V, C> setContext(C context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+      return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object value) {
+      for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+         if (value.equals(entry.getValue().value)) {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   @Override
+   public synchronized V get(Object key) {
+      MapRecord<K, V> reccord = map.get(key);
+      if (reccord == null) {
+         return null;
+      } else {
+         return reccord.value;
+      }
+   }
+
+   /** This is to be called from a single thread during reload, no need to be 
synchronized */
+   public void reload(MapRecord<K, V> reloadValue) {
+      map.put(reloadValue.getKey(), reloadValue);
+   }
+
+   @Override
+   public synchronized V put(K key, V value) {
+      logger.debug("adding {} = {}", key, value);
+      long id = idGenerator.getAsLong();
+      MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+      store(key, record);
+      MapRecord<K, V> oldRecord = map.put(key, record);
+
+      if (oldRecord != null) {
+         removed(oldRecord);
+         return oldRecord.value;
+      } else {
+         return null;
+      }
+
+   }
+
+   private synchronized void store(K key, MapRecord<K, V> record) {
+      try {
+         IOCompletion callback = null;
+         if (completionSupplier != null) {
+            callback = completionSupplier.get();
+         }
+
+         if (callback == null) {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
false);
+         } else {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
true, callback);
+         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord) {
+      try {
+         journal.appendDeleteRecord(reccord.id, false);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord, long txid) {

Review Comment:
   Same



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {
+      final long collectionID;
+      long id;
+      K key;
+      V value;
+
+      MapRecord(long collectionID, long id, K key, V value) {
+         this.collectionID = collectionID;
+         this.id = id;
+         this.key = key;
+         this.value = value;
+      }
+
+      @Override
+      public K getKey() {
+         return key;
+      }
+
+      @Override
+      public V getValue() {
+         return value;
+      }
+
+      @Override
+      public V setValue(V value) {
+         V oldValue = this.value;
+         this.value = value;
+         return oldValue;
+      }
+
+      @Override
+      public String toString() {
+         return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+      }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, 
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+      this.collectionId = collectionId;
+      this.journal = journal;
+      this.idGenerator = idGenerator;
+      this.persister = persister;
+      this.recordType = recordType;
+      this.exceptionListener = ioExceptionListener;
+      this.completionSupplier = completionSupplier;
+      this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction<C> contextProvider;
+
+   private final Persister<MapRecord<K, V>> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier<IOCompletion> completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+   public long getCollectionId() {
+      return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+      return map.size();
+   }
+
+   public C getContext() {
+      if (context == null && contextProvider != null) {
+         context = contextProvider.apply(this.collectionId);
+      }
+      return context;
+   }
+
+   public JournalHashMap<K, V, C> setContext(C context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+      return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object value) {
+      for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+         if (value.equals(entry.getValue().value)) {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   @Override
+   public synchronized V get(Object key) {
+      MapRecord<K, V> reccord = map.get(key);
+      if (reccord == null) {
+         return null;
+      } else {
+         return reccord.value;
+      }
+   }
+
+   /** This is to be called from a single thread during reload, no need to be 
synchronized */
+   public void reload(MapRecord<K, V> reloadValue) {
+      map.put(reloadValue.getKey(), reloadValue);
+   }
+
+   @Override
+   public synchronized V put(K key, V value) {
+      logger.debug("adding {} = {}", key, value);
+      long id = idGenerator.getAsLong();
+      MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+      store(key, record);
+      MapRecord<K, V> oldRecord = map.put(key, record);
+
+      if (oldRecord != null) {
+         removed(oldRecord);
+         return oldRecord.value;
+      } else {
+         return null;
+      }
+
+   }
+
+   private synchronized void store(K key, MapRecord<K, V> record) {
+      try {
+         IOCompletion callback = null;
+         if (completionSupplier != null) {
+            callback = completionSupplier.get();
+         }
+
+         if (callback == null) {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
false);
+         } else {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
true, callback);
+         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord) {
+      try {
+         journal.appendDeleteRecord(reccord.id, false);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord, long txid) {
+      try {
+         journal.appendDeleteRecordTransactional(txid, reccord.id);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   @Override
+   public synchronized V remove(Object key) {
+      MapRecord<K, V> record = map.remove(key);
+      this.removed(record);
+      return record.value;
+   }
+
+   /** This method will remove the element from the HashMap immediately 
however the record is still part of a transaction.
+    *  This is not playing with rollbacks. So a rollback on the transaction 
wouldn't place the elements back.
+    *  This is intended to make sure the operation would be atomic in case of 
a failure, while an appendRollback is not expected. */
+   public synchronized V remove(Object key, long transactionID) {
+      MapRecord<K, V> record = map.remove(key);
+      this.removed(record, transactionID);
+      return record.value;
+   }
+
+   @Override
+   public synchronized void putAll(Map<? extends K, ? extends V> m) {
+      m.forEach(this::put);
+   }
+
+   @Override
+   public synchronized void clear() {
+      map.values().forEach(v -> remove(v));
+      map.clear();
+   }
+
+   @Override
+   public synchronized Set<K> keySet() {
+      HashSet<K> keys = new HashSet(map.size());
+      map.values().forEach(v -> keys.add(v.key));
+      return keys;
+   }
+
+   @Override
+   public synchronized Collection<V> values() {
+      ArrayList values = new ArrayList(map.size());
+      map.values().forEach(v -> values.add(v.value));
+      return values;
+   }

Review Comment:
   Similarly violates the Map contract



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {
+      final long collectionID;
+      long id;
+      K key;
+      V value;
+
+      MapRecord(long collectionID, long id, K key, V value) {
+         this.collectionID = collectionID;
+         this.id = id;
+         this.key = key;
+         this.value = value;
+      }
+
+      @Override
+      public K getKey() {
+         return key;
+      }
+
+      @Override
+      public V getValue() {
+         return value;
+      }
+
+      @Override
+      public V setValue(V value) {
+         V oldValue = this.value;
+         this.value = value;
+         return oldValue;
+      }
+
+      @Override
+      public String toString() {
+         return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+      }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, 
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+      this.collectionId = collectionId;
+      this.journal = journal;
+      this.idGenerator = idGenerator;
+      this.persister = persister;
+      this.recordType = recordType;
+      this.exceptionListener = ioExceptionListener;
+      this.completionSupplier = completionSupplier;
+      this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction<C> contextProvider;
+
+   private final Persister<MapRecord<K, V>> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier<IOCompletion> completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+   public long getCollectionId() {
+      return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+      return map.size();
+   }
+
+   public C getContext() {
+      if (context == null && contextProvider != null) {
+         context = contextProvider.apply(this.collectionId);
+      }
+      return context;
+   }
+
+   public JournalHashMap<K, V, C> setContext(C context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+      return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object value) {
+      for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+         if (value.equals(entry.getValue().value)) {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   @Override
+   public synchronized V get(Object key) {
+      MapRecord<K, V> reccord = map.get(key);
+      if (reccord == null) {
+         return null;
+      } else {
+         return reccord.value;
+      }
+   }
+
+   /** This is to be called from a single thread during reload, no need to be 
synchronized */
+   public void reload(MapRecord<K, V> reloadValue) {
+      map.put(reloadValue.getKey(), reloadValue);
+   }
+
+   @Override
+   public synchronized V put(K key, V value) {
+      logger.debug("adding {} = {}", key, value);
+      long id = idGenerator.getAsLong();
+      MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+      store(key, record);
+      MapRecord<K, V> oldRecord = map.put(key, record);
+
+      if (oldRecord != null) {
+         removed(oldRecord);
+         return oldRecord.value;
+      } else {
+         return null;
+      }
+
+   }
+
+   private synchronized void store(K key, MapRecord<K, V> record) {
+      try {
+         IOCompletion callback = null;
+         if (completionSupplier != null) {
+            callback = completionSupplier.get();
+         }
+
+         if (callback == null) {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
false);
+         } else {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
true, callback);
+         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord) {
+      try {
+         journal.appendDeleteRecord(reccord.id, false);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord, long txid) {
+      try {
+         journal.appendDeleteRecordTransactional(txid, reccord.id);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   @Override
+   public synchronized V remove(Object key) {
+      MapRecord<K, V> record = map.remove(key);
+      this.removed(record);
+      return record.value;
+   }
+
+   /** This method will remove the element from the HashMap immediately 
however the record is still part of a transaction.
+    *  This is not playing with rollbacks. So a rollback on the transaction 
wouldn't place the elements back.
+    *  This is intended to make sure the operation would be atomic in case of 
a failure, while an appendRollback is not expected. */
+   public synchronized V remove(Object key, long transactionID) {
+      MapRecord<K, V> record = map.remove(key);
+      this.removed(record, transactionID);
+      return record.value;
+   }
+
+   @Override
+   public synchronized void putAll(Map<? extends K, ? extends V> m) {
+      m.forEach(this::put);
+   }
+
+   @Override
+   public synchronized void clear() {
+      map.values().forEach(v -> remove(v));
+      map.clear();
+   }
+
+   @Override
+   public synchronized Set<K> keySet() {
+      HashSet<K> keys = new HashSet(map.size());
+      map.values().forEach(v -> keys.add(v.key));
+      return keys;
+   }
+
+   @Override
+   public synchronized Collection<V> values() {
+      ArrayList values = new ArrayList(map.size());
+      map.values().forEach(v -> values.add(v.value));
+      return values;
+   }
+
+   @Override
+   public synchronized Set<Entry<K, V>> entrySet() {
+      HashSet<Entry<K, V>> values = new HashSet<>();
+      map.values().forEach(values::add);
+      return values;
+   }

Review Comment:
   Ditto



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java:
##########
@@ -288,6 +298,15 @@ public synchronized void stop() {
 
    }
 
+   @Override
+   public void run() {
+      if (scheduledTask == null) {
+         logger.warn("Task not defined on {}", this.getClass());
+      } else {
+         scheduledTask.run();
+      }
+   }
+

Review Comment:
   I think it would probably be better to leave this class abstract and 
implement run in the new Component, as every other use of the class did and 
still does. This class already has too many different ways to use it from what 
I recall of previous looks. This basically just adds another.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -424,6 +453,10 @@ public void preAcknowledge(final Transaction tx, final 
MessageReference ref, fin
 
       MirrorController controllerInUse = getControllerInUse();
 
+      if (controllerInUse != null && !controllerInUse.isAllowACK()) {

Review Comment:
   This comment relates to code just above this change, not the change it is 
against.
   
   The trace log at the start of the method looks wrong, references 
postACKInternalMessage when thats a seperate method further up, this is 
actually preAcknowledge



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {
+      final long collectionID;
+      long id;
+      K key;
+      V value;
+
+      MapRecord(long collectionID, long id, K key, V value) {
+         this.collectionID = collectionID;
+         this.id = id;
+         this.key = key;
+         this.value = value;
+      }
+
+      @Override
+      public K getKey() {
+         return key;
+      }
+
+      @Override
+      public V getValue() {
+         return value;
+      }
+
+      @Override
+      public V setValue(V value) {
+         V oldValue = this.value;
+         this.value = value;
+         return oldValue;
+      }
+
+      @Override
+      public String toString() {
+         return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+      }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, 
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+      this.collectionId = collectionId;
+      this.journal = journal;
+      this.idGenerator = idGenerator;
+      this.persister = persister;
+      this.recordType = recordType;
+      this.exceptionListener = ioExceptionListener;
+      this.completionSupplier = completionSupplier;
+      this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction<C> contextProvider;
+
+   private final Persister<MapRecord<K, V>> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier<IOCompletion> completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+   public long getCollectionId() {
+      return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+      return map.size();
+   }
+
+   public C getContext() {
+      if (context == null && contextProvider != null) {
+         context = contextProvider.apply(this.collectionId);
+      }
+      return context;
+   }
+
+   public JournalHashMap<K, V, C> setContext(C context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+      return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object value) {
+      for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+         if (value.equals(entry.getValue().value)) {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   @Override
+   public synchronized V get(Object key) {
+      MapRecord<K, V> reccord = map.get(key);
+      if (reccord == null) {
+         return null;
+      } else {
+         return reccord.value;
+      }
+   }
+
+   /** This is to be called from a single thread during reload, no need to be 
synchronized */
+   public void reload(MapRecord<K, V> reloadValue) {
+      map.put(reloadValue.getKey(), reloadValue);
+   }
+
+   @Override
+   public synchronized V put(K key, V value) {
+      logger.debug("adding {} = {}", key, value);
+      long id = idGenerator.getAsLong();
+      MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);

Review Comment:
   Add diamond <> (or actual full type<K,V>) to suppress warnings. Here and 
lots of other places, it looks like.



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {

Review Comment:
   This Entry should be implementing equals and hashCode, especially given it 
is actually being inserted into a HashMap



##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMapProvider.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+public class JournalHashMapProvider<K, V, C> {
+
+   final Journal journal;
+   final Persister<JournalHashMap.MapRecord<K, V>> persister;
+   final LongObjectHashMap<JournalHashMap<K, V, C>> journalMaps = new 
LongObjectHashMap<>();
+   final LongSupplier idSupplier;
+   final byte recordType;
+   final IOCriticalErrorListener ioExceptionListener;
+   final Supplier<IOCompletion> ioCompletionSupplier;
+   final LongFunction<C> contextProvider;
+
+   public JournalHashMapProvider(LongSupplier idSupplier, Journal journal, 
AbstractHashMapPersister persister, byte recordType, Supplier<IOCompletion> 
ioCompletionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener 
ioExceptionListener) {
+      this.idSupplier = idSupplier;
+      this.persister = persister;
+      this.journal = journal;
+      this.recordType = recordType;
+      this.ioExceptionListener = ioExceptionListener;
+      this.contextProvider = contextProvider;
+      this.ioCompletionSupplier = ioCompletionSupplier;
+   }
+
+   public List<JournalHashMap<K, V, C>> getMaps() {
+      ArrayList<JournalHashMap<K, V, C>> maps = new ArrayList<>();
+      journalMaps.values().forEach(maps::add);
+      return maps;
+   }
+
+   public void clear() {
+      journalMaps.clear();
+   }
+
+   public void reload(RecordInfo recordInfo) {
+      JournalHashMap.MapRecord<K, V> mapRecord = 
persister.decode(recordInfo.wrapData(), null, null);
+      getMap(mapRecord.collectionID, null).reload(mapRecord);
+   }
+
+   public Iterator<JournalHashMap<K, V, C>> iterMaps() {
+      return journalMaps.values().iterator();
+   }
+
+   public JournalHashMap<K, V, C> getMap(long collectionID, C context) {
+      JournalHashMap<K, V, C> journalHashMap = journalMaps.get(collectionID);
+      if (journalHashMap == null) {
+         journalHashMap = new JournalHashMap<>(collectionID, journal, 
idSupplier, persister, recordType, ioCompletionSupplier, contextProvider, 
ioExceptionListener).setContext(context);
+         journalMaps.put(collectionID, journalHashMap);
+      }
+      return journalHashMap;
+   }

Review Comment:
   Presumably this is only being used from a single thread?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -617,4 +650,55 @@ public static void routeMirrorCommand(ActiveMQServer 
server, Message message, Tr
       server.getPostOffice().route(message, ctx, false);
    }
 
+   class PagedRouteContext implements RouteContextList {
+
+      private final List<Queue> listRepresentation;
+      private final List<Queue> emptyList = Collections.emptyList();
+
+      PagedRouteContext() {
+         listRepresentation = new ArrayList<>(1);
+         listRepresentation.add(snfQueue);

Review Comment:
   If the snfQueue durability is essentially fixed, cant we pre-determine the 
answers/return values and simplify much of the method code?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
             return;
          }
 
+         if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+            return;
+         }
+
+         if (message.isPaged()) {
+            // if the source was paged, we copy the message
+            message = copyMessageForPaging(message);
+         }

Review Comment:
   Why?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
             return;
          }
 
+         if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+            return;
+         }

Review Comment:
   What is this doing? What did it before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to