This is an automated email from the ASF dual-hosted git repository.

maskit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new b9779fd8a4 Refactor the read and write functions in Unix and SSL NetVC 
(#11794)
b9779fd8a4 is described below

commit b9779fd8a4ece23783fff59f96b725979b7d340d
Author: Masakazu Kitajo <[email protected]>
AuthorDate: Mon Nov 4 14:30:07 2024 -0700

    Refactor the read and write functions in Unix and SSL NetVC (#11794)
---
 include/iocore/net/NetEvent.h         |   6 +-
 src/iocore/net/NetHandler.cc          |   4 +-
 src/iocore/net/P_QUICNetVConnection.h |   2 +-
 src/iocore/net/P_SSLNetVConnection.h  |   4 +-
 src/iocore/net/P_UnixNetVConnection.h |  14 +-
 src/iocore/net/QUICNetVConnection.cc  |   6 +-
 src/iocore/net/SSLNetVConnection.cc   |  16 +-
 src/iocore/net/UnixNetVConnection.cc  | 998 +++++++++++++++++-----------------
 8 files changed, 508 insertions(+), 542 deletions(-)

diff --git a/include/iocore/net/NetEvent.h b/include/iocore/net/NetEvent.h
index 08b2ff8c8a..35c7dabd22 100644
--- a/include/iocore/net/NetEvent.h
+++ b/include/iocore/net/NetEvent.h
@@ -55,9 +55,9 @@ class NetEvent
 public:
   NetEvent() = default;
   virtual ~NetEvent() {}
-  virtual void net_read_io(NetHandler *nh, EThread *lthread)  = 0;
-  virtual void net_write_io(NetHandler *nh, EThread *lthread) = 0;
-  virtual void free_thread(EThread *t)                        = 0;
+  virtual void net_read_io(NetHandler *nh)  = 0;
+  virtual void net_write_io(NetHandler *nh) = 0;
+  virtual void free_thread(EThread *t)      = 0;
 
   // since we want this class to be independent from VConnection, 
Continutaion. There should be
   // a pure virtual function which connect sub class and NetHandler.
diff --git a/src/iocore/net/NetHandler.cc b/src/iocore/net/NetHandler.cc
index 208728636a..0f7d519d2b 100644
--- a/src/iocore/net/NetHandler.cc
+++ b/src/iocore/net/NetHandler.cc
@@ -283,7 +283,7 @@ NetHandler::process_ready_list()
     if (ne->closed) {
       free_netevent(ne);
     } else if (ne->read.enabled && ne->read.triggered) {
-      ne->net_read_io(this, this->thread);
+      ne->net_read_io(this);
     } else if (!ne->read.enabled) {
       read_ready_list.remove(ne);
     }
@@ -293,7 +293,7 @@ NetHandler::process_ready_list()
     if (ne->closed) {
       free_netevent(ne);
     } else if (ne->write.enabled && ne->write.triggered) {
-      ne->net_write_io(this, this->thread);
+      ne->net_write_io(this);
     } else if (!ne->write.enabled) {
       write_ready_list.remove(ne);
     }
diff --git a/src/iocore/net/P_QUICNetVConnection.h 
b/src/iocore/net/P_QUICNetVConnection.h
index 9797c24984..353d3aed22 100644
--- a/src/iocore/net/P_QUICNetVConnection.h
+++ b/src/iocore/net/P_QUICNetVConnection.h
@@ -108,7 +108,7 @@ public:
   bool    getSSLHandShakeComplete() const override;
 
   // NetEvent
-  virtual void net_read_io(NetHandler *nh, EThread *lthread) override;
+  virtual void net_read_io(NetHandler *nh) override;
 
   // NetVConnection
   int         populate_protocol(std::string_view *results, int n) const 
override;
diff --git a/src/iocore/net/P_SSLNetVConnection.h 
b/src/iocore/net/P_SSLNetVConnection.h
index 9b437a6d20..2babaeb0d6 100644
--- a/src/iocore/net/P_SSLNetVConnection.h
+++ b/src/iocore/net/P_SSLNetVConnection.h
@@ -140,7 +140,7 @@ public:
 
   int     sslServerHandShakeEvent(int &err);
   int     sslClientHandShakeEvent(int &err);
-  void    net_read_io(NetHandler *nh, EThread *lthread) override;
+  void    net_read_io(NetHandler *nh) override;
   int64_t load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, 
int64_t &total_written, int &needs) override;
   void    do_io_close(int lerrno = -1) override;
 
@@ -376,7 +376,7 @@ private:
   UnixNetVConnection *_migrateFromSSL();
   void                _propagateHandShakeBuffer(UnixNetVConnection *target, 
EThread *t);
 
-  int         _ssl_read_from_net(EThread *lthread, int64_t &ret);
+  int         _ssl_read_from_net(int64_t &ret);
   ssl_error_t _ssl_read_buffer(void *buf, int64_t nbytes, int64_t &nread);
   ssl_error_t _ssl_write_buffer(const void *buf, int64_t nbytes, int64_t 
&nwritten);
   ssl_error_t _ssl_connect();
diff --git a/src/iocore/net/P_UnixNetVConnection.h 
b/src/iocore/net/P_UnixNetVConnection.h
index a6ce3ab89e..5ff97aee16 100644
--- a/src/iocore/net/P_UnixNetVConnection.h
+++ b/src/iocore/net/P_UnixNetVConnection.h
@@ -150,8 +150,8 @@ public:
   }
 
   // NetEvent
-  virtual void net_read_io(NetHandler *nh, EThread *lthread) override;
-  virtual void net_write_io(NetHandler *nh, EThread *lthread) override;
+  virtual void net_read_io(NetHandler *nh) override;
+  virtual void net_write_io(NetHandler *nh) override;
   virtual void free_thread(EThread *t) override;
   virtual int
   close() override
