This is an automated email from the ASF dual-hosted git repository.

cmcfarlen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 90dbc21a54 Remove eventsystem Inline.cc and cleanup private headers. 
(#12581)
90dbc21a54 is described below

commit 90dbc21a541986db6b223649cb4f798e190a550f
Author: Chris McFarlen <[email protected]>
AuthorDate: Sat Nov 29 11:20:09 2025 -0600

    Remove eventsystem Inline.cc and cleanup private headers. (#12581)
    
    * Remove eventsystem Inline.cc and cleanup private headers.
    
    * include EThread to get this_ethread
    
    * Fix include in RecProcess.cc
    
    * Remove I_VIO_h
    
    * restore DELAY_FOR_RETRY
    
    ---------
    
    Co-authored-by: Chris McFarlen <[email protected]>
---
 include/iocore/eventsystem/EThread.h             |   25 +-
 include/iocore/eventsystem/Event.h               |   20 +-
 include/iocore/eventsystem/Freer.h               |    8 +-
 include/iocore/eventsystem/IOBuffer.h            |   41 +
 include/iocore/eventsystem/ProtectedQueue.h      |   50 ++
 include/iocore/eventsystem/Thread.h              |   12 +-
 include/iocore/eventsystem/VConnection.h         |   56 +-
 include/iocore/eventsystem/VIO.h                 |   66 +-
 include/ts/InkAPIPrivateIOCore.h                 |    9 +-
 src/api/InkVConnInternal.cc                      |    2 +
 src/iocore/cache/RegressionSM.cc                 |    1 +
 src/iocore/cache/unit_tests/main.cc              |    2 +
 src/iocore/eventsystem/CMakeLists.txt            |    2 +-
 src/iocore/eventsystem/ConfigProcessor.cc        |    2 +-
 src/iocore/eventsystem/EventSystem.cc            |    5 +-
 src/iocore/eventsystem/IOBuffer.cc               |  965 +++++++++++++++++++-
 src/iocore/eventsystem/Inline.cc                 |   30 -
 src/iocore/eventsystem/Lock.cc                   |    4 +-
 src/iocore/eventsystem/PQ-List.cc                |    3 +-
 src/iocore/eventsystem/P_EventSystem.h           |   49 -
 src/iocore/eventsystem/P_IOBuffer.h              | 1034 ----------------------
 src/iocore/eventsystem/P_ProtectedQueue.h        |   83 --
 src/iocore/eventsystem/P_Thread.h                |   49 -
 src/iocore/eventsystem/P_UnixEThread.h           |  261 ------
 src/iocore/eventsystem/P_UnixEventProcessor.h    |  246 -----
 src/iocore/eventsystem/P_VConnection.h           |   95 --
 src/iocore/eventsystem/P_VIO.h                   |  123 ---
 src/iocore/eventsystem/Processor.cc              |    4 +-
 src/iocore/eventsystem/ProtectedQueue.cc         |    3 +-
 src/iocore/eventsystem/RecProcess.cc             |    3 +-
 src/iocore/eventsystem/UnixEThread.cc            |  207 ++++-
 src/iocore/eventsystem/UnixEvent.cc              |    4 +-
 src/iocore/eventsystem/UnixEventProcessor.cc     |  215 +++++
 src/iocore/eventsystem/{P_UnixEvent.h => VIO.cc} |   42 +-
 src/iocore/net/SSLSessionCache.cc                |    2 +-
 src/iocore/net/Socks.cc                          |    1 -
 src/iocore/net/TLSEventSupport.cc                |    1 +
 src/iocore/utils/OneWayMultiTunnel.cc            |   15 +-
 src/iocore/utils/OneWayTunnel.cc                 |    2 +-
 src/proxy/PluginVC.cc                            |    1 -
 src/proxy/ReverseProxy.cc                        |    1 -
 src/proxy/hdrs/HdrTSOnly.cc                      |    2 +-
 src/proxy/http/HttpDebugNames.cc                 |    1 -
 src/proxy/http/PreWarmManager.cc                 |    1 -
 src/proxy/http/remap/PluginDso.cc                |    1 -
 src/proxy/http/remap/PluginFactory.cc            |    2 +-
 src/proxy/http3/Http09App.cc                     |    2 +-
 src/proxy/http3/Http3App.cc                      |    2 +-
 src/proxy/logging/Log.cc                         |    1 -
 src/proxy/logging/LogBuffer.cc                   |    1 -
 src/proxy/logging/LogFile.cc                     |    1 -
 src/proxy/logging/LogObject.cc                   |    2 +-
 src/traffic_server/SocksProxy.cc                 |    1 -
 src/traffic_server/traffic_server.cc             |    1 -
 54 files changed, 1702 insertions(+), 2060 deletions(-)

diff --git a/include/iocore/eventsystem/EThread.h 
b/include/iocore/eventsystem/EThread.h
index 07f1f938e6..99569b9fff 100644
--- a/include/iocore/eventsystem/EThread.h
+++ b/include/iocore/eventsystem/EThread.h
@@ -710,6 +710,29 @@ operator new(size_t, ink_dummy_for_new *p)
 }
 #define ETHREAD_GET_PTR(thread, offset) ((void *)((char *)(thread) + (offset)))
 
-extern EThread *this_ethread();
+inline EThread *
+this_ethread()
+{
+  return EThread::this_ethread_ptr;
+}
+
+inline EThread *
+this_event_thread()
+{
+  EThread *ethread = this_ethread();
+  if (ethread != nullptr && ethread->tt == REGULAR) {
+    return ethread;
+  } else {
+    return nullptr;
+  }
+}
+
+inline void
+EThread::free_event(Event *e)
+{
+  ink_assert(!e->in_the_priority_queue && !e->in_the_prot_queue);
+  e->mutex = nullptr;
+  EVENT_FREE(e, eventAllocator, this);
+}
 
 extern int thread_max_heartbeat_mseconds;
diff --git a/include/iocore/eventsystem/Event.h 
b/include/iocore/eventsystem/Event.h
index aef3db53ae..a167f1d3ba 100644
--- a/include/iocore/eventsystem/Event.h
+++ b/include/iocore/eventsystem/Event.h
@@ -228,9 +228,18 @@ public:
 
   // Private
 
-  Event();
+  Event() : in_the_prot_queue(false), in_the_priority_queue(false), 
immediate(false), globally_allocated(true), in_heap(false) {}
 
-  Event *init(Continuation *c, ink_hrtime atimeout_at = 0, ink_hrtime aperiod 
= 0);
+  Event *
+  init(Continuation *c, ink_hrtime atimeout_at = 0, ink_hrtime aperiod = 0)
+  {
+    continuation = c;
+    timeout_at   = atimeout_at;
+    period       = aperiod;
+    immediate    = !period && !atimeout_at;
+    cancelled    = false;
+    return this;
+  }
 
 #ifdef ENABLE_TIME_TRACE
   ink_hrtime start_time;
@@ -282,6 +291,13 @@ public:
 //
 extern ClassAllocator<Event> eventAllocator;
 
+inline void
+Event::free()
+{
+  mutex = nullptr;
+  eventAllocator.free(this);
+}
+
 #define EVENT_ALLOC(_a, _t) THREAD_ALLOC(_a, _t)
 #define EVENT_FREE(_p, _a, _t) \
   _p->mutex = nullptr;         \
diff --git a/include/iocore/eventsystem/Freer.h 
b/include/iocore/eventsystem/Freer.h
index 51738e036a..f19fcbd95b 100644
--- a/include/iocore/eventsystem/Freer.h
+++ b/include/iocore/eventsystem/Freer.h
@@ -52,7 +52,7 @@ public: // Needed by WinNT compiler (compiler bug)
 //    1. Make sure to schedule a delete on an ET_TASK thread
 //    2. Delay the delete (this should be used sparingly)
 template <class C>
-TS_INLINE void
+void
 new_Deleter(C *ap, ink_hrtime t)
 {
   if (t > 0) {
@@ -78,7 +78,7 @@ public: // Needed by WinNT compiler (compiler bug)
 };
 
 template <class C>
-TS_INLINE void
+void
 new_FreeCaller(C *ap, ink_hrtime t)
 {
   eventProcessor.schedule_in(new FreeCallContinuation<C>(ap), t, ET_TASK);
@@ -103,7 +103,7 @@ struct FreerContinuation : public Continuation {
   explicit FreerContinuation(void *ap) : Continuation(nullptr), p(ap) { 
SET_HANDLER(&FreerContinuation::dieEvent); }
 };
 
-TS_INLINE void
+inline void
 new_Freer(void *ap, ink_hrtime t)
 {
   eventProcessor.schedule_in(new FreerContinuation(ap), t, ET_TASK);
@@ -128,7 +128,7 @@ template <class C> struct DereferContinuation : public 
Continuation {
 };
 
 template <class C>
-TS_INLINE void
+void
 new_Derefer(C *ap, ink_hrtime t)
 {
   eventProcessor.schedule_in(new DereferContinuation<C>(ap), t, ET_TASK);
diff --git a/include/iocore/eventsystem/IOBuffer.h 
b/include/iocore/eventsystem/IOBuffer.h
index de01b59a6a..a8656aa6f7 100644
--- a/include/iocore/eventsystem/IOBuffer.h
+++ b/include/iocore/eventsystem/IOBuffer.h
@@ -1514,3 +1514,44 @@ IOBufferChain::iterator::operator->() const
 {
   return _b;
 }
+
+//////////////////////////////////////////////////////////////
+//
+// returns 0 for DEFAULT_BUFFER_BASE_SIZE,
+// +1 for each power of 2
+//
+//////////////////////////////////////////////////////////////
+inline int64_t
+buffer_size_to_index(int64_t size, int64_t max)
+{
+  int64_t r = max;
+
+  while (r && BUFFER_SIZE_FOR_INDEX(r - 1) >= size) {
+    r--;
+  }
+  return r;
+}
+
+inline int64_t
+iobuffer_size_to_index(int64_t size, int64_t max)
+{
+  if (size > BUFFER_SIZE_FOR_INDEX(max)) {
+    return BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size);
+  }
+  return buffer_size_to_index(size, max);
+}
+
+inline int64_t
+index_to_buffer_size(int64_t idx)
+{
+  if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(idx)) {
+    return BUFFER_SIZE_FOR_INDEX(idx);
+  } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(idx)) {
+    return BUFFER_SIZE_FOR_XMALLOC(idx);
+    // coverity[dead_error_condition]
+  } else if (BUFFER_SIZE_INDEX_IS_CONSTANT(idx)) {
+    return BUFFER_SIZE_FOR_CONSTANT(idx);
+  }
+  // coverity[dead_error_line]
+  return 0;
+}
diff --git a/include/iocore/eventsystem/ProtectedQueue.h 
b/include/iocore/eventsystem/ProtectedQueue.h
index bc1b3b2577..b24789e709 100644
--- a/include/iocore/eventsystem/ProtectedQueue.h
+++ b/include/iocore/eventsystem/ProtectedQueue.h
@@ -52,3 +52,53 @@ struct ProtectedQueue {
 
   ProtectedQueue();
 };
+
+inline ProtectedQueue::ProtectedQueue()
+{
+  Event e;
+  ink_mutex_init(&lock);
+  ink_atomiclist_init(&al, "ProtectedQueue", (char *)&e.link.next - (char 
*)&e);
+  ink_cond_init(&might_have_data);
+}
+
+inline void
+ProtectedQueue::signal()
+{
+  // Need to get the lock before you can signal the thread
+  ink_mutex_acquire(&lock);
+  ink_cond_signal(&might_have_data);
+  ink_mutex_release(&lock);
+}
+
+inline int
+ProtectedQueue::try_signal()
+{
+  // Need to get the lock before you can signal the thread
+  if (ink_mutex_try_acquire(&lock)) {
+    ink_cond_signal(&might_have_data);
+    ink_mutex_release(&lock);
+    return 1;
+  } else {
+    return 0;
+  }
+}
+
+// Called from the same thread (don't need to signal)
+inline void
+ProtectedQueue::enqueue_local(Event *e)
+{
+  ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
+  e->in_the_prot_queue = 1;
+  localQueue.enqueue(e);
+}
+
+inline Event *
+ProtectedQueue::dequeue_local()
+{
+  Event *e = localQueue.dequeue();
+  if (e) {
+    ink_assert(e->in_the_prot_queue);
+    e->in_the_prot_queue = 0;
+  }
+  return e;
+}
diff --git a/include/iocore/eventsystem/Thread.h 
b/include/iocore/eventsystem/Thread.h
index 22b006a496..0a34dd633c 100644
--- a/include/iocore/eventsystem/Thread.h
+++ b/include/iocore/eventsystem/Thread.h
@@ -107,7 +107,11 @@ public:
   */
   Ptr<ProxyMutex> mutex;
 
-  virtual void set_specific() = 0;
+  virtual void
+  set_specific()
+  {
+    this_thread_ptr = this;
+  }
 
   static thread_local Thread *this_thread_ptr;
 
@@ -170,4 +174,8 @@ protected:
   Thread();
 };
 
-extern Thread *this_thread();
+inline Thread *
+this_thread()
+{
+  return Thread::this_thread_ptr;
+}
diff --git a/include/iocore/eventsystem/VConnection.h 
b/include/iocore/eventsystem/VConnection.h
index 9c731181e7..66780a2fda 100644
--- a/include/iocore/eventsystem/VConnection.h
+++ b/include/iocore/eventsystem/VConnection.h
@@ -28,10 +28,6 @@
 #include "tscore/PluginUserArgs.h"
 #include "iocore/eventsystem/EventSystem.h"
 
-#if !defined(I_VIO_h)
-#error "include VIO.h"
-#endif
-
 //
 // Data Types
 //
@@ -145,7 +141,7 @@ typedef struct tsapi_vio *TSVIO;
 class VConnection : public Continuation
 {
 public:
-  ~VConnection() override;
+  ~VConnection() override {}
 
   /**
     Read data from the VConnection.
@@ -306,17 +302,28 @@ public:
   */
   virtual void do_io_shutdown(ShutdownHowTo_t howto) = 0;
 
-  explicit VConnection(ProxyMutex *aMutex);
-  explicit VConnection(Ptr<ProxyMutex> &aMutex);
+  explicit VConnection(ProxyMutex *aMutex) : Continuation(aMutex), lerrno(0) { 
SET_HANDLER(nullptr); }
+  explicit VConnection(Ptr<ProxyMutex> &aMutex) : Continuation(aMutex), 
lerrno(0) { SET_HANDLER(nullptr); }
 
   // Private
   // Set continuation on a given vio. The public interface
   // is through VIO::set_continuation()
-  virtual void set_continuation(VIO *vio, Continuation *cont);
+  virtual void
+  set_continuation(VIO *, Continuation *)
+  {
+  }
 
   // Reenable a given vio.  The public interface is through VIO::reenable
-  virtual void reenable(VIO *vio);
-  virtual void reenable_re(VIO *vio);
+  virtual void
+  reenable(VIO *)
+  {
+  }
+
+  virtual void
+  reenable_re(VIO *vio)
+  {
+    reenable(vio);
+  }
 
   /**
     Convenience function to retrieve information from VConnection.
@@ -411,3 +418,32 @@ struct DummyVConnection : public VConnection, public 
PluginUserArgs<TS_USER_ARGS
 
   explicit DummyVConnection(ProxyMutex *m) : VConnection(m) {}
 };
+
+inline const char *
+get_vc_event_name(int event)
+{
+  switch (event) {
+  default:
+    return "unknown event";
+  case VC_EVENT_NONE:
+    return "VC_EVENT_NONE";
+  case VC_EVENT_IMMEDIATE:
+    return "VC_EVENT_IMMEDIATE";
+  case VC_EVENT_READ_READY:
+    return "VC_EVENT_READ_READY";
+  case VC_EVENT_WRITE_READY:
+    return "VC_EVENT_WRITE_READY";
+  case VC_EVENT_READ_COMPLETE:
+    return "VC_EVENT_READ_COMPLETE";
+  case VC_EVENT_WRITE_COMPLETE:
+    return "VC_EVENT_WRITE_COMPLETE";
+  case VC_EVENT_EOS:
+    return "VC_EVENT_EOS";
+  case VC_EVENT_ERROR:
+    return "VC_EVENT_ERROR";
+  case VC_EVENT_INACTIVITY_TIMEOUT:
+    return "VC_EVENT_INACTIVITY_TIMEOUT";
+  case VC_EVENT_ACTIVE_TIMEOUT:
+    return "VC_EVENT_ACTIVE_TIMEOUT";
+  }
+}
diff --git a/include/iocore/eventsystem/VIO.h b/include/iocore/eventsystem/VIO.h
index 9bfd32873f..c8da22b025 100644
--- a/include/iocore/eventsystem/VIO.h
+++ b/include/iocore/eventsystem/VIO.h
@@ -23,7 +23,6 @@
  */
 
 #pragma once
-#define I_VIO_h
 
 #include "IOBuffer.h"
 
@@ -56,13 +55,17 @@ class ProxyMutex;
 class VIO
 {
 public:
-  explicit VIO(int aop);
-  VIO();
+  explicit VIO(int aop) : op(aop), buffer(), mutex(nullptr) {}
+  VIO() : buffer(), mutex(nullptr) {}
   ~VIO() {}
 
   /** Interface for the VConnection that owns this handle. */
-  Continuation *get_continuation() const;
-  void          set_continuation(Continuation *cont);
+  Continuation *
+  get_continuation() const
+  {
+    return cont;
+  }
+  void set_continuation(Continuation *cont);
 
   /**
     Set nbytes to be what is current available.
@@ -70,7 +73,15 @@ public:
     Interface to set nbytes to be ndone + buffer.reader()->read_avail()
     if a reader is set.
   */
-  void done();
+  void
+  done()
+  {
+    if (buffer.reader()) {
+      nbytes = ndone + buffer.reader()->read_avail();
+    } else {
+      nbytes = ndone;
+    }
+  }
 
   /**
     Determine the number of bytes remaining.
@@ -81,15 +92,36 @@ public:
     @return The number of bytes to be processed by the operation.
 
   */
-  int64_t ntodo() const;
+  int64_t
+  ntodo() const
+  {
+    return nbytes - ndone;
+  }
 
   /////////////////////
   // buffer settings //
   /////////////////////
-  void            set_writer(MIOBuffer *writer);
-  void            set_reader(IOBufferReader *reader);
-  MIOBuffer      *get_writer() const;
-  IOBufferReader *get_reader() const;
+  void
+  set_writer(MIOBuffer *writer)
+  {
+    buffer.writer_for(writer);
+  }
+  void
+  set_reader(IOBufferReader *reader)
+  {
+    buffer.reader_for(reader);
+  }
+  MIOBuffer *
+  get_writer() const
+  {
+    return buffer.writer();
+  }
+
+  IOBufferReader *
+  get_reader() const
+  {
+    return (buffer.reader());
+  }
 
   /**
     Reenable the IO operation.
@@ -125,8 +157,16 @@ public:
   */
   void reenable_re();
 
-  void disable();
-  bool is_disabled() const;
+  void
+  disable()
+  {
+    this->_disabled = true;
+  }
+  bool
+  is_disabled() const
+  {
+    return this->_disabled;
+  }
 
   enum {
     NONE = 0,
diff --git a/include/ts/InkAPIPrivateIOCore.h b/include/ts/InkAPIPrivateIOCore.h
index ac3b18245c..f87a251253 100644
--- a/include/ts/InkAPIPrivateIOCore.h
+++ b/include/ts/InkAPIPrivateIOCore.h
@@ -22,13 +22,8 @@
  */
 
 #pragma once
-#if !defined(__GNUC__)
-#include "iocore/eventsystem/EventSystem.h"
-#include "iocore/net/Net.h"
-#else
-#include "../../src/iocore/eventsystem/P_EventSystem.h"
-#include "../../src/iocore/net/P_Net.h"
-#endif
+#include "iocore/eventsystem/VConnection.h"
+#include "ts/apidefs.h"
 
 enum INKContInternalMagic_t {
   INKCONT_INTERN_MAGIC_ALIVE = 0x00009631,
diff --git a/src/api/InkVConnInternal.cc b/src/api/InkVConnInternal.cc
index c0898f883f..950900e43b 100644
--- a/src/api/InkVConnInternal.cc
+++ b/src/api/InkVConnInternal.cc
@@ -21,8 +21,10 @@
   limitations under the License.
  */
 
+#include "iocore/net/Net.h"
 #include "ts/apidefs.h"
 #include "ts/InkAPIPrivateIOCore.h"
+#include "tscore/ink_atomic.h"
 
 ClassAllocator<INKVConnInternal> INKVConnAllocator("INKVConnAllocator");
 
diff --git a/src/iocore/cache/RegressionSM.cc b/src/iocore/cache/RegressionSM.cc
index 8513773cbc..57ad08dd3a 100644
--- a/src/iocore/cache/RegressionSM.cc
+++ b/src/iocore/cache/RegressionSM.cc
@@ -23,6 +23,7 @@
 
 #include "RegressionSM.h"
 #include "iocore/eventsystem/EventProcessor.h"
+#include "iocore/eventsystem/EThread.h"
 
 #define REGRESSION_SM_RETRY (100 * HRTIME_MSECOND)
 
diff --git a/src/iocore/cache/unit_tests/main.cc 
b/src/iocore/cache/unit_tests/main.cc
index 56e9e05edc..e01dd8463a 100644
--- a/src/iocore/cache/unit_tests/main.cc
+++ b/src/iocore/cache/unit_tests/main.cc
@@ -23,10 +23,12 @@
 
 #include "../P_CacheInternal.h"
 #include "api/HttpAPIHooks.h"
+#include "iocore/net/Net.h"
 #include "iocore/net/NetProcessor.h"
 #include "records/RecordsConfig.h"
 #include "tscore/ink_config.h"
 #include "tscore/Layout.h"
+#include "tscore/TSSystemState.h"
 
 #include "main.h"
 #include "swoc/swoc_file.h"
diff --git a/src/iocore/eventsystem/CMakeLists.txt 
b/src/iocore/eventsystem/CMakeLists.txt
index ab19fdcca6..1eaf5cd29d 100644
--- a/src/iocore/eventsystem/CMakeLists.txt
+++ b/src/iocore/eventsystem/CMakeLists.txt
@@ -19,7 +19,6 @@ add_library(
   inkevent STATIC
   EventSystem.cc
   IOBuffer.cc
-  Inline.cc
   Lock.cc
   MIOBufferWriter.cc
   PQ-List.cc
@@ -36,6 +35,7 @@ add_library(
   RecRawStatsImpl.cc
   RecProcess.cc
   Watchdog.cc
+  VIO.cc
 )
 add_library(ts::inkevent ALIAS inkevent)
 
diff --git a/src/iocore/eventsystem/ConfigProcessor.cc 
b/src/iocore/eventsystem/ConfigProcessor.cc
index 028968ba51..a49335154a 100644
--- a/src/iocore/eventsystem/ConfigProcessor.cc
+++ b/src/iocore/eventsystem/ConfigProcessor.cc
@@ -22,7 +22,7 @@
  */
 
 #include "iocore/eventsystem/ConfigProcessor.h"
-#include "P_EventSystem.h"
+#include "tscore/ink_atomic.h"
 #if TS_HAS_TESTS
 #include "tscore/TestBox.h"
 #endif
diff --git a/src/iocore/eventsystem/EventSystem.cc 
b/src/iocore/eventsystem/EventSystem.cc
index 73c10ecd42..3aeeca562a 100644
--- a/src/iocore/eventsystem/EventSystem.cc
+++ b/src/iocore/eventsystem/EventSystem.cc
@@ -28,10 +28,13 @@
 
 ****************************************************************************/
 
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/EventSystem.h"
+#include "tscore/Version.h"
 #include "tscore/hugepages.h"
 #include "records/RecCore.h"
 
+static constexpr ts::ModuleVersion 
EVENT_SYSTEM_MODULE_INTERNAL_VERSION{EVENT_SYSTEM_MODULE_PUBLIC_VERSION,
+                                                                        
ts::ModuleVersion::PRIVATE};
 void
 ink_event_system_init(ts::ModuleVersion v)
 {
diff --git a/src/iocore/eventsystem/IOBuffer.cc 
b/src/iocore/eventsystem/IOBuffer.cc
index 66decc4bac..9bedf0a6c6 100644
--- a/src/iocore/eventsystem/IOBuffer.cc
+++ b/src/iocore/eventsystem/IOBuffer.cc
@@ -25,13 +25,974 @@
   UIOBuffer.cc
 
 **************************************************************************/
+#include "iocore/eventsystem/Thread.h"
 #include "tscore/Allocator.h"
-#include "tscore/ink_defs.h"
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/IOBuffer.h"
 #include "swoc/Lexicon.h"
+#include "tscore/Diags.h"
+#include "tscore/ink_memory.h"
 
 #include <optional>
 
+// TODO: I think we're overly aggressive here on making MIOBuffer 64-bit
+// but not sure it's worthwhile changing anything to 32-bit honestly.
+
+IOBufferBlock *
+iobufferblock_clone(IOBufferBlock *src, int64_t offset, int64_t len)
+{
+  IOBufferBlock *start_buf   = nullptr;
+  IOBufferBlock *current_buf = nullptr;
+
+  while (src && len >= 0) {
+    char   *start     = src->_start;
+    char   *end       = src->_end;
+    int64_t max_bytes = end - start;
+
+    max_bytes -= offset;
+    if (max_bytes <= 0) {
+      offset = -max_bytes;
+      src    = src->next.get();
+      continue;
+    }
+
+    int64_t bytes = len;
+    if (bytes >= max_bytes) {
+      bytes = max_bytes;
+    }
+
+    IOBufferBlock *new_buf  = src->clone();
+    new_buf->_start        += offset;
+    new_buf->_buf_end = new_buf->_end = new_buf->_start + bytes;
+
+    if (!start_buf) {
+      start_buf   = new_buf;
+      current_buf = start_buf;
+    } else {
+      current_buf->next = new_buf;
+      current_buf       = new_buf;
+    }
+
+    len    -= bytes;
+    src     = src->next.get();
+    offset  = 0;
+  }
+
+  return start_buf;
+}
+
+IOBufferBlock *
+iobufferblock_skip(IOBufferBlock *b, int64_t *poffset, int64_t *plen, int64_t 
write)
+{
+  int64_t offset = *poffset;
+  int64_t len    = write;
+
+  while (b && len >= 0) {
+    int64_t max_bytes = b->read_avail();
+
+    // If this block ends before the start offset, skip it
+    // and adjust the offset to consume its length.
+    max_bytes -= offset;
+    if (max_bytes <= 0) {
+      offset = -max_bytes;
+      b      = b->next.get();
+      continue;
+    }
+
+    if (len >= max_bytes) {
+      b       = b->next.get();
+      len    -= max_bytes;
+      offset  = 0;
+    } else {
+      offset = offset + len;
+      break;
+    }
+  }
+
+  *poffset  = offset;
+  *plen    -= write;
+  return b;
+}
+
+void
+iobuffer_mem_inc(const char *_loc, int64_t _size_index)
+{
+  if (!res_track_memory) {
+    return;
+  }
+
+  if (!BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
+    return;
+  }
+
+  if (!_loc) {
+    _loc = "memory/IOBuffer/UNKNOWN-LOCATION";
+  }
+  ResourceTracker::increment(_loc, index_to_buffer_size(_size_index));
+}
+
+void
+iobuffer_mem_dec(const char *_loc, int64_t _size_index)
+{
+  if (!res_track_memory) {
+    return;
+  }
+
+  if (!BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
+    return;
+  }
+  if (!_loc) {
+    _loc = "memory/IOBuffer/UNKNOWN-LOCATION";
+  }
+  ResourceTracker::increment(_loc, -index_to_buffer_size(_size_index));
+}
+
+//////////////////////////////////////////////////////////////////
+//
+// inline functions definitions
+//
+//////////////////////////////////////////////////////////////////
+//////////////////////////////////////////////////////////////////
+//
+//  class IOBufferData --
+//         inline functions definitions
+//
+//////////////////////////////////////////////////////////////////
+int64_t
+IOBufferData::block_size()
+{
+  return index_to_buffer_size(_size_index);
+}
+
+IOBufferData *
+new_IOBufferData_internal(const char *location, void *b, int64_t size, int64_t 
asize_index)
+{
+  (void)size;
+  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_thread());
+  d->_size_index  = asize_index;
+  ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index) || size <= 
d->block_size());
+  d->_location = location;
+  d->_data     = static_cast<char *>(b);
+  return d;
+}
+
+IOBufferData *
+new_xmalloc_IOBufferData_internal(const char *location, void *b, int64_t size)
+{
+  return new_IOBufferData_internal(location, b, size, 
BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size));
+}
+
+IOBufferData *
+new_IOBufferData_internal(const char *loc, int64_t size_index, AllocType type)
+{
+  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_thread());
+  d->_location    = loc;
+  d->alloc(size_index, type);
+  return d;
+}
+
+// IRIX has a compiler bug which prevents this function
+// from being compiled correctly at -O3
+// so it is DUPLICATED in IOBuffer.cc
+// ****** IF YOU CHANGE THIS FUNCTION change that one as well.
+void
+IOBufferData::alloc(int64_t size_index, AllocType type)
+{
+  if (_data) {
+    dealloc();
+  }
+  _size_index = size_index;
+  _mem_type   = type;
+  iobuffer_mem_inc(_location, size_index);
+  switch (type) {
+  case MEMALIGNED:
+    if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(size_index)) {
+      _data = static_cast<char *>(ioBufAllocator[size_index].alloc_void());
+      // coverity[dead_error_condition]
+    } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(size_index)) {
+      _data = static_cast<char *>(ats_memalign(ats_pagesize(), 
index_to_buffer_size(size_index)));
+    }
+    break;
+  default:
+  case DEFAULT_ALLOC:
+    if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(size_index)) {
+      _data = static_cast<char *>(ioBufAllocator[size_index].alloc_void());
+    } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(size_index)) {
+      _data = static_cast<char 
*>(ats_malloc(BUFFER_SIZE_FOR_XMALLOC(size_index)));
+    }
+    break;
+  }
+}
+
+// ****** IF YOU CHANGE THIS FUNCTION change that one as well.
+
+void
+IOBufferData::dealloc()
+{
+  iobuffer_mem_dec(_location, _size_index);
+  switch (_mem_type) {
+  case MEMALIGNED:
+    if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
+      ioBufAllocator[_size_index].free_void(_data);
+    } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(_size_index)) {
+      ::free(_data);
+    }
+    break;
+  default:
+  case DEFAULT_ALLOC:
+    if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
+      ioBufAllocator[_size_index].free_void(_data);
+    } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(_size_index)) {
+      ats_free(_data);
+    }
+    break;
+  }
+  _data       = nullptr;
+  _size_index = BUFFER_SIZE_NOT_ALLOCATED;
+  _mem_type   = NO_ALLOC;
+}
+
+void
+IOBufferData::free()
+{
+  dealloc();
+  THREAD_FREE(this, ioDataAllocator, this_thread());
+}
+
+//////////////////////////////////////////////////////////////////
+//
+//  class IOBufferBlock --
+//         inline functions definitions
+//
+//////////////////////////////////////////////////////////////////
+IOBufferBlock *
+new_IOBufferBlock_internal(const char *location)
+{
+  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_thread());
+  b->_location     = location;
+  return b;
+}
+
+IOBufferBlock *
+new_IOBufferBlock_internal(const char *location, IOBufferData *d, int64_t len, 
int64_t offset)
+{
+  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_thread());
+  b->_location     = location;
+  b->set(d, len, offset);
+  return b;
+}
+
+IOBufferBlock::IOBufferBlock()
+{
+  return;
+}
+
+void
+IOBufferBlock::consume(int64_t len)
+{
+  _start += len;
+  ink_assert(_start <= _end);
+}
+
+void
+IOBufferBlock::fill(int64_t len)
+{
+  _end += len;
+  ink_assert(_end <= _buf_end);
+}
+
+void
+IOBufferBlock::reset()
+{
+  _end = _start = buf();
+  _buf_end      = buf() + data->block_size();
+}
+
+void
+IOBufferBlock::alloc(int64_t i)
+{
+  ink_assert(BUFFER_SIZE_ALLOCATED(i));
+  data = new_IOBufferData_internal(_location, i);
+  reset();
+}
+
+void
+IOBufferBlock::clear()
+{
+  data = nullptr;
+
+  IOBufferBlock *p = next.get();
+  while (p) {
+    // If our block pointer refcount dropped to zero,
+    // recursively free the list.
+    if (p->refcount_dec() == 0) {
+      IOBufferBlock *n = p->next.detach();
+      p->free();
+      p = n;
+    } else {
+      // We don't hold the last refcount, so we are done.
+      break;
+    }
+  }
+
+  // Nuke the next pointer without dropping the refcount
+  // because we already manually did that.
+  next.detach();
+
+  _buf_end = _end = _start = nullptr;
+}
+
+IOBufferBlock *
+IOBufferBlock::clone() const
+{
+  IOBufferBlock *b = new_IOBufferBlock_internal(_location);
+  b->data          = data;
+  b->_start        = _start;
+  b->_end          = _end;
+  b->_buf_end      = _end;
+  b->_location     = _location;
+  return b;
+}
+
+void
+IOBufferBlock::dealloc()
+{
+  clear();
+}
+
+void
+IOBufferBlock::free()
+{
+  dealloc();
+  THREAD_FREE(this, ioBlockAllocator, this_thread());
+}
+
+void
+IOBufferBlock::set_internal(void *b, int64_t len, int64_t asize_index)
+{
+  data        = new_IOBufferData_internal(_location, 
BUFFER_SIZE_NOT_ALLOCATED);
+  data->_data = static_cast<char *>(b);
+  iobuffer_mem_inc(_location, asize_index);
+  data->_size_index = asize_index;
+  reset();
+  _end = _start + len;
+}
+
+void
+IOBufferBlock::set(IOBufferData *d, int64_t len, int64_t offset)
+{
+  data     = d;
+  _start   = buf() + offset;
+  _end     = _start + len;
+  _buf_end = buf() + d->block_size();
+}
+
+//////////////////////////////////////////////////////////////////
+//
+//  class IOBufferReader --
+//         inline functions definitions
+//
+//////////////////////////////////////////////////////////////////
+void
+IOBufferReader::skip_empty_blocks()
+{
+  while (block->next && block->next->read_avail() && start_offset >= 
block->size()) {
+    start_offset -= block->size();
+    block         = block->next;
+  }
+}
+
+bool
+IOBufferReader::low_water()
+{
+  return mbuf->low_water();
+}
+
+bool
+IOBufferReader::high_water()
+{
+  return read_avail() >= mbuf->water_mark;
+}
+
+bool
+IOBufferReader::current_low_water()
+{
+  return mbuf->current_low_water();
+}
+
+IOBufferBlock *
+IOBufferReader::get_current_block()
+{
+  return block.get();
+}
+
+char *
+IOBufferReader::start()
+{
+  if (!block) {
+    return nullptr;
+  }
+
+  skip_empty_blocks();
+  return block->start() + start_offset;
+}
+
+char *
+IOBufferReader::end()
+{
+  if (!block) {
+    return nullptr;
+  }
+
+  skip_empty_blocks();
+  return block->end();
+}
+
+int64_t
+IOBufferReader::block_read_avail()
+{
+  if (!block) {
+    return 0;
+  }
+
+  skip_empty_blocks();
+  return static_cast<int64_t>(block->end() - (block->start() + start_offset));
+}
+
+std::string_view
+IOBufferReader::block_read_view()
+{
+  const char *start = this->start(); // empty blocks are skipped in here.
+  return start ? std::string_view{start, static_cast<size_t>(block->end() - 
start)} : std::string_view{};
+}
+
+int
+IOBufferReader::block_count()
+{
+  int            count = 0;
+  IOBufferBlock *b     = block.get();
+
+  while (b) {
+    count++;
+    b = b->next.get();
+  }
+
+  return count;
+}
+
+int64_t
+IOBufferReader::read_avail()
+{
+  int64_t        t = 0;
+  IOBufferBlock *b = block.get();
+
+  while (b) {
+    t += b->read_avail();
+    b  = b->next.get();
+  }
+
+  t -= start_offset;
+  if (size_limit != INT64_MAX && t > size_limit) {
+    t = size_limit;
+  }
+
+  return t;
+}
+
+bool
+IOBufferReader::is_read_avail_more_than(int64_t size)
+{
+  int64_t        t = -start_offset;
+  IOBufferBlock *b = block.get();
+
+  while (b) {
+    t += b->read_avail();
+    if (t > size) {
+      return true;
+    }
+    b = b->next.get();
+  }
+  return false;
+}
+
+void
+IOBufferReader::consume(int64_t n)
+{
+  ink_assert(read_avail() >= n);
+  start_offset += n;
+  if (size_limit != INT64_MAX) {
+    size_limit -= n;
+  }
+
+  ink_assert(size_limit >= 0);
+  if (!block) {
+    return;
+  }
+
+  int64_t r = block->read_avail();
+  int64_t s = start_offset;
+  while (r <= s && block->next && block->next->read_avail()) {
+    s            -= r;
+    start_offset  = s;
+    block         = block->next;
+    r             = block->read_avail();
+  }
+}
+
+char &
+IOBufferReader::operator[](int64_t i)
+{
+  static char    default_ret = '\0'; // This is just to avoid compiler 
warnings...
+  IOBufferBlock *b           = block.get();
+
+  i += start_offset;
+  while (b) {
+    int64_t bytes = b->read_avail();
+    if (bytes > i) {
+      return b->start()[i];
+    }
+    i -= bytes;
+    b  = b->next.get();
+  }
+
+  ink_release_assert(!"out of range");
+  // Never used, just to satisfy compilers not understanding the fatality of 
ink_release_assert().
+  return default_ret;
+}
+
+void
+IOBufferReader::clear()
+{
+  accessor     = nullptr;
+  block        = nullptr;
+  mbuf         = nullptr;
+  start_offset = 0;
+  size_limit   = INT64_MAX;
+}
+
+void
+IOBufferReader::reset()
+{
+  block        = mbuf->_writer;
+  start_offset = 0;
+  size_limit   = INT64_MAX;
+}
+
+////////////////////////////////////////////////////////////////
+//
+//  class MIOBuffer --
+//      inline functions definitions
+//
+////////////////////////////////////////////////////////////////
+extern ClassAllocator<MIOBuffer> ioAllocator;
+
+MIOBuffer::MIOBuffer(int64_t default_size_index)
+{
+  clear();
+  size_index = default_size_index;
+  _location  = nullptr;
+  return;
+}
+
+MIOBuffer::MIOBuffer()
+{
+  clear();
+  _location = nullptr;
+  return;
+}
+
+MIOBuffer::~MIOBuffer()
+{
+  _writer = nullptr;
+  dealloc_all_readers();
+}
+
+MIOBuffer *
+new_MIOBuffer_internal(const char *location, int64_t size_index)
+{
+  MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_thread());
+  b->_location = location;
+  b->alloc(size_index);
+  b->water_mark = 0;
+  return b;
+}
+
+void
+free_MIOBuffer(MIOBuffer *mio)
+{
+  mio->_writer = nullptr;
+  mio->dealloc_all_readers();
+  THREAD_FREE(mio, ioAllocator, this_thread());
+}
+
+MIOBuffer *
+new_empty_MIOBuffer_internal(const char *location, int64_t size_index)
+{
+  MIOBuffer *b  = THREAD_ALLOC(ioAllocator, this_thread());
+  b->size_index = size_index;
+  b->water_mark = 0;
+  b->_location  = location;
+  return b;
+}
+
+void
+free_empty_MIOBuffer(MIOBuffer *mio)
+{
+  THREAD_FREE(mio, ioAllocator, this_thread());
+}
+
+IOBufferReader *
+MIOBuffer::alloc_accessor(MIOBufferAccessor *anAccessor)
+{
+  int i;
+  for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
+    if (!readers[i].allocated()) {
+      break;
+    }
+  }
+
+  // TODO refactor code to return nullptr at some point
+  ink_release_assert(i < MAX_MIOBUFFER_READERS);
+
+  IOBufferReader *e = &readers[i];
+  e->mbuf           = this;
+  e->reset();
+  e->accessor = anAccessor;
+
+  return e;
+}
+
+IOBufferReader *
+MIOBuffer::alloc_reader()
+{
+  int i;
+  for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
+    if (!readers[i].allocated()) {
+      break;
+    }
+  }
+
+  // TODO refactor code to return nullptr at some point
+  ink_release_assert(i < MAX_MIOBUFFER_READERS);
+
+  IOBufferReader *e = &readers[i];
+  e->mbuf           = this;
+  e->reset();
+  e->accessor = nullptr;
+
+  return e;
+}
+
+int64_t
+MIOBuffer::block_size()
+{
+  return index_to_buffer_size(size_index);
+}
+IOBufferReader *
+MIOBuffer::clone_reader(IOBufferReader *r)
+{
+  int i;
+  for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
+    if (!readers[i].allocated()) {
+      break;
+    }
+  }
+
+  // TODO refactor code to return nullptr at some point
+  ink_release_assert(i < MAX_MIOBUFFER_READERS);
+
+  IOBufferReader *e = &readers[i];
+  e->mbuf           = this;
+  e->accessor       = nullptr;
+  e->block          = r->block;
+  e->start_offset   = r->start_offset;
+  e->size_limit     = r->size_limit;
+  ink_assert(e->size_limit >= 0);
+
+  return e;
+}
+
+int64_t
+MIOBuffer::block_write_avail()
+{
+  IOBufferBlock *b = first_write_block();
+  return b ? b->write_avail() : 0;
+}
+
+////////////////////////////////////////////////////////////////
+//
+//  MIOBuffer::append_block()
+//
+//  Appends a block to writer->next and make it the current
+//  block.
+//  Note that the block is not appended to the end of the list.
+//  That means that if writer->next was not null before this
+//  call then the block that writer->next was pointing to will
+//  have its reference count decremented and writer->next
+//  will have a new value which is the new block.
+//  In any case the new appended block becomes the current
+//  block.
+//
+////////////////////////////////////////////////////////////////
+void
+MIOBuffer::append_block_internal(IOBufferBlock *b)
+{
+  // It would be nice to remove an empty buffer at the beginning,
+  // but this breaks HTTP.
+  if (!_writer) {
+    _writer = b;
+    init_readers();
+  } else {
+    ink_assert(!_writer->next || !_writer->next->read_avail());
+    _writer->next = b;
+    while (b->read_avail()) {
+      _writer = b;
+      b       = b->next.get();
+      if (!b) {
+        break;
+      }
+    }
+  }
+  while (_writer->next && !_writer->write_avail() && 
_writer->next->read_avail()) {
+    _writer = _writer->next;
+  }
+}
+
+void
+MIOBuffer::append_block(IOBufferBlock *b)
+{
+  ink_assert(b->read_avail());
+  append_block_internal(b);
+}
+
+////////////////////////////////////////////////////////////////
+//
+//  MIOBuffer::append_block()
+//
+//  Allocate a block, appends it to current->next
+//  and make the new block the current block (writer).
+//
+////////////////////////////////////////////////////////////////
+void
+MIOBuffer::append_block(int64_t asize_index)
+{
+  ink_assert(BUFFER_SIZE_ALLOCATED(asize_index));
+  IOBufferBlock *b = new_IOBufferBlock_internal(_location);
+  b->alloc(asize_index);
+  append_block_internal(b);
+  return;
+}
+
+void
+MIOBuffer::add_block()
+{
+  if (this->_writer == nullptr || this->_writer->next == nullptr) {
+    append_block(size_index);
+  }
+}
+
+void
+MIOBuffer::check_add_block()
+{
+  if (!high_water() && current_low_water()) {
+    add_block();
+  }
+}
+
+IOBufferBlock *
+MIOBuffer::get_current_block()
+{
+  return first_write_block();
+}
+
+//////////////////////////////////////////////////////////////////
+//
+//  MIOBuffer::current_write_avail()
+//
+//  returns the total space available in all blocks.
+//  This function is different than write_avail() because
+//  it will not append a new block if there is no space
+//  or below the watermark space available.
+//
+//////////////////////////////////////////////////////////////////
+int64_t
+MIOBuffer::current_write_avail()
+{
+  int64_t        t = 0;
+  IOBufferBlock *b = _writer.get();
+  while (b) {
+    t += b->write_avail();
+    b  = b->next.get();
+  }
+  return t;
+}
+
+//////////////////////////////////////////////////////////////////
+//
+//  MIOBuffer::write_avail()
+//
+//  returns the number of bytes available in the current block.
+//  If there is no current block or not enough free space in
+//  the current block then a new block is appended.
+//
+//////////////////////////////////////////////////////////////////
+int64_t
+MIOBuffer::write_avail()
+{
+  check_add_block();
+  return current_write_avail();
+}
+
+void
+MIOBuffer::fill(int64_t len)
+{
+  int64_t f = _writer->write_avail();
+  while (f < len) {
+    _writer->fill(f);
+    len -= f;
+    if (len > 0) {
+      _writer = _writer->next;
+    }
+    f = _writer->write_avail();
+  }
+  _writer->fill(len);
+}
+
+int
+MIOBuffer::max_block_count()
+{
+  int maxb = 0;
+  for (auto &reader : readers) {
+    if (reader.allocated()) {
+      int c = reader.block_count();
+      if (c > maxb) {
+        maxb = c;
+      }
+    }
+  }
+  return maxb;
+}
+
+int64_t
+MIOBuffer::max_read_avail()
+{
+  int64_t s     = 0;
+  int     found = 0;
+  for (auto &reader : readers) {
+    if (reader.allocated()) {
+      int64_t ss = reader.read_avail();
+      if (ss > s) {
+        s = ss;
+      }
+      found = 1;
+    }
+  }
+  if (!found && _writer) {
+    return _writer->read_avail();
+  }
+  return s;
+}
+
+void
+MIOBuffer::set(void *b, int64_t len)
+{
+  _writer = new_IOBufferBlock_internal(_location);
+  _writer->set_internal(b, len, BUFFER_SIZE_INDEX_FOR_CONSTANT_SIZE(len));
+  init_readers();
+}
+
+void
+MIOBuffer::append_xmalloced(void *b, int64_t len)
+{
+  if (len == 0) {
+    return;
+  }
+
+  IOBufferBlock *x = new_IOBufferBlock_internal(_location);
+  x->set_internal(b, len, BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(len));
+  append_block_internal(x);
+}
+
+void
+MIOBuffer::append_fast_allocated(void *b, int64_t len, int64_t fast_size_index)
+{
+  IOBufferBlock *x = new_IOBufferBlock_internal(_location);
+  x->set_internal(b, len, fast_size_index);
+  append_block_internal(x);
+}
+
+void
+MIOBuffer::alloc(int64_t i)
+{
+  _writer = new_IOBufferBlock_internal(_location);
+  _writer->alloc(i);
+  size_index = i;
+  init_readers();
+}
+
+void
+MIOBuffer::dealloc_reader(IOBufferReader *e)
+{
+  if (e->accessor) {
+    ink_assert(e->accessor->writer() == this);
+    ink_assert(e->accessor->reader() == e);
+    e->accessor->clear();
+  }
+  e->clear();
+}
+
+IOBufferReader *
+IOBufferReader::clone()
+{
+  return mbuf->clone_reader(this);
+}
+
+void
+IOBufferReader::dealloc()
+{
+  mbuf->dealloc_reader(this);
+}
+
+void
+MIOBuffer::dealloc_all_readers()
+{
+  for (auto &reader : readers) {
+    if (reader.allocated()) {
+      dealloc_reader(&reader);
+    }
+  }
+}
+
+void
+MIOBufferAccessor::reader_for(MIOBuffer *abuf)
+{
+  mbuf = abuf;
+  if (abuf) {
+    entry = mbuf->alloc_accessor(this);
+  } else {
+    entry = nullptr;
+  }
+}
+
+void
+MIOBufferAccessor::reader_for(IOBufferReader *areader)
+{
+  if (entry == areader) {
+    return;
+  }
+  mbuf  = areader->mbuf;
+  entry = areader;
+  ink_assert(mbuf);
+}
+
+void
+MIOBufferAccessor::writer_for(MIOBuffer *abuf)
+{
+  mbuf  = abuf;
+  entry = nullptr;
+}
+
+MIOBufferAccessor::~MIOBufferAccessor() {}
+
 //
 // General Buffer Allocator
 //
