This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch branch-panama in repository https://gitbox.apache.org/repos/asf/artemis-native.git
commit ca8cf7fd1b0abf74d99965f86e3874e458049f88 Author: mayankkunwar <[email protected]> AuthorDate: Tue Apr 28 13:08:06 2026 +0100 removing hashMap registry --- .../artemis/nativo/jlibaio/LibaioContext.java | 6 +-- .../nativo/jlibaio/ffm/CallbackRegistry.java | 58 ---------------------- .../nativo/jlibaio/ffm/FFMNativeHelper.java | 57 +++++++++++---------- .../artemis/nativo/jlibaio/ffm/IOControl.java | 26 +++++++++- 4 files changed, 58 insertions(+), 89 deletions(-) diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java index 78e3f7c..355708f 100644 --- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java @@ -150,7 +150,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { /** * the native ioContext including the structure created. */ - private final IOControl ioContext; + private final IOControl<Callback> ioContext; private final AtomicBoolean closed = new AtomicBoolean(false); @@ -376,7 +376,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { /** * This is the queue for libaio, initialized with queueSize. */ - private IOControl newContext(int queueSize) { + private IOControl<Callback> newContext(int queueSize) { return this.ffmNativeHelper.newContext(queueSize); } @@ -464,7 +464,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { /** * This method will block as long as the context is open. */ - void blockedPoll(IOControl ioControl, boolean useFdatasync) { + void blockedPoll(IOControl<Callback> ioControl, boolean useFdatasync) { this.ffmNativeHelper.blockedPoll(ioControl, useFdatasync); } diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/CallbackRegistry.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/CallbackRegistry.java deleted file mode 100644 index 8d86038..0000000 --- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/CallbackRegistry.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.nativo.jlibaio.ffm; - -import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -public class CallbackRegistry<Callback extends SubmitInfo> { -// private static final Logger logger = LoggerFactory.getLogger(CallbackRegistry.class); - private final ConcurrentHashMap<Long, Callback> registry = new ConcurrentHashMap<>(); - private final AtomicLong idGenerator = new AtomicLong(0); - - /* - * <p> - * This method is thread-safe and lock-free. It avoids returning an ID of -1 and 0, - * as -1 and 0 are reserved as a NULL/Invalid marker in the native libaio layer. - * - * @param callback the I/O completion handler to register. - * @return a unique identifier (other than -1 or 0) for this callback. - * */ - public long register(Callback callback) { - Objects.requireNonNull(callback, "Callbacks cannot be null"); - long id; - do { - id = idGenerator.incrementAndGet(); - } while (id == -1 || id == 0); - registry.put(id, callback); -// logger.debug("CallbackRegistry::register id = {}, callback = {}", id, callback); - return id; - } - - public Callback findCallbackById(long id) { - return registry.get(id); - } - - public void removeCallbackById(long id) { - this.registry.remove(id); - } -} diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java index 9101936..79eed7f 100644 --- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java @@ -135,12 +135,9 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { } } - private final CallbackRegistry<Callback> registry; - private final ReleaseCallback releaseCallback; public FFMNativeHelper(ReleaseCallback releaseCallback) { - this.registry = new CallbackRegistry<>(); this.releaseCallback = releaseCallback; } @@ -319,10 +316,10 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { return forceSysCall.get() || !RING_REAPER; } - public IOControl newContext(int queueSize) { + public IOControl<Callback> newContext(int queueSize) { // logger.debug("Initializing context with QueueSize={}", queueSize); - IOControl ioControl = new IOControl(); + IOControl<Callback> ioControl = new IOControl<>(); try { MemorySegment ioContext = ioQueueInit(queueSize); ioControl.setIoContext(ioContext); @@ -348,7 +345,7 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { throw new OutOfMemoryError( String.format("Arena memory allocation failed: iocb[%d/%d]", i, queueSize)); } - IOCBInit.setAioData(iocb, 0L); + IOCBInit.setAioData(iocb, i); iocbPool[i] = iocb; } ioControl.setIocbPool(iocbPool); @@ -430,7 +427,8 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { throw new IOException("Not enough space in libaio queue during shutdown"); } ioPrepPOp(dumbIocb, DUMB_WRITE_HANDLER, MemorySegment.NULL, 0L, 0L, 1); - IOCBInit.setAioData(dumbIocb, -1L); + int iocbId = (int) IOCBInit.getAioData(dumbIocb); + ioControl.getIocbState().set(iocbId, -1); if(!submit(ioControl, dumbIocb)) { // logger.warn("deleteContext: submit failed: Continuing cleanup"); @@ -631,21 +629,24 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { throw new IOException("IOCB pool exhausted (used=" + ioControl.used() + "/queueSize=" + ioControl.queueSize() + ")"); } -// logger.trace("submitWrite called!"); - long callbackId = registry.register(callback); + int callbackId = (int) IOCBInit.getAioData(iocb); +// logger.trace("submitWrite called! callbackId: {}", callbackId); boolean submitted = false; try { + if (!ioControl.getIocbState().compareAndSet(callbackId, 0, 1)) { + throw new IOException("submitWrite failed: callbackId=" + callbackId + " already in use"); + } + ioControl.addCallback(callbackId, callback); bufferWrite.clear(); ioPrepPOp(iocb, fd, MemorySegment.ofBuffer(bufferWrite), size, position, 1); - IOCBInit.setAioData(iocb, callbackId); submit(ioControl, iocb); submitted = true; } catch (Throwable e) { throw new IOException("submitWrite failed", e); } finally { if(!submitted) { - registry.removeCallbackById(callbackId); + ioControl.takeCallback(callbackId); } } } @@ -690,25 +691,28 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { } // logger.trace("submitRead called!"); - long callbackId = registry.register(callback); + long callbackId = IOCBInit.getAioData(iocb); boolean submitted = false; try { + if (!ioControl.getIocbState().compareAndSet((int) callbackId, 0, 1)) { + throw new IOException("submitRead failed: callbackId=" + callbackId + " already in use"); + } + ioControl.addCallback((int) callbackId, callback); bufferWrite.clear(); ioPrepPOp(iocb, fd, MemorySegment.ofBuffer(bufferWrite), size, position, 0); - IOCBInit.setAioData(iocb, callbackId); submit(ioControl, iocb); submitted = true; } catch (Throwable e) { throw new IOException("submitRead failed", e); } finally { if(!submitted) { - registry.removeCallbackById(callbackId); + ioControl.takeCallback((int) callbackId); } } } - public int poll(IOControl ioControl, Callback[] callbacks, int min, int max) { + public int poll(IOControl<Callback> ioControl, Callback[] callbacks, int min, int max) { if(ioControl == null || !ioControl.isValid()) { // logger.warn("poll: invalid context"); return 0; @@ -734,20 +738,22 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { MemorySegment iocbp = event.get(ValueLayout.ADDRESS, 8L) .reinterpret(64); int eventResult = (int) event.get(ValueLayout.JAVA_LONG, 16L); -// logger.trace("poll[{}]: res={}, iocbp=0x{}", i, eventResult, Long.toHexString(iocbp.address())); +// logger.trace("poll[{}]: res={}, iocbp=0x{}, AioData: {}", i, eventResult, Long.toHexString(iocbp.address()), +// IOCBInit.getAioData(iocbp)); if(eventResult < 0) { // logger.warn("poll[{}]: I/O error: {}", i, eventResult); } - long callbackIdRaw = IOCBInit.getAioData(iocbp); - if(callbackIdRaw == 0L || callbackIdRaw == -1L) { + int callbackIdRaw = (int) IOCBInit.getAioData(iocbp); + int iocbState = ioControl.getIocbState().get(callbackIdRaw); + if(iocbState == 0 || iocbState == -1) { // logger.warn("poll[{}]: invalid callback=0x{}", i, Long.toHexString(callbackIdRaw)); ioControl.putIOCB(iocbp); continue; } - Callback callback = registry.findCallbackById(callbackIdRaw); + Callback callback = ioControl.takeCallback(callbackIdRaw); if(callback != null) { callbacks[i] = callback; if(eventResult < 0) { @@ -758,11 +764,11 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { if (releaseCallback != null) { releaseCallback.release(); } - registry.removeCallbackById(callbackIdRaw); } else { // logger.warn("poll[{}]: callback not found for id=0x{}", // i, Long.toHexString(callbackIdRaw)); } + ioControl.getIocbState().set(callbackIdRaw, 0); ioControl.putIOCB(iocbp); } return result; @@ -772,7 +778,7 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { } } - public void blockedPoll(IOControl ioControl, boolean useFdatasync) { + public void blockedPoll(IOControl<Callback> ioControl, boolean useFdatasync) { // logger.debug("blockedPoll starting(useFdatasync={})", useFdatasync); if(ioControl == null || !ioControl.isValid()) { // logger.warn("blockedPoll: invalid context"); @@ -830,13 +836,13 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { int eventResult = (int) event.get(ValueLayout.JAVA_LONG, 16L); - long callbackIdRaw = IOCBInit.getAioData(iocbp); + int callbackIdRaw = (int) IOCBInit.getAioData(iocbp); // logger.trace("blockedPoll: callbackIdRaw: {}", callbackIdRaw); - IOCBInit.setAioData(iocbp, 0L); // this is to detect invalid elements on the buffer. - if(callbackIdRaw != 0L) { + // this IOCB state is to detect invalid elements on the buffer. + if(ioControl.getIocbState().compareAndSet(callbackIdRaw, 1, 0)) { ioControl.putIOCB(iocbp); - Callback callback = registry.findCallbackById(callbackIdRaw); + Callback callback = ioControl.takeCallback(callbackIdRaw); if (callback != null) { if (eventResult < 0) { // logger.error("blockedPoll[{}]: I/O error fd={}, {}", i, fd, eventResult); @@ -848,7 +854,6 @@ public class FFMNativeHelper<Callback extends SubmitInfo> { if (releaseCallback != null) { releaseCallback.release(); } - registry.removeCallbackById(callbackIdRaw); } } else { if(!forceSysCall.get()) { diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/IOControl.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/IOControl.java index 8777335..8ba9eef 100644 --- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/IOControl.java +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/IOControl.java @@ -16,13 +16,15 @@ */ package org.apache.activemq.artemis.nativo.jlibaio.ffm; +import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.foreign.MemorySegment; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.atomic.AtomicReferenceArray; -public class IOControl { +public class IOControl<Callback extends SubmitInfo> { // private static final Logger logger = LoggerFactory.getLogger(IOControl.class); private final Object iocbLock = new Object(); @@ -35,6 +37,10 @@ public class IOControl { private int iocbGet; private int used; private MemorySegment[] iocbPool; + private AtomicReferenceArray<Callback> callbackRegistry; + + // -1: delete, 0: free, 1: used + private AtomicIntegerArray iocbState; public MemorySegment ioContext() { return this.ioContext; @@ -55,6 +61,8 @@ public class IOControl { } public void setQueueSize(int size) { this.queueSize = size; + callbackRegistry = new AtomicReferenceArray<>(size); + iocbState = new AtomicIntegerArray(size); } public int iocbPut() { @@ -74,6 +82,20 @@ public class IOControl { this.iocbPool = iocbPool; } + public void addCallback(int idx, Callback callback) { + if (callbackRegistry.get(idx) != null) { + throw new IllegalStateException("callback already registered"); + } + callbackRegistry.set(idx, callback); + } + public Callback takeCallback(int idx) { + return callbackRegistry.getAndSet(idx, null); + } + + public AtomicIntegerArray getIocbState() { + return this.iocbState; + } + public void withIocbLock(Runnable action) { synchronized ( iocbLock){ action.run(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