@@ -195,7 +195,7 @@ public:
   int             readSignalAndUpdate(int event);
   void            readReschedule(NetHandler *nh);
   void            writeReschedule(NetHandler *nh);
-  void            netActivity(EThread *lthread);
+  void            netActivity();
   /**
    * If the current object's thread does not match the t argument, create a new
    * NetVC in the thread t context based on the socket and ssl information in 
the
@@ -234,8 +234,6 @@ public:
   int          set_tcp_congestion_control(int side) override;
   void         apply_options() override;
 
-  friend void write_to_net_io(NetHandler *, UnixNetVConnection *, EThread *);
-
   // set_context() should be called before calling this member function.
   void mark_as_tunnel_endpoint() override;
 
@@ -376,9 +374,3 @@ UnixNetVConnection::get_action() const
 {
   return &action_;
 }
-
-// declarations for local use (within the net module)
-
-void write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread);
-void write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread);
-void net_activity(UnixNetVConnection *vc, EThread *thread);
diff --git a/src/iocore/net/QUICNetVConnection.cc 
b/src/iocore/net/QUICNetVConnection.cc
index 2fd225df24..5154c34950 100644
--- a/src/iocore/net/QUICNetVConnection.cc
+++ b/src/iocore/net/QUICNetVConnection.cc
@@ -401,7 +401,7 @@ QUICNetVConnection::handle_received_packet(UDPPacket 
*packet)
 {
   size_t   buf_len{0};
   uint8_t *buf = packet->get_entire_chain_buffer(&buf_len);
-  net_activity(this, this_ethread());
+  this->netActivity();
   quiche_recv_info recv_info = {
     &packet->from.sa,
     static_cast<socklen_t>(packet->from.isIp4() ? sizeof(packet->from.sin) : 
sizeof(packet->from.sin6)),
@@ -522,7 +522,7 @@ QUICNetVConnection::is_handshake_completed() const
 }
 
 void
-QUICNetVConnection::net_read_io(NetHandler * /* nh ATS_UNUSED */, EThread * /* 
lthread ATS_UNUSED */)
+QUICNetVConnection::net_read_io(NetHandler * /* nh ATS_UNUSED */)
 {
   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
   this->handleEvent(QUIC_EVENT_PACKET_READ_READY, nullptr);
@@ -714,7 +714,7 @@ QUICNetVConnection::_handle_write_ready()
       segment_size = max_udp_payload_size;
     }
     this->_packet_handler->send_packet(this->_udp_con, this->con.addr, 
udp_payload, segment_size, &send_at_hint);
-    net_activity(this, this_ethread());
+    this->netActivity();
   }
 }
 
diff --git a/src/iocore/net/SSLNetVConnection.cc 
b/src/iocore/net/SSLNetVConnection.cc
index d49c79ec28..88805cf32e 100644
--- a/src/iocore/net/SSLNetVConnection.cc
+++ b/src/iocore/net/SSLNetVConnection.cc
@@ -178,7 +178,7 @@ debug_certificate_name(const char *msg, X509_NAME *name)
 }
 
 int
-SSLNetVConnection::_ssl_read_from_net(EThread *lthread, int64_t &ret)
+SSLNetVConnection::_ssl_read_from_net(int64_t &ret)
 {
   NetState          *s          = &this->read;
   MIOBufferAccessor &buf        = s->vio.buffer;
@@ -221,7 +221,7 @@ SSLNetVConnection::_ssl_read_from_net(EThread *lthread, 
int64_t &ret)
       bytes_read += nread;
       if (nread > 0) {
         buf.writer()->fill(nread); // Tell the buffer, we've used the bytes
-        this->netActivity(lthread);
+        this->netActivity();
       }
       break;
     case SSL_ERROR_WANT_WRITE:
@@ -275,7 +275,7 @@ SSLNetVConnection::_ssl_read_from_net(EThread *lthread, 
int64_t &ret)
     Dbg(dbg_ctl_ssl, "bytes_read=%" PRId64, bytes_read);
 
     s->vio.ndone += bytes_read;
-    this->netActivity(lthread);
+    this->netActivity();
 
     ret = bytes_read;
 
@@ -454,7 +454,7 @@ SSLNetVConnection::update_rbio(bool move_to_socket)
 
 // changed by YTS Team, yamsat
 void
-SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
+SSLNetVConnection::net_read_io(NetHandler *nh)
 {
   int       ret;
   int64_t   r     = 0;
@@ -462,11 +462,11 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread 
*lthread)
   NetState *s     = &this->read;
 
   if (HttpProxyPort::TRANSPORT_BLIND_TUNNEL == this->attributes) {
-    this->super::net_read_io(nh, lthread);
+    this->super::net_read_io(nh);
     return;
   }
 
-  MUTEX_TRY_LOCK(lock, s->vio.mutex, lthread);
+  MUTEX_TRY_LOCK(lock, s->vio.mutex, nh->thread);
   if (!lock.is_locked()) {
     readReschedule(nh);
     return;
@@ -475,7 +475,7 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread 
*lthread)
   // The closed flag should be stable once we get the s->vio.mutex in that case
   // (the global session pool mutex).
   if (this->closed) {
-    this->super::net_read_io(nh, lthread);
+    this->super::net_read_io(nh);
     return;
   }
   // If the key renegotiation failed it's over, just signal the error and 
finish.
@@ -614,7 +614,7 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread 
*lthread)
   // this comment if you know
   int ssl_read_errno = 0;
   do {
-    ret = this->_ssl_read_from_net(lthread, r);
+    ret = this->_ssl_read_from_net(r);
     if (ret == SSL_READ_READY || ret == SSL_READ_ERROR_NONE) {
       bytes += r;
     }
diff --git a/src/iocore/net/UnixNetVConnection.cc 
b/src/iocore/net/UnixNetVConnection.cc
index 46b568999b..6bd0331d54 100644
--- a/src/iocore/net/UnixNetVConnection.cc
+++ b/src/iocore/net/UnixNetVConnection.cc
@@ -75,18 +75,6 @@ write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
   }
 }
 
