This is an automated email from the ASF dual-hosted git repository.
bnolsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 2c1ea0607b Revert "fix race condition with session thread migration
(#10897)" (#11101)
2c1ea0607b is described below
commit 2c1ea0607b7eb0ddf74b9bd88e5058909f460069
Author: Brian Olsen <[email protected]>
AuthorDate: Tue Feb 27 17:21:54 2024 -0700
Revert "fix race condition with session thread migration (#10897)" (#11101)
This reverts commit 94f366de200d9dab31e3755a188a5e6290b1ade2.
---
src/iocore/net/EventIO.cc | 12 +--
src/iocore/net/UnixNetVConnection.cc | 6 ++
src/proxy/http/HttpSessionManager.cc | 154 +++++++++++++++++------------------
3 files changed, 82 insertions(+), 90 deletions(-)
diff --git a/src/iocore/net/EventIO.cc b/src/iocore/net/EventIO.cc
index 29c9aefcd0..0ec7ff2078 100644
--- a/src/iocore/net/EventIO.cc
+++ b/src/iocore/net/EventIO.cc
@@ -64,11 +64,7 @@ EventIO::modify(int e)
return 0;
}
- // Session migration may result in this condition.
- if (nullptr == event_loop) {
- return 1;
- }
-
+ ink_assert(event_loop);
#if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER)
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
@@ -121,11 +117,7 @@ EventIO::refresh(int e)
return 0;
}
- // Session migration may result in this condition.
- if (nullptr == event_loop) {
- return 1;
- }
-
+ ink_assert(event_loop);
#if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER)
e = e & events;
struct kevent ev[2];
diff --git a/src/iocore/net/UnixNetVConnection.cc
b/src/iocore/net/UnixNetVConnection.cc
index 8cec52fd4a..cc5bc933ee 100644
--- a/src/iocore/net/UnixNetVConnection.cc
+++ b/src/iocore/net/UnixNetVConnection.cc
@@ -1373,6 +1373,12 @@ UnixNetVConnection::migrateToCurrentThread(Continuation
*cont, EThread *t)
void *arg = this->_prepareForMigration();
+ // Do_io_close will signal the VC to be freed on the original thread
+ // Since we moved the con context, the fd will not be closed
+ // Go ahead and remove the fd from the original thread's epoll structure, so
it is not
+ // processed on two threads simultaneously
+ this->ep.stop();
+
// Create new VC:
UnixNetVConnection *newvc = static_cast<UnixNetVConnection
*>(this->_getNetProcessor()->allocate_vc(t));
ink_assert(newvc != nullptr);
diff --git a/src/proxy/http/HttpSessionManager.cc
b/src/proxy/http/HttpSessionManager.cc
index 8c821775b5..9bb015053e 100644
--- a/src/proxy/http/HttpSessionManager.cc
+++ b/src/proxy/http/HttpSessionManager.cc
@@ -146,58 +146,57 @@ ServerSessionPool::acquireSession(sockaddr const *addr,
CryptoHash const &hostna
HSMresult_t zret = HSM_NOT_FOUND;
to_return = nullptr;
- // first section, match against fqdn/port
if ((TS_SERVER_SESSION_SHARING_MATCH_MASK_HOSTONLY & match_style) &&
!(TS_SERVER_SESSION_SHARING_MATCH_MASK_IP & match_style)) {
Debug("http_ss", "Search for host name only not IP. Pool size %zu",
m_fqdn_pool.count());
// This is broken out because only in this case do we check the host hash
first. The range must be checked
// to verify an upstream that matches port and SNI name is selected. Walk
backwards to select oldest.
- in_port_t const port = ats_ip_port_cast(addr);
- auto iter = m_fqdn_pool.find(hostname_hash);
- while (iter != m_fqdn_pool.end() && iter->hostname_hash == hostname_hash) {
- Debug("http_ss", "Compare port 0x%x against 0x%x", port,
ats_ip_port_cast(iter->get_remote_addr()));
- if (port == ats_ip_port_cast(iter->get_remote_addr()) &&
- (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_SNI) ||
validate_sni(sm, iter->get_netvc())) &&
- (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_HOSTSNISYNC)
|| validate_host_sni(sm, iter->get_netvc())) &&
- (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_CERT) ||
validate_cert(sm, iter->get_netvc()))) {
- to_return = iter;
+ in_port_t port = ats_ip_port_cast(addr);
+ auto first = m_fqdn_pool.find(hostname_hash);
+ while (first != m_fqdn_pool.end() && first->hostname_hash ==
hostname_hash) {
+ Debug("http_ss", "Compare port 0x%x against 0x%x", port,
ats_ip_port_cast(first->get_remote_addr()));
+ if (port == ats_ip_port_cast(first->get_remote_addr()) &&
+ (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_SNI) ||
validate_sni(sm, first->get_netvc())) &&
+ (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_HOSTSNISYNC)
|| validate_host_sni(sm, first->get_netvc())) &&
+ (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_CERT) ||
validate_cert(sm, first->get_netvc()))) {
+ zret = HSM_DONE;
break;
}
- ++iter;
+ ++first;
}
-
- if (iter != m_fqdn_pool.end()) {
+ if (zret == HSM_DONE) {
+ to_return = first;
+ if (!to_return->is_multiplexing()) {
+ this->removeSession(to_return);
+ }
+ } else if (first != m_fqdn_pool.end()) {
Debug("http_ss", "Failed find entry due to name mismatch %s",
sm->t_state.current.server->name);
}
-
- // second section, match against ip addr (includes port)
} else if (TS_SERVER_SESSION_SHARING_MATCH_MASK_IP & match_style) { //
matching is not disabled.
- auto iter = m_ip_pool.find(addr);
+ auto first = m_ip_pool.find(addr);
// The range is all that is needed in the match IP case, otherwise need to
scan for matching fqdn
// And matches the other constraints as well
// Note the port is matched as part of the address key so it doesn't need
to be checked again.
if (match_style & (~TS_SERVER_SESSION_SHARING_MATCH_MASK_IP)) {
- while (iter != m_ip_pool.end() &&
ats_ip_addr_port_eq(iter->get_remote_addr(), addr)) {
- if ((!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_HOSTONLY) ||
iter->hostname_hash == hostname_hash) &&
- (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_SNI) ||
validate_sni(sm, iter->get_netvc())) &&
- (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_HOSTSNISYNC)
|| validate_host_sni(sm, iter->get_netvc())) &&
- (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_CERT) ||
validate_cert(sm, iter->get_netvc()))) {
- to_return = iter;
+ while (first != m_ip_pool.end() &&
ats_ip_addr_port_eq(first->get_remote_addr(), addr)) {
+ if ((!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_HOSTONLY) ||
first->hostname_hash == hostname_hash) &&
+ (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_SNI) ||
validate_sni(sm, first->get_netvc())) &&
+ (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_HOSTSNISYNC)
|| validate_host_sni(sm, first->get_netvc())) &&
+ (!(match_style & TS_SERVER_SESSION_SHARING_MATCH_MASK_CERT) ||
validate_cert(sm, first->get_netvc()))) {
+ zret = HSM_DONE;
break;
}
- ++iter;
+ ++first;
}
- } else if (iter != m_ip_pool.end()) {
- to_return = iter;
+ } else if (first != m_ip_pool.end()) {
+ zret = HSM_DONE;
}
- }
-
- if (nullptr != to_return) {
- zret = HSM_DONE;
- if (!to_return->is_multiplexing()) {
- this->removeSession(to_return);
+ if (zret == HSM_DONE) {
+ to_return = first;
+ if (!to_return->is_multiplexing()) {
+ this->removeSession(to_return);
+ }
}
}
-
return zret;
}
@@ -418,17 +417,16 @@ HSMresult_t
HttpSessionManager::_acquire_session(sockaddr const *ip, CryptoHash const
&hostname_hash, HttpSM *sm,
TSServerSessionSharingMatchMask
match_style, TSServerSessionSharingPoolType pool_type)
{
- PoolableSession *to_return = nullptr;
- HSMresult_t retval = HSM_NOT_FOUND;
- UnixNetVConnection *server_vc = nullptr;
- EThread *const ethread = this_ethread();
- bool acquired = false;
+ PoolableSession *to_return = nullptr;
+ HSMresult_t retval = HSM_NOT_FOUND;
+ bool acquired = false;
// Extend the mutex window until the acquired Server session is attached
// to the SM. Releasing the mutex before that results in race conditions
// due to a potential parallel network read on the VC with no mutex guarding
{
// Now check to see if we have a connection in our shared connection pool
+ EThread *ethread = this_ethread();
Ptr<ProxyMutex> pool_mutex =
(TS_SERVER_SESSION_SHARING_POOL_THREAD == pool_type) ?
ethread->server_session_pool->mutex : m_g_pool->mutex;
@@ -445,42 +443,54 @@ HttpSessionManager::_acquire_session(sockaddr const *ip,
CryptoHash const &hostn
retval = m_g_pool->acquireSession(ip, hostname_hash, match_style,
sm, to_return);
acquired = (HSM_DONE == retval);
Debug("http_ss", "[acquire session] global pool search %s", to_return
? "successful" : "failed");
-
- // If thread must be migrated, clear out the VC's
- // data and event handling on the original thread.
- if (nullptr != to_return) {
- server_vc = dynamic_cast<UnixNetVConnection
*>(to_return->get_netvc());
- if (nullptr != server_vc) {
- if (ethread != server_vc->get_thread()) {
- SCOPED_MUTEX_LOCK(vclock, server_vc->mutex, ethread);
- server_vc->ep.stop();
- server_vc->do_io_read(m_g_pool, 0, nullptr);
+ // At this point to_return has been removed from the pool. Do we need
to move it
+ // to the same thread?
+ if (to_return) {
+ UnixNetVConnection *server_vc = dynamic_cast<UnixNetVConnection
*>(to_return->get_netvc());
+ if (server_vc) {
+ // Disable i/o on this vc now, but, hold onto the g_pool cont
+ // and the mutex to stop any stray events from getting in
+ server_vc->do_io_read(m_g_pool, 0, nullptr);
+ server_vc->do_io_write(m_g_pool, 0, nullptr);
+ UnixNetVConnection *new_vc = server_vc->migrateToCurrentThread(sm,
ethread);
+ // The VC moved, free up the original one
+ if (new_vc != server_vc) {
+ ink_assert(new_vc == nullptr || new_vc->nh != nullptr);
+ if (!new_vc) {
+ // Close out to_return, we were't able to get a connection
+
Metrics::Counter::increment(http_rsb.origin_shutdown_migration_failure);
+ to_return->do_io_close();
+ to_return = nullptr;
+ retval = HSM_NOT_FOUND;
+ } else {
+ // Keep things from timing out on us
+
new_vc->set_inactivity_timeout(new_vc->get_inactivity_timeout());
+ to_return->set_netvc(new_vc);
+ }
+ } else {
+ // Keep things from timing out on us
server_vc->set_inactivity_timeout(server_vc->get_inactivity_timeout());
}
}
}
}
- } else { // Didn't get the lock. to_return is still nullptr
+ } else { // Didn't get the lock. to_return is still NULL
retval = HSM_RETRY;
}
- }
- // now the vc is out of the pool with chance of thread migration
- if (TS_SERVER_SESSION_SHARING_POOL_THREAD != pool_type && nullptr !=
to_return && nullptr != server_vc) {
- UnixNetVConnection *const new_vc = server_vc->migrateToCurrentThread(sm,
ethread);
- // The VC moved, free up the original one
- if (new_vc != server_vc) {
- ink_assert(nullptr == new_vc || nullptr != new_vc->nh);
- if (nullptr == new_vc) {
- // Close out to_return, we were't able to get a connection
-
Metrics::Counter::increment(http_rsb.origin_shutdown_migration_failure);
- to_return->do_io_close(); // already done ??
- to_return = nullptr;
- retval = HSM_NOT_FOUND;
+ if (to_return) {
+ if (sm->create_server_txn(to_return)) {
+ Debug("http_ss", "[%" PRId64 "] [acquire session] return session from
shared pool", to_return->connection_id());
+ to_return->state = PoolableSession::SSN_IN_USE;
+ retval = HSM_DONE;
} else {
- // Keep the new session from timing out on us
- new_vc->set_inactivity_timeout(new_vc->get_inactivity_timeout());
- to_return->set_netvc(new_vc);
+ Debug("http_ss", "[%" PRId64 "] [acquire session] failed to get
transaction on session from shared pool",
+ to_return->connection_id());
+ // Don't close the H2 origin. Otherwise you get use-after free with
the activity timeout cop
+ if (!to_return->is_multiplexing()) {
+ to_return->do_io_close();
+ }
+ retval = HSM_RETRY;
}
}
}
@@ -489,22 +499,6 @@ HttpSessionManager::_acquire_session(sockaddr const *ip,
CryptoHash const &hostn
Metrics::Gauge::decrement(http_rsb.pooled_server_connections);
}
- if (nullptr != to_return) {
- if (sm->create_server_txn(to_return)) {
- Debug("http_ss", "[%" PRId64 "] [acquire session] return session from
shared pool", to_return->connection_id());
- to_return->state = PoolableSession::SSN_IN_USE;
- retval = HSM_DONE;
- } else {
- Debug("http_ss", "[%" PRId64 "] [acquire session] failed to get
transaction on session from shared pool",
- to_return->connection_id());
- // Don't close the H2 origin. Otherwise you get use-after free with the
activity timeout cop
- if (!to_return->is_multiplexing()) {
- to_return->do_io_close();
- }
- retval = HSM_RETRY;
- }
- }
-
return retval;
}