diff --git a/src/iocore/eventsystem/Inline.cc b/src/iocore/eventsystem/Inline.cc
deleted file mode 100644
index dc708c272e..0000000000
--- a/src/iocore/eventsystem/Inline.cc
+++ /dev/null
@@ -1,30 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/*
- * Inline Functions as globals for users using the public interface
- *
- */
-
-#define TS_INLINE
-#include "P_EventSystem.h"
diff --git a/src/iocore/eventsystem/Lock.cc b/src/iocore/eventsystem/Lock.cc
index bee54f43a3..e5d14a0bc0 100644
--- a/src/iocore/eventsystem/Lock.cc
+++ b/src/iocore/eventsystem/Lock.cc
@@ -28,8 +28,8 @@
 
 
 **************************************************************************/
-#include "P_EventSystem.h"
-#include "tscore/Diags.h"
+#include "iocore/eventsystem/Lock.h"
+#include "tsutil/DbgCtl.h"
 
 ClassAllocator<ProxyMutex> mutexAllocator("mutexAllocator");
 
diff --git a/src/iocore/eventsystem/PQ-List.cc 
b/src/iocore/eventsystem/PQ-List.cc
index 35af521a68..de8aad30ef 100644
--- a/src/iocore/eventsystem/PQ-List.cc
+++ b/src/iocore/eventsystem/PQ-List.cc
@@ -21,7 +21,8 @@
   limitations under the License.
  */
 
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/PriorityEventQueue.h"
+#include "iocore/eventsystem/EThread.h"
 
 PriorityEventQueue::PriorityEventQueue()
 {
diff --git a/src/iocore/eventsystem/P_EventSystem.h 
b/src/iocore/eventsystem/P_EventSystem.h
deleted file mode 100644
index 53b4b463f7..0000000000
--- a/src/iocore/eventsystem/P_EventSystem.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/****************************************************************************
-
-  Event Subsystem
-
-
-
-**************************************************************************/
-#pragma once
-#define _P_EventSystem_H
-
-#include "tscore/ink_platform.h"
-
-#include "iocore/eventsystem/EventSystem.h"
-#include "iocore/eventsystem/Freer.h"
-
-#include "P_Thread.h"
-#include "P_VIO.h"
-#include "P_IOBuffer.h"
-#include "P_VConnection.h"
-#include "P_UnixEvent.h"
-#include "P_UnixEThread.h"
-#include "P_ProtectedQueue.h"
-#include "P_UnixEventProcessor.h"
-
-static constexpr ts::ModuleVersion 
EVENT_SYSTEM_MODULE_INTERNAL_VERSION{EVENT_SYSTEM_MODULE_PUBLIC_VERSION,
-                                                                        
ts::ModuleVersion::PRIVATE};
diff --git a/src/iocore/eventsystem/P_IOBuffer.h 
b/src/iocore/eventsystem/P_IOBuffer.h
deleted file mode 100644
index e159cbafc5..0000000000
--- a/src/iocore/eventsystem/P_IOBuffer.h
+++ /dev/null
@@ -1,1034 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-#pragma once
-
-#include "iocore/eventsystem/IOBuffer.h"
-#include "iocore/eventsystem/Thread.h"
-#include "tscore/ink_memory.h"
-#include "tscore/ink_resource.h"
-
-// TODO: I think we're overly aggressive here on making MIOBuffer 64-bit
-// but not sure it's worthwhile changing anything to 32-bit honestly.
-
-//////////////////////////////////////////////////////////////
-//
-// returns 0 for DEFAULT_BUFFER_BASE_SIZE,
-// +1 for each power of 2
-//
-//////////////////////////////////////////////////////////////
-TS_INLINE int64_t
-buffer_size_to_index(int64_t size, int64_t max)
-{
-  int64_t r = max;
-
-  while (r && BUFFER_SIZE_FOR_INDEX(r - 1) >= size) {
-    r--;
-  }
-  return r;
-}
-
-TS_INLINE int64_t
-iobuffer_size_to_index(int64_t size, int64_t max)
-{
-  if (size > BUFFER_SIZE_FOR_INDEX(max)) {
-    return BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size);
-  }
-  return buffer_size_to_index(size, max);
-}
-
-TS_INLINE int64_t
-index_to_buffer_size(int64_t idx)
-{
-  if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(idx)) {
-    return BUFFER_SIZE_FOR_INDEX(idx);
-  } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(idx)) {
-    return BUFFER_SIZE_FOR_XMALLOC(idx);
-    // coverity[dead_error_condition]
-  } else if (BUFFER_SIZE_INDEX_IS_CONSTANT(idx)) {
-    return BUFFER_SIZE_FOR_CONSTANT(idx);
-  }
-  // coverity[dead_error_line]
-  return 0;
-}
-
-TS_INLINE IOBufferBlock *
-iobufferblock_clone(IOBufferBlock *src, int64_t offset, int64_t len)
-{
-  IOBufferBlock *start_buf   = nullptr;
-  IOBufferBlock *current_buf = nullptr;
-
-  while (src && len >= 0) {
-    char   *start     = src->_start;
-    char   *end       = src->_end;
-    int64_t max_bytes = end - start;
-
-    max_bytes -= offset;
-    if (max_bytes <= 0) {
-      offset = -max_bytes;
-      src    = src->next.get();
-      continue;
-    }
-
-    int64_t bytes = len;
-    if (bytes >= max_bytes) {
-      bytes = max_bytes;
-    }
-
-    IOBufferBlock *new_buf  = src->clone();
-    new_buf->_start        += offset;
-    new_buf->_buf_end = new_buf->_end = new_buf->_start + bytes;
-
-    if (!start_buf) {
-      start_buf   = new_buf;
-      current_buf = start_buf;
-    } else {
-      current_buf->next = new_buf;
-      current_buf       = new_buf;
-    }
-
-    len    -= bytes;
-    src     = src->next.get();
-    offset  = 0;
-  }
-
-  return start_buf;
-}
-
-TS_INLINE IOBufferBlock *
-iobufferblock_skip(IOBufferBlock *b, int64_t *poffset, int64_t *plen, int64_t 
write)
-{
-  int64_t offset = *poffset;
-  int64_t len    = write;
-
-  while (b && len >= 0) {
-    int64_t max_bytes = b->read_avail();
-
-    // If this block ends before the start offset, skip it
-    // and adjust the offset to consume its length.
-    max_bytes -= offset;
-    if (max_bytes <= 0) {
-      offset = -max_bytes;
-      b      = b->next.get();
-      continue;
-    }
-
-    if (len >= max_bytes) {
-      b       = b->next.get();
-      len    -= max_bytes;
-      offset  = 0;
-    } else {
-      offset = offset + len;
-      break;
-    }
-  }
-
-  *poffset  = offset;
-  *plen    -= write;
-  return b;
-}
-
-TS_INLINE void
-iobuffer_mem_inc(const char *_loc, int64_t _size_index)
-{
-  if (!res_track_memory) {
-    return;
-  }
-
-  if (!BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
-    return;
-  }
-
-  if (!_loc) {
-    _loc = "memory/IOBuffer/UNKNOWN-LOCATION";
-  }
-  ResourceTracker::increment(_loc, index_to_buffer_size(_size_index));
-}
-
-TS_INLINE void
-iobuffer_mem_dec(const char *_loc, int64_t _size_index)
-{
-  if (!res_track_memory) {
-    return;
-  }
-
-  if (!BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
-    return;
-  }
-  if (!_loc) {
-    _loc = "memory/IOBuffer/UNKNOWN-LOCATION";
-  }
-  ResourceTracker::increment(_loc, -index_to_buffer_size(_size_index));
-}
-
-//////////////////////////////////////////////////////////////////
-//
-// inline functions definitions
-//
-//////////////////////////////////////////////////////////////////
-//////////////////////////////////////////////////////////////////
-//
-//  class IOBufferData --
-//         inline functions definitions
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE int64_t
-IOBufferData::block_size()
-{
-  return index_to_buffer_size(_size_index);
-}
-
-TS_INLINE IOBufferData *
-new_IOBufferData_internal(const char *location, void *b, int64_t size, int64_t 
asize_index)
-{
-  (void)size;
-  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_thread());
-  d->_size_index  = asize_index;
-  ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index) || size <= 
d->block_size());
-  d->_location = location;
-  d->_data     = static_cast<char *>(b);
-  return d;
-}
-
-TS_INLINE IOBufferData *
-new_xmalloc_IOBufferData_internal(const char *location, void *b, int64_t size)
-{
-  return new_IOBufferData_internal(location, b, size, 
BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size));
-}
-
-TS_INLINE IOBufferData *
-new_IOBufferData_internal(const char *loc, int64_t size_index, AllocType type)
-{
-  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_thread());
-  d->_location    = loc;
-  d->alloc(size_index, type);
-  return d;
-}
-
-// IRIX has a compiler bug which prevents this function
-// from being compiled correctly at -O3
-// so it is DUPLICATED in IOBuffer.cc
-// ****** IF YOU CHANGE THIS FUNCTION change that one as well.
-TS_INLINE void
-IOBufferData::alloc(int64_t size_index, AllocType type)
-{
-  if (_data) {
-    dealloc();
-  }
-  _size_index = size_index;
-  _mem_type   = type;
-  iobuffer_mem_inc(_location, size_index);
-  switch (type) {
-  case MEMALIGNED:
-    if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(size_index)) {
-      _data = static_cast<char *>(ioBufAllocator[size_index].alloc_void());
-      // coverity[dead_error_condition]
-    } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(size_index)) {
-      _data = static_cast<char *>(ats_memalign(ats_pagesize(), 
index_to_buffer_size(size_index)));
-    }
-    break;
-  default:
-  case DEFAULT_ALLOC:
-    if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(size_index)) {
-      _data = static_cast<char *>(ioBufAllocator[size_index].alloc_void());
-    } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(size_index)) {
-      _data = static_cast<char 
*>(ats_malloc(BUFFER_SIZE_FOR_XMALLOC(size_index)));
-    }
-    break;
-  }
-}
-
-// ****** IF YOU CHANGE THIS FUNCTION change that one as well.
-
-TS_INLINE void
-IOBufferData::dealloc()
-{
-  iobuffer_mem_dec(_location, _size_index);
-  switch (_mem_type) {
-  case MEMALIGNED:
-    if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
-      ioBufAllocator[_size_index].free_void(_data);
-    } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(_size_index)) {
-      ::free(_data);
-    }
-    break;
-  default:
-  case DEFAULT_ALLOC:
-    if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(_size_index)) {
-      ioBufAllocator[_size_index].free_void(_data);
-    } else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(_size_index)) {
-      ats_free(_data);
-    }
-    break;
-  }
-  _data       = nullptr;
-  _size_index = BUFFER_SIZE_NOT_ALLOCATED;
-  _mem_type   = NO_ALLOC;
-}
-
-TS_INLINE void
-IOBufferData::free()
-{
-  dealloc();
-  THREAD_FREE(this, ioDataAllocator, this_thread());
-}
-
-//////////////////////////////////////////////////////////////////
-//
-//  class IOBufferBlock --
-//         inline functions definitions
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE IOBufferBlock *
-new_IOBufferBlock_internal(const char *location)
-{
-  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_thread());
-  b->_location     = location;
-  return b;
-}
-
-TS_INLINE IOBufferBlock *
-new_IOBufferBlock_internal(const char *location, IOBufferData *d, int64_t len, 
int64_t offset)
-{
-  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_thread());
-  b->_location     = location;
-  b->set(d, len, offset);
-  return b;
-}
-
-TS_INLINE
-IOBufferBlock::IOBufferBlock()
-{
-  return;
-}
-
-TS_INLINE void
-IOBufferBlock::consume(int64_t len)
-{
-  _start += len;
-  ink_assert(_start <= _end);
-}
-
-TS_INLINE void
-IOBufferBlock::fill(int64_t len)
-{
-  _end += len;
-  ink_assert(_end <= _buf_end);
-}
-
-TS_INLINE void
-IOBufferBlock::reset()
-{
-  _end = _start = buf();
-  _buf_end      = buf() + data->block_size();
-}
-
-TS_INLINE void
-IOBufferBlock::alloc(int64_t i)
-{
-  ink_assert(BUFFER_SIZE_ALLOCATED(i));
-  data = new_IOBufferData_internal(_location, i);
-  reset();
-}
-
-TS_INLINE void
-IOBufferBlock::clear()
-{
-  data = nullptr;
-
-  IOBufferBlock *p = next.get();
-  while (p) {
-    // If our block pointer refcount dropped to zero,
-    // recursively free the list.
-    if (p->refcount_dec() == 0) {
-      IOBufferBlock *n = p->next.detach();
-      p->free();
-      p = n;
-    } else {
-      // We don't hold the last refcount, so we are done.
-      break;
-    }
-  }
-
-  // Nuke the next pointer without dropping the refcount
-  // because we already manually did that.
-  next.detach();
-
-  _buf_end = _end = _start = nullptr;
-}
-
-TS_INLINE IOBufferBlock *
-IOBufferBlock::clone() const
-{
-  IOBufferBlock *b = new_IOBufferBlock_internal(_location);
-  b->data          = data;
-  b->_start        = _start;
-  b->_end          = _end;
-  b->_buf_end      = _end;
-  b->_location     = _location;
-  return b;
-}
-
-TS_INLINE void
-IOBufferBlock::dealloc()
-{
-  clear();
-}
-
-TS_INLINE void
-IOBufferBlock::free()
-{
-  dealloc();
-  THREAD_FREE(this, ioBlockAllocator, this_thread());
-}
-
-TS_INLINE void
-IOBufferBlock::set_internal(void *b, int64_t len, int64_t asize_index)
-{
-  data        = new_IOBufferData_internal(_location, 
BUFFER_SIZE_NOT_ALLOCATED);
-  data->_data = static_cast<char *>(b);
-  iobuffer_mem_inc(_location, asize_index);
-  data->_size_index = asize_index;
-  reset();
-  _end = _start + len;
-}
-
-TS_INLINE void
-IOBufferBlock::set(IOBufferData *d, int64_t len, int64_t offset)
-{
-  data     = d;
-  _start   = buf() + offset;
-  _end     = _start + len;
-  _buf_end = buf() + d->block_size();
-}
-
-//////////////////////////////////////////////////////////////////
-//
-//  class IOBufferReader --
-//         inline functions definitions
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE void
-IOBufferReader::skip_empty_blocks()
-{
-  while (block->next && block->next->read_avail() && start_offset >= 
block->size()) {
-    start_offset -= block->size();
-    block         = block->next;
-  }
-}
-
-TS_INLINE bool
-IOBufferReader::low_water()
-{
-  return mbuf->low_water();
-}
-
-TS_INLINE bool
-IOBufferReader::high_water()
-{
-  return read_avail() >= mbuf->water_mark;
-}
-
-TS_INLINE bool
-IOBufferReader::current_low_water()
-{
-  return mbuf->current_low_water();
-}
-
-TS_INLINE IOBufferBlock *
-IOBufferReader::get_current_block()
-{
-  return block.get();
-}
-
-TS_INLINE char *
-IOBufferReader::start()
-{
-  if (!block) {
-    return nullptr;
-  }
-
-  skip_empty_blocks();
-  return block->start() + start_offset;
-}
-
-TS_INLINE char *
-IOBufferReader::end()
-{
-  if (!block) {
-    return nullptr;
-  }
-
-  skip_empty_blocks();
-  return block->end();
-}
-
-TS_INLINE int64_t
-IOBufferReader::block_read_avail()
-{
-  if (!block) {
-    return 0;
-  }
-
-  skip_empty_blocks();
-  return static_cast<int64_t>(block->end() - (block->start() + start_offset));
-}
-
-inline std::string_view
-IOBufferReader::block_read_view()
-{
-  const char *start = this->start(); // empty blocks are skipped in here.
-  return start ? std::string_view{start, static_cast<size_t>(block->end() - 
start)} : std::string_view{};
-}
-
-TS_INLINE int
-IOBufferReader::block_count()
-{
-  int            count = 0;
-  IOBufferBlock *b     = block.get();
-
-  while (b) {
-    count++;
-    b = b->next.get();
-  }
-
-  return count;
-}
-
-TS_INLINE int64_t
-IOBufferReader::read_avail()
-{
-  int64_t        t = 0;
-  IOBufferBlock *b = block.get();
-
-  while (b) {
-    t += b->read_avail();
-    b  = b->next.get();
-  }
-
-  t -= start_offset;
-  if (size_limit != INT64_MAX && t > size_limit) {
-    t = size_limit;
-  }
-
-  return t;
-}
-
-TS_INLINE bool
-IOBufferReader::is_read_avail_more_than(int64_t size)
-{
-  int64_t        t = -start_offset;
-  IOBufferBlock *b = block.get();
-
-  while (b) {
-    t += b->read_avail();
-    if (t > size) {
-      return true;
-    }
-    b = b->next.get();
-  }
-  return false;
-}
-
-TS_INLINE void
-IOBufferReader::consume(int64_t n)
-{
-  ink_assert(read_avail() >= n);
-  start_offset += n;
-  if (size_limit != INT64_MAX) {
-    size_limit -= n;
-  }
-
-  ink_assert(size_limit >= 0);
-  if (!block) {
-    return;
-  }
-
-  int64_t r = block->read_avail();
-  int64_t s = start_offset;
-  while (r <= s && block->next && block->next->read_avail()) {
-    s            -= r;
-    start_offset  = s;
-    block         = block->next;
-    r             = block->read_avail();
-  }
-}
-
-TS_INLINE char &
-IOBufferReader::operator[](int64_t i)
-{
-  static char    default_ret = '\0'; // This is just to avoid compiler 
warnings...
-  IOBufferBlock *b           = block.get();
-
-  i += start_offset;
-  while (b) {
-    int64_t bytes = b->read_avail();
-    if (bytes > i) {
-      return b->start()[i];
-    }
-    i -= bytes;
-    b  = b->next.get();
-  }
-
-  ink_release_assert(!"out of range");
-  // Never used, just to satisfy compilers not understanding the fatality of 
ink_release_assert().
-  return default_ret;
-}
-
-TS_INLINE void
-IOBufferReader::clear()
-{
-  accessor     = nullptr;
-  block        = nullptr;
-  mbuf         = nullptr;
-  start_offset = 0;
-  size_limit   = INT64_MAX;
-}
-
-TS_INLINE void
-IOBufferReader::reset()
-{
-  block        = mbuf->_writer;
-  start_offset = 0;
-  size_limit   = INT64_MAX;
-}
-
-////////////////////////////////////////////////////////////////
-//
-//  class MIOBuffer --
-//      inline functions definitions
-//
-////////////////////////////////////////////////////////////////
-extern ClassAllocator<MIOBuffer> ioAllocator;
-
-TS_INLINE
-MIOBuffer::MIOBuffer(int64_t default_size_index)
-{
-  clear();
-  size_index = default_size_index;
-  _location  = nullptr;
-  return;
-}
-
-TS_INLINE
-MIOBuffer::MIOBuffer()
-{
-  clear();
-  _location = nullptr;
-  return;
-}
-
-TS_INLINE
-MIOBuffer::~MIOBuffer()
-{
-  _writer = nullptr;
-  dealloc_all_readers();
-}
-
-TS_INLINE MIOBuffer *
-new_MIOBuffer_internal(const char *location, int64_t size_index)
-{
-  MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_thread());
-  b->_location = location;
-  b->alloc(size_index);
-  b->water_mark = 0;
-  return b;
-}
-
-TS_INLINE void
-free_MIOBuffer(MIOBuffer *mio)
-{
-  mio->_writer = nullptr;
-  mio->dealloc_all_readers();
-  THREAD_FREE(mio, ioAllocator, this_thread());
-}
-
-TS_INLINE MIOBuffer *
-new_empty_MIOBuffer_internal(const char *location, int64_t size_index)
-{
-  MIOBuffer *b  = THREAD_ALLOC(ioAllocator, this_thread());
-  b->size_index = size_index;
-  b->water_mark = 0;
-  b->_location  = location;
-  return b;
-}
-
-TS_INLINE void
-free_empty_MIOBuffer(MIOBuffer *mio)
-{
-  THREAD_FREE(mio, ioAllocator, this_thread());
-}
-
-TS_INLINE IOBufferReader *
-MIOBuffer::alloc_accessor(MIOBufferAccessor *anAccessor)
-{
-  int i;
-  for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
-    if (!readers[i].allocated()) {
-      break;
-    }
-  }
-
-  // TODO refactor code to return nullptr at some point
-  ink_release_assert(i < MAX_MIOBUFFER_READERS);
-
-  IOBufferReader *e = &readers[i];
-  e->mbuf           = this;
-  e->reset();
-  e->accessor = anAccessor;
-
-  return e;
-}
-
-TS_INLINE IOBufferReader *
-MIOBuffer::alloc_reader()
-{
-  int i;
-  for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
-    if (!readers[i].allocated()) {
-      break;
-    }
-  }
-
-  // TODO refactor code to return nullptr at some point
-  ink_release_assert(i < MAX_MIOBUFFER_READERS);
-
-  IOBufferReader *e = &readers[i];
-  e->mbuf           = this;
-  e->reset();
-  e->accessor = nullptr;
-
-  return e;
-}
-
-TS_INLINE int64_t
-MIOBuffer::block_size()
-{
-  return index_to_buffer_size(size_index);
-}
-TS_INLINE IOBufferReader *
-MIOBuffer::clone_reader(IOBufferReader *r)
-{
-  int i;
-  for (i = 0; i < MAX_MIOBUFFER_READERS; i++) {
-    if (!readers[i].allocated()) {
-      break;
-    }
-  }
-
-  // TODO refactor code to return nullptr at some point
-  ink_release_assert(i < MAX_MIOBUFFER_READERS);
-
-  IOBufferReader *e = &readers[i];
-  e->mbuf           = this;
-  e->accessor       = nullptr;
-  e->block          = r->block;
-  e->start_offset   = r->start_offset;
-  e->size_limit     = r->size_limit;
-  ink_assert(e->size_limit >= 0);
-
-  return e;
-}
-
-TS_INLINE int64_t
-MIOBuffer::block_write_avail()
-{
-  IOBufferBlock *b = first_write_block();
-  return b ? b->write_avail() : 0;
-}
-
-////////////////////////////////////////////////////////////////
-//
-//  MIOBuffer::append_block()
-//
-//  Appends a block to writer->next and make it the current
-//  block.
-//  Note that the block is not appended to the end of the list.
-//  That means that if writer->next was not null before this
-//  call then the block that writer->next was pointing to will
-//  have its reference count decremented and writer->next
-//  will have a new value which is the new block.
-//  In any case the new appended block becomes the current
-//  block.
-//
-////////////////////////////////////////////////////////////////
-TS_INLINE void
-MIOBuffer::append_block_internal(IOBufferBlock *b)
-{
-  // It would be nice to remove an empty buffer at the beginning,
-  // but this breaks HTTP.
-  if (!_writer) {
-    _writer = b;
-    init_readers();
-  } else {
-    ink_assert(!_writer->next || !_writer->next->read_avail());
-    _writer->next = b;
-    while (b->read_avail()) {
-      _writer = b;
-      b       = b->next.get();
-      if (!b) {
-        break;
-      }
-    }
-  }
-  while (_writer->next && !_writer->write_avail() && 
_writer->next->read_avail()) {
-    _writer = _writer->next;
-  }
-}
-
-TS_INLINE void
-MIOBuffer::append_block(IOBufferBlock *b)
-{
-  ink_assert(b->read_avail());
-  append_block_internal(b);
-}
-
-////////////////////////////////////////////////////////////////
-//
-//  MIOBuffer::append_block()
-//
-//  Allocate a block, appends it to current->next
-//  and make the new block the current block (writer).
-//
-////////////////////////////////////////////////////////////////
-TS_INLINE void
-MIOBuffer::append_block(int64_t asize_index)
-{
-  ink_assert(BUFFER_SIZE_ALLOCATED(asize_index));
-  IOBufferBlock *b = new_IOBufferBlock_internal(_location);
-  b->alloc(asize_index);
-  append_block_internal(b);
-  return;
-}
-
-TS_INLINE void
-MIOBuffer::add_block()
-{
-  if (this->_writer == nullptr || this->_writer->next == nullptr) {
-    append_block(size_index);
-  }
-}
-
-TS_INLINE void
-MIOBuffer::check_add_block()
-{
-  if (!high_water() && current_low_water()) {
-    add_block();
-  }
-}
-
-TS_INLINE IOBufferBlock *
-MIOBuffer::get_current_block()
-{
-  return first_write_block();
-}
-
-//////////////////////////////////////////////////////////////////
-//
-//  MIOBuffer::current_write_avail()
-//
-//  returns the total space available in all blocks.
-//  This function is different than write_avail() because
-//  it will not append a new block if there is no space
-//  or below the watermark space available.
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE int64_t
-MIOBuffer::current_write_avail()
-{
-  int64_t        t = 0;
-  IOBufferBlock *b = _writer.get();
-  while (b) {
-    t += b->write_avail();
-    b  = b->next.get();
-  }
-  return t;
-}
-
-//////////////////////////////////////////////////////////////////
-//
-//  MIOBuffer::write_avail()
-//
-//  returns the number of bytes available in the current block.
-//  If there is no current block or not enough free space in
-//  the current block then a new block is appended.
-//
-//////////////////////////////////////////////////////////////////
-TS_INLINE int64_t
-MIOBuffer::write_avail()
-{
-  check_add_block();
-  return current_write_avail();
-}
-
-TS_INLINE void
-MIOBuffer::fill(int64_t len)
-{
-  int64_t f = _writer->write_avail();
-  while (f < len) {
-    _writer->fill(f);
-    len -= f;
-    if (len > 0) {
-      _writer = _writer->next;
-    }
-    f = _writer->write_avail();
-  }
-  _writer->fill(len);
-}
-
-TS_INLINE int
-MIOBuffer::max_block_count()
-{
-  int maxb = 0;
-  for (auto &reader : readers) {
-    if (reader.allocated()) {
-      int c = reader.block_count();
-      if (c > maxb) {
-        maxb = c;
-      }
-    }
-  }
-  return maxb;
-}
-
-TS_INLINE int64_t
-MIOBuffer::max_read_avail()
-{
-  int64_t s     = 0;
-  int     found = 0;
-  for (auto &reader : readers) {
-    if (reader.allocated()) {
-      int64_t ss = reader.read_avail();
-      if (ss > s) {
-        s = ss;
-      }
-      found = 1;
-    }
-  }
-  if (!found && _writer) {
-    return _writer->read_avail();
-  }
-  return s;
-}
-
-TS_INLINE void
-MIOBuffer::set(void *b, int64_t len)
-{
-  _writer = new_IOBufferBlock_internal(_location);
-  _writer->set_internal(b, len, BUFFER_SIZE_INDEX_FOR_CONSTANT_SIZE(len));
-  init_readers();
-}
-
-TS_INLINE void
-MIOBuffer::append_xmalloced(void *b, int64_t len)
-{
-  if (len == 0) {
-    return;
-  }
-
-  IOBufferBlock *x = new_IOBufferBlock_internal(_location);
-  x->set_internal(b, len, BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(len));
-  append_block_internal(x);
-}
-
-TS_INLINE void
-MIOBuffer::append_fast_allocated(void *b, int64_t len, int64_t fast_size_index)
-{
-  IOBufferBlock *x = new_IOBufferBlock_internal(_location);
-  x->set_internal(b, len, fast_size_index);
-  append_block_internal(x);
-}
-
-TS_INLINE void
-MIOBuffer::alloc(int64_t i)
-{
-  _writer = new_IOBufferBlock_internal(_location);
-  _writer->alloc(i);
-  size_index = i;
-  init_readers();
-}
-
-TS_INLINE void
-MIOBuffer::dealloc_reader(IOBufferReader *e)
-{
-  if (e->accessor) {
-    ink_assert(e->accessor->writer() == this);
-    ink_assert(e->accessor->reader() == e);
-    e->accessor->clear();
-  }
-  e->clear();
-}
-
-TS_INLINE IOBufferReader *
-IOBufferReader::clone()
-{
-  return mbuf->clone_reader(this);
-}
-
-TS_INLINE void
-IOBufferReader::dealloc()
-{
-  mbuf->dealloc_reader(this);
-}
-
-TS_INLINE void
-MIOBuffer::dealloc_all_readers()
-{
-  for (auto &reader : readers) {
-    if (reader.allocated()) {
-      dealloc_reader(&reader);
-    }
-  }
-}
-
-TS_INLINE void
-MIOBufferAccessor::reader_for(MIOBuffer *abuf)
-{
-  mbuf = abuf;
-  if (abuf) {
-    entry = mbuf->alloc_accessor(this);
-  } else {
-    entry = nullptr;
-  }
-}
-
-TS_INLINE void
-MIOBufferAccessor::reader_for(IOBufferReader *areader)
-{
-  if (entry == areader) {
-    return;
-  }
-  mbuf  = areader->mbuf;
-  entry = areader;
-  ink_assert(mbuf);
-}
-
-TS_INLINE void
-MIOBufferAccessor::writer_for(MIOBuffer *abuf)
-{
-  mbuf  = abuf;
-  entry = nullptr;
-}
-
-TS_INLINE
-MIOBufferAccessor::~MIOBufferAccessor() {}
diff --git a/src/iocore/eventsystem/P_ProtectedQueue.h 
b/src/iocore/eventsystem/P_ProtectedQueue.h
deleted file mode 100644
index 80c72ddb75..0000000000
--- a/src/iocore/eventsystem/P_ProtectedQueue.h
+++ /dev/null
@@ -1,83 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/****************************************************************************
-
-  Protected Queue
-
-
- ****************************************************************************/
-#pragma once
-
-#include "iocore/eventsystem/EventSystem.h"
-
-TS_INLINE
-ProtectedQueue::ProtectedQueue()
-{
-  Event e;
-  ink_mutex_init(&lock);
-  ink_atomiclist_init(&al, "ProtectedQueue", (char *)&e.link.next - (char 
*)&e);
-  ink_cond_init(&might_have_data);
-}
-
-TS_INLINE void
-ProtectedQueue::signal()
-{
-  // Need to get the lock before you can signal the thread
-  ink_mutex_acquire(&lock);
-  ink_cond_signal(&might_have_data);
-  ink_mutex_release(&lock);
-}
-
-TS_INLINE int
-ProtectedQueue::try_signal()
-{
-  // Need to get the lock before you can signal the thread
-  if (ink_mutex_try_acquire(&lock)) {
-    ink_cond_signal(&might_have_data);
-    ink_mutex_release(&lock);
-    return 1;
-  } else {
-    return 0;
-  }
-}
-
-// Called from the same thread (don't need to signal)
-TS_INLINE void
-ProtectedQueue::enqueue_local(Event *e)
-{
-  ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
-  e->in_the_prot_queue = 1;
-  localQueue.enqueue(e);
-}
-
-TS_INLINE Event *
-ProtectedQueue::dequeue_local()
-{
-  Event *e = localQueue.dequeue();
-  if (e) {
-    ink_assert(e->in_the_prot_queue);
-    e->in_the_prot_queue = 0;
-  }
-  return e;
-}
diff --git a/src/iocore/eventsystem/P_Thread.h 
b/src/iocore/eventsystem/P_Thread.h
deleted file mode 100644
index c4c1e07e71..0000000000
--- a/src/iocore/eventsystem/P_Thread.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/****************************************************************************
-
-  Basic Threads
-
-
-
-****************************************************************************/
-#pragma once
-
-#include "iocore/eventsystem/Thread.h"
-
-///////////////////////////////////////////////
-// Common Interface impl                     //
-///////////////////////////////////////////////
-
-TS_INLINE void
-Thread::set_specific()
-{
-  this_thread_ptr = this;
-}
-
-TS_INLINE Thread *
-this_thread()
-{
-  return Thread::this_thread_ptr;
-}
diff --git a/src/iocore/eventsystem/P_UnixEThread.h 
b/src/iocore/eventsystem/P_UnixEThread.h
deleted file mode 100644
index 81568ef966..0000000000
--- a/src/iocore/eventsystem/P_UnixEThread.h
+++ /dev/null
@@ -1,261 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/****************************************************************************
-
-  P_UnixEThread.h
-
-
-
-*****************************************************************************/
-#pragma once
-
-#include "iocore/eventsystem/EThread.h"
-#include "iocore/eventsystem/EventProcessor.h"
-#include "tscore/ink_atomic.h"
-#include <execinfo.h>
-
-const ink_hrtime DELAY_FOR_RETRY = HRTIME_MSECONDS(10);
-
-TS_INLINE Event *
-EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
-{
-  Event *e = ::eventAllocator.alloc();
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, 0, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, 
void *cookie)
-{
-  Event *e = ::eventAllocator.alloc();
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, t, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_in(Continuation *cont, ink_hrtime t, int callback_event, 
void *cookie)
-{
-  Event *e = ::eventAllocator.alloc();
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, ink_get_hrtime() + t, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, 
void *cookie)
-{
-  Event *e = ::eventAllocator.alloc();
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  if (t < 0) {
-    return schedule(e->init(cont, t, t));
-  } else {
-    return schedule(e->init(cont, ink_get_hrtime() + t, t));
-  }
-}
-
-TS_INLINE Event *
-EThread::schedule(Event *e)
-{
-  e->ethread = this;
-  if (tt != REGULAR) {
-    ink_assert(tt == DEDICATED);
-    return eventProcessor.schedule(e, ET_CALL);
-  }
-  if (e->continuation->mutex) {
-    e->mutex = e->continuation->mutex;
-  } else {
-    e->mutex = e->continuation->mutex = e->ethread->mutex;
-  }
-  ink_assert(e->mutex.get());
-
-  // Make sure client IP debugging works consistently
-  // The continuation that gets scheduled later is not always the
-  // client VC, it can be HttpCacheSM etc. so save the flags
-  e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
-
-  if (e->ethread == this_ethread()) {
-    EventQueueExternal.enqueue_local(e);
-  } else {
-    EventQueueExternal.enqueue(e);
-  }
-
-  return e;
-}
-
-TS_INLINE Event *
-EThread::schedule_imm_local(Continuation *cont, int callback_event, void 
*cookie)
-{
-  Event *e = EVENT_ALLOC(eventAllocator, this);
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule_local(e->init(cont, 0, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_at_local(Continuation *cont, ink_hrtime t, int 
callback_event, void *cookie)
-{
-  Event *e = EVENT_ALLOC(eventAllocator, this);
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule_local(e->init(cont, t, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_in_local(Continuation *cont, ink_hrtime t, int 
callback_event, void *cookie)
-{
-  Event *e = EVENT_ALLOC(eventAllocator, this);
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule_local(e->init(cont, ink_get_hrtime() + t, 0));
-}
-
-TS_INLINE Event *
-EThread::schedule_every_local(Continuation *cont, ink_hrtime t, int 
callback_event, void *cookie)
-{
-  Event *e = EVENT_ALLOC(eventAllocator, this);
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  if (t < 0) {
-    return schedule_local(e->init(cont, t, t));
-  } else {
-    return schedule_local(e->init(cont, ink_get_hrtime() + t, t));
-  }
-}
-
-TS_INLINE Event *
-EThread::schedule_local(Event *e)
-{
-  if (tt != REGULAR) {
-    ink_assert(tt == DEDICATED);
-    return eventProcessor.schedule(e, ET_CALL);
-  }
-  if (!e->mutex) {
-    e->ethread = this;
-    e->mutex   = e->continuation->mutex;
-  } else {
-    ink_assert(e->ethread == this);
-  }
-  e->globally_allocated = false;
-
-  // Make sure client IP debugging works consistently
-  // The continuation that gets scheduled later is not always the
-  // client VC, it can be HttpCacheSM etc. so save the flags
-  e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
-
-  // If you need to schedule an event from a different thread, use 
Ethread::schedule_imm/at/in/every functions
-  ink_release_assert(this == this_ethread());
-
-  EventQueueExternal.enqueue_local(e);
-  return e;
-}
-
-TS_INLINE Event *
-EThread::schedule_spawn(Continuation *c, int ev, void *cookie)
-{
-  ink_assert(this != this_ethread()); // really broken to call this from the 
same thread.
-  if (start_event) {
-    free_event(start_event);
-  }
-  start_event          = EVENT_ALLOC(eventAllocator, this);
-  start_event->ethread = this;
-  start_event->mutex   = this->mutex;
-  start_event->init(c);
-  start_event->callback_event = ev;
-  start_event->cookie         = cookie;
-  return start_event;
-}
-
-TS_INLINE EThread *
-this_ethread()
-{
-  return EThread::this_ethread_ptr;
-}
-
-TS_INLINE EThread *
-this_event_thread()
-{
-  EThread *ethread = this_ethread();
-  if (ethread != nullptr && ethread->tt == REGULAR) {
-    return ethread;
-  } else {
-    return nullptr;
-  }
-}
-
-TS_INLINE void
-EThread::free_event(Event *e)
-{
-  ink_assert(!e->in_the_priority_queue && !e->in_the_prot_queue);
-  e->mutex = nullptr;
-  EVENT_FREE(e, eventAllocator, this);
-}
-
-TS_INLINE void
-EThread::set_tail_handler(LoopTailHandler *handler)
-{
-  ink_atomic_swap(&tail_cb, handler);
-}
diff --git a/src/iocore/eventsystem/P_UnixEventProcessor.h 
b/src/iocore/eventsystem/P_UnixEventProcessor.h
deleted file mode 100644
index eb4235602a..0000000000
--- a/src/iocore/eventsystem/P_UnixEventProcessor.h
+++ /dev/null
@@ -1,246 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-#pragma once
-
-#include <vector>
-
-#include <tscore/TSSystemState.h>
-
-#include "tscore/ink_align.h"
-#include "iocore/eventsystem/EventProcessor.h"
-
-const int LOAD_BALANCE_INTERVAL = 1;
-
-TS_INLINE off_t
-EventProcessor::allocate(int size)
-{
-  static off_t start = INK_ALIGN(offsetof(EThread, thread_private), 16);
-  static off_t loss  = start - offsetof(EThread, thread_private);
-  size               = INK_ALIGN(size, 16); // 16 byte alignment
-
-  int old;
-  do {
-    old = thread_data_used;
-    if (old + loss + size > PER_THREAD_DATA) {
-      return -1;
-    }
-  } while (!ink_atomic_cas(&thread_data_used, old, old + size));
-
-  return (off_t)(old + start);
-}
-
-TS_INLINE EThread *
-EventProcessor::assign_thread(EventType etype)
-{
-  int                    next;
-  ThreadGroupDescriptor *tg = &thread_group[etype];
-
-  ink_assert(etype < MAX_EVENT_TYPES);
-  if (tg->_count > 1) {
-    next = ++tg->_next_round_robin % tg->_count;
-  } else {
-    next = 0;
-  }
-  return tg->_thread[next];
-}
-
-// If thread_holding is the correct type, return it.
-//
-// Otherwise check if there is already an affinity associated with the 
continuation,
-// return it if the type is the same, return the next available thread of 
"etype" if
-// the type is different.
-//
-// Only assign new affinity when there is currently none.
-TS_INLINE EThread *
-EventProcessor::assign_affinity_by_type(Continuation *cont, EventType etype)
-{
-  EThread *ethread = cont->mutex->thread_holding;
-  if (!ethread->is_event_type(etype)) {
-    ethread = cont->getThreadAffinity();
-    if (ethread == nullptr || !ethread->is_event_type(etype)) {
-      ethread = assign_thread(etype);
-    }
-  }
-
-  if (cont->getThreadAffinity() == nullptr) {
-    cont->setThreadAffinity(ethread);
-  }
-
-  return ethread;
-}
-
-TS_INLINE Event *
-EventProcessor::schedule(Event *e, EventType etype)
-{
-  ink_assert(etype < MAX_EVENT_TYPES);
-
-  if (TSSystemState::is_event_system_shut_down()) {
-    return nullptr;
-  }
-
-  EThread *affinity_thread = e->continuation->getThreadAffinity();
-  EThread *curr_thread     = this_ethread();
-  if (affinity_thread != nullptr && affinity_thread->is_event_type(etype)) {
-    e->ethread = affinity_thread;
-  } else {
-    // Is the current thread eligible?
-    if (curr_thread != nullptr && curr_thread->is_event_type(etype)) {
-      e->ethread = curr_thread;
-    } else {
-      e->ethread = assign_thread(etype);
-    }
-    if (affinity_thread == nullptr) {
-      e->continuation->setThreadAffinity(e->ethread);
-    }
-  }
-
-  if (e->continuation->mutex) {
-    e->mutex = e->continuation->mutex;
-  }
-
-  if (curr_thread != nullptr && e->ethread == curr_thread) {
-    e->ethread->EventQueueExternal.enqueue_local(e);
-  } else {
-    e->ethread->EventQueueExternal.enqueue(e);
-  }
-
-  return e;
-}
-
-TS_INLINE Event *
-EventProcessor::schedule_imm(Continuation *cont, EventType et, int 
callback_event, void *cookie)
-{
-  Event *e = eventAllocator.alloc();
-
-  ink_assert(et < MAX_EVENT_TYPES);
-#ifdef ENABLE_TIME_TRACE
-  e->start_time = ink_get_hrtime();
-#endif
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, 0, 0), et);
-}
-
-TS_INLINE Event *
-EventProcessor::schedule_at(Continuation *cont, ink_hrtime t, EventType et, 
int callback_event, void *cookie)
-{
-  Event *e = eventAllocator.alloc();
-
-  ink_assert(t > 0);
-  ink_assert(et < MAX_EVENT_TYPES);
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, t, 0), et);
-}
-
-TS_INLINE Event *
-EventProcessor::schedule_in(Continuation *cont, ink_hrtime t, EventType et, 
int callback_event, void *cookie)
-{
-  Event *e = eventAllocator.alloc();
-
-  ink_assert(et < MAX_EVENT_TYPES);
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, ink_get_hrtime() + t, 0), et);
-}
-
-TS_INLINE Event *
-EventProcessor::schedule_every(Continuation *cont, ink_hrtime t, EventType et, 
int callback_event, void *cookie)
-{
-  Event *e = eventAllocator.alloc();
-
-  ink_assert(t != 0);
-  ink_assert(et < MAX_EVENT_TYPES);
-
-#ifdef ENABLE_EVENT_TRACKER
-  e->set_location();
-#endif
-
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  if (t < 0) {
-    return schedule(e->init(cont, t, t), et);
-  } else {
-    return schedule(e->init(cont, ink_get_hrtime() + t, t), et);
-  }
-}
-
-TS_INLINE std::vector<TSAction>
-EventProcessor::schedule_entire(Continuation *cont, ink_hrtime t, ink_hrtime 
p, EventType et, int callback_event, void *cookie)
-{
-  ThreadGroupDescriptor *tg          = &thread_group[et];
-  EThread               *curr_thread = this_ethread();
-
-  std::vector<TSAction> actions;
-
-  for (int i = 0; i < tg->_count; i++) {
-    Event *e = eventAllocator.alloc();
-
-    e->ethread        = tg->_thread[i];
-    e->callback_event = callback_event;
-    e->cookie         = cookie;
-
-    if (t == 0 && p == 0) {
-      e->init(cont, 0, 0);
-    } else if (t != 0 && p == 0) {
-      e->init(cont, ink_get_hrtime() + t, 0);
-    } else if (t == 0 && p != 0) {
-      if (p < 0) {
-        e->init(cont, p, p);
-      } else {
-        e->init(cont, ink_get_hrtime() + p, p);
-      }
-    } else {
-      ink_assert(!"not reached");
-    }
-
-    e->mutex = new_ProxyMutex();
-
-    if (curr_thread != nullptr && e->ethread == curr_thread) {
-      e->ethread->EventQueueExternal.enqueue_local(e);
-    } else {
-      e->ethread->EventQueueExternal.enqueue(e);
-    }
-
-    /* This is a hack. Should be handled in ink_types */
-    actions.push_back((TSAction)((uintptr_t) reinterpret_cast<TSAction>(e) | 
0x1));
-  }
-
-  return actions;
-}
diff --git a/src/iocore/eventsystem/P_VConnection.h 
b/src/iocore/eventsystem/P_VConnection.h
deleted file mode 100644
index ad49800452..0000000000
--- a/src/iocore/eventsystem/P_VConnection.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-#pragma once
-#include "iocore/eventsystem/EventSystem.h"
-
-TS_INLINE const char *
-get_vc_event_name(int event)
-{
-  switch (event) {
-  default:
-    return "unknown event";
-  case VC_EVENT_NONE:
-    return "VC_EVENT_NONE";
-  case VC_EVENT_IMMEDIATE:
-    return "VC_EVENT_IMMEDIATE";
-  case VC_EVENT_READ_READY:
-    return "VC_EVENT_READ_READY";
-  case VC_EVENT_WRITE_READY:
-    return "VC_EVENT_WRITE_READY";
-  case VC_EVENT_READ_COMPLETE:
-    return "VC_EVENT_READ_COMPLETE";
-  case VC_EVENT_WRITE_COMPLETE:
-    return "VC_EVENT_WRITE_COMPLETE";
-  case VC_EVENT_EOS:
-    return "VC_EVENT_EOS";
-  case VC_EVENT_ERROR:
-    return "VC_EVENT_ERROR";
-  case VC_EVENT_INACTIVITY_TIMEOUT:
-    return "VC_EVENT_INACTIVITY_TIMEOUT";
-  case VC_EVENT_ACTIVE_TIMEOUT:
-    return "VC_EVENT_ACTIVE_TIMEOUT";
-  }
-}
-
-TS_INLINE
-VConnection::VConnection(ProxyMutex *aMutex) : Continuation(aMutex), lerrno(0)
-{
-  SET_HANDLER(nullptr);
-}
-
-TS_INLINE
-VConnection::VConnection(Ptr<ProxyMutex> &aMutex) : Continuation(aMutex), 
lerrno(0)
-{
-  SET_HANDLER(nullptr);
-}
-
-TS_INLINE
-VConnection::~VConnection() {}
-
-TS_INLINE VIO *
-vc_do_io_write(VConnection *vc, Continuation *cont, int64_t nbytes, MIOBuffer 
*buf, int64_t offset)
-{
-  IOBufferReader *reader = buf->alloc_reader();
-
-  if (offset > 0) {
-    reader->consume(offset);
-  }
-
-  return vc->do_io_write(cont, nbytes, reader, true);
-}
-
-TS_INLINE void
-VConnection::set_continuation(VIO *, Continuation *)
-{
-}
-TS_INLINE void
-VConnection::reenable(VIO *)
-{
-}
-TS_INLINE void
-VConnection::reenable_re(VIO *vio)
-{
-  reenable(vio);
-}
diff --git a/src/iocore/eventsystem/P_VIO.h b/src/iocore/eventsystem/P_VIO.h
deleted file mode 100644
index de5f919e95..0000000000
--- a/src/iocore/eventsystem/P_VIO.h
+++ /dev/null
@@ -1,123 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-#pragma once
-#include "iocore/eventsystem/VIO.h"
-
-TS_INLINE
-VIO::VIO(int aop) : op(aop), buffer(), mutex(nullptr) {}
-
-TS_INLINE
-VIO::VIO() : buffer(), mutex(nullptr) {}
-
-TS_INLINE Continuation *
-VIO::get_continuation() const
-{
-  return cont;
-}
-
-TS_INLINE void
-VIO::set_writer(MIOBuffer *writer)
-{
-  buffer.writer_for(writer);
-}
-
-TS_INLINE void
-VIO::set_reader(IOBufferReader *reader)
-{
-  buffer.reader_for(reader);
-}
-
-TS_INLINE MIOBuffer *
-VIO::get_writer() const
-{
-  return buffer.writer();
-}
-
-TS_INLINE IOBufferReader *
-VIO::get_reader() const
-{
-  return (buffer.reader());
-}
-
-TS_INLINE int64_t
-VIO::ntodo() const
-{
-  return nbytes - ndone;
-}
-
-TS_INLINE void
-VIO::done()
-{
-  if (buffer.reader()) {
-    nbytes = ndone + buffer.reader()->read_avail();
-  } else {
-    nbytes = ndone;
-  }
-}
-
-TS_INLINE void
-VIO::set_continuation(Continuation *acont)
-{
-  if (vc_server) {
-    vc_server->set_continuation(this, acont);
-  }
-  if (acont) {
-    mutex = acont->mutex;
-    cont  = acont;
-  } else {
-    mutex = nullptr;
-    cont  = nullptr;
-  }
-  return;
-}
-
-TS_INLINE void
-VIO::reenable()
-{
-  this->_disabled = false;
-  if (vc_server) {
-    vc_server->reenable(this);
-  }
-}
-
-TS_INLINE void
-VIO::reenable_re()
-{
-  this->_disabled = false;
-  if (vc_server) {
-    vc_server->reenable_re(this);
-  }
-}
-
-TS_INLINE void
-VIO::disable()
-{
-  this->_disabled = true;
-}
-
-TS_INLINE bool
-VIO::is_disabled() const
-{
-  return this->_disabled;
-}
diff --git a/src/iocore/eventsystem/Processor.cc 
b/src/iocore/eventsystem/Processor.cc
index 18887f2315..93fe5ca84e 100644
--- a/src/iocore/eventsystem/Processor.cc
+++ b/src/iocore/eventsystem/Processor.cc
@@ -40,7 +40,9 @@
 
  ****************************************************************************/
 
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/Processor.h"
+#include "tscore/ink_assert.h"
+
 //////////////////////////////////////////////////////////////////////////////
 //
 //      Processor::Processor()
diff --git a/src/iocore/eventsystem/ProtectedQueue.cc 
b/src/iocore/eventsystem/ProtectedQueue.cc
index 6be82e388d..02ae16a7e0 100644
--- a/src/iocore/eventsystem/ProtectedQueue.cc
+++ b/src/iocore/eventsystem/ProtectedQueue.cc
@@ -30,7 +30,8 @@
 
 */
 
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/ProtectedQueue.h"
+#include "iocore/eventsystem/EThread.h"
 
 // The protected queue is designed to delay signaling of threads
 // until some amount of work has been completed on the current thread
diff --git a/src/iocore/eventsystem/RecProcess.cc 
b/src/iocore/eventsystem/RecProcess.cc
index 81d698bab5..128154cb3e 100644
--- a/src/iocore/eventsystem/RecProcess.cc
+++ b/src/iocore/eventsystem/RecProcess.cc
@@ -21,13 +21,14 @@
   limitations under the License.
  */
 
+#include "records/RecProcess.h"
+#include "iocore/eventsystem/EventProcessor.h"
 #include "tscore/ink_platform.h"
 #include "tscore/EventNotify.h"
 #include "tsutil/Metrics.h"
 
 #include "iocore/eventsystem/Tasks.h"
 
-#include "P_EventSystem.h"
 #include "../../records/P_RecCore.h"
 #include "../../records/P_RecMessage.h"
 #include "../../records/P_RecUtils.h"
diff --git a/src/iocore/eventsystem/UnixEThread.cc 
b/src/iocore/eventsystem/UnixEThread.cc
index 423b4d31ea..93578784c8 100644
--- a/src/iocore/eventsystem/UnixEThread.cc
+++ b/src/iocore/eventsystem/UnixEThread.cc
@@ -31,9 +31,11 @@
 // The EThread Class
 //
 /////////////////////////////////////////////////////////////////////
-#include "P_EventSystem.h"
+#include "iocore/eventsystem/EThread.h"
+#include "iocore/eventsystem/EventProcessor.h"
 #include "iocore/eventsystem/Lock.h"
 #include "tscore/ink_hrtime.h"
+#include "tscore/ink_atomic.h"
 
 #if HAVE_EVENTFD
 #include <sys/eventfd.h>
@@ -53,8 +55,9 @@ char const *const EThread::Metrics::Slice::STAT_NAME[] = {
   "proxy.process.eventloop.io.wait.max", "proxy.process.eventloop.io.work.max",
 };
 
-int thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS;
-int loop_time_update_probability  = 10;
+int              thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS;
+int              loop_time_update_probability  = 10;
+const ink_hrtime DELAY_FOR_RETRY               = HRTIME_MSECONDS(10);
 
 // To define a class inherits from Thread:
 //   1) Define an independent thread_local static member
@@ -439,3 +442,201 @@ EThread::Metrics::summarize(Metrics &global)
     global._api_timing  += _api_timing;
   }
 }