-void
-net_activity(UnixNetVConnection *vc, EThread *thread)
-{
-  Dbg(dbg_ctl_socket, "net_activity updating inactivity %" PRId64 ", 
NetVC=%p", vc->inactivity_timeout_in, vc);
-  (void)thread;
-  if (vc->inactivity_timeout_in) {
-    vc->next_inactivity_timeout_at = ink_get_hrtime() + 
vc->inactivity_timeout_in;
-  } else {
-    vc->next_inactivity_timeout_at = 0;
-  }
-}
-
 //
 // Signal an event
 //
@@ -188,394 +176,12 @@ read_signal_error(NetHandler *nh, UnixNetVConnection 
*vc, int lerrno)
   vc->lerrno = lerrno;
   return read_signal_done(VC_EVENT_ERROR, nh, vc);
 }
-
-static inline int
-write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
-{
-  vc->lerrno = lerrno;
-  return write_signal_done(VC_EVENT_ERROR, nh, vc);
-}
-
-// Read the data for a UnixNetVConnection.
-// Rescheduling the UnixNetVConnection by moving the VC
-// onto or off of the ready_list.
-// Had to wrap this function with net_read_io for SSL.
-static void
-read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
-{
-  NetState *s = &vc->read;
-  int64_t   r = 0;
-
-  MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
-
-  if (!lock.is_locked()) {
-    read_reschedule(nh, vc);
-    return;
-  }
-
-  // It is possible that the closed flag got set from HttpSessionManager in the
-  // global session pool case.  If so, the closed flag should be stable once 
we get the
-  // s->vio.mutex (the global session pool mutex).
-  if (vc->closed) {
-    vc->nh->free_netevent(vc);
-    return;
-  }
-  // if it is not enabled.
-  if (!s->enabled || s->vio.op != VIO::READ || s->vio.is_disabled()) {
-    read_disable(nh, vc);
-    return;
-  }
-
-  MIOBufferAccessor &buf = s->vio.buffer;
-  ink_assert(buf.writer());
-
-  // if there is nothing to do, disable connection
-  int64_t ntodo = s->vio.ntodo();
-  if (ntodo <= 0) {
-    read_disable(nh, vc);
-    return;
-  }
-  int64_t toread = buf.writer()->write_avail();
-  if (toread > ntodo) {
-    toread = ntodo;
-  }
-
-  // read data
-  int64_t  rattempted = 0, total_read = 0;
-  unsigned niov = 0;
-  IOVec    tiovec[NET_MAX_IOV];
-  if (toread) {
-    IOBufferBlock *b = buf.writer()->first_write_block();
-    do {
-      niov       = 0;
-      rattempted = 0;
-      while (b && niov < NET_MAX_IOV) {
-        int64_t a = b->write_avail();
-        if (a > 0) {
-          tiovec[niov].iov_base = b->_end;
-          int64_t togo          = toread - total_read - rattempted;
-          if (a > togo) {
-            a = togo;
-          }
-          tiovec[niov].iov_len  = a;
-          rattempted           += a;
-          niov++;
-          if (a >= togo) {
-            break;
-          }
-        }
-        b = b->next.get();
-      }
-
-      ink_assert(niov > 0);
-      ink_assert(niov <= countof(tiovec));
-      struct msghdr msg;
-
-      ink_zero(msg);
-      msg.msg_name    = const_cast<sockaddr *>(vc->get_remote_addr());
-      msg.msg_namelen = ats_ip_size(vc->get_remote_addr());
-      msg.msg_iov     = &tiovec[0];
-      msg.msg_iovlen  = niov;
-      r               = vc->con.sock.recvmsg(&msg, 0);
-
-      Metrics::Counter::increment(net_rsb.calls_to_read);
-
-      total_read += rattempted;
-    } while (rattempted && r == rattempted && total_read < toread);
-
-    // if we have already moved some bytes successfully, summarize in r
-    if (total_read != rattempted) {
-      if (r <= 0) {
-        r = total_read - rattempted;
-      } else {
-        r = total_read - rattempted + r;
-      }
-    }
-    // check for errors
-    if (r <= 0) {
-      if (r == -EAGAIN || r == -ENOTCONN) {
-        Metrics::Counter::increment(net_rsb.calls_to_read_nodata);
-        vc->read.triggered = 0;
-        nh->read_ready_list.remove(vc);
-        return;
-      }
-
-      if (!r || r == -ECONNRESET) {
-        vc->read.triggered = 0;
-        nh->read_ready_list.remove(vc);
-        read_signal_done(VC_EVENT_EOS, nh, vc);
-        return;
-      }
-      vc->read.triggered = 0;
-      read_signal_error(nh, vc, static_cast<int>(-r));
-      return;
-    }
-    Metrics::Counter::increment(net_rsb.read_bytes, r);
-    Metrics::Counter::increment(net_rsb.read_bytes_count);
-
-    // Add data to buffer and signal continuation.
-    buf.writer()->fill(r);
-#ifdef DEBUG
-    if (buf.writer()->write_avail() <= 0) {
-      Dbg(dbg_ctl_iocore_net, "read_from_net, read buffer full");
-    }
-#endif
-    s->vio.ndone += r;
-    net_activity(vc, thread);
-  } else {
-    r = 0;
-  }
-
-  // Signal read ready, check if user is not done
-  if (r) {
-    // If there are no more bytes to read, signal read complete
-    ink_assert(ntodo >= 0);
-    if (s->vio.ntodo() <= 0) {
-      read_signal_done(VC_EVENT_READ_COMPLETE, nh, vc);
-      Dbg(dbg_ctl_iocore_net, "read_from_net, read finished - signal done");
-      return;
-    } else {
-      if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT) {
-        return;
-      }
-
-      // change of lock... don't look at shared variables!
-      if (lock.get_mutex() != s->vio.mutex.get()) {
-        read_reschedule(nh, vc);
-        return;
-      }
-    }
-  }
-
-  // If here are is no more room, or nothing to do, disable the connection
-  if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
-    read_disable(nh, vc);
-    return;
-  }
-
-  read_reschedule(nh, vc);
-}
-
-//
-// Write the data for a UnixNetVConnection.
-// Rescheduling the UnixNetVConnection when necessary.
-//
-void
-write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
-{
-  Metrics::Counter::increment(net_rsb.calls_to_writetonet);
-  write_to_net_io(nh, vc, thread);
-}
-
-void
-write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
-{
-  NetState     *s = &vc->write;
-  Continuation *c = vc->write.vio.cont;
-
-  MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
-
-  if (!lock.is_locked() || lock.get_mutex() != s->vio.mutex.get()) {
-    write_reschedule(nh, vc);
-    return;
-  }
-
-  if (vc->has_error()) {
-    vc->lerrno = vc->error;
-    write_signal_and_update(VC_EVENT_ERROR, vc);
-    return;
-  }
-
-  // This function will always return true unless
-  // vc is an SSLNetVConnection.
-  if (!vc->getSSLHandShakeComplete()) {
-    if (vc->trackFirstHandshake()) {
-      // Eat the first write-ready.  Until the TLS handshake is complete,
-      // we should still be under the connect timeout and shouldn't bother
-      // the state machine until the TLS handshake is complete
-      vc->write.triggered = 0;
-      nh->write_ready_list.remove(vc);
-    }
-
-    int err, ret;
-
-    if (vc->get_context() == NET_VCONNECTION_OUT) {
-      ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
-    } else {
-      ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
-    }
-
-    if (ret == EVENT_ERROR) {
-      vc->write.triggered = 0;
-      write_signal_error(nh, vc, err);
-    } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == 
SSL_HANDSHAKE_WANT_ACCEPT) {
-      vc->read.triggered = 0;
-      nh->read_ready_list.remove(vc);
-      read_reschedule(nh, vc);
-    } else if (ret == SSL_HANDSHAKE_WANT_CONNECT || ret == 
SSL_HANDSHAKE_WANT_WRITE) {
-      vc->write.triggered = 0;
-      nh->write_ready_list.remove(vc);
-      write_reschedule(nh, vc);
-    } else if (ret == EVENT_DONE) {
-      vc->write.triggered = 1;
-      if (vc->write.enabled) {
-        nh->write_ready_list.in_or_enqueue(vc);
-      }
-      // If this was driven by a zero length read, signal complete when
-      // the handshake is complete. Otherwise set up for continuing read
-      // operations.
-      if (s->vio.ntodo() <= 0) {
-        vc->readSignalDone(VC_EVENT_WRITE_COMPLETE, nh);
-      }
-    } else {
-      write_reschedule(nh, vc);
-    }
-
-    return;
-  }
-
-  // If it is not enabled,add to WaitList.
-  if (!s->enabled || s->vio.op != VIO::WRITE) {
-    write_disable(nh, vc);
-    return;
-  }
-
-  // If there is nothing to do, disable
-  int64_t ntodo = s->vio.ntodo();
-  if (ntodo <= 0) {
-    write_disable(nh, vc);
-    return;
-  }
-
-  MIOBufferAccessor &buf = s->vio.buffer;
-  ink_assert(buf.writer());
-
-  // Calculate the amount to write.
-  int64_t towrite = buf.reader()->read_avail();
-  if (towrite > ntodo) {
-    towrite = ntodo;
-  }
-
-  int signalled = 0;
-
-  // signal write ready to allow user to fill the buffer
-  if (towrite != ntodo && !buf.writer()->high_water()) {
-    if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
-      return;
-    } else if (c != s->vio.cont) { /* The write vio was updated in the handler 
*/
-      write_reschedule(nh, vc);
-      return;
-    }
-
-    ntodo = s->vio.ntodo();
-    if (ntodo <= 0) {
-      write_disable(nh, vc);
-      return;
-    }
-
-    signalled = 1;
-
-    // Recalculate amount to write
-    towrite = buf.reader()->read_avail();
-    if (towrite > ntodo) {
-      towrite = ntodo;
-    }
-  }
-
-  // if there is nothing to do, disable
-  ink_assert(towrite >= 0);
-  if (towrite <= 0) {
-    write_disable(nh, vc);
-    return;
-  }
-
-  int     needs         = 0;
-  int64_t total_written = 0;
-  int64_t r             = vc->load_buffer_and_write(towrite, buf, 
total_written, needs);
-
-  if (total_written > 0) {
-    Metrics::Counter::increment(net_rsb.write_bytes, total_written);
-    Metrics::Counter::increment(net_rsb.write_bytes_count);
-    s->vio.ndone += total_written;
-    net_activity(vc, thread);
-  }
-
-  // A write of 0 makes no sense since we tried to write more than 0.
-  ink_assert(r != 0);
-  // Either we wrote something or got an error.
-  // check for errors
-  if (r < 0) { // if the socket was not ready, add to WaitList
-    if (r == -EAGAIN || r == -ENOTCONN || -r == EINPROGRESS) {
-      Metrics::Counter::increment(net_rsb.calls_to_write_nodata);
-      if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
-        vc->write.triggered = 0;
-        nh->write_ready_list.remove(vc);
-        write_reschedule(nh, vc);
-      }
-
-      if ((needs & EVENTIO_READ) == EVENTIO_READ) {
-        vc->read.triggered = 0;
-        nh->read_ready_list.remove(vc);
-        read_reschedule(nh, vc);
-      }
-
-      return;
-    }
-
-    vc->write.triggered = 0;
-    write_signal_error(nh, vc, static_cast<int>(-r));
-    return;
-  } else {                                        // Wrote data.  Finished 
without error
-    int wbe_event = vc->write_buffer_empty_event; // save so we can clear if 
needed.
-
-    // If the empty write buffer trap is set, clear it.
-    if (!(buf.reader()->is_read_avail_more_than(0))) {
-      vc->write_buffer_empty_event = 0;
-    }
-
-    // If there are no more bytes to write, signal write complete,
-    ink_assert(ntodo >= 0);
-    if (s->vio.ntodo() <= 0) {
-      write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, vc);
-      return;
-    }
-
-    int e = 0;
-    if (!signalled || (s->vio.ntodo() > 0 && !buf.writer()->high_water())) {
-      e = VC_EVENT_WRITE_READY;
-    } else if (wbe_event != vc->write_buffer_empty_event) {
-      // @a signalled means we won't send an event, and the event values 
differing means we
-      // had a write buffer trap and cleared it, so we need to send it now.
-      e = wbe_event;
-    }
-
-    if (e) {
-      if (write_signal_and_update(e, vc) != EVENT_CONT) {
-        return;
-      }
-
-      // change of lock... don't look at shared variables!
-      if (lock.get_mutex() != s->vio.mutex.get()) {
-        write_reschedule(nh, vc);
-        return;
-      }
-    }
-
-    if ((needs & EVENTIO_READ) == EVENTIO_READ) {
-      read_reschedule(nh, vc);
-    }
-
-    if (!(buf.reader()->is_read_avail_more_than(0))) {
-      write_disable(nh, vc);
-      return;
-    }
-
-    if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
-      write_reschedule(nh, vc);
-    }
-
-    return;
-  }
+
+static inline int
+write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
+{
+  vc->lerrno = lerrno;
+  return write_signal_done(VC_EVENT_ERROR, nh, vc);
 }
 
 bool
