This is an automated email from the ASF dual-hosted git repository.
cmcfarlen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new aebe166655 Cleanup aio inline and remove P_AIO header (#11792)
aebe166655 is described below
commit aebe16665532e8c208deb7cccc7b68bc5dbe008b
Author: Chris McFarlen <[email protected]>
AuthorDate: Thu Oct 3 08:41:24 2024 -0500
Cleanup aio inline and remove P_AIO header (#11792)
* Cleanup aio inline and remove P_AIO header
* Fixes for io_uring
* Fix cache tests that behave strangely
---------
Co-authored-by: Chris McFarlen <[email protected]>
---
include/iocore/aio/AIO.h | 36 ++++++-
include/iocore/cache/CacheVC.h | 6 +-
src/iocore/aio/AIO.cc | 150 +++++++++++++++++++++--------
src/iocore/aio/CMakeLists.txt | 2 +-
src/iocore/aio/Inline.cc | 30 ------
src/iocore/aio/P_AIO.h | 142 ---------------------------
src/iocore/aio/test_AIO.cc | 1 -
src/iocore/cache/CacheEvacuateDocVC.cc | 2 +-
src/iocore/cache/P_Cache.h | 1 -
src/iocore/cache/P_CacheDir.h | 21 ++--
src/iocore/cache/P_CacheDisk.h | 38 ++++----
src/iocore/cache/P_CacheStats.h | 74 +++++++-------
src/iocore/cache/Stripe.cc | 6 +-
src/iocore/cache/StripeSM.cc | 6 +-
src/iocore/cache/StripeSM.h | 6 +-
src/iocore/cache/unit_tests/main.h | 2 +-
src/iocore/cache/unit_tests/test_doubles.h | 14 +++
src/iocore/io_uring/IOUringEventIO.cc | 2 +-
18 files changed, 239 insertions(+), 300 deletions(-)
diff --git a/include/iocore/aio/AIO.h b/include/iocore/aio/AIO.h
index fba6925faa..73cd5b3d3c 100644
--- a/include/iocore/aio/AIO.h
+++ b/include/iocore/aio/AIO.h
@@ -30,10 +30,15 @@
****************************************************************************/
#pragma once
+#include "iocore/eventsystem/Continuation.h"
#include "tscore/ink_platform.h"
#include "iocore/eventsystem/EventSystem.h"
#include "records/RecProcess.h"
+#if TS_USE_LINUX_IO_URING
+#include "iocore/io_uring/IO_URING.h"
+#endif
+
static constexpr ts::ModuleVersion AIO_MODULE_PUBLIC_VERSION(1, 0,
ts::ModuleVersion::PUBLIC);
#define AIO_EVENT_DONE (AIO_EVENT_EVENTS_START + 0)
@@ -63,17 +68,40 @@ bool ink_aio_thread_num_set(int thread_num);
#define AIO_CALLBACK_THREAD_ANY ((EThread *)0) // any regular event thread
#define AIO_CALLBACK_THREAD_AIO ((EThread *)-1)
+struct AIO_Reqs;
+
+#if TS_USE_LINUX_IO_URING
+struct AIOCallback : public Continuation, public IOUringCompletionHandler {
+#else
struct AIOCallback : public Continuation {
+#endif
// set before calling aio_read/aio_write
ink_aiocb aiocb;
Action action;
EThread *thread = AIO_CALLBACK_THREAD_ANY;
AIOCallback *then = nullptr;
// set on return from aio_read/aio_write
- int64_t aio_result = 0;
-
- int ok();
- AIOCallback() {}
+ int64_t aio_result = 0;
+ AIO_Reqs *aio_req = nullptr;
+ ink_hrtime sleep_time = 0;
+ SLINK(AIOCallback, alink); /* for AIO_Reqs::aio_temp_list */
+#if TS_USE_LINUX_IO_URING
+ iovec iov = {}; // this is to support older kernels that only
support readv/writev
+ AIOCallback *this_op = nullptr;
+ AIOCallback *aio_op = nullptr;
+
+ void handle_complete(io_uring_cqe *) override;
+#endif
+
+ int io_complete(int event, void *data);
+
+ int
+ ok()
+ {
+ return (aiocb.aio_nbytes == static_cast<size_t>(aio_result)) &&
(aio_result >= 0);
+ }
+
+ AIOCallback() { SET_HANDLER(&AIOCallback::io_complete); }
};
void ink_aio_init(ts::ModuleVersion version, AIOBackend backend =
AIO_BACKEND_AUTO);
diff --git a/include/iocore/cache/CacheVC.h b/include/iocore/cache/CacheVC.h
index 5b862065de..4ff28ad2a7 100644
--- a/include/iocore/cache/CacheVC.h
+++ b/include/iocore/cache/CacheVC.h
@@ -237,9 +237,9 @@ struct CacheVC : public CacheVConnection {
Ptr<IOBufferBlock> blocks; // data available to write
Ptr<IOBufferBlock> writer_buf;
- OpenDirEntry *od = nullptr;
- AIOCallbackInternal io;
- int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred
position in vector
+ OpenDirEntry *od = nullptr;
+ AIOCallback io;
+ int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred
position in vector
LINK(CacheVC, opendir_link);
// end Region B
diff --git a/src/iocore/aio/AIO.cc b/src/iocore/aio/AIO.cc
index c499057103..4f6ba934db 100644
--- a/src/iocore/aio/AIO.cc
+++ b/src/iocore/aio/AIO.cc
@@ -25,12 +25,14 @@
* Async Disk IO operations.
*/
+#include "iocore/aio/AIO.h"
#include "tscore/TSSystemState.h"
+#include "tscore/ink_atomic.h"
#include "tscore/ink_hw.h"
-#include <atomic>
-
-#include "P_AIO.h"
+#if TS_USE_LINUX_IO_URING
+#include "iocore/io_uring/IO_URING.h"
+#endif
#ifdef AIO_FAULT_INJECTION
#include "iocore/aio/AIO_fault_injection.h"
@@ -39,6 +41,11 @@
#define MAX_DISKS_POSSIBLE 100
// globals
+static constexpr ts::ModuleVersion
AIO_MODULE_INTERNAL_VERSION{AIO_MODULE_PUBLIC_VERSION,
ts::ModuleVersion::PRIVATE};
+
+// for debugging
+// #define AIO_STATS 1
+
#if TS_USE_LINUX_IO_URING
static bool use_io_uring = false;
@@ -49,6 +56,7 @@ void setup_prep_ops(IOUringContext *);
#endif
/* structure to hold information about each file descriptor */
+struct AIO_Reqs;
AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
/* number of unique file descriptors in the aio_reqs array */
int num_filedes = 1;
@@ -62,9 +70,73 @@ int thread_is_created = 0;
RecInt cache_config_threads_per_disk = 12;
RecInt api_config_threads_per_disk = 12;
-AIOStatsBlock aio_rsb;
Continuation *aio_err_callback = nullptr;
+/* internal definitions */
+int
+AIOCallback::io_complete(int event, void *data)
+{
+ (void)event;
+ (void)data;
+ if (aio_err_callback && !ok()) {
+ AIOCallback *err_op = new AIOCallback();
+ err_op->aiocb.aio_fildes = this->aiocb.aio_fildes;
+ err_op->aiocb.aio_lio_opcode = this->aiocb.aio_lio_opcode;
+ err_op->mutex = aio_err_callback->mutex;
+ err_op->action = aio_err_callback;
+
+ // Take this lock in-line because we want to stop other I/O operations on
this disk ASAP
+ SCOPED_MUTEX_LOCK(lock, aio_err_callback->mutex, this_ethread());
+ err_op->action.continuation->handleEvent(EVENT_NONE, err_op);
+ }
+ if (!action.cancelled && action.continuation) {
+ action.continuation->handleEvent(AIO_EVENT_DONE, this);
+ }
+ return EVENT_DONE;
+}
+
+struct AIO_Reqs {
+ Que(AIOCallback, link) aio_todo; /* queue for AIO operations */
+ /* Atomic list to temporarily hold the
request if the
+ lock for a particular queue cannot be
acquired */
+ ASLL(AIOCallback, alink) aio_temp_list;
+ ink_mutex aio_mutex;
+ ink_cond aio_cond;
+ int index = 0; /* position of this struct in the aio_reqs
array */
+ int pending = 0; /* number of outstanding requests on the
disk */
+ int queued = 0; /* total number of aio_todo requests */
+ int filedes = -1; /* the file descriptor for the requests or
status IO_NOT_IN_PROGRESS */
+ int requests_queued = 0;
+};
+
+#ifdef AIO_STATS
+class AIOTestData : public Continuation
+{
+public:
+ int num_req;
+ int num_temp;
+ int num_queue;
+ ink_hrtime start;
+
+ int ink_aio_stats(int event, void *data);
+
+ AIOTestData() : Continuation(new_ProxyMutex()), num_req(0), num_temp(0),
num_queue(0)
+ {
+ start = ink_get_hrtime();
+ SET_HANDLER(&AIOTestData::ink_aio_stats);
+ }
+};
+#endif
+
+struct AIOStatsBlock {
+ ts::Metrics::Counter::AtomicType *read_count;
+ ts::Metrics::Counter::AtomicType *kb_read;
+ ts::Metrics::Counter::AtomicType *write_count;
+ ts::Metrics::Counter::AtomicType *kb_write;
+};
+
+AIOStatsBlock aio_rsb;
+
#ifdef AIO_STATS
/* total number of requests received - for debugging */
static int num_requests = 0;
@@ -93,7 +165,7 @@ AIOTestData::ink_aio_stats(int event, void *d)
AIOCallback *
new_AIOCallback()
{
- return new AIOCallbackInternal;
+ return new AIOCallback;
}
void
@@ -107,10 +179,10 @@ ink_aio_init(ts::ModuleVersion v, [[maybe_unused]]
AIOBackend backend)
{
ink_release_assert(v.check(AIO_MODULE_INTERNAL_VERSION));
- aio_rsb.read_count =
Metrics::Counter::createPtr("proxy.process.cache.aio.read_count");
- aio_rsb.write_count =
Metrics::Counter::createPtr("proxy.process.cache.aio.write_count");
- aio_rsb.kb_read =
Metrics::Counter::createPtr("proxy.process.cache.aio.KB_read");
- aio_rsb.kb_write =
Metrics::Counter::createPtr("proxy.process.cache.aio.KB_write");
+ aio_rsb.read_count =
ts::Metrics::Counter::createPtr("proxy.process.cache.aio.read_count");
+ aio_rsb.write_count =
ts::Metrics::Counter::createPtr("proxy.process.cache.aio.write_count");
+ aio_rsb.kb_read =
ts::Metrics::Counter::createPtr("proxy.process.cache.aio.KB_read");
+ aio_rsb.kb_write =
ts::Metrics::Counter::createPtr("proxy.process.cache.aio.KB_write");
memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
ink_mutex_init(&insert_mutex);
@@ -276,8 +348,8 @@ aio_move(AIO_Reqs *req)
return;
}
- AIOCallbackInternal *cbi;
- SList(AIOCallbackInternal, alink) aq(req->aio_temp_list.popall());
+ AIOCallback *cbi;
+ SList(AIOCallback, alink) aq(req->aio_temp_list.popall());
// flip the list
Queue<AIOCallback> cbq;
@@ -293,7 +365,7 @@ aio_move(AIO_Reqs *req)
/* queue the new request */
static void
-aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
+aio_queue_req(AIOCallback *op, int fromAPI = 0)
{
int thread_ndx = 1;
AIO_Reqs *req = op->aio_req;
@@ -365,10 +437,10 @@ aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
}
static inline int
-cache_op(AIOCallbackInternal *op)
+cache_op(AIOCallback *op)
{
bool read = (op->aiocb.aio_lio_opcode == LIO_READ);
- for (; op; op = (AIOCallbackInternal *)op->then) {
+ for (; op; op = op->then) {
ink_aiocb *a = &op->aiocb;
ssize_t err, res = 0;
@@ -440,13 +512,13 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info)
// update the stats;
if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
- Metrics::Counter::increment(aio_rsb.write_count);
- Metrics::Counter::increment(aio_rsb.kb_write, op->aiocb.aio_nbytes >>
10);
+ ts::Metrics::Counter::increment(aio_rsb.write_count);
+ ts::Metrics::Counter::increment(aio_rsb.kb_write, op->aiocb.aio_nbytes
>> 10);
} else {
- Metrics::Counter::increment(aio_rsb.read_count);
- Metrics::Counter::increment(aio_rsb.kb_read, op->aiocb.aio_nbytes >>
10);
+ ts::Metrics::Counter::increment(aio_rsb.read_count);
+ ts::Metrics::Counter::increment(aio_rsb.kb_read, op->aiocb.aio_nbytes
>> 10);
}
- cache_op(reinterpret_cast<AIOCallbackInternal *>(op));
+ cache_op(reinterpret_cast<AIOCallback *>(op));
ink_atomic_increment(&my_aio_req->requests_queued, -1);
#ifdef AIO_STATS
ink_atomic_increment(&my_aio_req->pending, -1);
@@ -477,13 +549,13 @@ namespace
{
void
-prep_read(io_uring_sqe *sqe, AIOCallbackInternal *op)
+prep_read(io_uring_sqe *sqe, AIOCallback *op)
{
io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf,
op->aiocb.aio_nbytes, op->aiocb.aio_offset);
}
void
-prep_readv(io_uring_sqe *sqe, AIOCallbackInternal *op)
+prep_readv(io_uring_sqe *sqe, AIOCallback *op)
{
op->iov.iov_len = op->aiocb.aio_nbytes;
op->iov.iov_base = op->aiocb.aio_buf;
@@ -491,20 +563,20 @@ prep_readv(io_uring_sqe *sqe, AIOCallbackInternal *op)
}
void
-prep_write(io_uring_sqe *sqe, AIOCallbackInternal *op)
+prep_write(io_uring_sqe *sqe, AIOCallback *op)
{
io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf,
op->aiocb.aio_nbytes, op->aiocb.aio_offset);
}
void
-prep_writev(io_uring_sqe *sqe, AIOCallbackInternal *op)
+prep_writev(io_uring_sqe *sqe, AIOCallback *op)
{
op->iov.iov_len = op->aiocb.aio_nbytes;
op->iov.iov_base = op->aiocb.aio_buf;
io_uring_prep_writev(sqe, op->aiocb.aio_fildes, &op->iov, 1,
op->aiocb.aio_offset);
}
-using prep_op = void (*)(io_uring_sqe *, AIOCallbackInternal *);
+using prep_op = void (*)(io_uring_sqe *, AIOCallback *);
prep_op prep_ops[] = {
nullptr,
@@ -526,10 +598,10 @@ setup_prep_ops(IOUringContext *ur)
}
void
-io_uring_prep_ops_internal(AIOCallbackInternal *op_in, int op_type)
+io_uring_prep_ops_internal(AIOCallback *op_in, int op_type)
{
- IOUringContext *ur = IOUringContext::local_context();
- AIOCallbackInternal *op = op_in;
+ IOUringContext *ur = IOUringContext::local_context();
+ AIOCallback *op = op_in;
while (op) {
op->this_op = op;
io_uring_sqe *sqe = ur->next_sqe(op);
@@ -542,19 +614,19 @@ io_uring_prep_ops_internal(AIOCallbackInternal *op_in,
int op_type)
if (op->then) {
sqe->flags |= IOSQE_IO_LINK;
} else if (op->aio_op == nullptr) { // This condition leaves an existing
aio_op in place if there is one. (EAGAIN)
- op->aio_op = static_cast<AIOCallbackInternal *>(op_in);
+ op->aio_op = op_in;
}
- op = static_cast<AIOCallbackInternal *>(op->then);
+ op = op->then;
}
}
} // namespace
void
-AIOCallbackInternal::handle_complete(io_uring_cqe *cqe)
+AIOCallback::handle_complete(io_uring_cqe *cqe)
{
- AIOCallbackInternal *op = this_op;
+ AIOCallback *op = this_op;
// Re-submit the request on EAGAIN.
// we might need to re-submit the entire rest of the chain, so just call
prep again
@@ -576,11 +648,11 @@ AIOCallbackInternal::handle_complete(io_uring_cqe *cqe)
if (op->aio_result > 0) {
if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
- Metrics::Counter::increment(aio_rsb.write_count);
- Metrics::Counter::increment(aio_rsb.kb_write, op->aiocb.aio_nbytes >>
10);
+ ts::Metrics::Counter::increment(aio_rsb.write_count);
+ ts::Metrics::Counter::increment(aio_rsb.kb_write, op->aiocb.aio_nbytes
>> 10);
} else {
- Metrics::Counter::increment(aio_rsb.read_count);
- Metrics::Counter::increment(aio_rsb.kb_read, op->aiocb.aio_nbytes >> 10);
+ ts::Metrics::Counter::increment(aio_rsb.read_count);
+ ts::Metrics::Counter::increment(aio_rsb.kb_read, op->aiocb.aio_nbytes >>
10);
}
}
@@ -605,12 +677,12 @@ ink_aio_read(AIOCallback *op_in, int fromAPI)
{
#if TS_USE_LINUX_IO_URING
if (use_io_uring) {
- io_uring_prep_ops_internal(static_cast<AIOCallbackInternal *>(op_in),
LIO_READ);
+ io_uring_prep_ops_internal(op_in, LIO_READ);
return 1;
}
#endif
op_in->aiocb.aio_lio_opcode = LIO_READ;
- aio_queue_req(static_cast<AIOCallbackInternal *>(op_in), fromAPI);
+ aio_queue_req(op_in, fromAPI);
return 1;
}
@@ -620,12 +692,12 @@ ink_aio_write(AIOCallback *op_in, int fromAPI)
{
#if TS_USE_LINUX_IO_URING
if (use_io_uring) {
- io_uring_prep_ops_internal(static_cast<AIOCallbackInternal *>(op_in),
LIO_WRITE);
+ io_uring_prep_ops_internal(op_in, LIO_WRITE);
return 1;
}
#endif
op_in->aiocb.aio_lio_opcode = LIO_WRITE;
- aio_queue_req(static_cast<AIOCallbackInternal *>(op_in), fromAPI);
+ aio_queue_req(op_in, fromAPI);
return 1;
}
diff --git a/src/iocore/aio/CMakeLists.txt b/src/iocore/aio/CMakeLists.txt
index 0930cc8dee..4f6f4bc0b8 100644
--- a/src/iocore/aio/CMakeLists.txt
+++ b/src/iocore/aio/CMakeLists.txt
@@ -18,7 +18,7 @@
add_library(aio STATIC)
add_library(ts::aio ALIAS aio)
-target_sources(aio PRIVATE AIO.cc Inline.cc AIO_fault_injection.cc)
+target_sources(aio PRIVATE AIO.cc AIO_fault_injection.cc)
target_link_libraries(aio PUBLIC ts::inkevent ts::tscore)
if(TS_USE_LINUX_IO_URING)
diff --git a/src/iocore/aio/Inline.cc b/src/iocore/aio/Inline.cc
deleted file mode 100644
index 8e9b6d32c5..0000000000
--- a/src/iocore/aio/Inline.cc
+++ /dev/null
@@ -1,30 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-/*
- * Inline Functions as globals for users using the public interface
- *
- */
-
-#define TS_INLINE
-#include "P_AIO.h"
diff --git a/src/iocore/aio/P_AIO.h b/src/iocore/aio/P_AIO.h
deleted file mode 100644
index 0e1e665487..0000000000
--- a/src/iocore/aio/P_AIO.h
+++ /dev/null
@@ -1,142 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-/****************************************************************************
-
- Async Disk IO operations.
-
-
-
- ****************************************************************************/
-#pragma once
-
-#include "../eventsystem/P_EventSystem.h"
-#include "iocore/aio/AIO.h"
-
-#if TS_USE_LINUX_IO_URING
-#include "iocore/io_uring/IO_URING.h"
-#endif
-
-#include "tsutil/Metrics.h"
-
-using ts::Metrics;
-
-// for debugging
-// #define AIO_STATS 1
-
-static constexpr ts::ModuleVersion
AIO_MODULE_INTERNAL_VERSION{AIO_MODULE_PUBLIC_VERSION,
ts::ModuleVersion::PRIVATE};
-
-TS_INLINE int
-AIOCallback::ok()
-{
- return (aiocb.aio_nbytes == static_cast<size_t>(aio_result)) && (aio_result
>= 0);
-}
-
-extern Continuation *aio_err_callback;
-
-struct AIO_Reqs;
-
-#if TS_USE_LINUX_IO_URING
-struct AIOCallbackInternal : public AIOCallback, public
IOUringCompletionHandler {
-#else
-struct AIOCallbackInternal : public AIOCallback {
-#endif
- AIO_Reqs *aio_req = nullptr;
- ink_hrtime sleep_time = 0;
- SLINK(AIOCallbackInternal, alink); /* for AIO_Reqs::aio_temp_list */
-#if TS_USE_LINUX_IO_URING
- iovec iov = {}; // this is to support older kernels that
only support readv/writev
- AIOCallbackInternal *this_op = nullptr;
- AIOCallbackInternal *aio_op = nullptr;
-
- void handle_complete(io_uring_cqe *) override;
-#endif
-
- int io_complete(int event, void *data);
-
- AIOCallbackInternal() { SET_HANDLER(&AIOCallbackInternal::io_complete); }
-};
-
-struct AIO_Reqs {
- Que(AIOCallback, link) aio_todo; /* queue for AIO operations */
- /* Atomic list to temporarily hold the
request if the
- lock for a particular queue cannot be
acquired */
- ASLL(AIOCallbackInternal, alink) aio_temp_list;
- ink_mutex aio_mutex;
- ink_cond aio_cond;
- int index = 0; /* position of this struct in the aio_reqs
array */
- int pending = 0; /* number of outstanding requests on the
disk */
- int queued = 0; /* total number of aio_todo requests */
- int filedes = -1; /* the file descriptor for the requests or
status IO_NOT_IN_PROGRESS */
- int requests_queued = 0;
-};
-
-TS_INLINE int
-AIOCallbackInternal::io_complete(int event, void *data)
-{
- (void)event;
- (void)data;
- if (aio_err_callback && !ok()) {
- AIOCallback *err_op = new AIOCallbackInternal();
- err_op->aiocb.aio_fildes = this->aiocb.aio_fildes;
- err_op->aiocb.aio_lio_opcode = this->aiocb.aio_lio_opcode;
- err_op->mutex = aio_err_callback->mutex;
- err_op->action = aio_err_callback;
-
- // Take this lock in-line because we want to stop other I/O operations on
this disk ASAP
- SCOPED_MUTEX_LOCK(lock, aio_err_callback->mutex, this_ethread());
- err_op->action.continuation->handleEvent(EVENT_NONE, err_op);
- }
- if (!action.cancelled && action.continuation) {
- action.continuation->handleEvent(AIO_EVENT_DONE, this);
- }
- return EVENT_DONE;
-}
-
-#ifdef AIO_STATS
-class AIOTestData : public Continuation
-{
-public:
- int num_req;
- int num_temp;
- int num_queue;
- ink_hrtime start;
-
- int ink_aio_stats(int event, void *data);
-
- AIOTestData() : Continuation(new_ProxyMutex()), num_req(0), num_temp(0),
num_queue(0)
- {
- start = ink_get_hrtime();
- SET_HANDLER(&AIOTestData::ink_aio_stats);
- }
-};
-#endif
-
-struct AIOStatsBlock {
- Metrics::Counter::AtomicType *read_count;
- Metrics::Counter::AtomicType *kb_read;
- Metrics::Counter::AtomicType *write_count;
- Metrics::Counter::AtomicType *kb_write;
-};
-
-extern AIOStatsBlock aio_rsb;
diff --git a/src/iocore/aio/test_AIO.cc b/src/iocore/aio/test_AIO.cc
index 065407bd9e..379e9fe3c7 100644
--- a/src/iocore/aio/test_AIO.cc
+++ b/src/iocore/aio/test_AIO.cc
@@ -21,7 +21,6 @@
limitations under the License.
*/
-#include "P_AIO.h"
#include "api/InkAPIInternal.h"
#include "tscore/ink_hw.h"
#include "tscore/Layout.h"
diff --git a/src/iocore/cache/CacheEvacuateDocVC.cc
b/src/iocore/cache/CacheEvacuateDocVC.cc
index d9f0b4a4ba..a525c76675 100644
--- a/src/iocore/cache/CacheEvacuateDocVC.cc
+++ b/src/iocore/cache/CacheEvacuateDocVC.cc
@@ -24,7 +24,7 @@
// make sure there are no incomplete types
// aio
-#include "../aio/P_AIO.h"
+#include "iocore/aio/AIO.h"
// inkcache
#include "iocore/cache/CacheDefs.h"
diff --git a/src/iocore/cache/P_Cache.h b/src/iocore/cache/P_Cache.h
index bc33c9819d..057e2165b6 100644
--- a/src/iocore/cache/P_Cache.h
+++ b/src/iocore/cache/P_Cache.h
@@ -25,7 +25,6 @@
#include "tscore/ink_platform.h"
#include "../eventsystem/P_EventSystem.h"
-#include "../aio/P_AIO.h"
#include "proxy/hdrs/HTTP.h"
#include "proxy/hdrs/MIME.h"
diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h
index a1efff1961..07cbc5007b 100644
--- a/src/iocore/cache/P_CacheDir.h
+++ b/src/iocore/cache/P_CacheDir.h
@@ -31,7 +31,6 @@
#include "iocore/eventsystem/Continuation.h"
// aio
-#include "../aio/P_AIO.h"
#include "iocore/aio/AIO.h"
class Stripe;
@@ -244,16 +243,16 @@ struct OpenDir : public Continuation {
};
struct CacheSync : public Continuation {
- int stripe_index = 0;
- char *buf = nullptr;
- size_t buflen = 0;
- bool buf_huge = false;
- off_t writepos = 0;
- AIOCallbackInternal io;
- Event *trigger = nullptr;
- ink_hrtime start_time = 0;
- int mainEvent(int event, Event *e);
- void aio_write(int fd, char *b, int n, off_t o);
+ int stripe_index = 0;
+ char *buf = nullptr;
+ size_t buflen = 0;
+ bool buf_huge = false;
+ off_t writepos = 0;
+ AIOCallback io;
+ Event *trigger = nullptr;
+ ink_hrtime start_time = 0;
+ int mainEvent(int event, Event *e);
+ void aio_write(int fd, char *b, int n, off_t o);
CacheSync() : Continuation(new_ProxyMutex()) {
SET_HANDLER(&CacheSync::mainEvent); }
};
diff --git a/src/iocore/cache/P_CacheDisk.h b/src/iocore/cache/P_CacheDisk.h
index 047926e880..e9a7782b82 100644
--- a/src/iocore/cache/P_CacheDisk.h
+++ b/src/iocore/cache/P_CacheDisk.h
@@ -25,8 +25,6 @@
#include "iocore/cache/Cache.h"
-#include "../aio/P_AIO.h"
-
extern int cache_config_max_disk_errors;
#define DISK_BAD(_x) ((_x)->num_errors >=
cache_config_max_disk_errors)
@@ -74,24 +72,24 @@ struct DiskHeader {
};
struct CacheDisk : public Continuation {
- DiskHeader *header = nullptr;
- char *path = nullptr;
- int header_len = 0;
- AIOCallbackInternal io;
- off_t len = 0; // in blocks (STORE_BLOCK)
- off_t start = 0;
- off_t skip = 0;
- off_t num_usable_blocks = 0;
- int hw_sector_size = 0;
- int fd = -1;
- off_t free_space = 0;
- off_t wasted_space = 0;
- DiskStripe **disk_stripes = nullptr;
- DiskStripe *free_blocks = nullptr;
- int num_errors = 0;
- int cleared = 0;
- bool read_only_p = false;
- bool online = true; /* flag marking cache disk online or offline (because of
too many failures or by the operator). */
+ DiskHeader *header = nullptr;
+ char *path = nullptr;
+ int header_len = 0;
+ AIOCallback io;
+ off_t len = 0; // in blocks (STORE_BLOCK)
+ off_t start = 0;
+ off_t skip = 0;
+ off_t num_usable_blocks = 0;
+ int hw_sector_size = 0;
+ int fd = -1;
+ off_t free_space = 0;
+ off_t wasted_space = 0;
+ DiskStripe **disk_stripes = nullptr;
+ DiskStripe *free_blocks = nullptr;
+ int num_errors = 0;
+ int cleared = 0;
+ bool read_only_p = false;
+ bool online = true; /* flag marking cache disk online or offline
(because of too many failures or by the operator). */
// Extra configuration values
int forced_volume_num = -1; ///< Volume number for this disk.
diff --git a/src/iocore/cache/P_CacheStats.h b/src/iocore/cache/P_CacheStats.h
index f849d2da54..1ac4ba7dea 100644
--- a/src/iocore/cache/P_CacheStats.h
+++ b/src/iocore/cache/P_CacheStats.h
@@ -23,48 +23,50 @@
#pragma once
+#include "tsutil/Metrics.h"
+
// cache stats definitions, for both global cache metrics, as well as per
volume metrics.
enum class CacheOpType { Lookup = 0, Read, Write, Update, Remove, Evacuate,
Scan, Last };
struct CacheStatsBlock {
struct {
- Metrics::Gauge::AtomicType *active = nullptr;
- Metrics::Counter::AtomicType *success = nullptr;
- Metrics::Counter::AtomicType *failure = nullptr;
+ ts::Metrics::Gauge::AtomicType *active = nullptr;
+ ts::Metrics::Counter::AtomicType *success = nullptr;
+ ts::Metrics::Counter::AtomicType *failure = nullptr;
} status[static_cast<int>(CacheOpType::Last)];
- Metrics::Counter::AtomicType *fragment_document_count[3] = {nullptr,
nullptr, nullptr}; // For 1, 2 and 3+ fragments
+ ts::Metrics::Counter::AtomicType *fragment_document_count[3] = {nullptr,
nullptr, nullptr}; // For 1, 2 and 3+ fragments
- Metrics::Gauge::AtomicType *bytes_used = nullptr;
- Metrics::Gauge::AtomicType *bytes_total = nullptr;
- Metrics::Gauge::AtomicType *stripes = nullptr;
- Metrics::Gauge::AtomicType *ram_cache_bytes = nullptr;
- Metrics::Gauge::AtomicType *ram_cache_bytes_total = nullptr;
- Metrics::Gauge::AtomicType *direntries_total = nullptr;
- Metrics::Gauge::AtomicType *direntries_used = nullptr;
- Metrics::Counter::AtomicType *ram_cache_hits = nullptr;
- Metrics::Counter::AtomicType *ram_cache_misses = nullptr;
- Metrics::Counter::AtomicType *pread_count = nullptr;
- Metrics::Gauge::AtomicType *percent_full = nullptr;
- Metrics::Counter::AtomicType *read_seek_fail = nullptr;
- Metrics::Counter::AtomicType *read_invalid = nullptr;
- Metrics::Counter::AtomicType *write_backlog_failure = nullptr;
- Metrics::Counter::AtomicType *directory_collision = nullptr;
- Metrics::Counter::AtomicType *read_busy_success = nullptr;
- Metrics::Counter::AtomicType *read_busy_failure = nullptr;
- Metrics::Counter::AtomicType *gc_bytes_evacuated = nullptr;
- Metrics::Counter::AtomicType *gc_frags_evacuated = nullptr;
- Metrics::Counter::AtomicType *write_bytes = nullptr;
- Metrics::Counter::AtomicType *hdr_vector_marshal = nullptr;
- Metrics::Counter::AtomicType *hdr_marshal = nullptr;
- Metrics::Counter::AtomicType *hdr_marshal_bytes = nullptr;
- Metrics::Counter::AtomicType *directory_wrap = nullptr;
- Metrics::Counter::AtomicType *directory_sync_count = nullptr;
- Metrics::Counter::AtomicType *directory_sync_time = nullptr;
- Metrics::Counter::AtomicType *directory_sync_bytes = nullptr;
- Metrics::Counter::AtomicType *span_errors_read = nullptr;
- Metrics::Counter::AtomicType *span_errors_write = nullptr;
- Metrics::Gauge::AtomicType *span_offline = nullptr;
- Metrics::Gauge::AtomicType *span_online = nullptr;
- Metrics::Gauge::AtomicType *span_failing = nullptr;
+ ts::Metrics::Gauge::AtomicType *bytes_used = nullptr;
+ ts::Metrics::Gauge::AtomicType *bytes_total = nullptr;
+ ts::Metrics::Gauge::AtomicType *stripes = nullptr;
+ ts::Metrics::Gauge::AtomicType *ram_cache_bytes = nullptr;
+ ts::Metrics::Gauge::AtomicType *ram_cache_bytes_total = nullptr;
+ ts::Metrics::Gauge::AtomicType *direntries_total = nullptr;
+ ts::Metrics::Gauge::AtomicType *direntries_used = nullptr;
+ ts::Metrics::Counter::AtomicType *ram_cache_hits = nullptr;
+ ts::Metrics::Counter::AtomicType *ram_cache_misses = nullptr;
+ ts::Metrics::Counter::AtomicType *pread_count = nullptr;
+ ts::Metrics::Gauge::AtomicType *percent_full = nullptr;
+ ts::Metrics::Counter::AtomicType *read_seek_fail = nullptr;
+ ts::Metrics::Counter::AtomicType *read_invalid = nullptr;
+ ts::Metrics::Counter::AtomicType *write_backlog_failure = nullptr;
+ ts::Metrics::Counter::AtomicType *directory_collision = nullptr;
+ ts::Metrics::Counter::AtomicType *read_busy_success = nullptr;
+ ts::Metrics::Counter::AtomicType *read_busy_failure = nullptr;
+ ts::Metrics::Counter::AtomicType *gc_bytes_evacuated = nullptr;
+ ts::Metrics::Counter::AtomicType *gc_frags_evacuated = nullptr;
+ ts::Metrics::Counter::AtomicType *write_bytes = nullptr;
+ ts::Metrics::Counter::AtomicType *hdr_vector_marshal = nullptr;
+ ts::Metrics::Counter::AtomicType *hdr_marshal = nullptr;
+ ts::Metrics::Counter::AtomicType *hdr_marshal_bytes = nullptr;
+ ts::Metrics::Counter::AtomicType *directory_wrap = nullptr;
+ ts::Metrics::Counter::AtomicType *directory_sync_count = nullptr;
+ ts::Metrics::Counter::AtomicType *directory_sync_time = nullptr;
+ ts::Metrics::Counter::AtomicType *directory_sync_bytes = nullptr;
+ ts::Metrics::Counter::AtomicType *span_errors_read = nullptr;
+ ts::Metrics::Counter::AtomicType *span_errors_write = nullptr;
+ ts::Metrics::Gauge::AtomicType *span_offline = nullptr;
+ ts::Metrics::Gauge::AtomicType *span_online = nullptr;
+ ts::Metrics::Gauge::AtomicType *span_failing = nullptr;
};
diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc
index 88e6bf000e..334c28083a 100644
--- a/src/iocore/cache/Stripe.cc
+++ b/src/iocore/cache/Stripe.cc
@@ -44,9 +44,9 @@ compare_ushort(void const *a, void const *b)
} // namespace
struct StripeInitInfo {
- off_t recover_pos;
- AIOCallbackInternal vol_aio[4];
- char *vol_h_f;
+ off_t recover_pos;
+ AIOCallback vol_aio[4];
+ char *vol_h_f;
StripeInitInfo()
{
diff --git a/src/iocore/cache/StripeSM.cc b/src/iocore/cache/StripeSM.cc
index b360e7dc81..f5861df320 100644
--- a/src/iocore/cache/StripeSM.cc
+++ b/src/iocore/cache/StripeSM.cc
@@ -88,9 +88,9 @@ static void update_header_info(CacheVC *vc, Doc *doc);
static int evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int
force, StripeSM *stripe);
struct StripeInitInfo {
- off_t recover_pos;
- AIOCallbackInternal vol_aio[4];
- char *vol_h_f;
+ off_t recover_pos;
+ AIOCallback vol_aio[4];
+ char *vol_h_f;
StripeInitInfo()
{
diff --git a/src/iocore/cache/StripeSM.h b/src/iocore/cache/StripeSM.h
index 415dc9f1f6..7ee9515392 100644
--- a/src/iocore/cache/StripeSM.h
+++ b/src/iocore/cache/StripeSM.h
@@ -68,9 +68,9 @@ public:
int hit_evacuate_window{};
- off_t recover_pos = 0;
- off_t prev_recover_pos = 0;
- AIOCallbackInternal io;
+ off_t recover_pos = 0;
+ off_t prev_recover_pos = 0;
+ AIOCallback io;
Queue<CacheVC, Continuation::Link_link> sync;
diff --git a/src/iocore/cache/unit_tests/main.h
b/src/iocore/cache/unit_tests/main.h
index 57d59c2288..6663d5ec10 100644
--- a/src/iocore/cache/unit_tests/main.h
+++ b/src/iocore/cache/unit_tests/main.h
@@ -33,7 +33,7 @@
#include "records/RecordsConfig.h"
#include "records/RecProcess.h"
-#include "../../aio/P_AIO.h"
+#include "iocore/aio/AIO.h"
#include "../P_CacheDisk.h"
#include "../../net/P_Net.h"
#include "CacheTestHandler.h"
diff --git a/src/iocore/cache/unit_tests/test_doubles.h
b/src/iocore/cache/unit_tests/test_doubles.h
index 65337efba2..f76efbbd84 100644
--- a/src/iocore/cache/unit_tests/test_doubles.h
+++ b/src/iocore/cache/unit_tests/test_doubles.h
@@ -26,6 +26,12 @@
#include "main.h"
#include "tscore/EventNotify.h"
+#include "tscore/ink_config.h"
+#include "tscore/ink_hrtime.h"
+
+#if TS_USE_LINUX_IO_URING
+#include "iocore/io_uring/IO_URING.h"
+#endif
#include <cstdint>
#include <cstring>
@@ -106,7 +112,15 @@ public:
{
this->_notifier.lock();
while (!this->_got_callback) {
+#if TS_USE_LINUX_IO_URING
+ // The cache tests make some assumptions that AIO is done on other
threads
+ // and don't go through the event loop but instead call this. This means
+ // that io_uring ops aren't submitted or serviced. Having this here
+ // gets io_uring going again.
+ IOUringContext::local_context()->submit_and_wait(HRTIME_MSECONDS(100));
+#else
this->_notifier.wait();
+#endif
}
this->_notifier.unlock();
}
diff --git a/src/iocore/io_uring/IOUringEventIO.cc
b/src/iocore/io_uring/IOUringEventIO.cc
index d36d38873c..15180c5180 100644
--- a/src/iocore/io_uring/IOUringEventIO.cc
+++ b/src/iocore/io_uring/IOUringEventIO.cc
@@ -34,7 +34,7 @@ IOUringEventIO::start(EventLoop l, IOUringContext *h)
}
void
-IOUringEventIO::process_event(int flags)
+IOUringEventIO::process_event(int /* flags ATS_UNUSED */)
{
_h->service();
}