+
+void
+EThread::set_tail_handler(LoopTailHandler *handler)
+{
+  ink_atomic_swap(&tail_cb, handler);
+}
+
+Event *
+EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
+{
+  Event *e = ::eventAllocator.alloc();
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule(e->init(cont, 0, 0));
+}
+
+Event *
+EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, 
void *cookie)
+{
+  Event *e = ::eventAllocator.alloc();
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule(e->init(cont, t, 0));
+}
+
+Event *
+EThread::schedule_in(Continuation *cont, ink_hrtime t, int callback_event, 
void *cookie)
+{
+  Event *e = ::eventAllocator.alloc();
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule(e->init(cont, ink_get_hrtime() + t, 0));
+}
+
+Event *
+EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, 
void *cookie)
+{
+  Event *e = ::eventAllocator.alloc();
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  if (t < 0) {
+    return schedule(e->init(cont, t, t));
+  } else {
+    return schedule(e->init(cont, ink_get_hrtime() + t, t));
+  }
+}
+
+Event *
+EThread::schedule(Event *e)
+{
+  e->ethread = this;
+  if (tt != REGULAR) {
+    ink_assert(tt == DEDICATED);
+    return eventProcessor.schedule(e, ET_CALL);
+  }
+  if (e->continuation->mutex) {
+    e->mutex = e->continuation->mutex;
+  } else {
+    e->mutex = e->continuation->mutex = e->ethread->mutex;
+  }
+  ink_assert(e->mutex.get());
+
+  // Make sure client IP debugging works consistently
+  // The continuation that gets scheduled later is not always the
+  // client VC, it can be HttpCacheSM etc. so save the flags
+  e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
+
+  if (e->ethread == this_ethread()) {
+    EventQueueExternal.enqueue_local(e);
+  } else {
+    EventQueueExternal.enqueue(e);
+  }
+
+  return e;
+}
+
+Event *
+EThread::schedule_imm_local(Continuation *cont, int callback_event, void 
*cookie)
+{
+  Event *e = EVENT_ALLOC(eventAllocator, this);
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule_local(e->init(cont, 0, 0));
+}
+
+Event *
+EThread::schedule_at_local(Continuation *cont, ink_hrtime t, int 
callback_event, void *cookie)
+{
+  Event *e = EVENT_ALLOC(eventAllocator, this);
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule_local(e->init(cont, t, 0));
+}
+
+Event *
+EThread::schedule_in_local(Continuation *cont, ink_hrtime t, int 
callback_event, void *cookie)
+{
+  Event *e = EVENT_ALLOC(eventAllocator, this);
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule_local(e->init(cont, ink_get_hrtime() + t, 0));
+}
+
+Event *
+EThread::schedule_every_local(Continuation *cont, ink_hrtime t, int 
callback_event, void *cookie)
+{
+  Event *e = EVENT_ALLOC(eventAllocator, this);
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  if (t < 0) {
+    return schedule_local(e->init(cont, t, t));
+  } else {
+    return schedule_local(e->init(cont, ink_get_hrtime() + t, t));
+  }
+}
+
+Event *
+EThread::schedule_local(Event *e)
+{
+  if (tt != REGULAR) {
+    ink_assert(tt == DEDICATED);
+    return eventProcessor.schedule(e, ET_CALL);
+  }
+  if (!e->mutex) {
+    e->ethread = this;
+    e->mutex   = e->continuation->mutex;
+  } else {
+    ink_assert(e->ethread == this);
+  }
+  e->globally_allocated = false;
+
+  // Make sure client IP debugging works consistently
+  // The continuation that gets scheduled later is not always the
+  // client VC, it can be HttpCacheSM etc. so save the flags
+  e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
+
+  // If you need to schedule an event from a different thread, use 
Ethread::schedule_imm/at/in/every functions
+  ink_release_assert(this == this_ethread());
+
+  EventQueueExternal.enqueue_local(e);
+  return e;
+}
+
+Event *
+EThread::schedule_spawn(Continuation *c, int ev, void *cookie)
+{
+  ink_assert(this != this_ethread()); // really broken to call this from the 
same thread.
+  if (start_event) {
+    free_event(start_event);
+  }
+  start_event          = EVENT_ALLOC(eventAllocator, this);
+  start_event->ethread = this;
+  start_event->mutex   = this->mutex;
+  start_event->init(c);
+  start_event->callback_event = ev;
+  start_event->cookie         = cookie;
+  return start_event;
+}
diff --git a/src/iocore/eventsystem/UnixEvent.cc 
b/src/iocore/eventsystem/UnixEvent.cc
index 8043155b76..4a2d0ee381 100644
--- a/src/iocore/eventsystem/UnixEvent.cc
+++ b/src/iocore/eventsystem/UnixEvent.cc
@@ -27,9 +27,9 @@
 
 
 *****************************************************************************/