@@ -739,141 +345,504 @@ UnixNetVConnection::do_io_shutdown(ShutdownHowTo_t 
howto)
 }
 
 //
-// Function used to reenable the VC for reading or
-// writing.
+// Function used to reenable the VC for reading or
+// writing.
+//
+void
+UnixNetVConnection::reenable(VIO *vio)
+{
+  if (STATE_FROM_VIO(vio)->enabled) {
+    return;
+  }
+  set_enabled(vio);
+  if (!thread) {
+    return;
+  }
+  EThread *t = vio->mutex->thread_holding;
+  ink_assert(t == this_ethread());
+  ink_release_assert(!closed);
+  if (nh->mutex->thread_holding == t) {
+    if (vio == &read.vio) {
+      ep.modify(EVENTIO_READ);
+      ep.refresh(EVENTIO_READ);
+      if (read.triggered) {
+        nh->read_ready_list.in_or_enqueue(this);
+      } else {
+        nh->read_ready_list.remove(this);
+      }
+    } else {
+      ep.modify(EVENTIO_WRITE);
+      ep.refresh(EVENTIO_WRITE);
+      if (write.triggered) {
+        nh->write_ready_list.in_or_enqueue(this);
+      } else {
+        nh->write_ready_list.remove(this);
+      }
+    }
+  } else {
+    MUTEX_TRY_LOCK(lock, nh->mutex, t);
+    if (!lock.is_locked()) {
+      if (vio == &read.vio) {
+        int isin = ink_atomic_swap(&read.in_enabled_list, 1);
+        if (!isin) {
+          nh->read_enable_list.push(this);
+        }
+      } else {
+        int isin = ink_atomic_swap(&write.in_enabled_list, 1);
+        if (!isin) {
+          nh->write_enable_list.push(this);
+        }
+      }
+      if (likely(nh->thread)) {
+        nh->thread->tail_cb->signalActivity();
+      } else if (nh->trigger_event) {
+        nh->trigger_event->ethread->tail_cb->signalActivity();
+      }
+    } else {
+      if (vio == &read.vio) {
+        ep.modify(EVENTIO_READ);
+        ep.refresh(EVENTIO_READ);
+        if (read.triggered) {
+          nh->read_ready_list.in_or_enqueue(this);
+        } else {
+          nh->read_ready_list.remove(this);
+        }
+      } else {
+        ep.modify(EVENTIO_WRITE);
+        ep.refresh(EVENTIO_WRITE);
+        if (write.triggered) {
+          nh->write_ready_list.in_or_enqueue(this);
+        } else {
+          nh->write_ready_list.remove(this);
+        }
+      }
+    }
+  }
+}
+
+void
+UnixNetVConnection::reenable_re(VIO *vio)
+{
+  if (!thread) {
+    return;
+  }
+  EThread *t = vio->mutex->thread_holding;
+  ink_assert(t == this_ethread());
+  if (nh->mutex->thread_holding == t) {
+    set_enabled(vio);
+    if (vio == &read.vio) {
+      ep.modify(EVENTIO_READ);
+      ep.refresh(EVENTIO_READ);
+      if (read.triggered) {
+        net_read_io(nh);
+      } else {
+        nh->read_ready_list.remove(this);
+      }
+    } else {
+      ep.modify(EVENTIO_WRITE);
+      ep.refresh(EVENTIO_WRITE);
+      if (write.triggered) {
+        this->net_write_io(nh);
+      } else {
+        nh->write_ready_list.remove(this);
+      }
+    }
+  } else {
+    reenable(vio);
+  }
+}
+
+UnixNetVConnection::UnixNetVConnection()
+{
+  SET_HANDLER(&UnixNetVConnection::startEvent);
+}
+
+// Private methods
+
+void
+UnixNetVConnection::set_enabled(VIO *vio)
+{
+  ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
+  ink_release_assert(!closed);
+  STATE_FROM_VIO(vio)->enabled = 1;
+  if (!next_inactivity_timeout_at && inactivity_timeout_in) {
+    next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in;
+  }
+}
+
+// Read the data for a UnixNetVConnection.
+// Rescheduling the UnixNetVConnection by moving the VC
+// onto or off of the ready_list.
+void
+UnixNetVConnection::net_read_io(NetHandler *nh)
+{
+  NetState *s = &this->read;
+  int64_t   r = 0;
+
+  MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
+
+  if (!lock.is_locked()) {
+    read_reschedule(nh, this);
+    return;
+  }
+
+  // It is possible that the closed flag got set from HttpSessionManager in the
+  // global session pool case.  If so, the closed flag should be stable once 
we get the
+  // s->vio.mutex (the global session pool mutex).
+  if (this->closed) {
+    this->nh->free_netevent(this);
+    return;
+  }
+  // if it is not enabled.
+  if (!s->enabled || s->vio.op != VIO::READ || s->vio.is_disabled()) {
+    read_disable(nh, this);
+    return;
+  }
+
+  MIOBufferAccessor &buf = s->vio.buffer;
+  ink_assert(buf.writer());
+
+  // if there is nothing to do, disable connection
+  int64_t ntodo = s->vio.ntodo();
+  if (ntodo <= 0) {
+    read_disable(nh, this);
+    return;
+  }
+  int64_t toread = buf.writer()->write_avail();
+  if (toread > ntodo) {
+    toread = ntodo;
+  }
+
+  // read data
+  int64_t  rattempted = 0, total_read = 0;
+  unsigned niov = 0;
+  IOVec    tiovec[NET_MAX_IOV];
+  if (toread) {
+    IOBufferBlock *b = buf.writer()->first_write_block();
+    do {
+      niov       = 0;
+      rattempted = 0;
+      while (b && niov < NET_MAX_IOV) {
+        int64_t a = b->write_avail();
+        if (a > 0) {
+          tiovec[niov].iov_base = b->_end;
+          int64_t togo          = toread - total_read - rattempted;
+          if (a > togo) {
+            a = togo;
+          }
+          tiovec[niov].iov_len  = a;
+          rattempted           += a;
+          niov++;
+          if (a >= togo) {
+            break;
+          }
+        }
+        b = b->next.get();
+      }
+
+      ink_assert(niov > 0);
+      ink_assert(niov <= countof(tiovec));
+      struct msghdr msg;
+
+      ink_zero(msg);
+      msg.msg_name    = const_cast<sockaddr *>(this->get_remote_addr());
+      msg.msg_namelen = ats_ip_size(this->get_remote_addr());
+      msg.msg_iov     = &tiovec[0];
+      msg.msg_iovlen  = niov;
+      r               = this->con.sock.recvmsg(&msg, 0);
+
+      Metrics::Counter::increment(net_rsb.calls_to_read);
+
+      total_read += rattempted;
+    } while (rattempted && r == rattempted && total_read < toread);
+
+    // if we have already moved some bytes successfully, summarize in r
+    if (total_read != rattempted) {
+      if (r <= 0) {
+        r = total_read - rattempted;
+      } else {
+        r = total_read - rattempted + r;
+      }
+    }
+    // check for errors
+    if (r <= 0) {
+      if (r == -EAGAIN || r == -ENOTCONN) {
+        Metrics::Counter::increment(net_rsb.calls_to_read_nodata);
+        this->read.triggered = 0;
+        nh->read_ready_list.remove(this);
+        return;
+      }
+
+      if (!r || r == -ECONNRESET) {
+        this->read.triggered = 0;
+        nh->read_ready_list.remove(this);
+        read_signal_done(VC_EVENT_EOS, nh, this);
+        return;
+      }
+      this->read.triggered = 0;
+      read_signal_error(nh, this, static_cast<int>(-r));
+      return;
+    }
+    Metrics::Counter::increment(net_rsb.read_bytes, r);
+    Metrics::Counter::increment(net_rsb.read_bytes_count);
+
+    // Add data to buffer and signal continuation.
+    buf.writer()->fill(r);
+#ifdef DEBUG
+    if (buf.writer()->write_avail() <= 0) {
+      Dbg(dbg_ctl_iocore_net, "read_from_net, read buffer full");
+    }
+#endif
+    s->vio.ndone += r;
+    this->netActivity();
+  } else {
+    r = 0;
+  }
+
+  // Signal read ready, check if user is not done
+  if (r) {
+    // If there are no more bytes to read, signal read complete
+    ink_assert(ntodo >= 0);
+    if (s->vio.ntodo() <= 0) {
+      read_signal_done(VC_EVENT_READ_COMPLETE, nh, this);
+      Dbg(dbg_ctl_iocore_net, "read_from_net, read finished - signal done");
+      return;
+    } else {
+      if (read_signal_and_update(VC_EVENT_READ_READY, this) != EVENT_CONT) {
+        return;
+      }
+
+      // change of lock... don't look at shared variables!
+      if (lock.get_mutex() != s->vio.mutex.get()) {
+        read_reschedule(nh, this);
+        return;
+      }
+    }
+  }
+
+  // If here are is no more room, or nothing to do, disable the connection
+  if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
+    read_disable(nh, this);
+    return;
+  }
+
+  read_reschedule(nh, this);
+}
+
+//
+// Write the data for a UnixNetVConnection.
+// Rescheduling the UnixNetVConnection when necessary.
 //
 void
