This is an automated email from the ASF dual-hosted git repository.
cmcfarlen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 90dbc21a54 Remove eventsystem Inline.cc and cleanup private headers.
(#12581)
90dbc21a54 is described below
commit 90dbc21a541986db6b223649cb4f798e190a550f
Author: Chris McFarlen <[email protected]>
AuthorDate: Sat Nov 29 11:20:09 2025 -0600
Remove eventsystem Inline.cc and cleanup private headers. (#12581)
* Remove eventsystem Inline.cc and cleanup private headers.
* include EThread to get this_ethread
* Fix include in RecProcess.cc
* Remove I_VIO_h
* restore DELAY_FOR_RETRY
---------
Co-authored-by: Chris McFarlen <[email protected]>
---
include/iocore/eventsystem/EThread.h | 25 +-
include/iocore/eventsystem/Event.h | 20 +-
include/iocore/eventsystem/Freer.h | 8 +-
include/iocore/eventsystem/IOBuffer.h | 41 +
include/iocore/eventsystem/ProtectedQueue.h | 50 ++
include/iocore/eventsystem/Thread.h | 12 +-
include/iocore/eventsystem/VConnection.h | 56 +-
include/iocore/eventsystem/VIO.h | 66 +-
include/ts/InkAPIPrivateIOCore.h | 9 +-
src/api/InkVConnInternal.cc | 2 +
src/iocore/cache/RegressionSM.cc | 1 +
src/iocore/cache/unit_tests/main.cc | 2 +
src/iocore/eventsystem/CMakeLists.txt | 2 +-
src/iocore/eventsystem/ConfigProcessor.cc | 2 +-
src/iocore/eventsystem/EventSystem.cc | 5 +-
src/iocore/eventsystem/IOBuffer.cc | 965 +++++++++++++++++++-
src/iocore/eventsystem/Inline.cc | 30 -
src/iocore/eventsystem/Lock.cc | 4 +-
src/iocore/eventsystem/PQ-List.cc | 3 +-
src/iocore/eventsystem/P_EventSystem.h | 49 -
src/iocore/eventsystem/P_IOBuffer.h | 1034 ----------------------
src/iocore/eventsystem/P_ProtectedQueue.h | 83 --
src/iocore/eventsystem/P_Thread.h | 49 -
src/iocore/eventsystem/P_UnixEThread.h | 261 ------
src/iocore/eventsystem/P_UnixEventProcessor.h | 246 -----
src/iocore/eventsystem/P_VConnection.h | 95 --
src/iocore/eventsystem/P_VIO.h | 123 ---
src/iocore/eventsystem/Processor.cc | 4 +-
src/iocore/eventsystem/ProtectedQueue.cc | 3 +-
src/iocore/eventsystem/RecProcess.cc | 3 +-
src/iocore/eventsystem/UnixEThread.cc | 207 ++++-
src/iocore/eventsystem/UnixEvent.cc | 4 +-
src/iocore/eventsystem/UnixEventProcessor.cc | 215 +++++
src/iocore/eventsystem/{P_UnixEvent.h => VIO.cc} | 42 +-
src/iocore/net/SSLSessionCache.cc | 2 +-
src/iocore/net/Socks.cc | 1 -
src/iocore/net/TLSEventSupport.cc | 1 +
src/iocore/utils/OneWayMultiTunnel.cc | 15 +-
src/iocore/utils/OneWayTunnel.cc | 2 +-
src/proxy/PluginVC.cc | 1 -
src/proxy/ReverseProxy.cc | 1 -
src/proxy/hdrs/HdrTSOnly.cc | 2 +-
src/proxy/http/HttpDebugNames.cc | 1 -
src/proxy/http/PreWarmManager.cc | 1 -
src/proxy/http/remap/PluginDso.cc | 1 -
src/proxy/http/remap/PluginFactory.cc | 2 +-
src/proxy/http3/Http09App.cc | 2 +-
src/proxy/http3/Http3App.cc | 2 +-
src/proxy/logging/Log.cc | 1 -
src/proxy/logging/LogBuffer.cc | 1 -
src/proxy/logging/LogFile.cc | 1 -
src/proxy/logging/LogObject.cc | 2 +-
src/traffic_server/SocksProxy.cc | 1 -
src/traffic_server/traffic_server.cc | 1 -
54 files changed, 1702 insertions(+), 2060 deletions(-)
diff --git a/include/iocore/eventsystem/EThread.h
b/include/iocore/eventsystem/EThread.h
index 07f1f938e6..99569b9fff 100644
--- a/include/iocore/eventsystem/EThread.h
+++ b/include/iocore/eventsystem/EThread.h
@@ -710,6 +710,29 @@ operator new(size_t, ink_dummy_for_new *p)
}
#define ETHREAD_GET_PTR(thread, offset) ((void *)((char *)(thread) + (offset)))
-extern EThread *this_ethread();
+inline EThread *
+this_ethread()
+{
+ return EThread::this_ethread_ptr;
+}
+
+inline EThread *
+this_event_thread()
+{
+ EThread *ethread = this_ethread();
+ if (ethread != nullptr && ethread->tt == REGULAR) {
+ return ethread;
+ } else {
+ return nullptr;
+ }
+}
+
+inline void
+EThread::free_event(Event *e)
+{
+ ink_assert(!e->in_the_priority_queue && !e->in_the_prot_queue);
+ e->mutex = nullptr;
+ EVENT_FREE(e, eventAllocator, this);
+}
extern int thread_max_heartbeat_mseconds;
diff --git a/include/iocore/eventsystem/Event.h
b/include/iocore/eventsystem/Event.h
index aef3db53ae..a167f1d3ba 100644
--- a/include/iocore/eventsystem/Event.h
+++ b/include/iocore/eventsystem/Event.h
@@ -228,9 +228,18 @@ public:
// Private
- Event();
+ Event() : in_the_prot_queue(false), in_the_priority_queue(false),
immediate(false), globally_allocated(true), in_heap(false) {}
- Event *init(Continuation *c, ink_hrtime atimeout_at = 0, ink_hrtime aperiod
= 0);
+ Event *
+ init(Continuation *c, ink_hrtime atimeout_at = 0, ink_hrtime aperiod = 0)
+ {
+ continuation = c;
+ timeout_at = atimeout_at;
+ period = aperiod;
+ immediate = !period && !atimeout_at;
+ cancelled = false;
+ return this;
+ }
#ifdef ENABLE_TIME_TRACE
ink_hrtime start_time;
@@ -282,6 +291,13 @@ public:
//
extern ClassAllocator<Event> eventAllocator;
+inline void
+Event::free()
+{
+ mutex = nullptr;
+ eventAllocator.free(this);
+}
+
#define EVENT_ALLOC(_a, _t) THREAD_ALLOC(_a, _t)
#define EVENT_FREE(_p, _a, _t) \
_p->mutex = nullptr; \
diff --git a/include/iocore/eventsystem/Freer.h
b/include/iocore/eventsystem/Freer.h
index 51738e036a..f19fcbd95b 100644
--- a/include/iocore/eventsystem/Freer.h
+++ b/include/iocore/eventsystem/Freer.h
@@ -52,7 +52,7 @@ public: // Needed by WinNT compiler (compiler bug)
// 1. Make sure to schedule a delete on an ET_TASK thread
// 2. Delay the delete (this should be used sparingly)
template <class C>
-TS_INLINE void
+void
new_Deleter(C *ap, ink_hrtime t)
{
if (t > 0) {
@@ -78,7 +78,7 @@ public: // Needed by WinNT compiler (compiler bug)
};
template <class C>
-TS_INLINE void
+void
new_FreeCaller(C *ap, ink_hrtime t)
{
eventProcessor.schedule_in(new FreeCallContinuation<C>(ap), t, ET_TASK);
@@ -103,7 +103,7 @@ struct FreerContinuation : public Continuation {
explicit FreerContinuation(void *ap) : Continuation(nullptr), p(ap) {
SET_HANDLER(&FreerContinuation::dieEvent); }
};
-TS_INLINE void
+inline void
new_Freer(void *ap, ink_hrtime t)
{
eventProcessor.schedule_in(new FreerContinuation(ap), t, ET_TASK);
@@ -128,7 +128,7 @@ template <class C> struct DereferContinuation : public
Continuation {
};
template <class C>
-TS_INLINE void
+void
new_Derefer(C *ap, ink_hrtime t)
{
eventProcessor.schedule_in(new DereferContinuation<C>(ap), t, ET_TASK);
diff --git a/include/iocore/eventsystem/IOBuffer.h
b/include/iocore/eventsystem/IOBuffer.h
index de01b59a6a..a8656aa6f7 100644
--- a/include/iocore/eventsystem/IOBuffer.h
+++ b/include/iocore/eventsystem/IOBuffer.h
@@ -1514,3 +1514,44 @@ IOBufferChain::iterator::operator->() const
{
return _b;
}
+
+//////////////////////////////////////////////////////////////
+//
+// returns 0 for DEFAULT_BUFFER_BASE_SIZE,
+// +1 for each power of 2
+//
+//////////////////////////////////////////////////////////////
+inline int64_t
+buffer_size_to_index(int64_t size, int64_t max)
+{
+ int64_t r = max;
+
+ while (r && BUFFER_SIZE_FOR_INDEX(r - 1) >= size) {
+ r--;
+ }
+ return r;
+}
+
+inline int64_t
+iobuffer_size_to_index(int64_t size, int64_t max)
+{
+ if (size > BUFFER_SIZE_FOR_INDEX(max)) {
+ return BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size);
+ }
+ return buffer_size_to_index(size, max);
+}
+
+inline int64_t
+index_to_buffer_size(int64_t idx)
+{
+ if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(idx)) {
+ return BUFFER_SIZE_FOR_INDEX(idx);
+ } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(idx)) {
+ return BUFFER_SIZE_FOR_XMALLOC(idx);
+ // coverity[dead_error_condition]
+ } else if (BUFFER_SIZE_INDEX_IS_CONSTANT(idx)) {
+ return BUFFER_SIZE_FOR_CONSTANT(idx);
+ }
+ // coverity[dead_error_line]
+ return 0;
+}
diff --git a/include/iocore/eventsystem/ProtectedQueue.h
b/include/iocore/eventsystem/ProtectedQueue.h
index bc1b3b2577..b24789e709 100644
--- a/include/iocore/eventsystem/ProtectedQueue.h
+++ b/include/iocore/eventsystem/ProtectedQueue.h
@@ -52,3 +52,53 @@ struct ProtectedQueue {
ProtectedQueue();
};
+
+inline ProtectedQueue::ProtectedQueue()
+{
+ Event e;
+ ink_mutex_init(&lock);
+ ink_atomiclist_init(&al, "ProtectedQueue", (char *)&e.link.next - (char
*)&e);
+ ink_cond_init(&might_have_data);
+}
+
+inline void
+ProtectedQueue::signal()
+{
+ // Need to get the lock before you can signal the thread
+ ink_mutex_acquire(&lock);
+ ink_cond_signal(&might_have_data);
+ ink_mutex_release(&lock);
+}
+
+inline int
+ProtectedQueue::try_signal()
+{
+ // Need to get the lock before you can signal the thread
+ if (ink_mutex_try_acquire(&lock)) {
+ ink_cond_signal(&might_have_data);
+ ink_mutex_release(&lock);
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+// Called from the same thread (don't need to signal)
+inline void
+ProtectedQueue::enqueue_local(Event *e)
+{
+ ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
+ e->in_the_prot_queue = 1;
+ localQueue.enqueue(e);
+}
+
+inline Event *
+ProtectedQueue::dequeue_local()
+{
+ Event *e = localQueue.dequeue();
+ if (e) {
+ ink_assert(e->in_the_prot_queue);
+ e->in_the_prot_queue = 0;
+ }
+ return e;
+}
diff --git a/include/iocore/eventsystem/Thread.h
b/include/iocore/eventsystem/Thread.h
index 22b006a496..0a34dd633c 100644
--- a/include/iocore/eventsystem/Thread.h
+++ b/include/iocore/eventsystem/Thread.h
@@ -107,7 +107,11 @@ public:
*/
Ptr<ProxyMutex> mutex;
- virtual void set_specific() = 0;
+ virtual void
+ set_specific()
+ {
+ this_thread_ptr = this;
+ }
static thread_local Thread *this_thread_ptr;
@@ -170,4 +174,8 @@ protected:
Thread();
};
-extern Thread *this_thread();
+inline Thread *
+this_thread()
+{
+ return Thread::this_thread_ptr;
+}
diff --git a/include/iocore/eventsystem/VConnection.h
b/include/iocore/eventsystem/VConnection.h
index 9c731181e7..66780a2fda 100644
--- a/include/iocore/eventsystem/VConnection.h
+++ b/include/iocore/eventsystem/VConnection.h
@@ -28,10 +28,6 @@
#include "tscore/PluginUserArgs.h"
#include "iocore/eventsystem/EventSystem.h"
-#if !defined(I_VIO_h)
-#error "include VIO.h"
-#endif
-
//
// Data Types
//
@@ -145,7 +141,7 @@ typedef struct tsapi_vio *TSVIO;
class VConnection : public Continuation
{
public:
- ~VConnection() override;
+ ~VConnection() override {}
/**
Read data from the VConnection.
@@ -306,17 +302,28 @@ public:
*/
virtual void do_io_shutdown(ShutdownHowTo_t howto) = 0;
- explicit VConnection(ProxyMutex *aMutex);
- explicit VConnection(Ptr<ProxyMutex> &aMutex);
+ explicit VConnection(ProxyMutex *aMutex) : Continuation(aMutex), lerrno(0) {
SET_HANDLER(nullptr); }
+ explicit VConnection(Ptr<ProxyMutex> &aMutex) : Continuation(aMutex),
lerrno(0) { SET_HANDLER(nullptr); }
// Private
// Set continuation on a given vio. The public interface
// is through VIO::set_continuation()
- virtual void set_continuation(VIO *vio, Continuation *cont);
+ virtual void
+ set_continuation(VIO *, Continuation *)
+ {
+ }
// Reenable a given vio. The public interface is through VIO::reenable
- virtual void reenable(VIO *vio);
- virtual void reenable_re(VIO *vio);
+ virtual void
+ reenable(VIO *)
+ {
+ }
+
+ virtual void
+ reenable_re(VIO *vio)
+ {
+ reenable(vio);
+ }
/**
Convenience function to retrieve information from VConnection.
@@ -411,3 +418,32 @@ struct DummyVConnection : public VConnection, public
PluginUserArgs<TS_USER_ARGS
explicit DummyVConnection(ProxyMutex *m) : VConnection(m) {}
};
+
+inline const char *
+get_vc_event_name(int event)
+{
+ switch (event) {
+ default:
+ return "unknown event";
+ case VC_EVENT_NONE:
+ return "VC_EVENT_NONE";
+ case VC_EVENT_IMMEDIATE:
+ return "VC_EVENT_IMMEDIATE";
+ case VC_EVENT_READ_READY:
+ return "VC_EVENT_READ_READY";
+ case VC_EVENT_WRITE_READY:
+ return "VC_EVENT_WRITE_READY";
+ case VC_EVENT_READ_COMPLETE:
+ return "VC_EVENT_READ_COMPLETE";
+ case VC_EVENT_WRITE_COMPLETE:
+ return "VC_EVENT_WRITE_COMPLETE";
+ case VC_EVENT_EOS:
+ return "VC_EVENT_EOS";
+ case VC_EVENT_ERROR:
+ return "VC_EVENT_ERROR";
+ case VC_EVENT_INACTIVITY_TIMEOUT:
+ return "VC_EVENT_INACTIVITY_TIMEOUT";
+ case VC_EVENT_ACTIVE_TIMEOUT:
+ return "VC_EVENT_ACTIVE_TIMEOUT";
+ }
+}
diff --git a/include/iocore/eventsystem/VIO.h b/include/iocore/eventsystem/VIO.h
index 9bfd32873f..c8da22b025 100644
--- a/include/iocore/eventsystem/VIO.h
+++ b/include/iocore/eventsystem/VIO.h
@@ -23,7 +23,6 @@
*/
#pragma once
-#define I_VIO_h
#include "IOBuffer.h"
@@ -56,13 +55,17 @@ class ProxyMutex;
class VIO
{
public:
- explicit VIO(int aop);
- VIO();
+ explicit VIO(int aop) : op(aop), buffer(), mutex(nullptr) {}
+ VIO() : buffer(), mutex(nullptr) {}
~VIO() {}
/** Interface for the VConnection that owns this handle. */
- Continuation *get_continuation() const;
- void set_continuation(Continuation *cont);
+ Continuation *
+ get_continuation() const
+ {
+ return cont;
+ }
+ void set_continuation(Continuation *cont);
/**
Set nbytes to be what is current available.
@@ -70,7 +73,15 @@ public:
Interface to set nbytes to be ndone + buffer.reader()->read_avail()
if a reader is set.
*/
- void done();
+ void
+ done()
+ {
+ if (buffer.reader()) {
+ nbytes = ndone + buffer.reader()->read_avail();
+ } else {
+ nbytes = ndone;
+ }
+ }
/**
Determine the number of bytes remaining.
@@ -81,15 +92,36 @@ public:
@return The number of bytes to be processed by the operation.
*/
- int64_t ntodo() const;
+ int64_t
+ ntodo() const
+ {
+ return nbytes - ndone;
+ }
/////////////////////
// buffer settings //
/////////////////////
- void set_writer(MIOBuffer *writer);
- void set_reader(IOBufferReader *reader);
- MIOBuffer *get_writer() const;
- IOBufferReader *get_reader() const;
+ void
+ set_writer(MIOBuffer *writer)
+ {
+ buffer.writer_for(writer);
+ }
+ void
+ set_reader(IOBufferReader *reader)
+ {
+ buffer.reader_for(reader);
+ }
+ MIOBuffer *
+ get_writer() const
+ {
+ return buffer.writer();
+ }
+
+ IOBufferReader *
+ get_reader() const
+ {
+ return (buffer.reader());
+ }
/**
Reenable the IO operation.
@@ -125,8 +157,16 @@ public:
*/
void reenable_re();
- void disable();
- bool is_disabled() const;
+ void
+ disable()
+ {
+ this->_disabled = true;
+ }
+ bool
+ is_disabled() const
+ {
+ return this->_disabled;
+ }
enum {
NONE = 0,
diff --git a/include/ts/InkAPIPrivateIOCore.h b/include/ts/InkAPIPrivateIOCore.h
index ac3b18245c..f87a251253 100644
--- a/include/ts/InkAPIPrivateIOCore.h
+++ b/include/ts/InkAPIPrivateIOCore.h
@@ -22,13 +22,8 @@
*/
#pragma once
-#if !defined(__GNUC__)
-#include "iocore/eventsystem/EventSystem.h"
-#include "iocore/net/Net.h"
-#else
-#include "../../src/iocore/eventsystem/P_EventSystem.h"
-#include "../../src/iocore/net/P_Net.h"
-#endif
+#include "iocore/eventsystem/VConnection.h"
+#include "ts/apidefs.h"
enum INKContInternalMagic_t {
INKCONT_INTERN_MAGIC_ALIVE = 0x00009631,
diff --git a/src/api/InkVConnInternal.cc b/src/api/InkVConnInternal.cc
index c0898f883f..950900e43b 100644
--- a/src/api/InkVConnInternal.cc
+++ b/src/api/InkVConnInternal.cc
@@ -21,8 +21,10 @@
limitations under the License.
*/
+#include "iocore/net/Net.h"
#include "ts/apidefs.h"
#include "ts/InkAPIPrivateIOCore.h"
+#include "tscore/ink_atomic.h"
ClassAllocator<INKVConnInternal> INKVConnAllocator("INKVConnAllocator");
diff --git a/src/iocore/cache/RegressionSM.cc b/src/iocore/cache/RegressionSM.cc
index 8513773cbc..57ad08dd3a 100644
--- a/src/iocore/cache/RegressionSM.cc
+++ b/src/iocore/cache/RegressionSM.cc
@@ -23,6 +23,7 @@
#include "RegressionSM.h"
#include "iocore/eventsystem/EventProcessor.h"
+#include "iocore/eventsystem/EThread.h"
#define REGRESSION_SM_RETRY (100 * HRTIME_MSECOND)
diff --git a/src/iocore/cache/unit_tests/main.cc
b/src/iocore/cache/unit_tests/main.cc
index 56e9e05edc..e01dd8463a 100644
--- a/src/iocore/cache/unit_tests/main.cc
+++ b/src/iocore/cache/unit_tests/main.cc
@@ -23,10 +23,12 @@
#include "../P_CacheInternal.h"
#include "api/HttpAPIHooks.h"
+#include "iocore/net/Net.h"
#include "iocore/net/NetProcessor.h"
#include "records/RecordsConfig.h"
#include "tscore/ink_config.h"
#include "tscore/Layout.h"
+#include "tscore/TSSystemState.h"
#include "main.h"
#include "swoc/swoc_file.h"
diff --git a/src/iocore/eventsystem/CMakeLists.txt
b/src/iocore/eventsystem/CMakeLists.txt
index ab19fdcca6..1eaf5cd29d 100644
--- a/src/iocore/eventsystem/CMakeLists.txt
+++ b/src/iocore/eventsystem/CMakeLists.txt
@@ -19,7 +19,6 @@ add_library(
inkevent STATIC
EventSystem.cc
IOBuffer.cc
- Inline.cc
Lock.cc
MIOBufferWriter.cc
PQ-List.cc
@@ -36,6 +35,7 @@ add_library(
RecRawStatsImpl.cc
RecProcess.cc
Watchdog.cc
+ VIO.cc
)
add_library(ts::inkevent ALIAS inkevent)
diff --git a/src/iocore/eventsystem/ConfigProcessor.cc
b/src/iocore/eventsystem/ConfigProcessor.cc
index 028968ba51..a49335154a 100644
--- a/src/iocore/eventsystem/ConfigProcessor.cc
+++ b/src/iocore/eventsystem/ConfigProcessor.cc
@@ -22,7 +22,7 @@
*/
#include "iocore/eventsystem/ConfigProcessor.h"
-#include "P_EventSystem.h"
+#include "tscore/ink_atomic.h"
#if TS_HAS_TESTS
#include "tscore/TestBox.h"
#endif
diff --git a/src/iocore/eventsystem/EventSystem.cc
b/src/iocore/eventsystem/EventSystem.cc
index 73c10ecd42..3aeeca562a 100644
--- a/src/iocore/eventsystem/EventSystem.cc
+++ b/src/iocore/eventsystem/EventSystem.cc
@@ -28,10 +28,13 @@
****************************************************************************/
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/EventSystem.h"
+#include "tscore/Version.h"
#include "tscore/hugepages.h"
#include "records/RecCore.h"
+static constexpr ts::ModuleVersion
EVENT_SYSTEM_MODULE_INTERNAL_VERSION{EVENT_SYSTEM_MODULE_PUBLIC_VERSION,
+
ts::ModuleVersion::PRIVATE};
void
ink_event_system_init(ts::ModuleVersion v)
{
diff --git a/src/iocore/eventsystem/IOBuffer.cc
b/src/iocore/eventsystem/IOBuffer.cc
index 66decc4bac..9bedf0a6c6 100644
--- a/src/iocore/eventsystem/IOBuffer.cc
+++ b/src/iocore/eventsystem/IOBuffer.cc
@@ -25,13 +25,974 @@
UIOBuffer.cc
**************************************************************************/
+#include "iocore/eventsystem/Thread.h"
#include "tscore/Allocator.h"
-#include "tscore/ink_defs.h"
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/IOBuffer.h"
#include "swoc/Lexicon.h"
+#include "tscore/Diags.h"
+#include "tscore/ink_memory.h"
#include <optional>
+// TODO: I think we're overly aggressive here on making MIOBuffer 64-bit
+// but not sure it's worthwhile changing anything to 32-bit honestly.
+
+IOBufferBlock *
+iobufferblock_clone(IOBufferBlock *src, int64_t offset, int64_t len)
+{
+ IOBufferBlock *start_buf = nullptr;
+ IOBufferBlock *current_buf = nullptr;
+
+ while (src && len >= 0) {
+ char *start = src->_start;
+ char *end = src->_end;
+ int64_t max_bytes = end - start;
+
+ max_bytes -= offset;
+ if (max_bytes <= 0) {
+ offset = -max_bytes;
+ src = src->next.get();
+ continue;
+ }
+
+ int64_t bytes = len;
+ if (bytes >= max_bytes) {
+ bytes = max_bytes;
+ }
+
+ IOBufferBlock *new_buf = src->clone();
+ new_buf->_start += offset;
+ new_buf->_buf_end = new_buf->_end = new_buf->_start + bytes;
+
+ if (!start_buf) {
+ start_buf = new_buf;
+ current_buf = start_buf;
+ } else {
+ current_buf->next = new_buf;
+ current_buf = new_buf;
+ }
+
+ len -= bytes;
+ src = src->next.get();
+ offset = 0;
+ }
+
+ return start_buf;
+}
+
+IOBufferBlock *
+iobufferblock_skip(IOBufferBlock *b, int64_t *poffset, int64_t *plen, int64_t
write)
+{
+ int64_t offset = *poffset;
+ int64_t len = write;
+
+ while (b && len >= 0) {
+ int64_t max_bytes = b->read_avail();
+
+ // If this block ends before the start offset, skip it
+ // and adjust the offset to consume its length.
+ max_bytes -= offset;
+ if (max_bytes <= 0) {
+ offset = -max_bytes;
+ b = b->next.get();
+ continue;
+ }
+
+ if (len >= max_bytes) {
+ b = b->next.get();
+ len -= max_bytes;
+ offset = 0;
+ } else {
+ offset = offset + len;
+ break;
+ }
+ }
+
+ *poffset = offset;
+ *plen -= write;
+ return b;
+}
+
+void
+iobuffer_mem_inc(const char *_loc, int64_t _size_index)
+{
+ if (!res_track_memory) {
+ return;
+ }
+
+ if (!BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
+ return;
+ }
+
+ if (!_loc) {
+ _loc = "memory/IOBuffer/UNKNOWN-LOCATION";
+ }
+ ResourceTracker::increment(_loc, index_to_buffer_size(_size_index));
+}
+
+void
+iobuffer_mem_dec(const char *_loc, int64_t _size_index)
+{
+ if (!res_track_memory) {
+ return;
+ }
+
+ if (!BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
+ return;
+ }
+ if (!_loc) {
+ _loc = "memory/IOBuffer/UNKNOWN-LOCATION";
+ }
+ ResourceTracker::increment(_loc, -index_to_buffer_size(_size_index));
+}
+
+//////////////////////////////////////////////////////////////////
+//
+// inline functions definitions
+//
+//////////////////////////////////////////////////////////////////
+//////////////////////////////////////////////////////////////////
+//
+// class IOBufferData --
+// inline functions definitions
+//
+//////////////////////////////////////////////////////////////////
+int64_t
+IOBufferData::block_size()
+{
+ return index_to_buffer_size(_size_index);
+}
+
+IOBufferData *
+new_IOBufferData_internal(const char *location, void *b, int64_t size, int64_t
asize_index)
+{
+ (void)size;
+ IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_thread());
+ d->_size_index = asize_index;
+ ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index) || size <=
d->block_size());
+ d->_location = location;
+ d->_data = static_cast<char *>(b);
+ return d;
+}
+
+IOBufferData *
+new_xmalloc_IOBufferData_internal(const char *location, void *b, int64_t size)
+{
+ return new_IOBufferData_internal(location, b, size,
BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size));
+}
+
+IOBufferData *
+new_IOBufferData_internal(const char *loc, int64_t size_index, AllocType type)
+{
+ IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_thread());
+ d->_location = loc;
+ d->alloc(size_index, type);
+ return d;
+}
+
+// IRIX has a compiler bug which prevents this function
+// from being compiled correctly at -O3
+// so it is DUPLICATED in IOBuffer.cc
+// ****** IF YOU CHANGE THIS FUNCTION change that one as well.
+void
+IOBufferData::alloc(int64_t size_index, AllocType type)
+{
+ if (_data) {
+ dealloc();
+ }
+ _size_index = size_index;
+ _mem_type = type;
+ iobuffer_mem_inc(_location, size_index);
+ switch (type) {
+ case MEMALIGNED:
+ if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(size_index)) {
+ _data = static_cast<char *>(ioBufAllocator[size_index].alloc_void());
+ // coverity[dead_error_condition]
+ } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(size_index)) {
+ _data = static_cast<char *>(ats_memalign(ats_pagesize(),
index_to_buffer_size(size_index)));
+ }
+ break;
+ default:
+ case DEFAULT_ALLOC:
+ if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(size_index)) {
+ _data = static_cast<char *>(ioBufAllocator[size_index].alloc_void());
+ } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(size_index)) {
+ _data = static_cast<char
*>(ats_malloc(BUFFER_SIZE_FOR_XMALLOC(size_index)));
+ }
+ break;
+ }
+}
+
+// ****** IF YOU CHANGE THIS FUNCTION change that one as well.
+
+void
+IOBufferData::dealloc()
+{
+ iobuffer_mem_dec(_location, _size_index);
+ switch (_mem_type) {
+ case MEMALIGNED:
+ if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
+ ioBufAllocator[_size_index].free_void(_data);
+ } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(_size_index)) {
+ ::free(_data);
+ }
+ break;
+ default:
+ case DEFAULT_ALLOC:
+ if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
+ ioBufAllocator[_size_index].free_void(_data);
+ } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(_size_index)) {
+ ats_free(_data);
+ }
+ break;
+ }
+ _data = nullptr;
+ _size_index = BUFFER_SIZE_NOT_ALLOCATED;
+ _mem_type = NO_ALLOC;
+}
+
+void
+IOBufferData::free()
+{
+ dealloc();
+ THREAD_FREE(this, ioDataAllocator, this_thread());
+}
+
+//////////////////////////////////////////////////////////////////
+//
+// class IOBufferBlock --
+// inline functions definitions
+//
+//////////////////////////////////////////////////////////////////
+IOBufferBlock *
+new_IOBufferBlock_internal(const char *location)
+{
+ IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_thread());
+ b->_location = location;
+ return b;
+}
+
+IOBufferBlock *
+new_IOBufferBlock_internal(const char *location, IOBufferData *d, int64_t len,
int64_t offset)
+{
+ IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_thread());
+ b->_location = location;
+ b->set(d, len, offset);
+ return b;
+}
+
+IOBufferBlock::IOBufferBlock()
+{
+ return;
+}
+
+void
+IOBufferBlock::consume(int64_t len)
+{
+ _start += len;
+ ink_assert(_start <= _end);
+}
+
+void
+IOBufferBlock::fill(int64_t len)
+{
+ _end += len;
+ ink_assert(_end <= _buf_end);
+}
+
+void
+IOBufferBlock::reset()
+{
+ _end = _start = buf();
+ _buf_end = buf() + data->block_size();
+}
+
+void
+IOBufferBlock::alloc(int64_t i)
+{
+ ink_assert(BUFFER_SIZE_ALLOCATED(i));
+ data = new_IOBufferData_internal(_location, i);
+ reset();
+}
+
+void
+IOBufferBlock::clear()
+{
+ data = nullptr;
+
+ IOBufferBlock *p = next.get();
+ while (p) {
+ // If our block pointer refcount dropped to zero,
+ // recursively free the list.
+ if (p->refcount_dec() == 0) {
+ IOBufferBlock *n = p->next.detach();
+ p->free();
+ p = n;
+ } else {
+ // We don't hold the last refcount, so we are done.
+ break;
+ }
+ }
+
+ // Nuke the next pointer without dropping the refcount
+ // because we already manually did that.
+ next.detach();
+
+ _buf_end = _end = _start = nullptr;
+}
+
+IOBufferBlock *
+IOBufferBlock::clone() const
+{
+ IOBufferBlock *b = new_IOBufferBlock_internal(_location);
+ b->data = data;
+ b->_start = _start;
+ b->_end = _end;
+ b->_buf_end = _end;
+ b->_location = _location;
+ return b;
+}
+
+void
+IOBufferBlock::dealloc()
+{
+ clear();
+}
+
+void
+IOBufferBlock::free()
+{
+ dealloc();
+ THREAD_FREE(this, ioBlockAllocator, this_thread());
+}
+
+void
+IOBufferBlock::set_internal(void *b, int64_t len, int64_t asize_index)
+{
+ data = new_IOBufferData_internal(_location,
BUFFER_SIZE_NOT_ALLOCATED);
+ data->_data = static_cast<char *>(b);
+ iobuffer_mem_inc(_location, asize_index);
+ data->_size_index = asize_index;
+ reset();
+ _end = _start + len;
+}
+
+void
+IOBufferBlock::set(IOBufferData *d, int64_t len, int64_t offset)
+{
+ data = d;
+ _start = buf() + offset;
+ _end = _start + len;
+ _buf_end = buf() + d->block_size();
+}
+
+//////////////////////////////////////////////////////////////////
+//
+// class IOBufferReader --
+// inline functions definitions
+//
+//////////////////////////////////////////////////////////////////
+void
+IOBufferReader::skip_empty_blocks()
+{
+ while (block->next && block->next->read_avail() && start_offset >=
block->size()) {
+ start_offset -= block->size();
+ block = block->next;
+ }
+}
+
+bool
+IOBufferReader::low_water()
+{
+ return mbuf->low_water();
+}
+
+bool
+IOBufferReader::high_water()
+{
+ return read_avail() >= mbuf->water_mark;
+}
+
+bool
+IOBufferReader::current_low_water()
+{
+ return mbuf->current_low_water();
+}
+
+IOBufferBlock *
+IOBufferReader::get_current_block()
+{
+ return block.get();
+}
+
+char *
+IOBufferReader::start()
+{
+ if (!block) {
+ return nullptr;
+ }
+
+ skip_empty_blocks();
+ return block->start() + start_offset;
+}
+
+char *
+IOBufferReader::end()
+{
+ if (!block) {
+ return nullptr;
+ }
+
+ skip_empty_blocks();
+ return block->end();
+}
+
+int64_t
+IOBufferReader::block_read_avail()
+{
+ if (!block) {
+ return 0;
+ }
+
+ skip_empty_blocks();
+ return static_cast<int64_t>(block->end() - (block->start() + start_offset));
+}
+
+std::string_view
+IOBufferReader::block_read_view()
+{
+ const char *start = this->start(); // empty blocks are skipped in here.
+ return start ? std::string_view{start, static_cast<size_t>(block->end() -
start)} : std::string_view{};
+}
+
+int
+IOBufferReader::block_count()
+{
+ int count = 0;
+ IOBufferBlock *b = block.get();
+
+ while (b) {
+ count++;
+ b = b->next.get();
+ }
+
+ return count;
+}
+
+int64_t
+IOBufferReader::read_avail()
+{
+ int64_t t = 0;
+ IOBufferBlock *b = block.get();
+
+ while (b) {
+ t += b->read_avail();
+ b = b->next.get();
+ }
+
+ t -= start_offset;
+ if (size_limit != INT64_MAX && t > size_limit) {
+ t = size_limit;
+ }
+
+ return t;
+}
+
+bool
+IOBufferReader::is_read_avail_more_than(int64_t size)
+{
+ int64_t t = -start_offset;
+ IOBufferBlock *b = block.get();
+
+ while (b) {
+ t += b->read_avail();
+ if (t > size) {
+ return true;
+ }
+ b = b->next.get();
+ }
+ return false;
+}
+
+void
+IOBufferReader::consume(int64_t n)
+{
+ ink_assert(read_avail() >= n);
+ start_offset += n;
+ if (size_limit != INT64_MAX) {
+ size_limit -= n;
+ }
+
+ ink_assert(size_limit >= 0);
+ if (!block) {
+ return;
+ }
+
+ int64_t r = block->read_avail();
+ int64_t s = start_offset;
+ while (r <= s && block->next && block->next->read_avail()) {
+ s -= r;
+ start_offset = s;
+ block = block->next;
+ r = block->read_avail();
+ }
+}
+
+char &
+IOBufferReader::operator[](int64_t i)
+{
+ static char default_ret = '\0'; // This is just to avoid compiler
warnings...
+ IOBufferBlock *b = block.get();
+
+ i += start_offset;
+ while (b) {
+ int64_t bytes = b->read_avail();
+ if (bytes > i) {
+ return b->start()[i];
+ }
+ i -= bytes;
+ b = b->next.get();
+ }
+
+ ink_release_assert(!"out of range");
+ // Never used, just to satisfy compilers not understanding the fatality of
ink_release_assert().
+ return default_ret;
+}
+
+void
+IOBufferReader::clear()
+{
+ accessor = nullptr;
+ block = nullptr;
+ mbuf = nullptr;
+ start_offset = 0;
+ size_limit = INT64_MAX;
+}
+
+void
+IOBufferReader::reset()
+{
+ block = mbuf->_writer;
+ start_offset = 0;
+ size_limit = INT64_MAX;
+}
+
+////////////////////////////////////////////////////////////////
+//
+// class MIOBuffer --
+// inline functions definitions
+//
+////////////////////////////////////////////////////////////////
+extern ClassAllocator<MIOBuffer> ioAllocator;
+
+MIOBuffer::MIOBuffer(int64_t default_size_index)
+{
+ clear();
+ size_index = default_size_index;
+ _location = nullptr;
+ return;
+}
+
+MIOBuffer::MIOBuffer()
+{
+ clear();
+ _location = nullptr;
+ return;
+}
+
+MIOBuffer::~MIOBuffer()
+{
+ _writer = nullptr;
+ dealloc_all_readers();
+}
+
+MIOBuffer *
+new_MIOBuffer_internal(const char *location, int64_t size_index)
+{
+ MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_thread());
+ b->_location = location;
+ b->alloc(size_index);
+ b->water_mark = 0;
+ return b;
+}
+
+void
+free_MIOBuffer(MIOBuffer *mio)
+{
+ mio->_writer = nullptr;
+ mio->dealloc_all_readers();
+ THREAD_FREE(mio, ioAllocator, this_thread());
+}
+
+MIOBuffer *
+new_empty_MIOBuffer_internal(const char *location, int64_t size_index)
+{
+ MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_thread());
+ b->size_index = size_index;
+ b->water_mark = 0;
+ b->_location = location;
+ return b;
+}
+
+void
+free_empty_MIOBuffer(MIOBuffer *mio)
+{
+ THREAD_FREE(mio, ioAllocator, this_thread());
+}
+
+IOBufferReader *
+MIOBuffer::alloc_accessor(MIOBufferAccessor *anAccessor)
+{
+ int i;
+ for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
+ if (!readers[i].allocated()) {
+ break;
+ }
+ }
+
+ // TODO refactor code to return nullptr at some point
+ ink_release_assert(i < MAX_MIOBUFFER_READERS);
+
+ IOBufferReader *e = &readers[i];
+ e->mbuf = this;
+ e->reset();
+ e->accessor = anAccessor;
+
+ return e;
+}
+
+IOBufferReader *
+MIOBuffer::alloc_reader()
+{
+ int i;
+ for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
+ if (!readers[i].allocated()) {
+ break;
+ }
+ }
+
+ // TODO refactor code to return nullptr at some point
+ ink_release_assert(i < MAX_MIOBUFFER_READERS);
+
+ IOBufferReader *e = &readers[i];
+ e->mbuf = this;
+ e->reset();
+ e->accessor = nullptr;
+
+ return e;
+}
+
+int64_t
+MIOBuffer::block_size()
+{
+ return index_to_buffer_size(size_index);
+}
+IOBufferReader *
+MIOBuffer::clone_reader(IOBufferReader *r)
+{
+ int i;
+ for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
+ if (!readers[i].allocated()) {
+ break;
+ }
+ }
+
+ // TODO refactor code to return nullptr at some point
+ ink_release_assert(i < MAX_MIOBUFFER_READERS);
+
+ IOBufferReader *e = &readers[i];
+ e->mbuf = this;
+ e->accessor = nullptr;
+ e->block = r->block;
+ e->start_offset = r->start_offset;
+ e->size_limit = r->size_limit;
+ ink_assert(e->size_limit >= 0);
+
+ return e;
+}
+
+int64_t
+MIOBuffer::block_write_avail()
+{
+ IOBufferBlock *b = first_write_block();
+ return b ? b->write_avail() : 0;
+}
+
+////////////////////////////////////////////////////////////////
+//
+// MIOBuffer::append_block()
+//
+// Appends a block to writer->next and make it the current
+// block.
+// Note that the block is not appended to the end of the list.
+// That means that if writer->next was not null before this
+// call then the block that writer->next was pointing to will
+// have its reference count decremented and writer->next
+// will have a new value which is the new block.
+// In any case the new appended block becomes the current
+// block.
+//
+////////////////////////////////////////////////////////////////
+void
+MIOBuffer::append_block_internal(IOBufferBlock *b)
+{
+ // It would be nice to remove an empty buffer at the beginning,
+ // but this breaks HTTP.
+ if (!_writer) {
+ _writer = b;
+ init_readers();
+ } else {
+ ink_assert(!_writer->next || !_writer->next->read_avail());
+ _writer->next = b;
+ while (b->read_avail()) {
+ _writer = b;
+ b = b->next.get();
+ if (!b) {
+ break;
+ }
+ }
+ }
+ while (_writer->next && !_writer->write_avail() &&
_writer->next->read_avail()) {
+ _writer = _writer->next;
+ }
+}
+
+void
+MIOBuffer::append_block(IOBufferBlock *b)
+{
+ ink_assert(b->read_avail());
+ append_block_internal(b);
+}
+
+////////////////////////////////////////////////////////////////
+//
+// MIOBuffer::append_block()
+//
+// Allocate a block, appends it to current->next
+// and make the new block the current block (writer).
+//
+////////////////////////////////////////////////////////////////
+void
+MIOBuffer::append_block(int64_t asize_index)
+{
+ ink_assert(BUFFER_SIZE_ALLOCATED(asize_index));
+ IOBufferBlock *b = new_IOBufferBlock_internal(_location);
+ b->alloc(asize_index);
+ append_block_internal(b);
+ return;
+}
+
+void
+MIOBuffer::add_block()
+{
+ if (this->_writer == nullptr || this->_writer->next == nullptr) {
+ append_block(size_index);
+ }
+}
+
+void
+MIOBuffer::check_add_block()
+{
+ if (!high_water() && current_low_water()) {
+ add_block();
+ }
+}
+
+IOBufferBlock *
+MIOBuffer::get_current_block()
+{
+ return first_write_block();
+}
+
+//////////////////////////////////////////////////////////////////
+//
+// MIOBuffer::current_write_avail()
+//
+// returns the total space available in all blocks.
+// This function is different than write_avail() because
+// it will not append a new block if there is no space
+// or below the watermark space available.
+//
+//////////////////////////////////////////////////////////////////
+int64_t
+MIOBuffer::current_write_avail()
+{
+ int64_t t = 0;
+ IOBufferBlock *b = _writer.get();
+ while (b) {
+ t += b->write_avail();
+ b = b->next.get();
+ }
+ return t;
+}
+
+//////////////////////////////////////////////////////////////////
+//
+// MIOBuffer::write_avail()
+//
+// returns the number of bytes available in the current block.
+// If there is no current block or not enough free space in
+// the current block then a new block is appended.
+//
+//////////////////////////////////////////////////////////////////
+int64_t
+MIOBuffer::write_avail()
+{
+ check_add_block();
+ return current_write_avail();
+}
+
+void
+MIOBuffer::fill(int64_t len)
+{
+ int64_t f = _writer->write_avail();
+ while (f < len) {
+ _writer->fill(f);
+ len -= f;
+ if (len > 0) {
+ _writer = _writer->next;
+ }
+ f = _writer->write_avail();
+ }
+ _writer->fill(len);
+}
+
+int
+MIOBuffer::max_block_count()
+{
+ int maxb = 0;
+ for (auto &reader : readers) {
+ if (reader.allocated()) {
+ int c = reader.block_count();
+ if (c > maxb) {
+ maxb = c;
+ }
+ }
+ }
+ return maxb;
+}
+
+int64_t
+MIOBuffer::max_read_avail()
+{
+ int64_t s = 0;
+ int found = 0;
+ for (auto &reader : readers) {
+ if (reader.allocated()) {
+ int64_t ss = reader.read_avail();
+ if (ss > s) {
+ s = ss;
+ }
+ found = 1;
+ }
+ }
+ if (!found && _writer) {
+ return _writer->read_avail();
+ }
+ return s;
+}
+
+void
+MIOBuffer::set(void *b, int64_t len)
+{
+ _writer = new_IOBufferBlock_internal(_location);
+ _writer->set_internal(b, len, BUFFER_SIZE_INDEX_FOR_CONSTANT_SIZE(len));
+ init_readers();
+}
+
+void
+MIOBuffer::append_xmalloced(void *b, int64_t len)
+{
+ if (len == 0) {
+ return;
+ }
+
+ IOBufferBlock *x = new_IOBufferBlock_internal(_location);
+ x->set_internal(b, len, BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(len));
+ append_block_internal(x);
+}
+
+void
+MIOBuffer::append_fast_allocated(void *b, int64_t len, int64_t fast_size_index)
+{
+ IOBufferBlock *x = new_IOBufferBlock_internal(_location);
+ x->set_internal(b, len, fast_size_index);
+ append_block_internal(x);
+}
+
+void
+MIOBuffer::alloc(int64_t i)
+{
+ _writer = new_IOBufferBlock_internal(_location);
+ _writer->alloc(i);
+ size_index = i;
+ init_readers();
+}
+
+void
+MIOBuffer::dealloc_reader(IOBufferReader *e)
+{
+ if (e->accessor) {
+ ink_assert(e->accessor->writer() == this);
+ ink_assert(e->accessor->reader() == e);
+ e->accessor->clear();
+ }
+ e->clear();
+}
+
+IOBufferReader *
+IOBufferReader::clone()
+{
+ return mbuf->clone_reader(this);
+}
+
+void
+IOBufferReader::dealloc()
+{
+ mbuf->dealloc_reader(this);
+}
+
+void
+MIOBuffer::dealloc_all_readers()
+{
+ for (auto &reader : readers) {
+ if (reader.allocated()) {
+ dealloc_reader(&reader);
+ }
+ }
+}
+
+void
+MIOBufferAccessor::reader_for(MIOBuffer *abuf)
+{
+ mbuf = abuf;
+ if (abuf) {
+ entry = mbuf->alloc_accessor(this);
+ } else {
+ entry = nullptr;
+ }
+}
+
+void
+MIOBufferAccessor::reader_for(IOBufferReader *areader)
+{
+ if (entry == areader) {
+ return;
+ }
+ mbuf = areader->mbuf;
+ entry = areader;
+ ink_assert(mbuf);
+}
+
+void
+MIOBufferAccessor::writer_for(MIOBuffer *abuf)
+{
+ mbuf = abuf;
+ entry = nullptr;
+}
+
+MIOBufferAccessor::~MIOBufferAccessor() {}
+
//
// General Buffer Allocator
//
diff --git a/src/iocore/eventsystem/Inline.cc b/src/iocore/eventsystem/Inline.cc
deleted file mode 100644
index dc708c272e..0000000000
--- a/src/iocore/eventsystem/Inline.cc
+++ /dev/null
@@ -1,30 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-/*
- * Inline Functions as globals for users using the public interface
- *
- */
-
-#define TS_INLINE
-#include "P_EventSystem.h"
diff --git a/src/iocore/eventsystem/Lock.cc b/src/iocore/eventsystem/Lock.cc
index bee54f43a3..e5d14a0bc0 100644
--- a/src/iocore/eventsystem/Lock.cc
+++ b/src/iocore/eventsystem/Lock.cc
@@ -28,8 +28,8 @@
**************************************************************************/
-#include "P_EventSystem.h"
-#include "tscore/Diags.h"
+#include "iocore/eventsystem/Lock.h"
+#include "tsutil/DbgCtl.h"
ClassAllocator<ProxyMutex> mutexAllocator("mutexAllocator");
diff --git a/src/iocore/eventsystem/PQ-List.cc
b/src/iocore/eventsystem/PQ-List.cc
index 35af521a68..de8aad30ef 100644
--- a/src/iocore/eventsystem/PQ-List.cc
+++ b/src/iocore/eventsystem/PQ-List.cc
@@ -21,7 +21,8 @@
limitations under the License.
*/
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/PriorityEventQueue.h"
+#include "iocore/eventsystem/EThread.h"
PriorityEventQueue::PriorityEventQueue()
{
diff --git a/src/iocore/eventsystem/P_EventSystem.h
b/src/iocore/eventsystem/P_EventSystem.h
deleted file mode 100644
index 53b4b463f7..0000000000
--- a/src/iocore/eventsystem/P_EventSystem.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-/****************************************************************************
-
- Event Subsystem
-
-
-
-**************************************************************************/
-#pragma once
-#define _P_EventSystem_H
-
-#include "tscore/ink_platform.h"
-
-#include "iocore/eventsystem/EventSystem.h"
-#include "iocore/eventsystem/Freer.h"
-
-#include "P_Thread.h"
-#include "P_VIO.h"
-#include "P_IOBuffer.h"
-#include "P_VConnection.h"
-#include "P_UnixEvent.h"
-#include "P_UnixEThread.h"
-#include "P_ProtectedQueue.h"
-#include "P_UnixEventProcessor.h"
-
-static constexpr ts::ModuleVersion
EVENT_SYSTEM_MODULE_INTERNAL_VERSION{EVENT_SYSTEM_MODULE_PUBLIC_VERSION,
-
ts::ModuleVersion::PRIVATE};
diff --git a/src/iocore/eventsystem/P_IOBuffer.h
b/src/iocore/eventsystem/P_IOBuffer.h
deleted file mode 100644
index e159cbafc5..0000000000
--- a/src/iocore/eventsystem/P_IOBuffer.h
+++ /dev/null
@@ -1,1034 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-#pragma once
-
-#include "iocore/eventsystem/IOBuffer.h"
-#include "iocore/eventsystem/Thread.h"
-#include "tscore/ink_memory.h"
-#include "tscore/ink_resource.h"
-
-// TODO: I think we're overly aggressive here on making MIOBuffer 64-bit
-// but not sure it's worthwhile changing anything to 32-bit honestly.
-
-//////////////////////////////////////////////////////////////
-//
-// returns 0 for DEFAULT_BUFFER_BASE_SIZE,
-// +1 for each power of 2
-//
-//////////////////////////////////////////////////////////////
-TS_INLINE int64_t
-buffer_size_to_index(int64_t size, int64_t max)
-{
- int64_t r = max;
-
- while (r && BUFFER_SIZE_FOR_INDEX(r - 1) >= size) {
- r--;
- }
- return r;
-}
-
-TS_INLINE int64_t
-iobuffer_size_to_index(int64_t size, int64_t max)
-{
- if (size > BUFFER_SIZE_FOR_INDEX(max)) {
- return BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size);
- }
- return buffer_size_to_index(size, max);
-}
-
-TS_INLINE int64_t
-index_to_buffer_size(int64_t idx)
-{
- if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(idx)) {
- return BUFFER_SIZE_FOR_INDEX(idx);
- } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(idx)) {
- return BUFFER_SIZE_FOR_XMALLOC(idx);
- // coverity[dead_error_condition]
- } else if (BUFFER_SIZE_INDEX_IS_CONSTANT(idx)) {
- return BUFFER_SIZE_FOR_CONSTANT(idx);
- }
- // coverity[dead_error_line]
- return 0;
-}
-
-TS_INLINE IOBufferBlock *
-iobufferblock_clone(IOBufferBlock *src, int64_t offset, int64_t len)
-{
- IOBufferBlock *start_buf = nullptr;
- IOBufferBlock *current_buf = nullptr;
-
- while (src && len >= 0) {
- char *start = src->_start;
- char *end = src->_end;
- int64_t max_bytes = end - start;
-
- max_bytes -= offset;
- if (max_bytes <= 0) {
- offset = -max_bytes;
- src = src->next.get();
- continue;
- }
-
- int64_t bytes = len;
- if (bytes >= max_bytes) {
- bytes = max_bytes;
- }
-
- IOBufferBlock *new_buf = src->clone();
- new_buf->_start += offset;
- new_buf->_buf_end = new_buf->_end = new_buf->_start + bytes;
-
- if (!start_buf) {
- start_buf = new_buf;
- current_buf = start_buf;
- } else {
- current_buf->next = new_buf;
- current_buf = new_buf;
- }
-
- len -= bytes;
- src = src->next.get();
- offset = 0;
- }
-
- return start_buf;
-}
-
-TS_INLINE IOBufferBlock *
-iobufferblock_skip(IOBufferBlock *b, int64_t *poffset, int64_t *plen, int64_t
write)
-{
- int64_t offset = *poffset;
- int64_t len = write;
-
- while (b && len >= 0) {
- int64_t max_bytes = b->read_avail();
-
- // If this block ends before the start offset, skip it
- // and adjust the offset to consume its length.
- max_bytes -= offset;
- if (max_bytes <= 0) {
- offset = -max_bytes;
- b = b->next.get();
- continue;
- }
-
- if (len >= max_bytes) {
- b = b->next.get();
- len -= max_bytes;
- offset = 0;
- } else {
- offset = offset + len;
- break;
- }
- }
-
- *poffset = offset;
- *plen -= write;
- return b;
-}
-
-TS_INLINE void
-iobuffer_mem_inc(const char *_loc, int64_t _size_index)
-{
- if (!res_track_memory) {
- return;
- }
-
- if (!BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
- return;
- }
-
- if (!_loc) {
- _loc = "memory/IOBuffer/UNKNOWN-LOCATION";
- }
- ResourceTracker::increment(_loc, index_to_buffer_size(_size_index));
-}
-
-TS_INLINE void
-iobuffer_mem_dec(const char *_loc, int64_t _size_index)
-{
- if (!res_track_memory) {
- return;
- }
-
- if (!BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
- return;
- }
- if (!_loc) {
- _loc = "memory/IOBuffer/UNKNOWN-LOCATION";
- }
- ResourceTracker::increment(_loc, -index_to_buffer_size(_size_index));
-}
-
-//////////////////////////////////////////////////////////////////
-//
-// inline functions definitions
-//
-//////////////////////////////////////////////////////////////////
-//////////////////////////////////////////////////////////////////
-//
-// class IOBufferData --
-// inline functions definitions
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE int64_t
-IOBufferData::block_size()
-{
- return index_to_buffer_size(_size_index);
-}
-
-TS_INLINE IOBufferData *
-new_IOBufferData_internal(const char *location, void *b, int64_t size, int64_t
asize_index)
-{
- (void)size;
- IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_thread());
- d->_size_index = asize_index;
- ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index) || size <=
d->block_size());
- d->_location = location;
- d->_data = static_cast<char *>(b);
- return d;
-}
-
-TS_INLINE IOBufferData *
-new_xmalloc_IOBufferData_internal(const char *location, void *b, int64_t size)
-{
- return new_IOBufferData_internal(location, b, size,
BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size));
-}
-
-TS_INLINE IOBufferData *
-new_IOBufferData_internal(const char *loc, int64_t size_index, AllocType type)
-{
- IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_thread());
- d->_location = loc;
- d->alloc(size_index, type);
- return d;
-}
-
-// IRIX has a compiler bug which prevents this function
-// from being compiled correctly at -O3
-// so it is DUPLICATED in IOBuffer.cc
-// ****** IF YOU CHANGE THIS FUNCTION change that one as well.
-TS_INLINE void
-IOBufferData::alloc(int64_t size_index, AllocType type)
-{
- if (_data) {
- dealloc();
- }
- _size_index = size_index;
- _mem_type = type;
- iobuffer_mem_inc(_location, size_index);
- switch (type) {
- case MEMALIGNED:
- if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(size_index)) {
- _data = static_cast<char *>(ioBufAllocator[size_index].alloc_void());
- // coverity[dead_error_condition]
- } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(size_index)) {
- _data = static_cast<char *>(ats_memalign(ats_pagesize(),
index_to_buffer_size(size_index)));
- }
- break;
- default:
- case DEFAULT_ALLOC:
- if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(size_index)) {
- _data = static_cast<char *>(ioBufAllocator[size_index].alloc_void());
- } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(size_index)) {
- _data = static_cast<char
*>(ats_malloc(BUFFER_SIZE_FOR_XMALLOC(size_index)));
- }
- break;
- }
-}
-
-// ****** IF YOU CHANGE THIS FUNCTION change that one as well.
-
-TS_INLINE void
-IOBufferData::dealloc()
-{
- iobuffer_mem_dec(_location, _size_index);
- switch (_mem_type) {
- case MEMALIGNED:
- if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
- ioBufAllocator[_size_index].free_void(_data);
- } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(_size_index)) {
- ::free(_data);
- }
- break;
- default:
- case DEFAULT_ALLOC:
- if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
- ioBufAllocator[_size_index].free_void(_data);
- } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(_size_index)) {
- ats_free(_data);
- }
- break;
- }
- _data = nullptr;
- _size_index = BUFFER_SIZE_NOT_ALLOCATED;
- _mem_type = NO_ALLOC;
-}
-
-TS_INLINE void
-IOBufferData::free()
-{
- dealloc();
- THREAD_FREE(this, ioDataAllocator, this_thread());
-}
-
-//////////////////////////////////////////////////////////////////
-//
-// class IOBufferBlock --
-// inline functions definitions
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE IOBufferBlock *
-new_IOBufferBlock_internal(const char *location)
-{
- IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_thread());
- b->_location = location;
- return b;
-}
-
-TS_INLINE IOBufferBlock *
-new_IOBufferBlock_internal(const char *location, IOBufferData *d, int64_t len,
int64_t offset)
-{
- IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_thread());
- b->_location = location;
- b->set(d, len, offset);
- return b;
-}
-
-TS_INLINE
-IOBufferBlock::IOBufferBlock()
-{
- return;
-}
-
-TS_INLINE void
-IOBufferBlock::consume(int64_t len)
-{
- _start += len;
- ink_assert(_start <= _end);
-}
-
-TS_INLINE void
-IOBufferBlock::fill(int64_t len)
-{
- _end += len;
- ink_assert(_end <= _buf_end);
-}
-
-TS_INLINE void
-IOBufferBlock::reset()
-{
- _end = _start = buf();
- _buf_end = buf() + data->block_size();
-}
-
-TS_INLINE void
-IOBufferBlock::alloc(int64_t i)
-{
- ink_assert(BUFFER_SIZE_ALLOCATED(i));
- data = new_IOBufferData_internal(_location, i);
- reset();
-}
-
-TS_INLINE void
-IOBufferBlock::clear()
-{
- data = nullptr;
-
- IOBufferBlock *p = next.get();
- while (p) {
- // If our block pointer refcount dropped to zero,
- // recursively free the list.
- if (p->refcount_dec() == 0) {
- IOBufferBlock *n = p->next.detach();
- p->free();
- p = n;
- } else {
- // We don't hold the last refcount, so we are done.
- break;
- }
- }
-
- // Nuke the next pointer without dropping the refcount
- // because we already manually did that.
- next.detach();
-
- _buf_end = _end = _start = nullptr;
-}
-
-TS_INLINE IOBufferBlock *
-IOBufferBlock::clone() const
-{
- IOBufferBlock *b = new_IOBufferBlock_internal(_location);
- b->data = data;
- b->_start = _start;
- b->_end = _end;
- b->_buf_end = _end;
- b->_location = _location;
- return b;
-}
-
-TS_INLINE void
-IOBufferBlock::dealloc()
-{
- clear();
-}
-
-TS_INLINE void
-IOBufferBlock::free()
-{
- dealloc();
- THREAD_FREE(this, ioBlockAllocator, this_thread());
-}
-
-TS_INLINE void
-IOBufferBlock::set_internal(void *b, int64_t len, int64_t asize_index)
-{
- data = new_IOBufferData_internal(_location,
BUFFER_SIZE_NOT_ALLOCATED);
- data->_data = static_cast<char *>(b);
- iobuffer_mem_inc(_location, asize_index);
- data->_size_index = asize_index;
- reset();
- _end = _start + len;
-}
-
-TS_INLINE void
-IOBufferBlock::set(IOBufferData *d, int64_t len, int64_t offset)
-{
- data = d;
- _start = buf() + offset;
- _end = _start + len;
- _buf_end = buf() + d->block_size();
-}
-
-//////////////////////////////////////////////////////////////////
-//
-// class IOBufferReader --
-// inline functions definitions
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE void
-IOBufferReader::skip_empty_blocks()
-{
- while (block->next && block->next->read_avail() && start_offset >=
block->size()) {
- start_offset -= block->size();
- block = block->next;
- }
-}
-
-TS_INLINE bool
-IOBufferReader::low_water()
-{
- return mbuf->low_water();
-}
-
-TS_INLINE bool
-IOBufferReader::high_water()
-{
- return read_avail() >= mbuf->water_mark;
-}
-
-TS_INLINE bool
-IOBufferReader::current_low_water()
-{
- return mbuf->current_low_water();
-}
-
-TS_INLINE IOBufferBlock *
-IOBufferReader::get_current_block()
-{
- return block.get();
-}
-
-TS_INLINE char *
-IOBufferReader::start()
-{
- if (!block) {
- return nullptr;
- }
-
- skip_empty_blocks();
- return block->start() + start_offset;
-}
-
-TS_INLINE char *
-IOBufferReader::end()
-{
- if (!block) {
- return nullptr;
- }
-
- skip_empty_blocks();
- return block->end();
-}
-
-TS_INLINE int64_t
-IOBufferReader::block_read_avail()
-{
- if (!block) {
- return 0;
- }
-
- skip_empty_blocks();
- return static_cast<int64_t>(block->end() - (block->start() + start_offset));
-}
-
-inline std::string_view
-IOBufferReader::block_read_view()
-{
- const char *start = this->start(); // empty blocks are skipped in here.
- return start ? std::string_view{start, static_cast<size_t>(block->end() -
start)} : std::string_view{};
-}
-
-TS_INLINE int
-IOBufferReader::block_count()
-{
- int count = 0;
- IOBufferBlock *b = block.get();
-
- while (b) {
- count++;
- b = b->next.get();
- }
-
- return count;
-}
-
-TS_INLINE int64_t
-IOBufferReader::read_avail()
-{
- int64_t t = 0;
- IOBufferBlock *b = block.get();
-
- while (b) {
- t += b->read_avail();
- b = b->next.get();
- }
-
- t -= start_offset;
- if (size_limit != INT64_MAX && t > size_limit) {
- t = size_limit;
- }
-
- return t;
-}
-
-TS_INLINE bool
-IOBufferReader::is_read_avail_more_than(int64_t size)
-{
- int64_t t = -start_offset;
- IOBufferBlock *b = block.get();
-
- while (b) {
- t += b->read_avail();
- if (t > size) {
- return true;
- }
- b = b->next.get();
- }
- return false;
-}
-
-TS_INLINE void
-IOBufferReader::consume(int64_t n)
-{
- ink_assert(read_avail() >= n);
- start_offset += n;
- if (size_limit != INT64_MAX) {
- size_limit -= n;
- }
-
- ink_assert(size_limit >= 0);
- if (!block) {
- return;
- }
-
- int64_t r = block->read_avail();
- int64_t s = start_offset;
- while (r <= s && block->next && block->next->read_avail()) {
- s -= r;
- start_offset = s;
- block = block->next;
- r = block->read_avail();
- }
-}
-
-TS_INLINE char &
-IOBufferReader::operator[](int64_t i)
-{
- static char default_ret = '\0'; // This is just to avoid compiler
warnings...
- IOBufferBlock *b = block.get();
-
- i += start_offset;
- while (b) {
- int64_t bytes = b->read_avail();
- if (bytes > i) {
- return b->start()[i];
- }
- i -= bytes;
- b = b->next.get();
- }
-
- ink_release_assert(!"out of range");
- // Never used, just to satisfy compilers not understanding the fatality of
ink_release_assert().
- return default_ret;
-}
-
-TS_INLINE void
-IOBufferReader::clear()
-{
- accessor = nullptr;
- block = nullptr;
- mbuf = nullptr;
- start_offset = 0;
- size_limit = INT64_MAX;
-}
-
-TS_INLINE void
-IOBufferReader::reset()
-{
- block = mbuf->_writer;
- start_offset = 0;
- size_limit = INT64_MAX;
-}
-
-////////////////////////////////////////////////////////////////
-//
-// class MIOBuffer --
-// inline functions definitions
-//
-////////////////////////////////////////////////////////////////
-extern ClassAllocator<MIOBuffer> ioAllocator;
-
-TS_INLINE
-MIOBuffer::MIOBuffer(int64_t default_size_index)
-{
- clear();
- size_index = default_size_index;
- _location = nullptr;
- return;
-}
-
-TS_INLINE
-MIOBuffer::MIOBuffer()
-{
- clear();
- _location = nullptr;
- return;
-}
-
-TS_INLINE
-MIOBuffer::~MIOBuffer()
-{
- _writer = nullptr;
- dealloc_all_readers();
-}
-
-TS_INLINE MIOBuffer *
-new_MIOBuffer_internal(const char *location, int64_t size_index)
-{
- MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_thread());
- b->_location = location;
- b->alloc(size_index);
- b->water_mark = 0;
- return b;
-}
-
-TS_INLINE void
-free_MIOBuffer(MIOBuffer *mio)
-{
- mio->_writer = nullptr;
- mio->dealloc_all_readers();
- THREAD_FREE(mio, ioAllocator, this_thread());
-}
-
-TS_INLINE MIOBuffer *
-new_empty_MIOBuffer_internal(const char *location, int64_t size_index)
-{
- MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_thread());
- b->size_index = size_index;
- b->water_mark = 0;
- b->_location = location;
- return b;
-}
-
-TS_INLINE void
-free_empty_MIOBuffer(MIOBuffer *mio)
-{
- THREAD_FREE(mio, ioAllocator, this_thread());
-}
-
-TS_INLINE IOBufferReader *
-MIOBuffer::alloc_accessor(MIOBufferAccessor *anAccessor)
-{
- int i;
- for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
- if (!readers[i].allocated()) {
- break;
- }
- }
-
- // TODO refactor code to return nullptr at some point
- ink_release_assert(i < MAX_MIOBUFFER_READERS);
-
- IOBufferReader *e = &readers[i];
- e->mbuf = this;
- e->reset();
- e->accessor = anAccessor;
-
- return e;
-}
-
-TS_INLINE IOBufferReader *
-MIOBuffer::alloc_reader()
-{
- int i;
- for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
- if (!readers[i].allocated()) {
- break;
- }
- }
-
- // TODO refactor code to return nullptr at some point
- ink_release_assert(i < MAX_MIOBUFFER_READERS);
-
- IOBufferReader *e = &readers[i];
- e->mbuf = this;
- e->reset();
- e->accessor = nullptr;
-
- return e;
-}
-
-TS_INLINE int64_t
-MIOBuffer::block_size()
-{
- return index_to_buffer_size(size_index);
-}
-TS_INLINE IOBufferReader *
-MIOBuffer::clone_reader(IOBufferReader *r)
-{
- int i;
- for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
- if (!readers[i].allocated()) {
- break;
- }
- }
-
- // TODO refactor code to return nullptr at some point
- ink_release_assert(i < MAX_MIOBUFFER_READERS);
-
- IOBufferReader *e = &readers[i];
- e->mbuf = this;
- e->accessor = nullptr;
- e->block = r->block;
- e->start_offset = r->start_offset;
- e->size_limit = r->size_limit;
- ink_assert(e->size_limit >= 0);
-
- return e;
-}
-
-TS_INLINE int64_t
-MIOBuffer::block_write_avail()
-{
- IOBufferBlock *b = first_write_block();
- return b ? b->write_avail() : 0;
-}
-
-////////////////////////////////////////////////////////////////
-//
-// MIOBuffer::append_block()
-//
-// Appends a block to writer->next and make it the current
-// block.
-// Note that the block is not appended to the end of the list.
-// That means that if writer->next was not null before this
-// call then the block that writer->next was pointing to will
-// have its reference count decremented and writer->next
-// will have a new value which is the new block.
-// In any case the new appended block becomes the current
-// block.
-//
-////////////////////////////////////////////////////////////////
-TS_INLINE void
-MIOBuffer::append_block_internal(IOBufferBlock *b)
-{
- // It would be nice to remove an empty buffer at the beginning,
- // but this breaks HTTP.
- if (!_writer) {
- _writer = b;
- init_readers();
- } else {
- ink_assert(!_writer->next || !_writer->next->read_avail());
- _writer->next = b;
- while (b->read_avail()) {
- _writer = b;
- b = b->next.get();
- if (!b) {
- break;
- }
- }
- }
- while (_writer->next && !_writer->write_avail() &&
_writer->next->read_avail()) {
- _writer = _writer->next;
- }
-}
-
-TS_INLINE void
-MIOBuffer::append_block(IOBufferBlock *b)
-{
- ink_assert(b->read_avail());
- append_block_internal(b);
-}
-
-////////////////////////////////////////////////////////////////
-//
-// MIOBuffer::append_block()
-//
-// Allocate a block, appends it to current->next
-// and make the new block the current block (writer).
-//
-////////////////////////////////////////////////////////////////
-TS_INLINE void
-MIOBuffer::append_block(int64_t asize_index)
-{
- ink_assert(BUFFER_SIZE_ALLOCATED(asize_index));
- IOBufferBlock *b = new_IOBufferBlock_internal(_location);
- b->alloc(asize_index);
- append_block_internal(b);
- return;
-}
-
-TS_INLINE void
-MIOBuffer::add_block()
-{
- if (this->_writer == nullptr || this->_writer->next == nullptr) {
- append_block(size_index);
- }
-}
-
-TS_INLINE void
-MIOBuffer::check_add_block()
-{
- if (!high_water() && current_low_water()) {
- add_block();
- }
-}
-
-TS_INLINE IOBufferBlock *
-MIOBuffer::get_current_block()
-{
- return first_write_block();
-}
-
-//////////////////////////////////////////////////////////////////
-//
-// MIOBuffer::current_write_avail()
-//
-// returns the total space available in all blocks.
-// This function is different than write_avail() because
-// it will not append a new block if there is no space
-// or below the watermark space available.
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE int64_t
-MIOBuffer::current_write_avail()
-{
- int64_t t = 0;
- IOBufferBlock *b = _writer.get();
- while (b) {
- t += b->write_avail();
- b = b->next.get();
- }
- return t;
-}
-
-//////////////////////////////////////////////////////////////////
-//
-// MIOBuffer::write_avail()
-//
-// returns the number of bytes available in the current block.
-// If there is no current block or not enough free space in
-// the current block then a new block is appended.
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE int64_t
-MIOBuffer::write_avail()
-{
- check_add_block();
- return current_write_avail();
-}
-
-TS_INLINE void
-MIOBuffer::fill(int64_t len)
-{
- int64_t f = _writer->write_avail();
- while (f < len) {
- _writer->fill(f);
- len -= f;
- if (len > 0) {
- _writer = _writer->next;
- }
- f = _writer->write_avail();
- }
- _writer->fill(len);
-}
-
-TS_INLINE int
-MIOBuffer::max_block_count()
-{
- int maxb = 0;
- for (auto &reader : readers) {
- if (reader.allocated()) {
- int c = reader.block_count();
- if (c > maxb) {
- maxb = c;
- }
- }
- }
- return maxb;
-}
-
-TS_INLINE int64_t
-MIOBuffer::max_read_avail()
-{
- int64_t s = 0;
- int found = 0;
- for (auto &reader : readers) {
- if (reader.allocated()) {
- int64_t ss = reader.read_avail();
- if (ss > s) {
- s = ss;
- }
- found = 1;
- }
- }
- if (!found && _writer) {
- return _writer->read_avail();
- }
- return s;
-}
-
-TS_INLINE void
-MIOBuffer::set(void *b, int64_t len)
-{
- _writer = new_IOBufferBlock_internal(_location);
- _writer->set_internal(b, len, BUFFER_SIZE_INDEX_FOR_CONSTANT_SIZE(len));
- init_readers();
-}
-
-TS_INLINE void
-MIOBuffer::append_xmalloced(void *b, int64_t len)
-{
- if (len == 0) {
- return;
- }
-
- IOBufferBlock *x = new_IOBufferBlock_internal(_location);
- x->set_internal(b, len, BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(len));
- append_block_internal(x);
-}
-
-TS_INLINE void
-MIOBuffer::append_fast_allocated(void *b, int64_t len, int64_t fast_size_index)
-{
- IOBufferBlock *x = new_IOBufferBlock_internal(_location);
- x->set_internal(b, len, fast_size_index);
- append_block_internal(x);
-}
-
-TS_INLINE void
-MIOBuffer::alloc(int64_t i)
-{
- _writer = new_IOBufferBlock_internal(_location);
- _writer->alloc(i);
- size_index = i;
- init_readers();
-}
-
-TS_INLINE void
-MIOBuffer::dealloc_reader(IOBufferReader *e)
-{
- if (e->accessor) {
- ink_assert(e->accessor->writer() == this);
- ink_assert(e->accessor->reader() == e);
- e->accessor->clear();
- }
- e->clear();
-}
-
-TS_INLINE IOBufferReader *
-IOBufferReader::clone()
-{
- return mbuf->clone_reader(this);
-}
-
-TS_INLINE void
-IOBufferReader::dealloc()
-{
- mbuf->dealloc_reader(this);
-}
-
-TS_INLINE void
-MIOBuffer::dealloc_all_readers()
-{
- for (auto &reader : readers) {
- if (reader.allocated()) {
- dealloc_reader(&reader);
- }
- }
-}
-
-TS_INLINE void
-MIOBufferAccessor::reader_for(MIOBuffer *abuf)
-{
- mbuf = abuf;
- if (abuf) {
- entry = mbuf->alloc_accessor(this);
- } else {
- entry = nullptr;
- }
-}
-
-TS_INLINE void
-MIOBufferAccessor::reader_for(IOBufferReader *areader)
-{
- if (entry == areader) {
- return;
- }
- mbuf = areader->mbuf;
- entry = areader;
- ink_assert(mbuf);
-}
-
-TS_INLINE void
-MIOBufferAccessor::writer_for(MIOBuffer *abuf)
-{
- mbuf = abuf;
- entry = nullptr;
-}
-
-TS_INLINE
-MIOBufferAccessor::~MIOBufferAccessor() {}
diff --git a/src/iocore/eventsystem/P_ProtectedQueue.h
b/src/iocore/eventsystem/P_ProtectedQueue.h
deleted file mode 100644
index 80c72ddb75..0000000000
--- a/src/iocore/eventsystem/P_ProtectedQueue.h
+++ /dev/null
@@ -1,83 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-/****************************************************************************
-
- Protected Queue
-
-
- ****************************************************************************/
-#pragma once
-
-#include "iocore/eventsystem/EventSystem.h"
-
-TS_INLINE
-ProtectedQueue::ProtectedQueue()
-{
- Event e;
- ink_mutex_init(&lock);
- ink_atomiclist_init(&al, "ProtectedQueue", (char *)&e.link.next - (char
*)&e);
- ink_cond_init(&might_have_data);
-}
-
-TS_INLINE void
-ProtectedQueue::signal()
-{
- // Need to get the lock before you can signal the thread
- ink_mutex_acquire(&lock);
- ink_cond_signal(&might_have_data);
- ink_mutex_release(&lock);
-}
-
-TS_INLINE int
-ProtectedQueue::try_signal()
-{
- // Need to get the lock before you can signal the thread
- if (ink_mutex_try_acquire(&lock)) {
- ink_cond_signal(&might_have_data);
- ink_mutex_release(&lock);
- return 1;
- } else {
- return 0;
- }
-}
-
-// Called from the same thread (don't need to signal)
-TS_INLINE void
-ProtectedQueue::enqueue_local(Event *e)
-{
- ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
- e->in_the_prot_queue = 1;
- localQueue.enqueue(e);
-}
-
-TS_INLINE Event *
-ProtectedQueue::dequeue_local()
-{
- Event *e = localQueue.dequeue();
- if (e) {
- ink_assert(e->in_the_prot_queue);
- e->in_the_prot_queue = 0;
- }
- return e;
-}
diff --git a/src/iocore/eventsystem/P_Thread.h
b/src/iocore/eventsystem/P_Thread.h
deleted file mode 100644
index c4c1e07e71..0000000000
--- a/src/iocore/eventsystem/P_Thread.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-/****************************************************************************
-
- Basic Threads
-
-
-
-****************************************************************************/
-#pragma once
-
-#include "iocore/eventsystem/Thread.h"
-
-///////////////////////////////////////////////
-// Common Interface impl //
-///////////////////////////////////////////////
-
-TS_INLINE void
-Thread::set_specific()
-{
- this_thread_ptr = this;
-}
-
-TS_INLINE Thread *
-this_thread()
-{
- return Thread::this_thread_ptr;
-}
diff --git a/src/iocore/eventsystem/P_UnixEThread.h
b/src/iocore/eventsystem/P_UnixEThread.h
deleted file mode 100644
index 81568ef966..0000000000
--- a/src/iocore/eventsystem/P_UnixEThread.h
+++ /dev/null
@@ -1,261 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-/****************************************************************************
-
- P_UnixEThread.h
-
-
-
-*****************************************************************************/
-#pragma once
-
-#include "iocore/eventsystem/EThread.h"
-#include "iocore/eventsystem/EventProcessor.h"
-#include "tscore/ink_atomic.h"
-#include <execinfo.h>
-
-const ink_hrtime DELAY_FOR_RETRY = HRTIME_MSECONDS(10);
-
-TS_INLINE Event *
-EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
-{
- Event *e = ::eventAllocator.alloc();
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule(e->init(cont, 0, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event,
void *cookie)
-{
- Event *e = ::eventAllocator.alloc();
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule(e->init(cont, t, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_in(Continuation *cont, ink_hrtime t, int callback_event,
void *cookie)
-{
- Event *e = ::eventAllocator.alloc();
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule(e->init(cont, ink_get_hrtime() + t, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event,
void *cookie)
-{
- Event *e = ::eventAllocator.alloc();
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- if (t < 0) {
- return schedule(e->init(cont, t, t));
- } else {
- return schedule(e->init(cont, ink_get_hrtime() + t, t));
- }
-}
-
-TS_INLINE Event *
-EThread::schedule(Event *e)
-{
- e->ethread = this;
- if (tt != REGULAR) {
- ink_assert(tt == DEDICATED);
- return eventProcessor.schedule(e, ET_CALL);
- }
- if (e->continuation->mutex) {
- e->mutex = e->continuation->mutex;
- } else {
- e->mutex = e->continuation->mutex = e->ethread->mutex;
- }
- ink_assert(e->mutex.get());
-
- // Make sure client IP debugging works consistently
- // The continuation that gets scheduled later is not always the
- // client VC, it can be HttpCacheSM etc. so save the flags
- e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
-
- if (e->ethread == this_ethread()) {
- EventQueueExternal.enqueue_local(e);
- } else {
- EventQueueExternal.enqueue(e);
- }
-
- return e;
-}
-
-TS_INLINE Event *
-EThread::schedule_imm_local(Continuation *cont, int callback_event, void
*cookie)
-{
- Event *e = EVENT_ALLOC(eventAllocator, this);
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule_local(e->init(cont, 0, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_at_local(Continuation *cont, ink_hrtime t, int
callback_event, void *cookie)
-{
- Event *e = EVENT_ALLOC(eventAllocator, this);
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule_local(e->init(cont, t, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_in_local(Continuation *cont, ink_hrtime t, int
callback_event, void *cookie)
-{
- Event *e = EVENT_ALLOC(eventAllocator, this);
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule_local(e->init(cont, ink_get_hrtime() + t, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_every_local(Continuation *cont, ink_hrtime t, int
callback_event, void *cookie)
-{
- Event *e = EVENT_ALLOC(eventAllocator, this);
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- if (t < 0) {
- return schedule_local(e->init(cont, t, t));
- } else {
- return schedule_local(e->init(cont, ink_get_hrtime() + t, t));
- }
-}
-
-TS_INLINE Event *
-EThread::schedule_local(Event *e)
-{
- if (tt != REGULAR) {
- ink_assert(tt == DEDICATED);
- return eventProcessor.schedule(e, ET_CALL);
- }
- if (!e->mutex) {
- e->ethread = this;
- e->mutex = e->continuation->mutex;
- } else {
- ink_assert(e->ethread == this);
- }
- e->globally_allocated = false;
-
- // Make sure client IP debugging works consistently
- // The continuation that gets scheduled later is not always the
- // client VC, it can be HttpCacheSM etc. so save the flags
- e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
-
- // If you need to schedule an event from a different thread, use
Ethread::schedule_imm/at/in/every functions
- ink_release_assert(this == this_ethread());
-
- EventQueueExternal.enqueue_local(e);
- return e;
-}
-
-TS_INLINE Event *
-EThread::schedule_spawn(Continuation *c, int ev, void *cookie)
-{
- ink_assert(this != this_ethread()); // really broken to call this from the
same thread.
- if (start_event) {
- free_event(start_event);
- }
- start_event = EVENT_ALLOC(eventAllocator, this);
- start_event->ethread = this;
- start_event->mutex = this->mutex;
- start_event->init(c);
- start_event->callback_event = ev;
- start_event->cookie = cookie;
- return start_event;
-}
-
-TS_INLINE EThread *
-this_ethread()
-{
- return EThread::this_ethread_ptr;
-}
-
-TS_INLINE EThread *
-this_event_thread()
-{
- EThread *ethread = this_ethread();
- if (ethread != nullptr && ethread->tt == REGULAR) {
- return ethread;
- } else {
- return nullptr;
- }
-}
-
-TS_INLINE void
-EThread::free_event(Event *e)
-{
- ink_assert(!e->in_the_priority_queue && !e->in_the_prot_queue);
- e->mutex = nullptr;
- EVENT_FREE(e, eventAllocator, this);
-}
-
-TS_INLINE void
-EThread::set_tail_handler(LoopTailHandler *handler)
-{
- ink_atomic_swap(&tail_cb, handler);
-}
diff --git a/src/iocore/eventsystem/P_UnixEventProcessor.h
b/src/iocore/eventsystem/P_UnixEventProcessor.h
deleted file mode 100644
index eb4235602a..0000000000
--- a/src/iocore/eventsystem/P_UnixEventProcessor.h
+++ /dev/null
@@ -1,246 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-#pragma once
-
-#include <vector>
-
-#include <tscore/TSSystemState.h>
-
-#include "tscore/ink_align.h"
-#include "iocore/eventsystem/EventProcessor.h"
-
-const int LOAD_BALANCE_INTERVAL = 1;
-
-TS_INLINE off_t
-EventProcessor::allocate(int size)
-{
- static off_t start = INK_ALIGN(offsetof(EThread, thread_private), 16);
- static off_t loss = start - offsetof(EThread, thread_private);
- size = INK_ALIGN(size, 16); // 16 byte alignment
-
- int old;
- do {
- old = thread_data_used;
- if (old + loss + size > PER_THREAD_DATA) {
- return -1;
- }
- } while (!ink_atomic_cas(&thread_data_used, old, old + size));
-
- return (off_t)(old + start);
-}
-
-TS_INLINE EThread *
-EventProcessor::assign_thread(EventType etype)
-{
- int next;
- ThreadGroupDescriptor *tg = &thread_group[etype];
-
- ink_assert(etype < MAX_EVENT_TYPES);
- if (tg->_count > 1) {
- next = ++tg->_next_round_robin % tg->_count;
- } else {
- next = 0;
- }
- return tg->_thread[next];
-}
-
-// If thread_holding is the correct type, return it.
-//
-// Otherwise check if there is already an affinity associated with the
continuation,
-// return it if the type is the same, return the next available thread of
"etype" if
-// the type is different.
-//
-// Only assign new affinity when there is currently none.
-TS_INLINE EThread *
-EventProcessor::assign_affinity_by_type(Continuation *cont, EventType etype)
-{
- EThread *ethread = cont->mutex->thread_holding;
- if (!ethread->is_event_type(etype)) {
- ethread = cont->getThreadAffinity();
- if (ethread == nullptr || !ethread->is_event_type(etype)) {
- ethread = assign_thread(etype);
- }
- }
-
- if (cont->getThreadAffinity() == nullptr) {
- cont->setThreadAffinity(ethread);
- }
-
- return ethread;
-}
-
-TS_INLINE Event *
-EventProcessor::schedule(Event *e, EventType etype)
-{
- ink_assert(etype < MAX_EVENT_TYPES);
-
- if (TSSystemState::is_event_system_shut_down()) {
- return nullptr;
- }
-
- EThread *affinity_thread = e->continuation->getThreadAffinity();
- EThread *curr_thread = this_ethread();
- if (affinity_thread != nullptr && affinity_thread->is_event_type(etype)) {
- e->ethread = affinity_thread;
- } else {
- // Is the current thread eligible?
- if (curr_thread != nullptr && curr_thread->is_event_type(etype)) {
- e->ethread = curr_thread;
- } else {
- e->ethread = assign_thread(etype);
- }
- if (affinity_thread == nullptr) {
- e->continuation->setThreadAffinity(e->ethread);
- }
- }
-
- if (e->continuation->mutex) {
- e->mutex = e->continuation->mutex;
- }
-
- if (curr_thread != nullptr && e->ethread == curr_thread) {
- e->ethread->EventQueueExternal.enqueue_local(e);
- } else {
- e->ethread->EventQueueExternal.enqueue(e);
- }
-
- return e;
-}
-
-TS_INLINE Event *
-EventProcessor::schedule_imm(Continuation *cont, EventType et, int
callback_event, void *cookie)
-{
- Event *e = eventAllocator.alloc();
-
- ink_assert(et < MAX_EVENT_TYPES);
-#ifdef ENABLE_TIME_TRACE
- e->start_time = ink_get_hrtime();
-#endif
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule(e->init(cont, 0, 0), et);
-}
-
-TS_INLINE Event *
-EventProcessor::schedule_at(Continuation *cont, ink_hrtime t, EventType et,
int callback_event, void *cookie)
-{
- Event *e = eventAllocator.alloc();
-
- ink_assert(t > 0);
- ink_assert(et < MAX_EVENT_TYPES);
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule(e->init(cont, t, 0), et);
-}
-
-TS_INLINE Event *
-EventProcessor::schedule_in(Continuation *cont, ink_hrtime t, EventType et,
int callback_event, void *cookie)
-{
- Event *e = eventAllocator.alloc();
-
- ink_assert(et < MAX_EVENT_TYPES);
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule(e->init(cont, ink_get_hrtime() + t, 0), et);
-}
-
-TS_INLINE Event *
-EventProcessor::schedule_every(Continuation *cont, ink_hrtime t, EventType et,
int callback_event, void *cookie)
-{
- Event *e = eventAllocator.alloc();
-
- ink_assert(t != 0);
- ink_assert(et < MAX_EVENT_TYPES);
-
-#ifdef ENABLE_EVENT_TRACKER
- e->set_location();
-#endif
-
- e->callback_event = callback_event;
- e->cookie = cookie;
- if (t < 0) {
- return schedule(e->init(cont, t, t), et);
- } else {
- return schedule(e->init(cont, ink_get_hrtime() + t, t), et);
- }
-}
-
-TS_INLINE std::vector<TSAction>
-EventProcessor::schedule_entire(Continuation *cont, ink_hrtime t, ink_hrtime
p, EventType et, int callback_event, void *cookie)
-{
- ThreadGroupDescriptor *tg = &thread_group[et];
- EThread *curr_thread = this_ethread();
-
- std::vector<TSAction> actions;
-
- for (int i = 0; i < tg->_count; i++) {
- Event *e = eventAllocator.alloc();
-
- e->ethread = tg->_thread[i];
- e->callback_event = callback_event;
- e->cookie = cookie;
-
- if (t == 0 && p == 0) {
- e->init(cont, 0, 0);
- } else if (t != 0 && p == 0) {
- e->init(cont, ink_get_hrtime() + t, 0);
- } else if (t == 0 && p != 0) {
- if (p < 0) {
- e->init(cont, p, p);
- } else {
- e->init(cont, ink_get_hrtime() + p, p);
- }
- } else {
- ink_assert(!"not reached");
- }
-
- e->mutex = new_ProxyMutex();
-
- if (curr_thread != nullptr && e->ethread == curr_thread) {
- e->ethread->EventQueueExternal.enqueue_local(e);
- } else {
- e->ethread->EventQueueExternal.enqueue(e);
- }
-
- /* This is a hack. Should be handled in ink_types */
- actions.push_back((TSAction)((uintptr_t) reinterpret_cast<TSAction>(e) |
0x1));
- }
-
- return actions;
-}
diff --git a/src/iocore/eventsystem/P_VConnection.h
b/src/iocore/eventsystem/P_VConnection.h
deleted file mode 100644
index ad49800452..0000000000
--- a/src/iocore/eventsystem/P_VConnection.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-#pragma once
-#include "iocore/eventsystem/EventSystem.h"
-
-TS_INLINE const char *
-get_vc_event_name(int event)
-{
- switch (event) {
- default:
- return "unknown event";
- case VC_EVENT_NONE:
- return "VC_EVENT_NONE";
- case VC_EVENT_IMMEDIATE:
- return "VC_EVENT_IMMEDIATE";
- case VC_EVENT_READ_READY:
- return "VC_EVENT_READ_READY";
- case VC_EVENT_WRITE_READY:
- return "VC_EVENT_WRITE_READY";
- case VC_EVENT_READ_COMPLETE:
- return "VC_EVENT_READ_COMPLETE";
- case VC_EVENT_WRITE_COMPLETE:
- return "VC_EVENT_WRITE_COMPLETE";
- case VC_EVENT_EOS:
- return "VC_EVENT_EOS";
- case VC_EVENT_ERROR:
- return "VC_EVENT_ERROR";
- case VC_EVENT_INACTIVITY_TIMEOUT:
- return "VC_EVENT_INACTIVITY_TIMEOUT";
- case VC_EVENT_ACTIVE_TIMEOUT:
- return "VC_EVENT_ACTIVE_TIMEOUT";
- }
-}
-
-TS_INLINE
-VConnection::VConnection(ProxyMutex *aMutex) : Continuation(aMutex), lerrno(0)
-{
- SET_HANDLER(nullptr);
-}
-
-TS_INLINE
-VConnection::VConnection(Ptr<ProxyMutex> &aMutex) : Continuation(aMutex),
lerrno(0)
-{
- SET_HANDLER(nullptr);
-}
-
-TS_INLINE
-VConnection::~VConnection() {}
-
-TS_INLINE VIO *
-vc_do_io_write(VConnection *vc, Continuation *cont, int64_t nbytes, MIOBuffer
*buf, int64_t offset)
-{
- IOBufferReader *reader = buf->alloc_reader();
-
- if (offset > 0) {
- reader->consume(offset);
- }
-
- return vc->do_io_write(cont, nbytes, reader, true);
-}
-
-TS_INLINE void
-VConnection::set_continuation(VIO *, Continuation *)
-{
-}
-TS_INLINE void
-VConnection::reenable(VIO *)
-{
-}
-TS_INLINE void
-VConnection::reenable_re(VIO *vio)
-{
- reenable(vio);
-}
diff --git a/src/iocore/eventsystem/P_VIO.h b/src/iocore/eventsystem/P_VIO.h
deleted file mode 100644
index de5f919e95..0000000000
--- a/src/iocore/eventsystem/P_VIO.h
+++ /dev/null
@@ -1,123 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-#pragma once
-#include "iocore/eventsystem/VIO.h"
-
-TS_INLINE
-VIO::VIO(int aop) : op(aop), buffer(), mutex(nullptr) {}
-
-TS_INLINE
-VIO::VIO() : buffer(), mutex(nullptr) {}
-
-TS_INLINE Continuation *
-VIO::get_continuation() const
-{
- return cont;
-}
-
-TS_INLINE void
-VIO::set_writer(MIOBuffer *writer)
-{
- buffer.writer_for(writer);
-}
-
-TS_INLINE void
-VIO::set_reader(IOBufferReader *reader)
-{
- buffer.reader_for(reader);
-}
-
-TS_INLINE MIOBuffer *
-VIO::get_writer() const
-{
- return buffer.writer();
-}
-
-TS_INLINE IOBufferReader *
-VIO::get_reader() const
-{
- return (buffer.reader());
-}
-
-TS_INLINE int64_t
-VIO::ntodo() const
-{
- return nbytes - ndone;
-}
-
-TS_INLINE void
-VIO::done()
-{
- if (buffer.reader()) {
- nbytes = ndone + buffer.reader()->read_avail();
- } else {
- nbytes = ndone;
- }
-}
-
-TS_INLINE void
-VIO::set_continuation(Continuation *acont)
-{
- if (vc_server) {
- vc_server->set_continuation(this, acont);
- }
- if (acont) {
- mutex = acont->mutex;
- cont = acont;
- } else {
- mutex = nullptr;
- cont = nullptr;
- }
- return;
-}
-
-TS_INLINE void
-VIO::reenable()
-{
- this->_disabled = false;
- if (vc_server) {
- vc_server->reenable(this);
- }
-}
-
-TS_INLINE void
-VIO::reenable_re()
-{
- this->_disabled = false;
- if (vc_server) {
- vc_server->reenable_re(this);
- }
-}
-
-TS_INLINE void
-VIO::disable()
-{
- this->_disabled = true;
-}
-
-TS_INLINE bool
-VIO::is_disabled() const
-{
- return this->_disabled;
-}
diff --git a/src/iocore/eventsystem/Processor.cc
b/src/iocore/eventsystem/Processor.cc
index 18887f2315..93fe5ca84e 100644
--- a/src/iocore/eventsystem/Processor.cc
+++ b/src/iocore/eventsystem/Processor.cc
@@ -40,7 +40,9 @@
****************************************************************************/
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/Processor.h"
+#include "tscore/ink_assert.h"
+
//////////////////////////////////////////////////////////////////////////////
//
// Processor::Processor()
diff --git a/src/iocore/eventsystem/ProtectedQueue.cc
b/src/iocore/eventsystem/ProtectedQueue.cc
index 6be82e388d..02ae16a7e0 100644
--- a/src/iocore/eventsystem/ProtectedQueue.cc
+++ b/src/iocore/eventsystem/ProtectedQueue.cc
@@ -30,7 +30,8 @@
*/
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/ProtectedQueue.h"
+#include "iocore/eventsystem/EThread.h"
// The protected queue is designed to delay signaling of threads
// until some amount of work has been completed on the current thread
diff --git a/src/iocore/eventsystem/RecProcess.cc
b/src/iocore/eventsystem/RecProcess.cc
index 81d698bab5..128154cb3e 100644
--- a/src/iocore/eventsystem/RecProcess.cc
+++ b/src/iocore/eventsystem/RecProcess.cc
@@ -21,13 +21,14 @@
limitations under the License.
*/
+#include "records/RecProcess.h"
+#include "iocore/eventsystem/EventProcessor.h"
#include "tscore/ink_platform.h"
#include "tscore/EventNotify.h"
#include "tsutil/Metrics.h"
#include "iocore/eventsystem/Tasks.h"
-#include "P_EventSystem.h"
#include "../../records/P_RecCore.h"
#include "../../records/P_RecMessage.h"
#include "../../records/P_RecUtils.h"
diff --git a/src/iocore/eventsystem/UnixEThread.cc
b/src/iocore/eventsystem/UnixEThread.cc
index 423b4d31ea..93578784c8 100644
--- a/src/iocore/eventsystem/UnixEThread.cc
+++ b/src/iocore/eventsystem/UnixEThread.cc
@@ -31,9 +31,11 @@
// The EThread Class
//
/////////////////////////////////////////////////////////////////////
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/EThread.h"
+#include "iocore/eventsystem/EventProcessor.h"
#include "iocore/eventsystem/Lock.h"
#include "tscore/ink_hrtime.h"
+#include "tscore/ink_atomic.h"
#if HAVE_EVENTFD
#include <sys/eventfd.h>
@@ -53,8 +55,9 @@ char const *const EThread::Metrics::Slice::STAT_NAME[] = {
"proxy.process.eventloop.io.wait.max", "proxy.process.eventloop.io.work.max",
};
-int thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS;
-int loop_time_update_probability = 10;
+int thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS;
+int loop_time_update_probability = 10;
+const ink_hrtime DELAY_FOR_RETRY = HRTIME_MSECONDS(10);
// To define a class inherits from Thread:
// 1) Define an independent thread_local static member
@@ -439,3 +442,201 @@ EThread::Metrics::summarize(Metrics &global)
global._api_timing += _api_timing;
}
}
+
+void
+EThread::set_tail_handler(LoopTailHandler *handler)
+{
+ ink_atomic_swap(&tail_cb, handler);
+}
+
+Event *
+EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
+{
+ Event *e = ::eventAllocator.alloc();
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule(e->init(cont, 0, 0));
+}
+
+Event *
+EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event,
void *cookie)
+{
+ Event *e = ::eventAllocator.alloc();
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule(e->init(cont, t, 0));
+}
+
+Event *
+EThread::schedule_in(Continuation *cont, ink_hrtime t, int callback_event,
void *cookie)
+{
+ Event *e = ::eventAllocator.alloc();
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule(e->init(cont, ink_get_hrtime() + t, 0));
+}
+
+Event *
+EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event,
void *cookie)
+{
+ Event *e = ::eventAllocator.alloc();
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ if (t < 0) {
+ return schedule(e->init(cont, t, t));
+ } else {
+ return schedule(e->init(cont, ink_get_hrtime() + t, t));
+ }
+}
+
+Event *
+EThread::schedule(Event *e)
+{
+ e->ethread = this;
+ if (tt != REGULAR) {
+ ink_assert(tt == DEDICATED);
+ return eventProcessor.schedule(e, ET_CALL);
+ }
+ if (e->continuation->mutex) {
+ e->mutex = e->continuation->mutex;
+ } else {
+ e->mutex = e->continuation->mutex = e->ethread->mutex;
+ }
+ ink_assert(e->mutex.get());
+
+ // Make sure client IP debugging works consistently
+ // The continuation that gets scheduled later is not always the
+ // client VC, it can be HttpCacheSM etc. so save the flags
+ e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
+
+ if (e->ethread == this_ethread()) {
+ EventQueueExternal.enqueue_local(e);
+ } else {
+ EventQueueExternal.enqueue(e);
+ }
+
+ return e;
+}
+
+Event *
+EThread::schedule_imm_local(Continuation *cont, int callback_event, void
*cookie)
+{
+ Event *e = EVENT_ALLOC(eventAllocator, this);
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule_local(e->init(cont, 0, 0));
+}
+
+Event *
+EThread::schedule_at_local(Continuation *cont, ink_hrtime t, int
callback_event, void *cookie)
+{
+ Event *e = EVENT_ALLOC(eventAllocator, this);
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule_local(e->init(cont, t, 0));
+}
+
+Event *
+EThread::schedule_in_local(Continuation *cont, ink_hrtime t, int
callback_event, void *cookie)
+{
+ Event *e = EVENT_ALLOC(eventAllocator, this);
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule_local(e->init(cont, ink_get_hrtime() + t, 0));
+}
+
+Event *
+EThread::schedule_every_local(Continuation *cont, ink_hrtime t, int
callback_event, void *cookie)
+{
+ Event *e = EVENT_ALLOC(eventAllocator, this);
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ if (t < 0) {
+ return schedule_local(e->init(cont, t, t));
+ } else {
+ return schedule_local(e->init(cont, ink_get_hrtime() + t, t));
+ }
+}
+
+Event *
+EThread::schedule_local(Event *e)
+{
+ if (tt != REGULAR) {
+ ink_assert(tt == DEDICATED);
+ return eventProcessor.schedule(e, ET_CALL);
+ }
+ if (!e->mutex) {
+ e->ethread = this;
+ e->mutex = e->continuation->mutex;
+ } else {
+ ink_assert(e->ethread == this);
+ }
+ e->globally_allocated = false;
+
+ // Make sure client IP debugging works consistently
+ // The continuation that gets scheduled later is not always the
+ // client VC, it can be HttpCacheSM etc. so save the flags
+ e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
+
+ // If you need to schedule an event from a different thread, use
Ethread::schedule_imm/at/in/every functions
+ ink_release_assert(this == this_ethread());
+
+ EventQueueExternal.enqueue_local(e);
+ return e;
+}
+
+Event *
+EThread::schedule_spawn(Continuation *c, int ev, void *cookie)
+{
+ ink_assert(this != this_ethread()); // really broken to call this from the
same thread.
+ if (start_event) {
+ free_event(start_event);
+ }
+ start_event = EVENT_ALLOC(eventAllocator, this);
+ start_event->ethread = this;
+ start_event->mutex = this->mutex;
+ start_event->init(c);
+ start_event->callback_event = ev;
+ start_event->cookie = cookie;
+ return start_event;
+}
diff --git a/src/iocore/eventsystem/UnixEvent.cc
b/src/iocore/eventsystem/UnixEvent.cc
index 8043155b76..4a2d0ee381 100644
--- a/src/iocore/eventsystem/UnixEvent.cc
+++ b/src/iocore/eventsystem/UnixEvent.cc
@@ -27,9 +27,9 @@
*****************************************************************************/
-#include "P_EventSystem.h"
-#include "tscore/ink_stack_trace.h"
+#include "iocore/eventsystem/Event.h"
+#include "iocore/eventsystem/EThread.h"
ClassAllocator<Event> eventAllocator("eventAllocator", 256);
diff --git a/src/iocore/eventsystem/UnixEventProcessor.cc
b/src/iocore/eventsystem/UnixEventProcessor.cc
index 4ac2c7f7bb..ea527bfab1 100644
--- a/src/iocore/eventsystem/UnixEventProcessor.cc
+++ b/src/iocore/eventsystem/UnixEventProcessor.cc
@@ -27,6 +27,8 @@
#include "records/RecCore.h"
#include "records/RecProcess.h"
#include "tscore/ink_align.h"
+#include "tscore/ink_atomic.h"
+#include "tscore/TSSystemState.h"
#include <sched.h>
#if TS_USE_HWLOC
#if __has_include(<alloca.h>)
@@ -615,3 +617,216 @@ thread_started(EThread *t)
}
}
}
+
+off_t
+EventProcessor::allocate(int size)
+{
+ static off_t start = INK_ALIGN(offsetof(EThread, thread_private), 16);
+ static off_t loss = start - offsetof(EThread, thread_private);
+ size = INK_ALIGN(size, 16); // 16 byte alignment
+
+ int old;
+ do {
+ old = thread_data_used;
+ if (old + loss + size > PER_THREAD_DATA) {
+ return -1;
+ }
+ } while (!ink_atomic_cas(&thread_data_used, old, old + size));
+
+ return (off_t)(old + start);
+}
+
+EThread *
+EventProcessor::assign_thread(EventType etype)
+{
+ int next;
+ ThreadGroupDescriptor *tg = &thread_group[etype];
+
+ ink_assert(etype < MAX_EVENT_TYPES);
+ if (tg->_count > 1) {
+ next = ++tg->_next_round_robin % tg->_count;
+ } else {
+ next = 0;
+ }
+ return tg->_thread[next];
+}
+
+// If thread_holding is the correct type, return it.
+//
+// Otherwise check if there is already an affinity associated with the
continuation,
+// return it if the type is the same, return the next available thread of
"etype" if
+// the type is different.
+//
+// Only assign new affinity when there is currently none.
+EThread *
+EventProcessor::assign_affinity_by_type(Continuation *cont, EventType etype)
+{
+ EThread *ethread = cont->mutex->thread_holding;
+ if (!ethread->is_event_type(etype)) {
+ ethread = cont->getThreadAffinity();
+ if (ethread == nullptr || !ethread->is_event_type(etype)) {
+ ethread = assign_thread(etype);
+ }
+ }
+
+ if (cont->getThreadAffinity() == nullptr) {
+ cont->setThreadAffinity(ethread);
+ }
+
+ return ethread;
+}
+
+Event *
+EventProcessor::schedule(Event *e, EventType etype)
+{
+ ink_assert(etype < MAX_EVENT_TYPES);
+
+ if (TSSystemState::is_event_system_shut_down()) {
+ return nullptr;
+ }
+
+ EThread *affinity_thread = e->continuation->getThreadAffinity();
+ EThread *curr_thread = this_ethread();
+ if (affinity_thread != nullptr && affinity_thread->is_event_type(etype)) {
+ e->ethread = affinity_thread;
+ } else {
+ // Is the current thread eligible?
+ if (curr_thread != nullptr && curr_thread->is_event_type(etype)) {
+ e->ethread = curr_thread;
+ } else {
+ e->ethread = assign_thread(etype);
+ }
+ if (affinity_thread == nullptr) {
+ e->continuation->setThreadAffinity(e->ethread);
+ }
+ }
+
+ if (e->continuation->mutex) {
+ e->mutex = e->continuation->mutex;
+ }
+
+ if (curr_thread != nullptr && e->ethread == curr_thread) {
+ e->ethread->EventQueueExternal.enqueue_local(e);
+ } else {
+ e->ethread->EventQueueExternal.enqueue(e);
+ }
+
+ return e;
+}
+
+Event *
+EventProcessor::schedule_imm(Continuation *cont, EventType et, int
callback_event, void *cookie)
+{
+ Event *e = eventAllocator.alloc();
+
+ ink_assert(et < MAX_EVENT_TYPES);
+#ifdef ENABLE_TIME_TRACE
+ e->start_time = ink_get_hrtime();
+#endif
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule(e->init(cont, 0, 0), et);
+}
+
+Event *
+EventProcessor::schedule_at(Continuation *cont, ink_hrtime t, EventType et,
int callback_event, void *cookie)
+{
+ Event *e = eventAllocator.alloc();
+
+ ink_assert(t > 0);
+ ink_assert(et < MAX_EVENT_TYPES);
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule(e->init(cont, t, 0), et);
+}
+
+Event *
+EventProcessor::schedule_in(Continuation *cont, ink_hrtime t, EventType et,
int callback_event, void *cookie)
+{
+ Event *e = eventAllocator.alloc();
+
+ ink_assert(et < MAX_EVENT_TYPES);
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule(e->init(cont, ink_get_hrtime() + t, 0), et);
+}
+
+Event *
+EventProcessor::schedule_every(Continuation *cont, ink_hrtime t, EventType et,
int callback_event, void *cookie)
+{
+ Event *e = eventAllocator.alloc();
+
+ ink_assert(t != 0);
+ ink_assert(et < MAX_EVENT_TYPES);
+
+#ifdef ENABLE_EVENT_TRACKER
+ e->set_location();
+#endif
+
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ if (t < 0) {
+ return schedule(e->init(cont, t, t), et);
+ } else {
+ return schedule(e->init(cont, ink_get_hrtime() + t, t), et);
+ }
+}
+
+std::vector<TSAction>
+EventProcessor::schedule_entire(Continuation *cont, ink_hrtime t, ink_hrtime
p, EventType et, int callback_event, void *cookie)
+{
+ ThreadGroupDescriptor *tg = &thread_group[et];
+ EThread *curr_thread = this_ethread();
+
+ std::vector<TSAction> actions;
+
+ for (int i = 0; i < tg->_count; i++) {
+ Event *e = eventAllocator.alloc();
+
+ e->ethread = tg->_thread[i];
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+
+ if (t == 0 && p == 0) {
+ e->init(cont, 0, 0);
+ } else if (t != 0 && p == 0) {
+ e->init(cont, ink_get_hrtime() + t, 0);
+ } else if (t == 0 && p != 0) {
+ if (p < 0) {
+ e->init(cont, p, p);
+ } else {
+ e->init(cont, ink_get_hrtime() + p, p);
+ }
+ } else {
+ ink_assert(!"not reached");
+ }
+
+ e->mutex = new_ProxyMutex();
+
+ if (curr_thread != nullptr && e->ethread == curr_thread) {
+ e->ethread->EventQueueExternal.enqueue_local(e);
+ } else {
+ e->ethread->EventQueueExternal.enqueue(e);
+ }
+
+ /* This is a hack. Should be handled in ink_types */
+ actions.push_back((TSAction)((uintptr_t) reinterpret_cast<TSAction>(e) |
0x1));
+ }
+
+ return actions;
+}
diff --git a/src/iocore/eventsystem/P_UnixEvent.h
b/src/iocore/eventsystem/VIO.cc
similarity index 61%
rename from src/iocore/eventsystem/P_UnixEvent.h
rename to src/iocore/eventsystem/VIO.cc
index df8cdee3d6..6fb7e1c21a 100644
--- a/src/iocore/eventsystem/P_UnixEvent.h
+++ b/src/iocore/eventsystem/VIO.cc
@@ -21,27 +21,39 @@
limitations under the License.
*/
-#pragma once
+#include "iocore/eventsystem/VIO.h"
+#include "iocore/eventsystem/VConnection.h"
-TS_INLINE Event *
-Event::init(Continuation *c, ink_hrtime atimeout_at, ink_hrtime aperiod)
+void
+VIO::set_continuation(Continuation *acont)
{
- continuation = c;
- timeout_at = atimeout_at;
- period = aperiod;
- immediate = !period && !atimeout_at;
- cancelled = false;
- return this;
+ if (vc_server) {
+ vc_server->set_continuation(this, acont);
+ }
+ if (acont) {
+ mutex = acont->mutex;
+ cont = acont;
+ } else {
+ mutex = nullptr;
+ cont = nullptr;
+ }
+ return;
}
-TS_INLINE void
-Event::free()
+void
+VIO::reenable()
{
- mutex = nullptr;
- eventAllocator.free(this);
+ this->_disabled = false;
+ if (vc_server) {
+ vc_server->reenable(this);
+ }
}
-TS_INLINE
-Event::Event() : in_the_prot_queue(false), in_the_priority_queue(false),
immediate(false), globally_allocated(true), in_heap(false)
+void
+VIO::reenable_re()
{
+ this->_disabled = false;
+ if (vc_server) {
+ vc_server->reenable_re(this);
+ }
}
diff --git a/src/iocore/net/SSLSessionCache.cc
b/src/iocore/net/SSLSessionCache.cc
index 7fe5ae9eaf..7e848b41b4 100644
--- a/src/iocore/net/SSLSessionCache.cc
+++ b/src/iocore/net/SSLSessionCache.cc
@@ -23,7 +23,7 @@
#include "SSLSessionCache.h"
#include "P_SSLUtils.h"
#include "SSLStats.h"
-#include "../eventsystem/P_IOBuffer.h"
+#include "iocore/eventsystem/IOBuffer.h"
#include <cstring>
#include <memory>
diff --git a/src/iocore/net/Socks.cc b/src/iocore/net/Socks.cc
index 06ccd42405..03bbaa30f0 100644
--- a/src/iocore/net/Socks.cc
+++ b/src/iocore/net/Socks.cc
@@ -32,7 +32,6 @@
#include "P_Socks.h"
#include "P_Net.h"
-#include "../eventsystem/P_VConnection.h"
#include "iocore/net/NetProcessor.h"
#include "tscore/InkErrno.h"
#include "swoc/swoc_file.h"
diff --git a/src/iocore/net/TLSEventSupport.cc
b/src/iocore/net/TLSEventSupport.cc
index 3095edeb89..c1009d8c6c 100644
--- a/src/iocore/net/TLSEventSupport.cc
+++ b/src/iocore/net/TLSEventSupport.cc
@@ -23,6 +23,7 @@
*/
#include "iocore/net/TLSEventSupport.h"
+#include "iocore/net/Net.h"
#include "iocore/net/SSLAPIHooks.h"
#include "SSLStats.h"
diff --git a/src/iocore/utils/OneWayMultiTunnel.cc
b/src/iocore/utils/OneWayMultiTunnel.cc
index d439a987f0..7c35353a7d 100644
--- a/src/iocore/utils/OneWayMultiTunnel.cc
+++ b/src/iocore/utils/OneWayMultiTunnel.cc
@@ -27,8 +27,7 @@
****************************************************************************/
#include "iocore/utils/OneWayMultiTunnel.h"
-#include "../eventsystem/P_IOBuffer.h"
-#include "../eventsystem/P_VConnection.h"
+#include "iocore/eventsystem/IOBuffer.h"
// #define TEST
@@ -45,6 +44,18 @@ OneWayMultiTunnel::OneWayMultiTunnel() : OneWayTunnel()
ink_zero(vioTargets);
}
+VIO *
+vc_do_io_write(VConnection *vc, Continuation *cont, int64_t nbytes, MIOBuffer
*buf, int64_t offset)
+{
+ IOBufferReader *reader = buf->alloc_reader();
+
+ if (offset > 0) {
+ reader->consume(offset);
+ }
+
+ return vc->do_io_write(cont, nbytes, reader, true);
+}
+
void
OneWayMultiTunnel::init(VConnection *vcSource, VConnection **vcTargets, int
n_vcTargets, Continuation *aCont, int size_estimate,
int64_t nbytes, bool asingle_buffer, /* = true */
diff --git a/src/iocore/utils/OneWayTunnel.cc b/src/iocore/utils/OneWayTunnel.cc
index 8274fcf517..25a8caef87 100644
--- a/src/iocore/utils/OneWayTunnel.cc
+++ b/src/iocore/utils/OneWayTunnel.cc
@@ -32,7 +32,7 @@
anything to do with HTTP, so it has been renamed to OneWayTunnel.
****************************************************************************/
-#include "../eventsystem/P_IOBuffer.h"
+#include "iocore/eventsystem/IOBuffer.h"
#include "iocore/utils/OneWayTunnel.h"
// #define TEST
diff --git a/src/proxy/PluginVC.cc b/src/proxy/PluginVC.cc
index 7c53d0390b..ab9d684aa6 100644
--- a/src/proxy/PluginVC.cc
+++ b/src/proxy/PluginVC.cc
@@ -72,7 +72,6 @@
****************************************************************************/
#include "proxy/PluginVC.h"
-#include "../iocore/eventsystem/P_EventSystem.h"
#include "../iocore/net/P_Net.h"
#include "tscore/Regression.h"
#if TS_HAS_TESTS
diff --git a/src/proxy/ReverseProxy.cc b/src/proxy/ReverseProxy.cc
index 037474575e..341c2c7de4 100644
--- a/src/proxy/ReverseProxy.cc
+++ b/src/proxy/ReverseProxy.cc
@@ -30,7 +30,6 @@
#include "tscore/ink_platform.h"
#include "tscore/Filenames.h"
#include <dlfcn.h>
-#include "../iocore/eventsystem/P_EventSystem.h"
#include "iocore/eventsystem/ConfigProcessor.h"
#include "proxy/ReverseProxy.h"
#include "tscore/MatcherUtils.h"
diff --git a/src/proxy/hdrs/HdrTSOnly.cc b/src/proxy/hdrs/HdrTSOnly.cc
index dc2fed3924..e603101863 100644
--- a/src/proxy/hdrs/HdrTSOnly.cc
+++ b/src/proxy/hdrs/HdrTSOnly.cc
@@ -36,9 +36,9 @@
****************************************************************************/
+#include "iocore/eventsystem/IOBuffer.h"
#include "tscore/ink_platform.h"
#include "proxy/hdrs/HTTP.h"
-#include "../../iocore/eventsystem/P_EventSystem.h"
/*-------------------------------------------------------------------------
-------------------------------------------------------------------------*/
diff --git a/src/proxy/http/HttpDebugNames.cc b/src/proxy/http/HttpDebugNames.cc
index e06e50159f..66d2bfcd87 100644
--- a/src/proxy/http/HttpDebugNames.cc
+++ b/src/proxy/http/HttpDebugNames.cc
@@ -23,7 +23,6 @@
#include "iocore/dns/DNSProcessor.h"
#include "proxy/http/HttpDebugNames.h"
-#include "../../iocore/eventsystem/P_EventSystem.h"
#include "proxy/http/HttpTunnel.h"
#include "proxy/Transform.h"
#include "proxy/http/HttpSM.h"
diff --git a/src/proxy/http/PreWarmManager.cc b/src/proxy/http/PreWarmManager.cc
index 416e6a55d8..8cb9fea88e 100644
--- a/src/proxy/http/PreWarmManager.cc
+++ b/src/proxy/http/PreWarmManager.cc
@@ -26,7 +26,6 @@
#include "proxy/http/HttpConfig.h"
#include "iocore/net/SSLSNIConfig.h"
-#include "../../iocore/eventsystem/P_VConnection.h"
#include "iocore/net/NetProcessor.h"
#include "iocore/net/PreWarm.h"
diff --git a/src/proxy/http/remap/PluginDso.cc
b/src/proxy/http/remap/PluginDso.cc
index cd0499d8e6..15cd390e3c 100644
--- a/src/proxy/http/remap/PluginDso.cc
+++ b/src/proxy/http/remap/PluginDso.cc
@@ -29,7 +29,6 @@
#include "proxy/http/remap/PluginDso.h"
#include "iocore/eventsystem/Freer.h"
-#include "../../../iocore/eventsystem/P_EventSystem.h"
#ifdef PLUGIN_DSO_TESTS
#include "unit-tests/plugin_testing_common.h"
#else
diff --git a/src/proxy/http/remap/PluginFactory.cc
b/src/proxy/http/remap/PluginFactory.cc
index f95e0c91ba..f5a0132c60 100644
--- a/src/proxy/http/remap/PluginFactory.cc
+++ b/src/proxy/http/remap/PluginFactory.cc
@@ -27,6 +27,7 @@
#include "proxy/http/remap/RemapPluginInfo.h"
#include "records/RecCore.h"
#include "proxy/http/remap/PluginFactory.h"
+#include "tscore/TSSystemState.h"
#ifdef PLUGIN_DSO_TESTS
#include "unit-tests/plugin_testing_common.h"
#else
@@ -34,7 +35,6 @@
#define PluginDbg Dbg
#define PluginError Error
#endif
-#include "../../../iocore/eventsystem/P_EventSystem.h"
#include <algorithm> /* std::swap */
#include <filesystem>
diff --git a/src/proxy/http3/Http09App.cc b/src/proxy/http3/Http09App.cc
index f1d7ce06a5..ac9f1ab2ec 100644
--- a/src/proxy/http3/Http09App.cc
+++ b/src/proxy/http3/Http09App.cc
@@ -26,7 +26,7 @@
#include "tscore/ink_resolver.h"
#include "../../iocore/net/P_Net.h"
-#include "../../iocore/eventsystem/P_VConnection.h"
+#include "iocore/eventsystem/VConnection.h"
#include "iocore/net/quic/QUICStreamManager.h"
#include "iocore/net/quic/QUICDebugNames.h"
#include "iocore/net/quic/QUICStreamVCAdapter.h"
diff --git a/src/proxy/http3/Http3App.cc b/src/proxy/http3/Http3App.cc
index 4ec8a0cfba..09d349a6a6 100644
--- a/src/proxy/http3/Http3App.cc
+++ b/src/proxy/http3/Http3App.cc
@@ -28,7 +28,7 @@
#include "tscore/ink_resolver.h"
#include "../../iocore/net/P_Net.h"
-#include "../../iocore/eventsystem/P_VConnection.h"
+#include "iocore/eventsystem/VConnection.h"
#include "iocore/net/quic/QUICStreamManager.h"
#include "iocore/net/quic/QUICStreamVCAdapter.h"
diff --git a/src/proxy/logging/Log.cc b/src/proxy/logging/Log.cc
index ad9ffd00ad..ca56f27c5a 100644
--- a/src/proxy/logging/Log.cc
+++ b/src/proxy/logging/Log.cc
@@ -34,7 +34,6 @@
***************************************************************************/
#include "tscore/ink_platform.h"
#include "tscore/TSSystemState.h"
-#include "../../iocore/eventsystem/P_EventSystem.h"
#include "../../iocore/net/P_Net.h"
#include "iocore/utils/Machine.h"
#include "proxy/hdrs/HTTP.h"
diff --git a/src/proxy/logging/LogBuffer.cc b/src/proxy/logging/LogBuffer.cc
index 9c0ab667f1..c183bede9e 100644
--- a/src/proxy/logging/LogBuffer.cc
+++ b/src/proxy/logging/LogBuffer.cc
@@ -32,7 +32,6 @@
#include <cstdlib>
#include <cstring>
-#include "../../iocore/eventsystem/P_EventSystem.h"
#include "proxy/logging/LogField.h"
#include "proxy/logging/LogFilter.h"
#include "proxy/logging/LogFormat.h"
diff --git a/src/proxy/logging/LogFile.cc b/src/proxy/logging/LogFile.cc
index f7a7004c60..9252136092 100644
--- a/src/proxy/logging/LogFile.cc
+++ b/src/proxy/logging/LogFile.cc
@@ -41,7 +41,6 @@
#include <fcntl.h>
#include <libgen.h>
-#include "../../iocore/eventsystem/P_EventSystem.h"
#include "iocore/utils/Machine.h"
#include "tscore/BaseLogFile.h"
diff --git a/src/proxy/logging/LogObject.cc b/src/proxy/logging/LogObject.cc
index 37e89598b5..997e394025 100644
--- a/src/proxy/logging/LogObject.cc
+++ b/src/proxy/logging/LogObject.cc
@@ -26,9 +26,9 @@
***************************************************************************/
+#include "iocore/eventsystem/Freer.h"
#include "tscore/ink_platform.h"
#include "tscore/CryptoHash.h"
-#include "../../iocore/eventsystem/P_EventSystem.h"
#include "proxy/logging/LogUtils.h"
#include "proxy/logging/LogField.h"
#include "proxy/logging/LogObject.h"
diff --git a/src/traffic_server/SocksProxy.cc b/src/traffic_server/SocksProxy.cc
index 97aa76ac90..811fedbdb1 100644
--- a/src/traffic_server/SocksProxy.cc
+++ b/src/traffic_server/SocksProxy.cc
@@ -28,7 +28,6 @@
*/
#include "../iocore/net/P_Socks.h"
-#include "../iocore/eventsystem/P_VConnection.h"
#include "iocore/utils/OneWayTunnel.h"
#include "proxy/http/HttpSessionAccept.h"
#include "tsutil/Metrics.h"
diff --git a/src/traffic_server/traffic_server.cc
b/src/traffic_server/traffic_server.cc
index 7ac906af10..61982e7716 100644
--- a/src/traffic_server/traffic_server.cc
+++ b/src/traffic_server/traffic_server.cc
@@ -75,7 +75,6 @@ extern "C" int plock(int);
#include "Crash.h"
#include "tscore/signals.h"
-#include "../iocore/eventsystem/P_EventSystem.h"
#include "../iocore/net/P_Net.h"
#if TS_HAS_QUICHE
#include "../iocore/net/P_QUICNetProcessor.h"