This is an automated email from the ASF dual-hosted git repository.

cmcfarlen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new aebe166655 Cleanup aio inline and remove P_AIO header (#11792)
aebe166655 is described below

commit aebe16665532e8c208deb7cccc7b68bc5dbe008b
Author: Chris McFarlen <[email protected]>
AuthorDate: Thu Oct 3 08:41:24 2024 -0500

    Cleanup aio inline and remove P_AIO header (#11792)
    
    * Cleanup aio inline and remove P_AIO header
    
    * Fixes for io_uring
    
    * Fix cache tests that behave strangely
    
    ---------
    
    Co-authored-by: Chris McFarlen <[email protected]>
---
 include/iocore/aio/AIO.h                   |  36 ++++++-
 include/iocore/cache/CacheVC.h             |   6 +-
 src/iocore/aio/AIO.cc                      | 150 +++++++++++++++++++++--------
 src/iocore/aio/CMakeLists.txt              |   2 +-
 src/iocore/aio/Inline.cc                   |  30 ------
 src/iocore/aio/P_AIO.h                     | 142 ---------------------------
 src/iocore/aio/test_AIO.cc                 |   1 -
 src/iocore/cache/CacheEvacuateDocVC.cc     |   2 +-
 src/iocore/cache/P_Cache.h                 |   1 -
 src/iocore/cache/P_CacheDir.h              |  21 ++--
 src/iocore/cache/P_CacheDisk.h             |  38 ++++----
 src/iocore/cache/P_CacheStats.h            |  74 +++++++-------
 src/iocore/cache/Stripe.cc                 |   6 +-
 src/iocore/cache/StripeSM.cc               |   6 +-
 src/iocore/cache/StripeSM.h                |   6 +-
 src/iocore/cache/unit_tests/main.h         |   2 +-
 src/iocore/cache/unit_tests/test_doubles.h |  14 +++
 src/iocore/io_uring/IOUringEventIO.cc      |   2 +-
 18 files changed, 239 insertions(+), 300 deletions(-)

diff --git a/include/iocore/aio/AIO.h b/include/iocore/aio/AIO.h
index fba6925faa..73cd5b3d3c 100644
--- a/include/iocore/aio/AIO.h
+++ b/include/iocore/aio/AIO.h
@@ -30,10 +30,15 @@
  ****************************************************************************/
 #pragma once
 
+#include "iocore/eventsystem/Continuation.h"
 #include "tscore/ink_platform.h"
 #include "iocore/eventsystem/EventSystem.h"
 #include "records/RecProcess.h"
 
+#if TS_USE_LINUX_IO_URING
+#include "iocore/io_uring/IO_URING.h"
+#endif
+
 static constexpr ts::ModuleVersion AIO_MODULE_PUBLIC_VERSION(1, 0, 
ts::ModuleVersion::PUBLIC);
 
 #define AIO_EVENT_DONE (AIO_EVENT_EVENTS_START + 0)
@@ -63,17 +68,40 @@ bool ink_aio_thread_num_set(int thread_num);
 #define AIO_CALLBACK_THREAD_ANY ((EThread *)0) // any regular event thread
 #define AIO_CALLBACK_THREAD_AIO ((EThread *)-1)
 
+struct AIO_Reqs;
+
+#if TS_USE_LINUX_IO_URING
+struct AIOCallback : public Continuation, public IOUringCompletionHandler {
+#else
 struct AIOCallback : public Continuation {
+#endif
   // set before calling aio_read/aio_write
   ink_aiocb    aiocb;
   Action       action;
   EThread     *thread = AIO_CALLBACK_THREAD_ANY;
   AIOCallback *then   = nullptr;
   // set on return from aio_read/aio_write
-  int64_t aio_result = 0;
-
-  int ok();
-  AIOCallback() {}
+  int64_t    aio_result = 0;
+  AIO_Reqs  *aio_req    = nullptr;
+  ink_hrtime sleep_time = 0;
+  SLINK(AIOCallback, alink); /* for AIO_Reqs::aio_temp_list */
+#if TS_USE_LINUX_IO_URING
+  iovec        iov     = {}; // this is to support older kernels that only 
support readv/writev
+  AIOCallback *this_op = nullptr;
+  AIOCallback *aio_op  = nullptr;
+
+  void handle_complete(io_uring_cqe *) override;
+#endif
+
+  int io_complete(int event, void *data);
+
+  int
+  ok()
+  {
+    return (aiocb.aio_nbytes == static_cast<size_t>(aio_result)) && 
(aio_result >= 0);
+  }
+
+  AIOCallback() { SET_HANDLER(&AIOCallback::io_complete); }
 };
 
 void ink_aio_init(ts::ModuleVersion version, AIOBackend backend = 
AIO_BACKEND_AUTO);
diff --git a/include/iocore/cache/CacheVC.h b/include/iocore/cache/CacheVC.h
index 5b862065de..4ff28ad2a7 100644
--- a/include/iocore/cache/CacheVC.h
+++ b/include/iocore/cache/CacheVC.h
@@ -237,9 +237,9 @@ struct CacheVC : public CacheVConnection {
   Ptr<IOBufferBlock>  blocks; // data available to write
   Ptr<IOBufferBlock>  writer_buf;
 
-  OpenDirEntry       *od = nullptr;
-  AIOCallbackInternal io;
-  int                 alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred 
position in vector
+  OpenDirEntry *od = nullptr;
+  AIOCallback   io;
+  int           alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred 
position in vector
   LINK(CacheVC, opendir_link);
   // end Region B
 
diff --git a/src/iocore/aio/AIO.cc b/src/iocore/aio/AIO.cc
index c499057103..4f6ba934db 100644
--- a/src/iocore/aio/AIO.cc
+++ b/src/iocore/aio/AIO.cc
@@ -25,12 +25,14 @@
  * Async Disk IO operations.
  */
 
+#include "iocore/aio/AIO.h"
 #include "tscore/TSSystemState.h"
+#include "tscore/ink_atomic.h"
 #include "tscore/ink_hw.h"
 
-#include <atomic>
-
-#include "P_AIO.h"
+#if TS_USE_LINUX_IO_URING
+#include "iocore/io_uring/IO_URING.h"
+#endif
 
 #ifdef AIO_FAULT_INJECTION
 #include "iocore/aio/AIO_fault_injection.h"
@@ -39,6 +41,11 @@
 #define MAX_DISKS_POSSIBLE 100
 
 // globals
+static constexpr ts::ModuleVersion 
AIO_MODULE_INTERNAL_VERSION{AIO_MODULE_PUBLIC_VERSION, 
ts::ModuleVersion::PRIVATE};
+
+// for debugging
+// #define AIO_STATS 1
+
 #if TS_USE_LINUX_IO_URING
 static bool use_io_uring = false;
 
@@ -49,6 +56,7 @@ void setup_prep_ops(IOUringContext *);
 #endif
 
 /* structure to hold information about each file descriptor */
+struct AIO_Reqs;
 AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
 /* number of unique file descriptors in the aio_reqs array */
 int num_filedes = 1;
@@ -62,9 +70,73 @@ int thread_is_created = 0;
 RecInt cache_config_threads_per_disk = 12;
 RecInt api_config_threads_per_disk   = 12;
 
-AIOStatsBlock aio_rsb;
 Continuation *aio_err_callback = nullptr;
 
+/* internal definitions */
+int
+AIOCallback::io_complete(int event, void *data)
+{
+  (void)event;
+  (void)data;
+  if (aio_err_callback && !ok()) {
+    AIOCallback *err_op          = new AIOCallback();
+    err_op->aiocb.aio_fildes     = this->aiocb.aio_fildes;
+    err_op->aiocb.aio_lio_opcode = this->aiocb.aio_lio_opcode;
+    err_op->mutex                = aio_err_callback->mutex;
+    err_op->action               = aio_err_callback;
+
+    // Take this lock in-line because we want to stop other I/O operations on 
this disk ASAP
+    SCOPED_MUTEX_LOCK(lock, aio_err_callback->mutex, this_ethread());
+    err_op->action.continuation->handleEvent(EVENT_NONE, err_op);
+  }
+  if (!action.cancelled && action.continuation) {
+    action.continuation->handleEvent(AIO_EVENT_DONE, this);
+  }
+  return EVENT_DONE;
+}
+
+struct AIO_Reqs {
+  Que(AIOCallback, link) aio_todo; /* queue for AIO operations */
+                                   /* Atomic list to temporarily hold the 
request if the
+                                      lock for a particular queue cannot be 
acquired */
+  ASLL(AIOCallback, alink) aio_temp_list;
+  ink_mutex aio_mutex;
+  ink_cond  aio_cond;
+  int       index           = 0;  /* position of this struct in the aio_reqs 
array */
+  int       pending         = 0;  /* number of outstanding requests on the 
disk */
+  int       queued          = 0;  /* total number of aio_todo requests */
+  int       filedes         = -1; /* the file descriptor for the requests or 
status IO_NOT_IN_PROGRESS */
+  int       requests_queued = 0;
+};
+
+#ifdef AIO_STATS
+class AIOTestData : public Continuation
+{
+public:
+  int        num_req;
+  int        num_temp;
+  int        num_queue;
+  ink_hrtime start;
+
+  int ink_aio_stats(int event, void *data);
+
+  AIOTestData() : Continuation(new_ProxyMutex()), num_req(0), num_temp(0), 
num_queue(0)
+  {
+    start = ink_get_hrtime();
+    SET_HANDLER(&AIOTestData::ink_aio_stats);
+  }
+};
+#endif
+
+struct AIOStatsBlock {
+  ts::Metrics::Counter::AtomicType *read_count;
+  ts::Metrics::Counter::AtomicType *kb_read;
+  ts::Metrics::Counter::AtomicType *write_count;
+  ts::Metrics::Counter::AtomicType *kb_write;
+};
+
+AIOStatsBlock aio_rsb;
+
 #ifdef AIO_STATS
 /* total number of requests received - for debugging */
 static int num_requests = 0;
@@ -93,7 +165,7 @@ AIOTestData::ink_aio_stats(int event, void *d)
 AIOCallback *
 new_AIOCallback()
 {
-  return new AIOCallbackInternal;
+  return new AIOCallback;
 }
 
 void
@@ -107,10 +179,10 @@ ink_aio_init(ts::ModuleVersion v, [[maybe_unused]] 
AIOBackend backend)
 {
   ink_release_assert(v.check(AIO_MODULE_INTERNAL_VERSION));
 
-  aio_rsb.read_count  = 
Metrics::Counter::createPtr("proxy.process.cache.aio.read_count");
-  aio_rsb.write_count = 
Metrics::Counter::createPtr("proxy.process.cache.aio.write_count");
-  aio_rsb.kb_read     = 
Metrics::Counter::createPtr("proxy.process.cache.aio.KB_read");
-  aio_rsb.kb_write    = 
Metrics::Counter::createPtr("proxy.process.cache.aio.KB_write");
+  aio_rsb.read_count  = 
ts::Metrics::Counter::createPtr("proxy.process.cache.aio.read_count");
+  aio_rsb.write_count = 
ts::Metrics::Counter::createPtr("proxy.process.cache.aio.write_count");
+  aio_rsb.kb_read     = 
ts::Metrics::Counter::createPtr("proxy.process.cache.aio.KB_read");
+  aio_rsb.kb_write    = 
ts::Metrics::Counter::createPtr("proxy.process.cache.aio.KB_write");
 
   memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
   ink_mutex_init(&insert_mutex);
@@ -276,8 +348,8 @@ aio_move(AIO_Reqs *req)
     return;
   }
 
-  AIOCallbackInternal *cbi;
-  SList(AIOCallbackInternal, alink) aq(req->aio_temp_list.popall());
+  AIOCallback *cbi;
+  SList(AIOCallback, alink) aq(req->aio_temp_list.popall());
 
   // flip the list
   Queue<AIOCallback> cbq;
@@ -293,7 +365,7 @@ aio_move(AIO_Reqs *req)
 
 /* queue the new request */
 static void
-aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
+aio_queue_req(AIOCallback *op, int fromAPI = 0)
 {
   int       thread_ndx = 1;
   AIO_Reqs *req        = op->aio_req;
@@ -365,10 +437,10 @@ aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
 }
 
 static inline int
-cache_op(AIOCallbackInternal *op)
+cache_op(AIOCallback *op)
 {
   bool read = (op->aiocb.aio_lio_opcode == LIO_READ);
-  for (; op; op = (AIOCallbackInternal *)op->then) {
+  for (; op; op = op->then) {
     ink_aiocb *a = &op->aiocb;
     ssize_t    err, res = 0;
 
@@ -440,13 +512,13 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info)
 
       // update the stats;
       if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
-        Metrics::Counter::increment(aio_rsb.write_count);
-        Metrics::Counter::increment(aio_rsb.kb_write, op->aiocb.aio_nbytes >> 
10);
+        ts::Metrics::Counter::increment(aio_rsb.write_count);
+        ts::Metrics::Counter::increment(aio_rsb.kb_write, op->aiocb.aio_nbytes 
>> 10);
       } else {
-        Metrics::Counter::increment(aio_rsb.read_count);
-        Metrics::Counter::increment(aio_rsb.kb_read, op->aiocb.aio_nbytes >> 
10);
+        ts::Metrics::Counter::increment(aio_rsb.read_count);
+        ts::Metrics::Counter::increment(aio_rsb.kb_read, op->aiocb.aio_nbytes 
>> 10);
       }
-      cache_op(reinterpret_cast<AIOCallbackInternal *>(op));
+      cache_op(reinterpret_cast<AIOCallback *>(op));
       ink_atomic_increment(&my_aio_req->requests_queued, -1);
 #ifdef AIO_STATS
       ink_atomic_increment(&my_aio_req->pending, -1);
@@ -477,13 +549,13 @@ namespace
 {
 
 void
-prep_read(io_uring_sqe *sqe, AIOCallbackInternal *op)
+prep_read(io_uring_sqe *sqe, AIOCallback *op)
 {
   io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, 
op->aiocb.aio_nbytes, op->aiocb.aio_offset);
 }
 
 void
-prep_readv(io_uring_sqe *sqe, AIOCallbackInternal *op)
+prep_readv(io_uring_sqe *sqe, AIOCallback *op)
 {
   op->iov.iov_len  = op->aiocb.aio_nbytes;
   op->iov.iov_base = op->aiocb.aio_buf;
@@ -491,20 +563,20 @@ prep_readv(io_uring_sqe *sqe, AIOCallbackInternal *op)
 }
 
 void
-prep_write(io_uring_sqe *sqe, AIOCallbackInternal *op)
+prep_write(io_uring_sqe *sqe, AIOCallback *op)
 {
   io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, 
op->aiocb.aio_nbytes, op->aiocb.aio_offset);
 }
 
 void
-prep_writev(io_uring_sqe *sqe, AIOCallbackInternal *op)
+prep_writev(io_uring_sqe *sqe, AIOCallback *op)
 {
   op->iov.iov_len  = op->aiocb.aio_nbytes;
   op->iov.iov_base = op->aiocb.aio_buf;
   io_uring_prep_writev(sqe, op->aiocb.aio_fildes, &op->iov, 1, 
op->aiocb.aio_offset);
 }
 
-using prep_op = void (*)(io_uring_sqe *, AIOCallbackInternal *);
+using prep_op = void (*)(io_uring_sqe *, AIOCallback *);
 
 prep_op prep_ops[] = {
   nullptr,
@@ -526,10 +598,10 @@ setup_prep_ops(IOUringContext *ur)
 }
 
 void
-io_uring_prep_ops_internal(AIOCallbackInternal *op_in, int op_type)
+io_uring_prep_ops_internal(AIOCallback *op_in, int op_type)
 {
-  IOUringContext      *ur = IOUringContext::local_context();
-  AIOCallbackInternal *op = op_in;
+  IOUringContext *ur = IOUringContext::local_context();
+  AIOCallback    *op = op_in;
   while (op) {
     op->this_op       = op;
     io_uring_sqe *sqe = ur->next_sqe(op);
@@ -542,19 +614,19 @@ io_uring_prep_ops_internal(AIOCallbackInternal *op_in, 
int op_type)
     if (op->then) {
       sqe->flags |= IOSQE_IO_LINK;
     } else if (op->aio_op == nullptr) { // This condition leaves an existing 
aio_op in place if there is one. (EAGAIN)
-      op->aio_op = static_cast<AIOCallbackInternal *>(op_in);
+      op->aio_op = op_in;
     }
 
-    op = static_cast<AIOCallbackInternal *>(op->then);
+    op = op->then;
   }
 }
 
 } // namespace
 
 void
-AIOCallbackInternal::handle_complete(io_uring_cqe *cqe)
+AIOCallback::handle_complete(io_uring_cqe *cqe)
 {
-  AIOCallbackInternal *op = this_op;
+  AIOCallback *op = this_op;
 
   // Re-submit the request on EAGAIN.
   // we might need to re-submit the entire rest of the chain, so just call 
prep again
@@ -576,11 +648,11 @@ AIOCallbackInternal::handle_complete(io_uring_cqe *cqe)
 
   if (op->aio_result > 0) {
     if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
-      Metrics::Counter::increment(aio_rsb.write_count);
-      Metrics::Counter::increment(aio_rsb.kb_write, op->aiocb.aio_nbytes >> 
10);
+      ts::Metrics::Counter::increment(aio_rsb.write_count);
+      ts::Metrics::Counter::increment(aio_rsb.kb_write, op->aiocb.aio_nbytes 
>> 10);
     } else {
-      Metrics::Counter::increment(aio_rsb.read_count);
-      Metrics::Counter::increment(aio_rsb.kb_read, op->aiocb.aio_nbytes >> 10);
+      ts::Metrics::Counter::increment(aio_rsb.read_count);
+      ts::Metrics::Counter::increment(aio_rsb.kb_read, op->aiocb.aio_nbytes >> 
10);
     }
   }
 
@@ -605,12 +677,12 @@ ink_aio_read(AIOCallback *op_in, int fromAPI)
 {
 #if TS_USE_LINUX_IO_URING
   if (use_io_uring) {
-    io_uring_prep_ops_internal(static_cast<AIOCallbackInternal *>(op_in), 
LIO_READ);
+    io_uring_prep_ops_internal(op_in, LIO_READ);
     return 1;
   }
 #endif
   op_in->aiocb.aio_lio_opcode = LIO_READ;
-  aio_queue_req(static_cast<AIOCallbackInternal *>(op_in), fromAPI);
+  aio_queue_req(op_in, fromAPI);
 
   return 1;
 }
@@ -620,12 +692,12 @@ ink_aio_write(AIOCallback *op_in, int fromAPI)
 {
 #if TS_USE_LINUX_IO_URING
   if (use_io_uring) {
-    io_uring_prep_ops_internal(static_cast<AIOCallbackInternal *>(op_in), 
LIO_WRITE);
+    io_uring_prep_ops_internal(op_in, LIO_WRITE);
     return 1;
   }
 #endif
   op_in->aiocb.aio_lio_opcode = LIO_WRITE;
-  aio_queue_req(static_cast<AIOCallbackInternal *>(op_in), fromAPI);
+  aio_queue_req(op_in, fromAPI);
 
   return 1;
 }
diff --git a/src/iocore/aio/CMakeLists.txt b/src/iocore/aio/CMakeLists.txt
index 0930cc8dee..4f6f4bc0b8 100644
--- a/src/iocore/aio/CMakeLists.txt
+++ b/src/iocore/aio/CMakeLists.txt
@@ -18,7 +18,7 @@
 add_library(aio STATIC)
 add_library(ts::aio ALIAS aio)
 
-target_sources(aio PRIVATE AIO.cc Inline.cc AIO_fault_injection.cc)
+target_sources(aio PRIVATE AIO.cc AIO_fault_injection.cc)
 
 target_link_libraries(aio PUBLIC ts::inkevent ts::tscore)
 if(TS_USE_LINUX_IO_URING)
diff --git a/src/iocore/aio/Inline.cc b/src/iocore/aio/Inline.cc
deleted file mode 100644
index 8e9b6d32c5..0000000000
--- a/src/iocore/aio/Inline.cc
+++ /dev/null
@@ -1,30 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/*
- * Inline Functions as globals for users using the public interface
- *
- */
-
-#define TS_INLINE
-#include "P_AIO.h"
diff --git a/src/iocore/aio/P_AIO.h b/src/iocore/aio/P_AIO.h
deleted file mode 100644
index 0e1e665487..0000000000
--- a/src/iocore/aio/P_AIO.h
+++ /dev/null
@@ -1,142 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/****************************************************************************
-
-  Async Disk IO operations.
-
-
-
- ****************************************************************************/
-#pragma once
-
-#include "../eventsystem/P_EventSystem.h"
-#include "iocore/aio/AIO.h"
-
-#if TS_USE_LINUX_IO_URING
-#include "iocore/io_uring/IO_URING.h"
-#endif
-
-#include "tsutil/Metrics.h"
-
-using ts::Metrics;
-
-// for debugging
-// #define AIO_STATS 1
-
-static constexpr ts::ModuleVersion 
AIO_MODULE_INTERNAL_VERSION{AIO_MODULE_PUBLIC_VERSION, 
ts::ModuleVersion::PRIVATE};
-
-TS_INLINE int
-AIOCallback::ok()
-{
-  return (aiocb.aio_nbytes == static_cast<size_t>(aio_result)) && (aio_result 
>= 0);
-}
-
-extern Continuation *aio_err_callback;
-
-struct AIO_Reqs;
-
-#if TS_USE_LINUX_IO_URING
-struct AIOCallbackInternal : public AIOCallback, public 
IOUringCompletionHandler {
-#else
-struct AIOCallbackInternal : public AIOCallback {
-#endif
-  AIO_Reqs  *aio_req    = nullptr;
-  ink_hrtime sleep_time = 0;
-  SLINK(AIOCallbackInternal, alink); /* for AIO_Reqs::aio_temp_list */
-#if TS_USE_LINUX_IO_URING
-  iovec                iov     = {}; // this is to support older kernels that 
only support readv/writev
-  AIOCallbackInternal *this_op = nullptr;
-  AIOCallbackInternal *aio_op  = nullptr;
-
-  void handle_complete(io_uring_cqe *) override;
-#endif
-
-  int io_complete(int event, void *data);
-
-  AIOCallbackInternal() { SET_HANDLER(&AIOCallbackInternal::io_complete); }
-};
-
-struct AIO_Reqs {
-  Que(AIOCallback, link) aio_todo; /* queue for AIO operations */
-                                   /* Atomic list to temporarily hold the 
request if the
-                                      lock for a particular queue cannot be 
acquired */
-  ASLL(AIOCallbackInternal, alink) aio_temp_list;
-  ink_mutex aio_mutex;
-  ink_cond  aio_cond;
-  int       index           = 0;  /* position of this struct in the aio_reqs 
array */
-  int       pending         = 0;  /* number of outstanding requests on the 
disk */
-  int       queued          = 0;  /* total number of aio_todo requests */
-  int       filedes         = -1; /* the file descriptor for the requests or 
status IO_NOT_IN_PROGRESS */
-  int       requests_queued = 0;
-};
-
-TS_INLINE int
-AIOCallbackInternal::io_complete(int event, void *data)
-{
-  (void)event;
-  (void)data;
-  if (aio_err_callback && !ok()) {
-    AIOCallback *err_op          = new AIOCallbackInternal();
-    err_op->aiocb.aio_fildes     = this->aiocb.aio_fildes;
-    err_op->aiocb.aio_lio_opcode = this->aiocb.aio_lio_opcode;
-    err_op->mutex                = aio_err_callback->mutex;
-    err_op->action               = aio_err_callback;
-
-    // Take this lock in-line because we want to stop other I/O operations on 
this disk ASAP
-    SCOPED_MUTEX_LOCK(lock, aio_err_callback->mutex, this_ethread());
-    err_op->action.continuation->handleEvent(EVENT_NONE, err_op);
-  }
-  if (!action.cancelled && action.continuation) {
-    action.continuation->handleEvent(AIO_EVENT_DONE, this);
-  }
-  return EVENT_DONE;
-}
-
-#ifdef AIO_STATS
-class AIOTestData : public Continuation
-{
-public:
-  int        num_req;
-  int        num_temp;
-  int        num_queue;
-  ink_hrtime start;
-
-  int ink_aio_stats(int event, void *data);
-
-  AIOTestData() : Continuation(new_ProxyMutex()), num_req(0), num_temp(0), 
num_queue(0)
-  {
-    start = ink_get_hrtime();
-    SET_HANDLER(&AIOTestData::ink_aio_stats);
-  }
-};
-#endif
-
-struct AIOStatsBlock {
-  Metrics::Counter::AtomicType *read_count;
-  Metrics::Counter::AtomicType *kb_read;
-  Metrics::Counter::AtomicType *write_count;
-  Metrics::Counter::AtomicType *kb_write;
-};
-
-extern AIOStatsBlock aio_rsb;
diff --git a/src/iocore/aio/test_AIO.cc b/src/iocore/aio/test_AIO.cc
index 065407bd9e..379e9fe3c7 100644
--- a/src/iocore/aio/test_AIO.cc
+++ b/src/iocore/aio/test_AIO.cc
@@ -21,7 +21,6 @@
   limitations under the License.
  */
 
-#include "P_AIO.h"
 #include "api/InkAPIInternal.h"
 #include "tscore/ink_hw.h"
 #include "tscore/Layout.h"
diff --git a/src/iocore/cache/CacheEvacuateDocVC.cc 
b/src/iocore/cache/CacheEvacuateDocVC.cc
index d9f0b4a4ba..a525c76675 100644
--- a/src/iocore/cache/CacheEvacuateDocVC.cc
+++ b/src/iocore/cache/CacheEvacuateDocVC.cc
@@ -24,7 +24,7 @@
 // make sure there are no incomplete types
 
 // aio
-#include "../aio/P_AIO.h"
+#include "iocore/aio/AIO.h"
 
 // inkcache
 #include "iocore/cache/CacheDefs.h"
diff --git a/src/iocore/cache/P_Cache.h b/src/iocore/cache/P_Cache.h
index bc33c9819d..057e2165b6 100644
--- a/src/iocore/cache/P_Cache.h
+++ b/src/iocore/cache/P_Cache.h
@@ -25,7 +25,6 @@
 
 #include "tscore/ink_platform.h"
 #include "../eventsystem/P_EventSystem.h"
-#include "../aio/P_AIO.h"
 
 #include "proxy/hdrs/HTTP.h"
 #include "proxy/hdrs/MIME.h"
diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h
index a1efff1961..07cbc5007b 100644
--- a/src/iocore/cache/P_CacheDir.h
+++ b/src/iocore/cache/P_CacheDir.h
@@ -31,7 +31,6 @@
 #include "iocore/eventsystem/Continuation.h"
 
 // aio
-#include "../aio/P_AIO.h"
 #include "iocore/aio/AIO.h"
 
 class Stripe;
@@ -244,16 +243,16 @@ struct OpenDir : public Continuation {
 };
 
 struct CacheSync : public Continuation {
-  int                 stripe_index = 0;
-  char               *buf          = nullptr;
-  size_t              buflen       = 0;
-  bool                buf_huge     = false;
-  off_t               writepos     = 0;
-  AIOCallbackInternal io;
-  Event              *trigger    = nullptr;
-  ink_hrtime          start_time = 0;
-  int                 mainEvent(int event, Event *e);
-  void                aio_write(int fd, char *b, int n, off_t o);
+  int         stripe_index = 0;
+  char       *buf          = nullptr;
+  size_t      buflen       = 0;
+  bool        buf_huge     = false;
+  off_t       writepos     = 0;
+  AIOCallback io;
+  Event      *trigger    = nullptr;
+  ink_hrtime  start_time = 0;
+  int         mainEvent(int event, Event *e);
+  void        aio_write(int fd, char *b, int n, off_t o);
 
   CacheSync() : Continuation(new_ProxyMutex()) { 
SET_HANDLER(&CacheSync::mainEvent); }
 };
diff --git a/src/iocore/cache/P_CacheDisk.h b/src/iocore/cache/P_CacheDisk.h
index 047926e880..e9a7782b82 100644
--- a/src/iocore/cache/P_CacheDisk.h
+++ b/src/iocore/cache/P_CacheDisk.h
@@ -25,8 +25,6 @@
 
 #include "iocore/cache/Cache.h"
 
-#include "../aio/P_AIO.h"
-
 extern int cache_config_max_disk_errors;
 
 #define DISK_BAD(_x)           ((_x)->num_errors >= 
cache_config_max_disk_errors)
@@ -74,24 +72,24 @@ struct DiskHeader {
 };
 
 struct CacheDisk : public Continuation {
-  DiskHeader         *header     = nullptr;
-  char               *path       = nullptr;
-  int                 header_len = 0;
-  AIOCallbackInternal io;
-  off_t               len               = 0; // in blocks (STORE_BLOCK)
-  off_t               start             = 0;
-  off_t               skip              = 0;
-  off_t               num_usable_blocks = 0;
-  int                 hw_sector_size    = 0;
-  int                 fd                = -1;
-  off_t               free_space        = 0;
-  off_t               wasted_space      = 0;
-  DiskStripe        **disk_stripes      = nullptr;
-  DiskStripe         *free_blocks       = nullptr;
-  int                 num_errors        = 0;
-  int                 cleared           = 0;
-  bool                read_only_p       = false;
-  bool online = true; /* flag marking cache disk online or offline (because of 
too many failures or by the operator). */
+  DiskHeader  *header     = nullptr;
+  char        *path       = nullptr;
+  int          header_len = 0;
+  AIOCallback  io;
+  off_t        len               = 0; // in blocks (STORE_BLOCK)
+  off_t        start             = 0;
+  off_t        skip              = 0;
+  off_t        num_usable_blocks = 0;
+  int          hw_sector_size    = 0;
+  int          fd                = -1;
+  off_t        free_space        = 0;
+  off_t        wasted_space      = 0;
+  DiskStripe **disk_stripes      = nullptr;
+  DiskStripe  *free_blocks       = nullptr;
+  int          num_errors        = 0;
+  int          cleared           = 0;
+  bool         read_only_p       = false;
+  bool         online = true; /* flag marking cache disk online or offline 
(because of too many failures or by the operator). */
 
   // Extra configuration values
   int            forced_volume_num = -1; ///< Volume number for this disk.
diff --git a/src/iocore/cache/P_CacheStats.h b/src/iocore/cache/P_CacheStats.h
index f849d2da54..1ac4ba7dea 100644
--- a/src/iocore/cache/P_CacheStats.h
+++ b/src/iocore/cache/P_CacheStats.h
@@ -23,48 +23,50 @@
 
 #pragma once
 
+#include "tsutil/Metrics.h"
+
 // cache stats definitions, for both global cache metrics, as well as per 
volume metrics.
 enum class CacheOpType { Lookup = 0, Read, Write, Update, Remove, Evacuate, 
Scan, Last };
 
 struct CacheStatsBlock {
   struct {
-    Metrics::Gauge::AtomicType   *active  = nullptr;
-    Metrics::Counter::AtomicType *success = nullptr;
-    Metrics::Counter::AtomicType *failure = nullptr;
+    ts::Metrics::Gauge::AtomicType   *active  = nullptr;
+    ts::Metrics::Counter::AtomicType *success = nullptr;
+    ts::Metrics::Counter::AtomicType *failure = nullptr;
   } status[static_cast<int>(CacheOpType::Last)];
 
-  Metrics::Counter::AtomicType *fragment_document_count[3] = {nullptr, 
nullptr, nullptr}; // For 1, 2 and 3+ fragments
+  ts::Metrics::Counter::AtomicType *fragment_document_count[3] = {nullptr, 
nullptr, nullptr}; // For 1, 2 and 3+ fragments
 
-  Metrics::Gauge::AtomicType   *bytes_used            = nullptr;
-  Metrics::Gauge::AtomicType   *bytes_total           = nullptr;
-  Metrics::Gauge::AtomicType   *stripes               = nullptr;
-  Metrics::Gauge::AtomicType   *ram_cache_bytes       = nullptr;
-  Metrics::Gauge::AtomicType   *ram_cache_bytes_total = nullptr;
-  Metrics::Gauge::AtomicType   *direntries_total      = nullptr;
-  Metrics::Gauge::AtomicType   *direntries_used       = nullptr;
-  Metrics::Counter::AtomicType *ram_cache_hits        = nullptr;
-  Metrics::Counter::AtomicType *ram_cache_misses      = nullptr;
-  Metrics::Counter::AtomicType *pread_count           = nullptr;
-  Metrics::Gauge::AtomicType   *percent_full          = nullptr;
-  Metrics::Counter::AtomicType *read_seek_fail        = nullptr;
-  Metrics::Counter::AtomicType *read_invalid          = nullptr;
-  Metrics::Counter::AtomicType *write_backlog_failure = nullptr;
-  Metrics::Counter::AtomicType *directory_collision   = nullptr;
-  Metrics::Counter::AtomicType *read_busy_success     = nullptr;
-  Metrics::Counter::AtomicType *read_busy_failure     = nullptr;
-  Metrics::Counter::AtomicType *gc_bytes_evacuated    = nullptr;
-  Metrics::Counter::AtomicType *gc_frags_evacuated    = nullptr;
-  Metrics::Counter::AtomicType *write_bytes           = nullptr;
-  Metrics::Counter::AtomicType *hdr_vector_marshal    = nullptr;
-  Metrics::Counter::AtomicType *hdr_marshal           = nullptr;
-  Metrics::Counter::AtomicType *hdr_marshal_bytes     = nullptr;
-  Metrics::Counter::AtomicType *directory_wrap        = nullptr;
-  Metrics::Counter::AtomicType *directory_sync_count  = nullptr;
-  Metrics::Counter::AtomicType *directory_sync_time   = nullptr;
-  Metrics::Counter::AtomicType *directory_sync_bytes  = nullptr;
-  Metrics::Counter::AtomicType *span_errors_read      = nullptr;
-  Metrics::Counter::AtomicType *span_errors_write     = nullptr;
-  Metrics::Gauge::AtomicType   *span_offline          = nullptr;
-  Metrics::Gauge::AtomicType   *span_online           = nullptr;
-  Metrics::Gauge::AtomicType   *span_failing          = nullptr;
+  ts::Metrics::Gauge::AtomicType   *bytes_used            = nullptr;
+  ts::Metrics::Gauge::AtomicType   *bytes_total           = nullptr;
+  ts::Metrics::Gauge::AtomicType   *stripes               = nullptr;
+  ts::Metrics::Gauge::AtomicType   *ram_cache_bytes       = nullptr;
+  ts::Metrics::Gauge::AtomicType   *ram_cache_bytes_total = nullptr;
+  ts::Metrics::Gauge::AtomicType   *direntries_total      = nullptr;
+  ts::Metrics::Gauge::AtomicType   *direntries_used       = nullptr;
+  ts::Metrics::Counter::AtomicType *ram_cache_hits        = nullptr;
+  ts::Metrics::Counter::AtomicType *ram_cache_misses      = nullptr;
+  ts::Metrics::Counter::AtomicType *pread_count           = nullptr;
+  ts::Metrics::Gauge::AtomicType   *percent_full          = nullptr;
+  ts::Metrics::Counter::AtomicType *read_seek_fail        = nullptr;
+  ts::Metrics::Counter::AtomicType *read_invalid          = nullptr;
+  ts::Metrics::Counter::AtomicType *write_backlog_failure = nullptr;
+  ts::Metrics::Counter::AtomicType *directory_collision   = nullptr;
+  ts::Metrics::Counter::AtomicType *read_busy_success     = nullptr;
+  ts::Metrics::Counter::AtomicType *read_busy_failure     = nullptr;
+  ts::Metrics::Counter::AtomicType *gc_bytes_evacuated    = nullptr;
+  ts::Metrics::Counter::AtomicType *gc_frags_evacuated    = nullptr;
+  ts::Metrics::Counter::AtomicType *write_bytes           = nullptr;
+  ts::Metrics::Counter::AtomicType *hdr_vector_marshal    = nullptr;
+  ts::Metrics::Counter::AtomicType *hdr_marshal           = nullptr;
+  ts::Metrics::Counter::AtomicType *hdr_marshal_bytes     = nullptr;
+  ts::Metrics::Counter::AtomicType *directory_wrap        = nullptr;
+  ts::Metrics::Counter::AtomicType *directory_sync_count  = nullptr;
+  ts::Metrics::Counter::AtomicType *directory_sync_time   = nullptr;
+  ts::Metrics::Counter::AtomicType *directory_sync_bytes  = nullptr;
+  ts::Metrics::Counter::AtomicType *span_errors_read      = nullptr;
+  ts::Metrics::Counter::AtomicType *span_errors_write     = nullptr;
+  ts::Metrics::Gauge::AtomicType   *span_offline          = nullptr;
+  ts::Metrics::Gauge::AtomicType   *span_online           = nullptr;
+  ts::Metrics::Gauge::AtomicType   *span_failing          = nullptr;
 };
diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc
index 88e6bf000e..334c28083a 100644
--- a/src/iocore/cache/Stripe.cc
+++ b/src/iocore/cache/Stripe.cc
@@ -44,9 +44,9 @@ compare_ushort(void const *a, void const *b)
 } // namespace
 
 struct StripeInitInfo {
-  off_t               recover_pos;
-  AIOCallbackInternal vol_aio[4];
-  char               *vol_h_f;
+  off_t       recover_pos;
+  AIOCallback vol_aio[4];
+  char       *vol_h_f;
 
   StripeInitInfo()
   {
diff --git a/src/iocore/cache/StripeSM.cc b/src/iocore/cache/StripeSM.cc
index b360e7dc81..f5861df320 100644
--- a/src/iocore/cache/StripeSM.cc
+++ b/src/iocore/cache/StripeSM.cc
@@ -88,9 +88,9 @@ static void update_header_info(CacheVC *vc, Doc *doc);
 static int  evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int 
force, StripeSM *stripe);
 
 struct StripeInitInfo {
-  off_t               recover_pos;
-  AIOCallbackInternal vol_aio[4];
-  char               *vol_h_f;
+  off_t       recover_pos;
+  AIOCallback vol_aio[4];
+  char       *vol_h_f;
 
   StripeInitInfo()
   {
diff --git a/src/iocore/cache/StripeSM.h b/src/iocore/cache/StripeSM.h
index 415dc9f1f6..7ee9515392 100644
--- a/src/iocore/cache/StripeSM.h
+++ b/src/iocore/cache/StripeSM.h
@@ -68,9 +68,9 @@ public:
 
   int hit_evacuate_window{};
 
-  off_t               recover_pos      = 0;
-  off_t               prev_recover_pos = 0;
-  AIOCallbackInternal io;
+  off_t       recover_pos      = 0;
+  off_t       prev_recover_pos = 0;
+  AIOCallback io;
 
   Queue<CacheVC, Continuation::Link_link> sync;
 
diff --git a/src/iocore/cache/unit_tests/main.h 
b/src/iocore/cache/unit_tests/main.h
index 57d59c2288..6663d5ec10 100644
--- a/src/iocore/cache/unit_tests/main.h
+++ b/src/iocore/cache/unit_tests/main.h
@@ -33,7 +33,7 @@
 
 #include "records/RecordsConfig.h"
 #include "records/RecProcess.h"
-#include "../../aio/P_AIO.h"
+#include "iocore/aio/AIO.h"
 #include "../P_CacheDisk.h"
 #include "../../net/P_Net.h"
 #include "CacheTestHandler.h"
diff --git a/src/iocore/cache/unit_tests/test_doubles.h 
b/src/iocore/cache/unit_tests/test_doubles.h
index 65337efba2..f76efbbd84 100644
--- a/src/iocore/cache/unit_tests/test_doubles.h
+++ b/src/iocore/cache/unit_tests/test_doubles.h
@@ -26,6 +26,12 @@
 #include "main.h"
 
 #include "tscore/EventNotify.h"
+#include "tscore/ink_config.h"
+#include "tscore/ink_hrtime.h"
+
+#if TS_USE_LINUX_IO_URING
+#include "iocore/io_uring/IO_URING.h"
+#endif
 
 #include <cstdint>
 #include <cstring>
@@ -106,7 +112,15 @@ public:
   {
     this->_notifier.lock();
     while (!this->_got_callback) {
+#if TS_USE_LINUX_IO_URING
+      // The cache tests make some assumptions that AIO is done on other 
threads
+      // and don't go through the event loop but instead call this.  This means
+      // that io_uring ops aren't submitted or serviced.  Having this here
+      // gets io_uring going again.
+      IOUringContext::local_context()->submit_and_wait(HRTIME_MSECONDS(100));
+#else
       this->_notifier.wait();
+#endif
     }
     this->_notifier.unlock();
   }
diff --git a/src/iocore/io_uring/IOUringEventIO.cc 
b/src/iocore/io_uring/IOUringEventIO.cc
index d36d38873c..15180c5180 100644
--- a/src/iocore/io_uring/IOUringEventIO.cc
+++ b/src/iocore/io_uring/IOUringEventIO.cc
@@ -34,7 +34,7 @@ IOUringEventIO::start(EventLoop l, IOUringContext *h)
 }
 
 void
-IOUringEventIO::process_event(int flags)
+IOUringEventIO::process_event(int /* flags ATS_UNUSED */)
 {
   _h->service();
 }


Reply via email to