-#include "P_EventSystem.h"
 
-#include "tscore/ink_stack_trace.h"
+#include "iocore/eventsystem/Event.h"
+#include "iocore/eventsystem/EThread.h"
 
 ClassAllocator<Event> eventAllocator("eventAllocator", 256);
 
diff --git a/src/iocore/eventsystem/UnixEventProcessor.cc 
b/src/iocore/eventsystem/UnixEventProcessor.cc
index 4ac2c7f7bb..ea527bfab1 100644
--- a/src/iocore/eventsystem/UnixEventProcessor.cc
+++ b/src/iocore/eventsystem/UnixEventProcessor.cc
@@ -27,6 +27,8 @@
 #include "records/RecCore.h"
 #include "records/RecProcess.h"
 #include "tscore/ink_align.h"
+#include "tscore/ink_atomic.h"
+#include "tscore/TSSystemState.h"
 #include <sched.h>
 #if TS_USE_HWLOC
 #if __has_include(<alloca.h>)
@@ -615,3 +617,216 @@ thread_started(EThread *t)
     }
   }
 }
+
+off_t
+EventProcessor::allocate(int size)
+{
+  static off_t start = INK_ALIGN(offsetof(EThread, thread_private), 16);
+  static off_t loss  = start - offsetof(EThread, thread_private);
+  size               = INK_ALIGN(size, 16); // 16 byte alignment
+
+  int old;
+  do {
+    old = thread_data_used;
+    if (old + loss + size > PER_THREAD_DATA) {
+      return -1;
+    }
+  } while (!ink_atomic_cas(&thread_data_used, old, old + size));
+
+  return (off_t)(old + start);
+}
+
+EThread *
+EventProcessor::assign_thread(EventType etype)
+{
+  int                    next;
+  ThreadGroupDescriptor *tg = &thread_group[etype];
+
+  ink_assert(etype < MAX_EVENT_TYPES);
+  if (tg->_count > 1) {
+    next = ++tg->_next_round_robin % tg->_count;
+  } else {
+    next = 0;
+  }
+  return tg->_thread[next];
+}
+
+// If thread_holding is the correct type, return it.
+//
+// Otherwise check if there is already an affinity associated with the 
continuation,
+// return it if the type is the same, return the next available thread of 
"etype" if
+// the type is different.
+//
+// Only assign new affinity when there is currently none.
+EThread *
+EventProcessor::assign_affinity_by_type(Continuation *cont, EventType etype)
+{
+  EThread *ethread = cont->mutex->thread_holding;
+  if (!ethread->is_event_type(etype)) {
+    ethread = cont->getThreadAffinity();
+    if (ethread == nullptr || !ethread->is_event_type(etype)) {
+      ethread = assign_thread(etype);
+    }
+  }
+
+  if (cont->getThreadAffinity() == nullptr) {
+    cont->setThreadAffinity(ethread);
+  }
+
+  return ethread;
+}
+
+Event *
+EventProcessor::schedule(Event *e, EventType etype)
+{
+  ink_assert(etype < MAX_EVENT_TYPES);
+
+  if (TSSystemState::is_event_system_shut_down()) {
+    return nullptr;
+  }
+
+  EThread *affinity_thread = e->continuation->getThreadAffinity();
+  EThread *curr_thread     = this_ethread();
+  if (affinity_thread != nullptr && affinity_thread->is_event_type(etype)) {
+    e->ethread = affinity_thread;
+  } else {
+    // Is the current thread eligible?
+    if (curr_thread != nullptr && curr_thread->is_event_type(etype)) {
+      e->ethread = curr_thread;
+    } else {
+      e->ethread = assign_thread(etype);
+    }
+    if (affinity_thread == nullptr) {
+      e->continuation->setThreadAffinity(e->ethread);
+    }
+  }
+
+  if (e->continuation->mutex) {
+    e->mutex = e->continuation->mutex;
+  }
+
+  if (curr_thread != nullptr && e->ethread == curr_thread) {
+    e->ethread->EventQueueExternal.enqueue_local(e);
+  } else {
+    e->ethread->EventQueueExternal.enqueue(e);
+  }
+
+  return e;
+}
+
+Event *
+EventProcessor::schedule_imm(Continuation *cont, EventType et, int 
callback_event, void *cookie)
+{
+  Event *e = eventAllocator.alloc();
+
+  ink_assert(et < MAX_EVENT_TYPES);
+#ifdef ENABLE_TIME_TRACE
+  e->start_time = ink_get_hrtime();
+#endif
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule(e->init(cont, 0, 0), et);
+}
+
+Event *
+EventProcessor::schedule_at(Continuation *cont, ink_hrtime t, EventType et, 
int callback_event, void *cookie)
+{
+  Event *e = eventAllocator.alloc();
+
+  ink_assert(t > 0);
+  ink_assert(et < MAX_EVENT_TYPES);
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule(e->init(cont, t, 0), et);
+}
+
+Event *
+EventProcessor::schedule_in(Continuation *cont, ink_hrtime t, EventType et, 
int callback_event, void *cookie)
+{
+  Event *e = eventAllocator.alloc();
+
+  ink_assert(et < MAX_EVENT_TYPES);
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  return schedule(e->init(cont, ink_get_hrtime() + t, 0), et);
+}
+
+Event *
+EventProcessor::schedule_every(Continuation *cont, ink_hrtime t, EventType et, 
int callback_event, void *cookie)
+{
+  Event *e = eventAllocator.alloc();
+
+  ink_assert(t != 0);
+  ink_assert(et < MAX_EVENT_TYPES);
+
+#ifdef ENABLE_EVENT_TRACKER
+  e->set_location();
+#endif
+
+  e->callback_event = callback_event;
+  e->cookie         = cookie;
+  if (t < 0) {
+    return schedule(e->init(cont, t, t), et);
+  } else {
+    return schedule(e->init(cont, ink_get_hrtime() + t, t), et);
+  }
+}
+
+std::vector<TSAction>
+EventProcessor::schedule_entire(Continuation *cont, ink_hrtime t, ink_hrtime 
p, EventType et, int callback_event, void *cookie)
+{
+  ThreadGroupDescriptor *tg          = &thread_group[et];
+  EThread               *curr_thread = this_ethread();
+
+  std::vector<TSAction> actions;
+
+  for (int i = 0; i < tg->_count; i++) {
+    Event *e = eventAllocator.alloc();
+
+    e->ethread        = tg->_thread[i];
+    e->callback_event = callback_event;
+    e->cookie         = cookie;
+
+    if (t == 0 && p == 0) {
+      e->init(cont, 0, 0);
+    } else if (t != 0 && p == 0) {
+      e->init(cont, ink_get_hrtime() + t, 0);
+    } else if (t == 0 && p != 0) {
+      if (p < 0) {
+        e->init(cont, p, p);
+      } else {
+        e->init(cont, ink_get_hrtime() + p, p);
+      }
+    } else {
+      ink_assert(!"not reached");
+    }
+
+    e->mutex = new_ProxyMutex();
+
+    if (curr_thread != nullptr && e->ethread == curr_thread) {
+      e->ethread->EventQueueExternal.enqueue_local(e);
+    } else {
+      e->ethread->EventQueueExternal.enqueue(e);
+    }
+
+    /* This is a hack. Should be handled in ink_types */
+    actions.push_back((TSAction)((uintptr_t) reinterpret_cast<TSAction>(e) | 
0x1));
+  }
+
+  return actions;
+}
diff --git a/src/iocore/eventsystem/P_UnixEvent.h 
b/src/iocore/eventsystem/VIO.cc
similarity index 61%
rename from src/iocore/eventsystem/P_UnixEvent.h
rename to src/iocore/eventsystem/VIO.cc
index df8cdee3d6..6fb7e1c21a 100644
--- a/src/iocore/eventsystem/P_UnixEvent.h
+++ b/src/iocore/eventsystem/VIO.cc
@@ -21,27 +21,39 @@
   limitations under the License.
  */
 