-UnixNetVConnection::reenable(VIO *vio)
+UnixNetVConnection::net_write_io(NetHandler *nh)
 {
-  if (STATE_FROM_VIO(vio)->enabled) {
+  Metrics::Counter::increment(net_rsb.calls_to_writetonet);
+  NetState     *s = &this->write;
+  Continuation *c = this->write.vio.cont;
+
+  MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
+
+  if (!lock.is_locked() || lock.get_mutex() != s->vio.mutex.get()) {
+    write_reschedule(nh, this);
     return;
   }
-  set_enabled(vio);
-  if (!thread) {
+
+  if (this->has_error()) {
+    this->lerrno = this->error;
+    write_signal_and_update(VC_EVENT_ERROR, this);
     return;
   }
-  EThread *t = vio->mutex->thread_holding;
-  ink_assert(t == this_ethread());
-  ink_release_assert(!closed);
-  if (nh->mutex->thread_holding == t) {
-    if (vio == &read.vio) {
-      ep.modify(EVENTIO_READ);
-      ep.refresh(EVENTIO_READ);
-      if (read.triggered) {
-        nh->read_ready_list.in_or_enqueue(this);
-      } else {
-        nh->read_ready_list.remove(this);
-      }
+
+  // This function will always return true unless
+  // this vc is an SSLNetVConnection.
+  if (!this->getSSLHandShakeComplete()) {
+    if (this->trackFirstHandshake()) {
+      // Eat the first write-ready.  Until the TLS handshake is complete,
+      // we should still be under the connect timeout and shouldn't bother
+      // the state machine until the TLS handshake is complete
+      this->write.triggered = 0;
+      nh->write_ready_list.remove(this);
+    }
+
+    int err, ret;
+
+    if (this->get_context() == NET_VCONNECTION_OUT) {
+      ret = this->sslStartHandShake(SSL_EVENT_CLIENT, err);
     } else {
-      ep.modify(EVENTIO_WRITE);
-      ep.refresh(EVENTIO_WRITE);
-      if (write.triggered) {
-        nh->write_ready_list.in_or_enqueue(this);
-      } else {
-        nh->write_ready_list.remove(this);
-      }
+      ret = this->sslStartHandShake(SSL_EVENT_SERVER, err);
     }
-  } else {
-    MUTEX_TRY_LOCK(lock, nh->mutex, t);
-    if (!lock.is_locked()) {
-      if (vio == &read.vio) {
-        int isin = ink_atomic_swap(&read.in_enabled_list, 1);
-        if (!isin) {
-          nh->read_enable_list.push(this);
-        }
-      } else {
-        int isin = ink_atomic_swap(&write.in_enabled_list, 1);
-        if (!isin) {
-          nh->write_enable_list.push(this);
-        }
+
+    if (ret == EVENT_ERROR) {
+      this->write.triggered = 0;
+      write_signal_error(nh, this, err);
+    } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == 
SSL_HANDSHAKE_WANT_ACCEPT) {
+      this->read.triggered = 0;
+      nh->read_ready_list.remove(this);
+      read_reschedule(nh, this);
+    } else if (ret == SSL_HANDSHAKE_WANT_CONNECT || ret == 
SSL_HANDSHAKE_WANT_WRITE) {
+      this->write.triggered = 0;
+      nh->write_ready_list.remove(this);
+      write_reschedule(nh, this);
+    } else if (ret == EVENT_DONE) {
+      this->write.triggered = 1;
+      if (this->write.enabled) {
+        nh->write_ready_list.in_or_enqueue(this);
       }
-      if (likely(nh->thread)) {
-        nh->thread->tail_cb->signalActivity();
-      } else if (nh->trigger_event) {
-        nh->trigger_event->ethread->tail_cb->signalActivity();
+      // If this was driven by a zero length read, signal complete when
+      // the handshake is complete. Otherwise set up for continuing read
+      // operations.
+      if (s->vio.ntodo() <= 0) {
+        this->readSignalDone(VC_EVENT_WRITE_COMPLETE, nh);
       }
     } else {
-      if (vio == &read.vio) {
-        ep.modify(EVENTIO_READ);
-        ep.refresh(EVENTIO_READ);
-        if (read.triggered) {
-          nh->read_ready_list.in_or_enqueue(this);
-        } else {
-          nh->read_ready_list.remove(this);
-        }
-      } else {
-        ep.modify(EVENTIO_WRITE);
-        ep.refresh(EVENTIO_WRITE);
-        if (write.triggered) {
-          nh->write_ready_list.in_or_enqueue(this);
-        } else {
-          nh->write_ready_list.remove(this);
-        }
-      }
+      write_reschedule(nh, this);
     }
+
+    return;
   }
-}
 
-void
-UnixNetVConnection::reenable_re(VIO *vio)
-{
-  if (!thread) {
+  // If it is not enabled,add to WaitList.
+  if (!s->enabled || s->vio.op != VIO::WRITE) {
+    write_disable(nh, this);
     return;
   }
-  EThread *t = vio->mutex->thread_holding;
-  ink_assert(t == this_ethread());
-  if (nh->mutex->thread_holding == t) {
-    set_enabled(vio);
-    if (vio == &read.vio) {
-      ep.modify(EVENTIO_READ);
-      ep.refresh(EVENTIO_READ);
-      if (read.triggered) {
-        net_read_io(nh, t);
-      } else {
-        nh->read_ready_list.remove(this);
-      }
-    } else {
-      ep.modify(EVENTIO_WRITE);
-      ep.refresh(EVENTIO_WRITE);
-      if (write.triggered) {
-        write_to_net(nh, this, t);
-      } else {
-        nh->write_ready_list.remove(this);
-      }
+
+  // If there is nothing to do, disable
+  int64_t ntodo = s->vio.ntodo();
+  if (ntodo <= 0) {
+    write_disable(nh, this);
+    return;
+  }
+
+  MIOBufferAccessor &buf = s->vio.buffer;
+  ink_assert(buf.writer());
+
+  // Calculate the amount to write.
+  int64_t towrite = buf.reader()->read_avail();
+  if (towrite > ntodo) {
+    towrite = ntodo;
+  }
+
+  int signalled = 0;
+
+  // signal write ready to allow user to fill the buffer
+  if (towrite != ntodo && !buf.writer()->high_water()) {
+    if (write_signal_and_update(VC_EVENT_WRITE_READY, this) != EVENT_CONT) {
+      return;
+    } else if (c != s->vio.cont) { /* The write vio was updated in the handler 
*/
+      write_reschedule(nh, this);
+      return;
+    }
+
+    ntodo = s->vio.ntodo();
+    if (ntodo <= 0) {
+      write_disable(nh, this);
+      return;
+    }
+
+    signalled = 1;
+
+    // Recalculate amount to write
+    towrite = buf.reader()->read_avail();
+    if (towrite > ntodo) {
+      towrite = ntodo;
     }
-  } else {
-    reenable(vio);
   }
-}
 
-UnixNetVConnection::UnixNetVConnection()
-{
-  SET_HANDLER(&UnixNetVConnection::startEvent);
-}
+  // if there is nothing to do, disable
+  ink_assert(towrite >= 0);
+  if (towrite <= 0) {
+    write_disable(nh, this);
+    return;
+  }
 
-// Private methods
+  int     needs         = 0;
+  int64_t total_written = 0;
+  int64_t r             = this->load_buffer_and_write(towrite, buf, 
total_written, needs);
 
-void
-UnixNetVConnection::set_enabled(VIO *vio)
-{
-  ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
-  ink_release_assert(!closed);
-  STATE_FROM_VIO(vio)->enabled = 1;
-  if (!next_inactivity_timeout_at && inactivity_timeout_in) {
-    next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in;
+  if (total_written > 0) {
+    Metrics::Counter::increment(net_rsb.write_bytes, total_written);
+    Metrics::Counter::increment(net_rsb.write_bytes_count);
+    s->vio.ndone += total_written;
+    this->netActivity();
   }
-}
 
-void
-UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
-{
-  read_from_net(nh, this, lthread);
-}
+  // A write of 0 makes no sense since we tried to write more than 0.
+  ink_assert(r != 0);
+  // Either we wrote something or got an error.
+  // check for errors
+  if (r < 0) { // if the socket was not ready, add to WaitList
+    if (r == -EAGAIN || r == -ENOTCONN || -r == EINPROGRESS) {
+      Metrics::Counter::increment(net_rsb.calls_to_write_nodata);
+      if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
+        this->write.triggered = 0;
+        nh->write_ready_list.remove(this);
+        write_reschedule(nh, this);
+      }
 
-void
-UnixNetVConnection::net_write_io(NetHandler *nh, EThread *lthread)
-{
-  write_to_net(nh, this, lthread);
+      if ((needs & EVENTIO_READ) == EVENTIO_READ) {
+        this->read.triggered = 0;
+        nh->read_ready_list.remove(this);
+        read_reschedule(nh, this);
+      }
+
+      return;
+    }
+
+    this->write.triggered = 0;
+    write_signal_error(nh, this, static_cast<int>(-r));
+    return;
+  } else {                                          // Wrote data.  Finished 
without error
+    int wbe_event = this->write_buffer_empty_event; // save so we can clear if 
needed.
+
+    // If the empty write buffer trap is set, clear it.
+    if (!(buf.reader()->is_read_avail_more_than(0))) {
+      this->write_buffer_empty_event = 0;
+    }
+
+    // If there are no more bytes to write, signal write complete,
+    ink_assert(ntodo >= 0);
+    if (s->vio.ntodo() <= 0) {
+      write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, this);
+      return;
+    }
+
+    int e = 0;
+    if (!signalled || (s->vio.ntodo() > 0 && !buf.writer()->high_water())) {
+      e = VC_EVENT_WRITE_READY;
+    } else if (wbe_event != this->write_buffer_empty_event) {
+      // @a signalled means we won't send an event, and the event values 
differing means we
+      // had a write buffer trap and cleared it, so we need to send it now.
+      e = wbe_event;
+    }
+
+    if (e) {
+      if (write_signal_and_update(e, this) != EVENT_CONT) {
+        return;
+      }
+
+      // change of lock... don't look at shared variables!
+      if (lock.get_mutex() != s->vio.mutex.get()) {
+        write_reschedule(nh, this);
+        return;
+      }
+    }
+
+    if ((needs & EVENTIO_READ) == EVENTIO_READ) {
+      read_reschedule(nh, this);
+    }
+
+    if (!(buf.reader()->is_read_avail_more_than(0))) {
+      write_disable(nh, this);
+      return;
+    }
+
+    if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
+      write_reschedule(nh, this);
+    }
+
+    return;
+  }
 }
 
 // This code was pulled out of write_to_net so
@@ -1005,9 +974,14 @@ UnixNetVConnection::writeReschedule(NetHandler *nh)
 }
 
 void
-UnixNetVConnection::netActivity(EThread *lthread)
+UnixNetVConnection::netActivity()
 {
-  net_activity(this, lthread);
+  Dbg(dbg_ctl_socket, "net_activity updating inactivity %" PRId64 ", 
NetVC=%p", this->inactivity_timeout_in, this);
+  if (this->inactivity_timeout_in) {
+    this->next_inactivity_timeout_at = ink_get_hrtime() + 
this->inactivity_timeout_in;
+  } else {
+    this->next_inactivity_timeout_at = 0;
+  }
 }
 
 int


Reply via email to