-#pragma once
+#include "iocore/eventsystem/VIO.h"
+#include "iocore/eventsystem/VConnection.h"
 
-TS_INLINE Event *
-Event::init(Continuation *c, ink_hrtime atimeout_at, ink_hrtime aperiod)
+void
+VIO::set_continuation(Continuation *acont)
 {
-  continuation = c;
-  timeout_at   = atimeout_at;
-  period       = aperiod;
-  immediate    = !period && !atimeout_at;
-  cancelled    = false;
-  return this;
+  if (vc_server) {
+    vc_server->set_continuation(this, acont);
+  }
+  if (acont) {
+    mutex = acont->mutex;
+    cont  = acont;
+  } else {
+    mutex = nullptr;
+    cont  = nullptr;
+  }
+  return;
 }
 
-TS_INLINE void
-Event::free()
+void
+VIO::reenable()
 {
-  mutex = nullptr;
-  eventAllocator.free(this);
+  this->_disabled = false;
+  if (vc_server) {
+    vc_server->reenable(this);
+  }
 }
 
-TS_INLINE
-Event::Event() : in_the_prot_queue(false), in_the_priority_queue(false), 
immediate(false), globally_allocated(true), in_heap(false)
+void
+VIO::reenable_re()
 {
+  this->_disabled = false;
+  if (vc_server) {
+    vc_server->reenable_re(this);
+  }
 }
diff --git a/src/iocore/net/SSLSessionCache.cc 
b/src/iocore/net/SSLSessionCache.cc
index 7fe5ae9eaf..7e848b41b4 100644
--- a/src/iocore/net/SSLSessionCache.cc
+++ b/src/iocore/net/SSLSessionCache.cc
@@ -23,7 +23,7 @@
 #include "SSLSessionCache.h"
 #include "P_SSLUtils.h"
 #include "SSLStats.h"
-#include "../eventsystem/P_IOBuffer.h"
+#include "iocore/eventsystem/IOBuffer.h"
 
 #include <cstring>
 #include <memory>
diff --git a/src/iocore/net/Socks.cc b/src/iocore/net/Socks.cc
index 06ccd42405..03bbaa30f0 100644
--- a/src/iocore/net/Socks.cc
+++ b/src/iocore/net/Socks.cc
@@ -32,7 +32,6 @@
 
 #include "P_Socks.h"
 #include "P_Net.h"
-#include "../eventsystem/P_VConnection.h"
 #include "iocore/net/NetProcessor.h"
 #include "tscore/InkErrno.h"
 #include "swoc/swoc_file.h"
diff --git a/src/iocore/net/TLSEventSupport.cc 
b/src/iocore/net/TLSEventSupport.cc
index 3095edeb89..c1009d8c6c 100644
--- a/src/iocore/net/TLSEventSupport.cc
+++ b/src/iocore/net/TLSEventSupport.cc
@@ -23,6 +23,7 @@
  */
 
 #include "iocore/net/TLSEventSupport.h"
+#include "iocore/net/Net.h"
 #include "iocore/net/SSLAPIHooks.h"
 #include "SSLStats.h"
 
diff --git a/src/iocore/utils/OneWayMultiTunnel.cc 
b/src/iocore/utils/OneWayMultiTunnel.cc
index d439a987f0..7c35353a7d 100644
--- a/src/iocore/utils/OneWayMultiTunnel.cc
+++ b/src/iocore/utils/OneWayMultiTunnel.cc
@@ -27,8 +27,7 @@
  ****************************************************************************/
 
 #include "iocore/utils/OneWayMultiTunnel.h"
-#include "../eventsystem/P_IOBuffer.h"
-#include "../eventsystem/P_VConnection.h"
+#include "iocore/eventsystem/IOBuffer.h"
 
 // #define TEST
 
@@ -45,6 +44,18 @@ OneWayMultiTunnel::OneWayMultiTunnel() : OneWayTunnel()
   ink_zero(vioTargets);
 }
 
+VIO *
+vc_do_io_write(VConnection *vc, Continuation *cont, int64_t nbytes, MIOBuffer 
*buf, int64_t offset)
+{
+  IOBufferReader *reader = buf->alloc_reader();
+
+  if (offset > 0) {
+    reader->consume(offset);
+  }
+
+  return vc->do_io_write(cont, nbytes, reader, true);
+}
+
 void
 OneWayMultiTunnel::init(VConnection *vcSource, VConnection **vcTargets, int 
n_vcTargets, Continuation *aCont, int size_estimate,
                         int64_t nbytes, bool asingle_buffer, /* = true */
diff --git a/src/iocore/utils/OneWayTunnel.cc b/src/iocore/utils/OneWayTunnel.cc
index 8274fcf517..25a8caef87 100644
--- a/src/iocore/utils/OneWayTunnel.cc
+++ b/src/iocore/utils/OneWayTunnel.cc
@@ -32,7 +32,7 @@
    anything to do with HTTP, so it has been renamed to OneWayTunnel.
  ****************************************************************************/
 
-#include "../eventsystem/P_IOBuffer.h"
+#include "iocore/eventsystem/IOBuffer.h"
 #include "iocore/utils/OneWayTunnel.h"
 
 // #define TEST
diff --git a/src/proxy/PluginVC.cc b/src/proxy/PluginVC.cc
index 7c53d0390b..ab9d684aa6 100644
--- a/src/proxy/PluginVC.cc
+++ b/src/proxy/PluginVC.cc
@@ -72,7 +72,6 @@
  ****************************************************************************/
 
 #include "proxy/PluginVC.h"
-#include "../iocore/eventsystem/P_EventSystem.h"
 #include "../iocore/net/P_Net.h"
 #include "tscore/Regression.h"
 #if TS_HAS_TESTS
diff --git a/src/proxy/ReverseProxy.cc b/src/proxy/ReverseProxy.cc
index 037474575e..341c2c7de4 100644
--- a/src/proxy/ReverseProxy.cc
+++ b/src/proxy/ReverseProxy.cc
@@ -30,7 +30,6 @@
 #include "tscore/ink_platform.h"
 #include "tscore/Filenames.h"
 #include <dlfcn.h>
-#include "../iocore/eventsystem/P_EventSystem.h"
 #include "iocore/eventsystem/ConfigProcessor.h"
 #include "proxy/ReverseProxy.h"
 #include "tscore/MatcherUtils.h"
diff --git a/src/proxy/hdrs/HdrTSOnly.cc b/src/proxy/hdrs/HdrTSOnly.cc
index dc2fed3924..e603101863 100644
--- a/src/proxy/hdrs/HdrTSOnly.cc
+++ b/src/proxy/hdrs/HdrTSOnly.cc
@@ -36,9 +36,9 @@
 
  ****************************************************************************/
 
+#include "iocore/eventsystem/IOBuffer.h"
 #include "tscore/ink_platform.h"
 #include "proxy/hdrs/HTTP.h"
-#include "../../iocore/eventsystem/P_EventSystem.h"
 
 /*-------------------------------------------------------------------------
   -------------------------------------------------------------------------*/
diff --git a/src/proxy/http/HttpDebugNames.cc b/src/proxy/http/HttpDebugNames.cc
index e06e50159f..66d2bfcd87 100644
--- a/src/proxy/http/HttpDebugNames.cc
+++ b/src/proxy/http/HttpDebugNames.cc
@@ -23,7 +23,6 @@
 
 #include "iocore/dns/DNSProcessor.h"
 #include "proxy/http/HttpDebugNames.h"
-#include "../../iocore/eventsystem/P_EventSystem.h"
 #include "proxy/http/HttpTunnel.h"
 #include "proxy/Transform.h"
 #include "proxy/http/HttpSM.h"
diff --git a/src/proxy/http/PreWarmManager.cc b/src/proxy/http/PreWarmManager.cc
index 416e6a55d8..8cb9fea88e 100644
--- a/src/proxy/http/PreWarmManager.cc
+++ b/src/proxy/http/PreWarmManager.cc
@@ -26,7 +26,6 @@
 
 #include "proxy/http/HttpConfig.h"
 #include "iocore/net/SSLSNIConfig.h"
-#include "../../iocore/eventsystem/P_VConnection.h"
 #include "iocore/net/NetProcessor.h"
 #include "iocore/net/PreWarm.h"
 
diff --git a/src/proxy/http/remap/PluginDso.cc 
b/src/proxy/http/remap/PluginDso.cc
index cd0499d8e6..15cd390e3c 100644
--- a/src/proxy/http/remap/PluginDso.cc
+++ b/src/proxy/http/remap/PluginDso.cc
@@ -29,7 +29,6 @@
 
 #include "proxy/http/remap/PluginDso.h"
 #include "iocore/eventsystem/Freer.h"
-#include "../../../iocore/eventsystem/P_EventSystem.h"
 #ifdef PLUGIN_DSO_TESTS
 #include "unit-tests/plugin_testing_common.h"
 #else
diff --git a/src/proxy/http/remap/PluginFactory.cc 
b/src/proxy/http/remap/PluginFactory.cc
index f95e0c91ba..f5a0132c60 100644
--- a/src/proxy/http/remap/PluginFactory.cc
+++ b/src/proxy/http/remap/PluginFactory.cc
@@ -27,6 +27,7 @@
 #include "proxy/http/remap/RemapPluginInfo.h"
 #include "records/RecCore.h"
 #include "proxy/http/remap/PluginFactory.h"
+#include "tscore/TSSystemState.h"
 #ifdef PLUGIN_DSO_TESTS
 #include "unit-tests/plugin_testing_common.h"
 #else
@@ -34,7 +35,6 @@
 #define PluginDbg   Dbg
 #define PluginError Error
 #endif
-#include "../../../iocore/eventsystem/P_EventSystem.h"
 
 #include <algorithm> /* std::swap */
 #include <filesystem>
diff --git a/src/proxy/http3/Http09App.cc b/src/proxy/http3/Http09App.cc
index f1d7ce06a5..ac9f1ab2ec 100644
--- a/src/proxy/http3/Http09App.cc
+++ b/src/proxy/http3/Http09App.cc
@@ -26,7 +26,7 @@
 #include "tscore/ink_resolver.h"
 
 #include "../../iocore/net/P_Net.h"
-#include "../../iocore/eventsystem/P_VConnection.h"
+#include "iocore/eventsystem/VConnection.h"
 #include "iocore/net/quic/QUICStreamManager.h"
 #include "iocore/net/quic/QUICDebugNames.h"
 #include "iocore/net/quic/QUICStreamVCAdapter.h"
diff --git a/src/proxy/http3/Http3App.cc b/src/proxy/http3/Http3App.cc
index 4ec8a0cfba..09d349a6a6 100644
--- a/src/proxy/http3/Http3App.cc
+++ b/src/proxy/http3/Http3App.cc
@@ -28,7 +28,7 @@
 #include "tscore/ink_resolver.h"
 
 #include "../../iocore/net/P_Net.h"
-#include "../../iocore/eventsystem/P_VConnection.h"
+#include "iocore/eventsystem/VConnection.h"
 #include "iocore/net/quic/QUICStreamManager.h"
 #include "iocore/net/quic/QUICStreamVCAdapter.h"
 
diff --git a/src/proxy/logging/Log.cc b/src/proxy/logging/Log.cc
index ad9ffd00ad..ca56f27c5a 100644
--- a/src/proxy/logging/Log.cc
+++ b/src/proxy/logging/Log.cc
@@ -34,7 +34,6 @@
  ***************************************************************************/
 #include "tscore/ink_platform.h"
 #include "tscore/TSSystemState.h"
-#include "../../iocore/eventsystem/P_EventSystem.h"
 #include "../../iocore/net/P_Net.h"
 #include "iocore/utils/Machine.h"
 #include "proxy/hdrs/HTTP.h"
diff --git a/src/proxy/logging/LogBuffer.cc b/src/proxy/logging/LogBuffer.cc
index 9c0ab667f1..c183bede9e 100644
--- a/src/proxy/logging/LogBuffer.cc
+++ b/src/proxy/logging/LogBuffer.cc
@@ -32,7 +32,6 @@
 #include <cstdlib>
 #include <cstring>
 
-#include "../../iocore/eventsystem/P_EventSystem.h"
 #include "proxy/logging/LogField.h"
 #include "proxy/logging/LogFilter.h"
 #include "proxy/logging/LogFormat.h"
diff --git a/src/proxy/logging/LogFile.cc b/src/proxy/logging/LogFile.cc
index f7a7004c60..9252136092 100644
--- a/src/proxy/logging/LogFile.cc
+++ b/src/proxy/logging/LogFile.cc
@@ -41,7 +41,6 @@
 #include <fcntl.h>
 #include <libgen.h>
 
-#include "../../iocore/eventsystem/P_EventSystem.h"
 #include "iocore/utils/Machine.h"
 
 #include "tscore/BaseLogFile.h"
diff --git a/src/proxy/logging/LogObject.cc b/src/proxy/logging/LogObject.cc
index 37e89598b5..997e394025 100644
--- a/src/proxy/logging/LogObject.cc
+++ b/src/proxy/logging/LogObject.cc
@@ -26,9 +26,9 @@
 
 
  ***************************************************************************/
+#include "iocore/eventsystem/Freer.h"
 #include "tscore/ink_platform.h"
 #include "tscore/CryptoHash.h"
-#include "../../iocore/eventsystem/P_EventSystem.h"
 #include "proxy/logging/LogUtils.h"
 #include "proxy/logging/LogField.h"
 #include "proxy/logging/LogObject.h"
diff --git a/src/traffic_server/SocksProxy.cc b/src/traffic_server/SocksProxy.cc
index 97aa76ac90..811fedbdb1 100644
--- a/src/traffic_server/SocksProxy.cc
+++ b/src/traffic_server/SocksProxy.cc
@@ -28,7 +28,6 @@
 
 */
 #include "../iocore/net/P_Socks.h"
-#include "../iocore/eventsystem/P_VConnection.h"
 #include "iocore/utils/OneWayTunnel.h"
 #include "proxy/http/HttpSessionAccept.h"
 #include "tsutil/Metrics.h"
diff --git a/src/traffic_server/traffic_server.cc 
b/src/traffic_server/traffic_server.cc
index 7ac906af10..61982e7716 100644
--- a/src/traffic_server/traffic_server.cc
+++ b/src/traffic_server/traffic_server.cc
@@ -75,7 +75,6 @@ extern "C" int plock(int);
 
 #include "Crash.h"
 #include "tscore/signals.h"
-#include "../iocore/eventsystem/P_EventSystem.h"
 #include "../iocore/net/P_Net.h"
 #if TS_HAS_QUICHE
 #include "../iocore/net/P_QUICNetProcessor.h"

Reply via email to