This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 8380e6e0 Fix span lifecycle with smart pointers to prevent
use-after-free in async RPC callbacks (#3140)
8380e6e0 is described below
commit 8380e6e0f6d44ce7c602b1d5d2709293d14ca84e
Author: lh2debug <[email protected]>
AuthorDate: Tue Mar 31 11:36:02 2026 +0800
Fix span lifecycle with smart pointers to prevent use-after-free in async
RPC callbacks (#3140)
* Fix span lifecycle with smart pointers to prevent use-after-free in async
RPC callbacks (#3068)
* Refactor bthread span lifecycle management and optimize span API with
smart pointer reuse (#3068)
---------
Co-authored-by: lhh <lhh>
---
docs/cn/rpcz.md | 12 +-
src/brpc/builtin/rpcz_service.cpp | 90 ++++--
src/brpc/channel.cpp | 22 +-
src/brpc/controller.cpp | 86 ++++--
src/brpc/controller.h | 3 +-
src/brpc/details/controller_private_accessor.h | 12 +-
src/brpc/global.cpp | 8 +-
src/brpc/policy/baidu_rpc_protocol.cpp | 19 +-
src/brpc/policy/couchbase_protocol.cpp | 3 +-
src/brpc/policy/esp_protocol.cpp | 6 +-
src/brpc/policy/http_rpc_protocol.cpp | 10 +-
src/brpc/policy/hulu_pbrpc_protocol.cpp | 9 +-
src/brpc/policy/memcache_binary_protocol.cpp | 3 +-
src/brpc/policy/nova_pbrpc_protocol.cpp | 3 +-
src/brpc/policy/nshead_mcpack_protocol.cpp | 3 +-
src/brpc/policy/nshead_protocol.cpp | 13 +-
src/brpc/policy/public_pbrpc_protocol.cpp | 5 +-
src/brpc/policy/redis_protocol.cpp | 3 +-
src/brpc/policy/rtmp_protocol.cpp | 3 +-
src/brpc/policy/sofa_pbrpc_protocol.cpp | 7 +-
src/brpc/policy/thrift_protocol.cpp | 10 +-
src/brpc/policy/ubrpc2pb_protocol.cpp | 3 +-
src/brpc/span.cpp | 361 ++++++++++++++++++-------
src/brpc/span.h | 161 ++++++++---
src/brpc/traceprintf.h | 17 ++
src/bthread/bthread.cpp | 24 +-
src/bthread/bthread.h | 28 ++
src/bthread/key.cpp | 1 +
src/bthread/task_group.cpp | 42 ++-
src/bthread/task_meta.h | 23 +-
src/bthread/unstable.h | 3 -
src/bvar/collector.cpp | 5 +
test/brpc_channel_unittest.cpp | 11 +-
test/bthread_unittest.cpp | 12 +-
34 files changed, 729 insertions(+), 292 deletions(-)
diff --git a/docs/cn/rpcz.md b/docs/cn/rpcz.md
index 637997d1..12ba2ef3 100644
--- a/docs/cn/rpcz.md
+++ b/docs/cn/rpcz.md
@@ -66,4 +66,14 @@ bthread_attr_t attr = { BTHREAD_STACKTYPE_NORMAL,
BTHREAD_INHERIT_SPAN, NULL };
bthread_start_urgent(&tid, &attr, thread_proc, arg);
```
-注意:使用这种方式创建子bthread来发送rpc,请确保rpc在server返回response之前完成,否则可能导致使用被释放的Span对象而出core。
+### Span生命周期管理
+
+brpc使用智能指针(`std::shared_ptr`/`std::weak_ptr`)管理Span对象的生命周期,并通过自旋锁保护并发访问,解决了以下问题:
+
+1.
**Use-after-free防护**:父Span通过`shared_ptr`持有子Span的强引用,TLS中使用`weak_ptr`存储,确保Span对象在被访问时仍然有效。即使server在子bthread完成前返回response,也不会导致访问已释放的Span对象。
+
+2.
**线程安全**:使用自旋锁保护`_client_list`和`_info`的并发修改,支持多个bthread同时创建子span或添加annotation。
+
+3. **自动生命周期管理**:当父Span销毁时,会自动清理所有子Span(通过`_client_list.clear()`),无需手动管理。
+
+使用`BTHREAD_INHERIT_SPAN`创建子bthread时,不再需要担心Span对象的生命周期问题,可以安全地在异步场景中使用。
diff --git a/src/brpc/builtin/rpcz_service.cpp
b/src/brpc/builtin/rpcz_service.cpp
index d9121eb5..e5111ac3 100644
--- a/src/brpc/builtin/rpcz_service.cpp
+++ b/src/brpc/builtin/rpcz_service.cpp
@@ -185,16 +185,43 @@ static void PrintElapse(std::ostream& os, int64_t
cur_time,
static void PrintAnnotations(
std::ostream& os, int64_t cur_time, int64_t* last_time,
- SpanInfoExtractor** extractors, int num_extr) {
+ SpanInfoExtractor** extractors, int num_extr, const RpczSpan* span) {
int64_t anno_time;
std::string a;
+ const char* span_type_str = "Span";
+ if (span) {
+ switch (span->type()) {
+ case SPAN_TYPE_SERVER:
+ span_type_str = "ServerSpan";
+ break;
+ case SPAN_TYPE_CLIENT:
+ span_type_str = "ClientSpan";
+ break;
+ case SPAN_TYPE_BTHREAD:
+ span_type_str = "BthreadSpan";
+ break;
+ }
+ }
+
// TODO: Going through all extractors is not strictly correct because
// later extractors may have earlier annotations.
for (int i = 0; i < num_extr; ++i) {
while (extractors[i]->PopAnnotation(cur_time, &anno_time, &a)) {
PrintRealTime(os, anno_time);
PrintElapse(os, anno_time, last_time);
- os << ' ' << WebEscape(a);
+ os << ' ';
+ if (span) {
+ const char* short_type = "SPAN";
+ if (span->type() == SPAN_TYPE_SERVER) {
+ short_type = "Server";
+ } else if (span->type() == SPAN_TYPE_CLIENT) {
+ short_type = "Client";
+ } else if (span->type() == SPAN_TYPE_BTHREAD) {
+ short_type = "Bthread";
+ }
+ os << '[' << short_type << " SPAN#" << Hex(span->span_id()) <<
"] ";
+ }
+ os << WebEscape(a);
if (a.empty() || butil::back_char(a) != '\n') {
os << '\n';
}
@@ -204,12 +231,12 @@ static void PrintAnnotations(
static bool PrintAnnotationsAndRealTimeSpan(
std::ostream& os, int64_t cur_time, int64_t* last_time,
- SpanInfoExtractor** extr, int num_extr) {
+ SpanInfoExtractor** extr, int num_extr, const RpczSpan* span) {
if (cur_time == 0) {
// the field was not set.
return false;
}
- PrintAnnotations(os, cur_time, last_time, extr, num_extr);
+ PrintAnnotations(os, cur_time, last_time, extr, num_extr, span);
PrintRealTime(os, cur_time);
PrintElapse(os, cur_time, last_time);
return true;
@@ -239,9 +266,10 @@ static void PrintClientSpan(
extr[num_extr++] = server_extr;
}
extr[num_extr++] = &client_extr;
- // start_send_us is always set for client spans.
- CHECK(PrintAnnotationsAndRealTimeSpan(os, span.start_send_real_us(),
- last_time, extr, num_extr));
+ if (!PrintAnnotationsAndRealTimeSpan(os, span.start_send_real_us(),
+ last_time, extr, num_extr, &span)) {
+ os << " start_send_real_us:not-set";
+ }
const Protocol* protocol = FindProtocol(span.protocol());
const char* protocol_name = (protocol ? protocol->name : "Unknown");
const butil::EndPoint remote_side(butil::int2ip(span.remote_ip()),
span.remote_port());
@@ -271,12 +299,12 @@ static void PrintClientSpan(
os << std::endl;
if (PrintAnnotationsAndRealTimeSpan(os, span.sent_real_us(),
- last_time, extr, num_extr)) {
- os << " Requested(" << span.request_size() << ") [1]" << std::endl;
+ last_time, extr, num_extr, &span)) {
+ os << " [Client SPAN#" << Hex(span.span_id()) << "] Requested(" <<
span.request_size() << ") [1]" << std::endl;
}
if (PrintAnnotationsAndRealTimeSpan(os, span.received_real_us(),
- last_time, extr, num_extr)) {
- os << " Received response(" << span.response_size() << ")";
+ last_time, extr, num_extr, &span)) {
+ os << " [Client SPAN#" << Hex(span.span_id()) << "] Received
response(" << span.response_size() << ")";
if (span.base_cid() != 0 && span.ending_cid() != 0) {
int64_t ver = span.ending_cid() - span.base_cid();
if (ver >= 1) {
@@ -289,18 +317,18 @@ static void PrintClientSpan(
}
if (PrintAnnotationsAndRealTimeSpan(os, span.start_parse_real_us(),
- last_time, extr, num_extr)) {
- os << " Processing the response in a new bthread" << std::endl;
+ last_time, extr, num_extr, &span)) {
+ os << " [Client SPAN#" << Hex(span.span_id()) << "] Processing the
response in a new bthread" << std::endl;
}
if (PrintAnnotationsAndRealTimeSpan(
os, span.start_callback_real_us(),
- last_time, extr, num_extr)) {
- os << (span.async() ? " Enter user's done" : " Back to user's
callsite") << std::endl;
+ last_time, extr, num_extr, &span)) {
+ os << " [Client SPAN#" << Hex(span.span_id()) << "] " << (span.async()
? " Enter user's done" : " Back to user's callsite") << std::endl;
}
PrintAnnotations(os, std::numeric_limits<int64_t>::max(),
- last_time, extr, num_extr);
+ last_time, extr, num_extr, &span);
}
static void PrintClientSpan(std::ostream& os,const RpczSpan& span,
@@ -318,7 +346,15 @@ static void PrintBthreadSpan(std::ostream& os, const
RpczSpan& span, int64_t* la
extr[num_extr++] = server_extr;
}
extr[num_extr++] = &client_extr;
- PrintAnnotations(os, std::numeric_limits<int64_t>::max(), last_time, extr,
num_extr);
+
+ // Print span id for bthread span context identification
+ os << " [Bthread SPAN#" << Hex(span.span_id());
+ if (span.parent_span_id() != 0) {
+ os << " parent#" << Hex(span.parent_span_id());
+ }
+ os << "] ";
+
+ PrintAnnotations(os, std::numeric_limits<int64_t>::max(), last_time, extr,
num_extr, &span);
}
static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
@@ -348,16 +384,16 @@ static void PrintServerSpan(std::ostream& os, const
RpczSpan& span,
os << std::endl;
if (PrintAnnotationsAndRealTimeSpan(
os, span.start_parse_real_us(),
- &last_time, extr, ARRAY_SIZE(extr))) {
- os << " Processing the request in a new bthread" << std::endl;
+ &last_time, extr, ARRAY_SIZE(extr), &span)) {
+ os << " [Server SPAN#" << Hex(span.span_id()) << "] Processing the
request in a new bthread" << std::endl;
}
bool entered_user_method = false;
if (PrintAnnotationsAndRealTimeSpan(
os, span.start_callback_real_us(),
- &last_time, extr, ARRAY_SIZE(extr))) {
+ &last_time, extr, ARRAY_SIZE(extr), &span)) {
entered_user_method = true;
- os << " Enter " << WebEscape(span.full_method_name()) << std::endl;
+ os << " [Server SPAN#" << Hex(span.span_id()) << "] Enter " <<
WebEscape(span.full_method_name()) << std::endl;
}
const int nclient = span.client_spans_size();
@@ -372,22 +408,22 @@ static void PrintServerSpan(std::ostream& os, const
RpczSpan& span,
if (PrintAnnotationsAndRealTimeSpan(
os, span.start_send_real_us(),
- &last_time, extr, ARRAY_SIZE(extr))) {
+ &last_time, extr, ARRAY_SIZE(extr), &span)) {
if (entered_user_method) {
- os << " Leave " << WebEscape(span.full_method_name()) << std::endl;
+ os << " [Server SPAN#" << Hex(span.span_id()) << "] Leave " <<
WebEscape(span.full_method_name()) << std::endl;
} else {
- os << " Responding" << std::endl;
+ os << " [Server SPAN#" << Hex(span.span_id()) << "] Responding" <<
std::endl;
}
}
if (PrintAnnotationsAndRealTimeSpan(
os, span.sent_real_us(),
- &last_time, extr, ARRAY_SIZE(extr))) {
- os << " Responded(" << span.response_size() << ')' << std::endl;
+ &last_time, extr, ARRAY_SIZE(extr), &span)) {
+ os << " [Server SPAN#" << Hex(span.span_id()) << "] Responded(" <<
span.response_size() << ')' << std::endl;
}
PrintAnnotations(os, std::numeric_limits<int64_t>::max(),
- &last_time, extr, ARRAY_SIZE(extr));
+ &last_time, extr, ARRAY_SIZE(extr), &span);
}
class RpczSpanFilter : public SpanFilter {
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index dde4ca0f..a8caeaf9 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -38,6 +38,7 @@
#include "brpc/rdma/rdma_helper.h"
#include "brpc/policy/esp_authenticator.h"
#include "brpc/transport_factory.h"
+#include "brpc/details/controller_private_accessor.h"
namespace brpc {
@@ -502,7 +503,7 @@ void Channel::CallMethod(const
google::protobuf::MethodDescriptor* method,
}
cntl->set_used_by_rpc();
- if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
+ if (cntl->_sender == NULL && IsTraceable(Span::tls_parent().get())) {
const int64_t start_send_us = butil::cpuwide_time_us();
std::string method_name;
if (_get_method_name) {
@@ -513,13 +514,16 @@ void Channel::CallMethod(const
google::protobuf::MethodDescriptor* method,
const static std::string NULL_METHOD_STR = "null-method";
method_name = NULL_METHOD_STR;
}
- Span* span = Span::CreateClientSpan(
+ std::shared_ptr<Span> span = Span::CreateClientSpan(
method_name, start_send_real_us - start_send_us);
- span->set_log_id(cntl->log_id());
- span->set_base_cid(correlation_id);
- span->set_protocol(_options.protocol);
- span->set_start_send_us(start_send_us);
- cntl->_span = span;
+ if (span) {
+ ControllerPrivateAccessor accessor(cntl);
+ span->set_log_id(cntl->log_id());
+ span->set_base_cid(correlation_id);
+ span->set_protocol(_options.protocol);
+ span->set_start_send_us(start_send_us);
+ accessor.set_span(span);
+ }
}
// Override some options if they haven't been set by Controller
if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
@@ -620,9 +624,7 @@ void Channel::CallMethod(const
google::protobuf::MethodDescriptor* method,
// be woken up by callback when RPC finishes (succeeds or still
// fails after retry)
Join(correlation_id);
- if (cntl->_span) {
- cntl->SubmitSpan();
- }
+ cntl->SubmitSpan();
cntl->OnRPCEnd(butil::gettimeofday_us());
}
}
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 133d1f04..15c8c918 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -183,8 +183,8 @@ static void CreateIgnoreAllRead() { s_ignore_all_read = new
IgnoreAllRead; }
// you don't have to set the fields to initial state after deletion since
// they'll be set uniformly after this method is called.
void Controller::ResetNonPods() {
- if (_span) {
- Span::Submit(_span, butil::cpuwide_time_us());
+ if (auto span = _span.lock()) {
+ Span::Submit(span, butil::cpuwide_time_us());
}
_error_text.clear();
_remote_side = butil::EndPoint();
@@ -240,7 +240,7 @@ void Controller::ResetNonPods() {
void Controller::ResetPods() {
// NOTE: Make the sequence of assignments same with the order that they're
// defined in header. Better for cpu cache and faster for lookup.
- _span = NULL;
+ _span.reset();
_flags = 0;
#ifndef BAIDU_INTERNAL
set_pb_bytes_to_base64(true);
@@ -458,9 +458,9 @@ void Controller::SetFailed(const std::string& reason) {
AppendServerIdentiy();
}
_error_text.append(reason);
- if (_span) {
- _span->set_error_code(_error_code);
- _span->Annotate(reason);
+ if (auto span = _span.lock()) {
+ span->set_error_code(_error_code);
+ span->Annotate(reason);
}
UpdateResponseHeader(this);
}
@@ -487,9 +487,9 @@ void Controller::SetFailed(int error_code, const char*
reason_fmt, ...) {
va_start(ap, reason_fmt);
butil::string_vappendf(&_error_text, reason_fmt, ap);
va_end(ap);
- if (_span) {
- _span->set_error_code(_error_code);
- _span->AnnotateCStr(_error_text.c_str() + old_size, 0);
+ if (auto span = _span.lock()) {
+ span->set_error_code(_error_code);
+ span->AnnotateCStr(_error_text.c_str() + old_size, 0);
}
UpdateResponseHeader(this);
}
@@ -515,9 +515,9 @@ void Controller::CloseConnection(const char* reason_fmt,
...) {
va_start(ap, reason_fmt);
butil::string_vappendf(&_error_text, reason_fmt, ap);
va_end(ap);
- if (_span) {
- _span->set_error_code(_error_code);
- _span->AnnotateCStr(_error_text.c_str() + old_size, 0);
+ if (auto span = _span.lock()) {
+ span->set_error_code(_error_code);
+ span->AnnotateCStr(_error_text.c_str() + old_size, 0);
}
UpdateResponseHeader(this);
}
@@ -952,9 +952,9 @@ void Controller::EndRPC(const CompletionInfo& info) {
}
// RPC finished, now it's safe to release `LoadBalancerWithNaming'
_lb.reset();
- if (_span) {
- _span->set_ending_cid(info.id);
- _span->set_async(_done);
+ if (auto span = _span.lock()) {
+ span->set_ending_cid(info.id);
+ span->set_async(_done);
// Submit the span if we're in async RPC. For sync RPC, the span
// is submitted after Join() to get a more accurate resuming timestamp.
if (_done) {
@@ -1028,12 +1028,16 @@ void Controller::DoneInBackupThread() {
void Controller::SubmitSpan() {
const int64_t now = butil::cpuwide_time_us();
- _span->set_start_callback_us(now);
- if (_span->local_parent()) {
- _span->local_parent()->AsParent();
+ if (auto span = _span.lock()) {
+ span->set_start_callback_us(now);
+ if (auto parent_span = span->local_parent().lock()) {
+ if (parent_span->is_active()) {
+ parent_span->AsParent();
+ }
+ }
+ Span::Submit(span, now);
+ _span.reset();
}
- Span::Submit(_span, now);
- _span = NULL;
}
void Controller::HandleSendFailed() {
@@ -1131,8 +1135,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
CHECK_EQ(_remote_side, tmp_sock->remote_side());
}
- Span* span = _span;
- if (span) {
+ if (auto span = _span.lock()) {
if (_current_call.nretry == 0) {
span->set_remote_side(_remote_side);
} else {
@@ -1244,7 +1247,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
int rc;
size_t packet_size = 0;
if (user_packet_guard) {
- if (span) {
+ if (auto span = _span.lock()) {
packet_size = user_packet_guard->EstimatedByteSize();
}
rc = _current_call.sending_sock->Write(user_packet_guard, &wopt);
@@ -1252,7 +1255,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
packet_size = packet.size();
rc = _current_call.sending_sock->Write(&packet, &wopt);
}
- if (span) {
+ if (auto span = _span.lock()) {
if (_current_call.nretry == 0) {
span->set_sent_us(butil::cpuwide_time_us());
span->set_request_size(packet_size);
@@ -1396,8 +1399,19 @@ const Controller* Controller::sub(int index) const {
return NULL;
}
-uint64_t Controller::trace_id() const { return _span ? _span->trace_id() : 0; }
-uint64_t Controller::span_id() const { return _span ? _span->span_id() : 0; }
+uint64_t Controller::trace_id() const {
+ if (auto span = _span.lock()) {
+ return span->trace_id();
+ }
+ return 0;
+}
+
+uint64_t Controller::span_id() const {
+ if (auto span = _span.lock()) {
+ return span->span_id();
+ }
+ return 0;
+}
void* Controller::session_local_data() {
if (_session_local_data) {
@@ -1724,4 +1738,24 @@ void Controller::DoPrintLogPrefix(std::ostream& os)
const {
}
}
+
+ControllerPrivateAccessor& ControllerPrivateAccessor::set_span(
+ const std::shared_ptr<Span>& span) {
+ _cntl->_span = span;
+ return *this;
+}
+
+ControllerPrivateAccessor& ControllerPrivateAccessor::set_span(Span* span) {
+ if (span) {
+ _cntl->_span = span->shared_from_this();
+ } else {
+ _cntl->_span.reset();
+ }
+ return *this;
+}
+
+std::shared_ptr<Span> ControllerPrivateAccessor::span() const {
+ return _cntl->_span.lock();
+}
+
} // namespace brpc
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 69d859ea..45f71b72 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -25,6 +25,7 @@
#include <functional> // std::function
#include <gflags/gflags.h> // Users often need gflags
#include <string>
+#include <memory>
#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
#include "bthread/errno.h" // Redefine errno
#include "butil/endpoint.h" // butil::EndPoint
@@ -803,7 +804,7 @@ private:
private:
// NOTE: align and group fields to make Controller as compact as possible.
- Span* _span;
+ std::weak_ptr<Span> _span;
uint32_t _flags; // all boolean fields inside Controller
int32_t _error_code;
std::string _error_text;
diff --git a/src/brpc/details/controller_private_accessor.h
b/src/brpc/details/controller_private_accessor.h
index 1a9d7062..0ad1aba6 100644
--- a/src/brpc/details/controller_private_accessor.h
+++ b/src/brpc/details/controller_private_accessor.h
@@ -30,9 +30,10 @@ class Message;
}
}
-
namespace brpc {
+class Span;
+
class AuthContext;
// A wrapper to access some private methods/fields of `Controller'
@@ -90,17 +91,16 @@ public:
return *this;
}
- ControllerPrivateAccessor &set_span(Span* span) {
- _cntl->_span = span;
- return *this;
- }
+ // Overloaded set_span methods to support both shared_ptr and raw pointer
+ ControllerPrivateAccessor &set_span(const std::shared_ptr<Span>& span);
+ ControllerPrivateAccessor &set_span(Span* span);
ControllerPrivateAccessor &set_request_protocol(ProtocolType protocol) {
_cntl->_request_protocol = protocol;
return *this;
}
- Span* span() const { return _cntl->_span; }
+ std::shared_ptr<Span> span() const;
uint32_t pipelined_count() const { return _cntl->_pipelined_count; }
void set_pipelined_count(uint32_t count) { _cntl->_pipelined_count =
count; }
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index c561d927..1f67aee2 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -54,6 +54,7 @@
// Span
#include "brpc/span.h"
#include "bthread/unstable.h"
+#include "bthread/bthread.h"
// Compress handlers
#include "brpc/compress.h"
@@ -343,8 +344,11 @@ static void GlobalInitializeOrDieImpl() {
SetLogHandler(&BaiduStreamingLogHandler);
#endif
- // Set bthread create span function
- bthread_set_create_span_func(CreateBthreadSpan);
+ if (bthread_set_span_funcs(CreateBthreadSpanAsVoid,
+ DestroyRpczParentSpan,
+ EndBthreadSpan) != 0) {
+ LOG(FATAL) << "Failed to register span callbacks to bthread";
+ }
// Setting the variable here does not work, the profiler probably check
// the variable before main() for only once.
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 0dba0162..2c5a7e72 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -272,9 +272,9 @@ struct BaiduProxyPBMessages : public RpcPBMessages {
// Used by UT, can't be static.
void SendRpcResponse(int64_t correlation_id, Controller* cntl,
RpcPBMessages* messages, const Server* server,
- MethodStatus* method_status, int64_t received_us) {
+ MethodStatus* method_status, int64_t received_us,
+ std::shared_ptr<Span> span) {
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
@@ -645,7 +645,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
bthread_assign_data((void*)&server->thread_local_options());
}
- Span* span = NULL;
+ std::shared_ptr<Span> span;
if (IsTraceable(request_meta.has_trace_id())) {
span = Span::CreateServerSpan(
request_meta.trace_id(), request_meta.span_id(),
@@ -827,9 +827,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, Controller*, RpcPBMessages*,
- const Server*, MethodStatus*, int64_t>(
+ const Server*, MethodStatus*, int64_t, std::shared_ptr<Span>>(
&SendRpcResponse, meta.correlation_id(),cntl.get(),
- messages, server, method_status, msg->received_us());
+ messages, server, method_status, msg->received_us(), span);
// optional, just release resource ASAP
msg.reset();
@@ -858,10 +858,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
// `cntl', `req' and `res' will be deleted inside `SendRpcResponse'
// `socket' will be held until response has been sent
+
SendRpcResponse(meta.correlation_id(),
cntl.release(), messages,
server, method_status,
- msg->received_us());
+ msg->received_us(), span);
}
bool VerifyRpcRequest(const InputMessageBase* msg_base) {
@@ -948,8 +949,7 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
}
cntl->set_rpc_received_us(msg->received_us());
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.size() + msg->payload.size() + 12);
@@ -1119,8 +1119,7 @@ void PackRpcRequest(butil::IOBuf* req_buf,
}
meta.set_content_type(cntl->request_content_type());
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
request_meta->set_trace_id(span->trace_id());
request_meta->set_span_id(span->span_id());
request_meta->set_parent_span_id(span->parent_span_id());
diff --git a/src/brpc/policy/couchbase_protocol.cpp
b/src/brpc/policy/couchbase_protocol.cpp
index a014581e..0ece53db 100644
--- a/src/brpc/policy/couchbase_protocol.cpp
+++ b/src/brpc/policy/couchbase_protocol.cpp
@@ -160,8 +160,7 @@ void ProcessCouchbaseResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.length());
diff --git a/src/brpc/policy/esp_protocol.cpp b/src/brpc/policy/esp_protocol.cpp
index 5925796b..ee8464b8 100644
--- a/src/brpc/policy/esp_protocol.cpp
+++ b/src/brpc/policy/esp_protocol.cpp
@@ -101,8 +101,7 @@ void PackEspRequest(butil::IOBuf* packet_buf,
}
accessor.get_sending_socket()->set_correlation_id(correlation_id);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_request_size(request.length());
}
@@ -131,8 +130,7 @@ void ProcessEspResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->payload.length());
diff --git a/src/brpc/policy/http_rpc_protocol.cpp
b/src/brpc/policy/http_rpc_protocol.cpp
index d0150a63..b03a961b 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -373,8 +373,7 @@ void ProcessHttpResponse(InputMessageBase* msg) {
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
// TODO: changing when imsg_guard->read_body_progressively() is true
@@ -721,8 +720,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
hreq.uri().set_path(path);
}
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
hreq.SetHeader("x-bd-trace-id", butil::string_printf(
"%llu", (unsigned long long)span->trace_id()));
hreq.SetHeader("x-bd-span-id", butil::string_printf(
@@ -838,7 +836,7 @@ HttpResponseSender::~HttpResponseSender() {
return;
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
+ auto span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
@@ -1493,7 +1491,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
bthread_assign_data((void*)&server->thread_local_options());
}
- Span* span = NULL;
+ std::shared_ptr<Span> span;
const std::string& path = req_header.uri().path();
const std::string* trace_id_str = req_header.GetHeader("x-bd-trace-id");
if (IsTraceable(trace_id_str)) {
diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp
b/src/brpc/policy/hulu_pbrpc_protocol.cpp
index bd0c4960..f6980485 100644
--- a/src/brpc/policy/hulu_pbrpc_protocol.cpp
+++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp
@@ -230,7 +230,7 @@ static void SendHuluResponse(int64_t correlation_id,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
+ auto span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
@@ -414,7 +414,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
bthread_assign_data((void*)&server->thread_local_options());
}
- Span* span = NULL;
+ std::shared_ptr<Span> span;
if (IsTraceable(meta.has_trace_id())) {
span = Span::CreateServerSpan(
meta.trace_id(), meta.span_id(), meta.parent_span_id(),
@@ -612,8 +612,7 @@ void ProcessHuluResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.size() + msg->payload.size() + 12);
@@ -715,7 +714,7 @@ void PackHuluRequest(butil::IOBuf* req_buf,
} // else don't set user_mesage_size when there's no attachment, otherwise
// existing hulu-pbrpc server may complain about empty attachment.
- Span* span = ControllerPrivateAccessor(cntl).span();
+ auto span = ControllerPrivateAccessor(cntl).span();
if (span) {
meta.set_trace_id(span->trace_id());
meta.set_span_id(span->span_id());
diff --git a/src/brpc/policy/memcache_binary_protocol.cpp
b/src/brpc/policy/memcache_binary_protocol.cpp
index d4c39dfd..46432c4f 100644
--- a/src/brpc/policy/memcache_binary_protocol.cpp
+++ b/src/brpc/policy/memcache_binary_protocol.cpp
@@ -164,8 +164,7 @@ void ProcessMemcacheResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.length());
diff --git a/src/brpc/policy/nova_pbrpc_protocol.cpp
b/src/brpc/policy/nova_pbrpc_protocol.cpp
index 249e35c7..a1d88f25 100644
--- a/src/brpc/policy/nova_pbrpc_protocol.cpp
+++ b/src/brpc/policy/nova_pbrpc_protocol.cpp
@@ -121,8 +121,7 @@ void ProcessNovaResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.size() + msg->payload.size());
diff --git a/src/brpc/policy/nshead_mcpack_protocol.cpp
b/src/brpc/policy/nshead_mcpack_protocol.cpp
index 052fd0f3..8ba49f93 100644
--- a/src/brpc/policy/nshead_mcpack_protocol.cpp
+++ b/src/brpc/policy/nshead_mcpack_protocol.cpp
@@ -112,8 +112,7 @@ void ProcessNsheadMcpackResponse(InputMessageBase*
msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.size() + msg->payload.size());
diff --git a/src/brpc/policy/nshead_protocol.cpp
b/src/brpc/policy/nshead_protocol.cpp
index a26dc968..82f696e3 100644
--- a/src/brpc/policy/nshead_protocol.cpp
+++ b/src/brpc/policy/nshead_protocol.cpp
@@ -69,7 +69,7 @@ void NsheadClosure::Run() {
std::unique_ptr<NsheadClosure, DeleteNsheadClosure> recycle_ctx(this);
ControllerPrivateAccessor accessor(&_controller);
- Span* span = accessor.span();
+ auto span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
@@ -144,8 +144,7 @@ void NsheadClosure::Run() {
void NsheadClosure::SetMethodName(const std::string& full_method_name) {
ControllerPrivateAccessor accessor(&_controller);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->ResetServerSpanName(full_method_name);
}
}
@@ -298,7 +297,7 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
bthread_assign_data((void*)&server->thread_local_options());
}
- Span* span = NULL;
+ std::shared_ptr<Span> span;
if (IsTraceable(false)) {
span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us());
accessor.set_span(span);
@@ -369,8 +368,7 @@ void ProcessNsheadResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->payload.length());
@@ -439,8 +437,7 @@ void PackNsheadRequest(
// pack the field.
accessor.get_sending_socket()->set_correlation_id(correlation_id);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_request_size(request.length());
// TODO: Nowhere to set tracing ids.
// request_meta->set_trace_id(span->trace_id());
diff --git a/src/brpc/policy/public_pbrpc_protocol.cpp
b/src/brpc/policy/public_pbrpc_protocol.cpp
index 38a749dc..a4298a15 100644
--- a/src/brpc/policy/public_pbrpc_protocol.cpp
+++ b/src/brpc/policy/public_pbrpc_protocol.cpp
@@ -174,8 +174,7 @@ void ProcessPublicPbrpcResponse(InputMessageBase* msg_base)
{
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.size() + msg->payload.size());
@@ -269,7 +268,7 @@ void PackPublicPbrpcRequest(butil::IOBuf* buf,
nshead.body_len = GetProtobufByteSize(pbreq);
buf->append(&nshead, sizeof(nshead));
- Span* span = ControllerPrivateAccessor(controller).span();
+ auto span = ControllerPrivateAccessor(controller).span();
if (span) {
// TODO: Nowhere to set tracing ids.
// request_meta->set_trace_id(span->trace_id());
diff --git a/src/brpc/policy/redis_protocol.cpp
b/src/brpc/policy/redis_protocol.cpp
index 9e8e148e..7dc5b5b8 100644
--- a/src/brpc/policy/redis_protocol.cpp
+++ b/src/brpc/policy/redis_protocol.cpp
@@ -237,8 +237,7 @@ void ProcessRedisResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->response.ByteSize());
diff --git a/src/brpc/policy/rtmp_protocol.cpp
b/src/brpc/policy/rtmp_protocol.cpp
index 8b251eb2..d7064686 100644
--- a/src/brpc/policy/rtmp_protocol.cpp
+++ b/src/brpc/policy/rtmp_protocol.cpp
@@ -3540,8 +3540,7 @@ void OnServerStreamCreated::Run(bool error,
break;
}
} while (0);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(base_realtime);
span->set_received_us(received_us);
span->set_response_size(istream->popped_bytes());
diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp
b/src/brpc/policy/sofa_pbrpc_protocol.cpp
index 2fb33ed5..01b21851 100644
--- a/src/brpc/policy/sofa_pbrpc_protocol.cpp
+++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp
@@ -215,7 +215,7 @@ static void SendSofaResponse(int64_t correlation_id,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
+ auto span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
@@ -374,7 +374,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
bthread_assign_data((void*)&server->thread_local_options());
}
- Span* span = NULL;
+ std::shared_ptr<Span> span;
if (IsTraceable(false)) {
span = Span::CreateServerSpan(
0/*meta.trace_id()*/, 0/*meta.span_id()*/,
@@ -517,8 +517,7 @@ void ProcessSofaResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.size() + msg->payload.size() + 24);
diff --git a/src/brpc/policy/thrift_protocol.cpp
b/src/brpc/policy/thrift_protocol.cpp
index 1e25066d..2b5739ea 100755
--- a/src/brpc/policy/thrift_protocol.cpp
+++ b/src/brpc/policy/thrift_protocol.cpp
@@ -243,7 +243,7 @@ void ThriftClosure::DoRun() {
const Server* server = _controller.server();
ControllerPrivateAccessor accessor(&_controller);
- Span* span = accessor.span();
+ auto span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
@@ -515,7 +515,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
bthread_assign_data((void*)&server->thread_local_options());
}
- Span* span = NULL;
+ std::shared_ptr<Span> span;
if (IsTraceable(false)) {
span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us());
accessor.set_span(span);
@@ -584,8 +584,7 @@ void ProcessThriftResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->payload.length());
@@ -752,8 +751,7 @@ void PackThriftRequest(
// pack the field.
accessor.get_sending_socket()->set_correlation_id(correlation_id);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_request_size(request.length());
// TODO: Nowhere to set tracing ids.
// request_meta->set_trace_id(span->trace_id());
diff --git a/src/brpc/policy/ubrpc2pb_protocol.cpp
b/src/brpc/policy/ubrpc2pb_protocol.cpp
index fe2c4619..2f5194c8 100644
--- a/src/brpc/policy/ubrpc2pb_protocol.cpp
+++ b/src/brpc/policy/ubrpc2pb_protocol.cpp
@@ -455,8 +455,7 @@ void ProcessUbrpcResponse(InputMessageBase* msg_base) {
}
ControllerPrivateAccessor accessor(cntl);
- Span* span = accessor.span();
- if (span) {
+ if (auto span = accessor.span()) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->meta.size() + msg->payload.size());
diff --git a/src/brpc/span.cpp b/src/brpc/span.cpp
index 8e9af46c..3a53f33a 100644
--- a/src/brpc/span.cpp
+++ b/src/brpc/span.cpp
@@ -37,9 +37,92 @@
#define BRPC_SPAN_INFO_SEP "\1"
-
namespace brpc {
+// Callback for creating a new bthread span when creating a new bthread.
+// This is called by bthread layer when BTHREAD_INHERIT_SPAN flag is set.
+// Returns a heap-allocated weak_ptr<Span>* as void*, or NULL if span creation
fails.
+void* CreateBthreadSpanAsVoid() {
+ const int64_t received_us = butil::cpuwide_time_us();
+ const int64_t base_realtime = butil::gettimeofday_us() - received_us;
+ std::shared_ptr<Span> span = Span::CreateBthreadSpan("Bthread",
base_realtime);
+
+ if (!span) {
+ return NULL;
+ }
+ return new std::weak_ptr<Span>(span);
+}
+
+void DestroyRpczParentSpan(void* ptr) {
+ if (ptr) {
+ delete static_cast<std::weak_ptr<Span>*>(ptr);
+ }
+}
+
+void EndBthreadSpan() {
+ std::shared_ptr<Span> span = GetTlsParentSpan();
+ if (span) {
+ span->set_ending_tid(bthread_self());
+ }
+
+ ClearTlsParentSpan();
+}
+
+void SetTlsParentSpan(std::shared_ptr<Span> span) {
+ using namespace bthread;
+ LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
+ if (ls.rpcz_parent_span) {
+ *static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span) = span;
+ } else {
+ ls.rpcz_parent_span = new std::weak_ptr<Span>(span);
+ BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls);
+ }
+}
+
+std::shared_ptr<Span> GetTlsParentSpan() {
+ using namespace bthread;
+ LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
+ if (!ls.rpcz_parent_span) {
+ return nullptr;
+ }
+
+ auto* weak_ptr = static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span);
+ return weak_ptr->lock();
+}
+
+void ClearTlsParentSpan() {
+ using namespace bthread;
+ LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
+ if (ls.rpcz_parent_span) {
+ static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span)->reset();
+ }
+}
+
+bool HasTlsParentSpan() {
+ using namespace bthread;
+ LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
+ if (!ls.rpcz_parent_span) {
+ return false;
+ }
+
+ auto* weak_ptr = static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span);
+ return !weak_ptr->expired();
+}
+
+
+void SpanDeleter::operator()(Span* r) const {
+ if (r == NULL) {
+ return;
+ }
+
+ // All children will be destroyed automatically along with the list.
+ // The list holds std::shared_ptr<> which will trigger deletion of
+ // children.
+ r->_client_list.clear();
+ r->_info.clear();
+ butil::return_object(r);
+}
+
const int64_t SPAN_DELETE_INTERVAL_US = 10000000L/*10s*/;
DEFINE_string(rpcz_database_dir, "./rpc_data/rpcz",
@@ -104,15 +187,28 @@ inline uint64_t GenerateTraceId() {
return (g->current_random & 0xFFFFFFFFFFFF0000ULL) | g->seq++;
}
-Span* Span::CreateClientSpan(const std::string& full_method_name,
- int64_t base_real_us) {
- Span* span = butil::get_object<Span>(Forbidden());
- if (__builtin_expect(span == NULL, 0)) {
- return NULL;
+Span::Span(Forbidden) {
+ CHECK_EQ(0, pthread_spin_init(&_info_spinlock, 0))
+ << "Failed to initialize _info_spinlock";
+ CHECK_EQ(0, pthread_spin_init(&_client_list_spinlock, 0))
+ << "Failed to initialize _client_list_spinlock";
+}
+
+Span::~Span() {
+ pthread_spin_destroy(&_client_list_spinlock);
+ pthread_spin_destroy(&_info_spinlock);
+}
+
+std::shared_ptr<Span> Span::CreateClientSpan(const std::string&
full_method_name,
+ int64_t base_real_us) {
+ Span* span_raw = butil::get_object<Span>(Forbidden());
+ if (__builtin_expect(span_raw == NULL, 0)) {
+ return nullptr;
}
+ std::shared_ptr<Span> span(span_raw, SpanDeleter());
span->_log_id = 0;
span->_base_cid = INVALID_BTHREAD_ID;
- span->_ending_cid = INVALID_BTHREAD_ID;
+ span->_ending_cid = INVALID_BTHREAD_ID; // Client Span uses ending_cid
span->_type = SPAN_TYPE_CLIENT;
span->_async = false;
span->_protocol = PROTOCOL_UNKNOWN;
@@ -125,40 +221,36 @@ Span* Span::CreateClientSpan(const std::string&
full_method_name,
span->_start_callback_real_us = 0;
span->_start_send_real_us = 0;
span->_sent_real_us = 0;
- span->_next_client = NULL;
- span->_client_list = NULL;
- span->_tls_next = NULL;
span->_full_method_name = full_method_name;
span->_info.clear();
- Span* parent = static_cast<Span*>(bthread::tls_bls.rpcz_parent_span);
+ std::shared_ptr<Span> parent = Span::tls_parent();
if (parent) {
span->_trace_id = parent->trace_id();
span->_parent_span_id = parent->span_id();
span->_local_parent = parent;
- span->_next_client = parent->_client_list;
- parent->_client_list = span;
+ {
+ BAIDU_SCOPED_LOCK(parent->_client_list_spinlock);
+ parent->_client_list.push_back(span);
+ }
} else {
span->_trace_id = GenerateTraceId();
span->_parent_span_id = 0;
- span->_local_parent = NULL;
}
span->_span_id = GenerateSpanId();
return span;
}
-Span* Span::CreateBthreadSpan(const std::string& full_method_name,
- int64_t base_real_us) {
- Span* parent = static_cast<Span*>(bthread::tls_bls.rpcz_parent_span);
- if (parent == NULL) {
- return NULL;
- }
- Span* span = butil::get_object<Span>(Forbidden());
- if (__builtin_expect(span == NULL, 0)) {
- return NULL;
+std::shared_ptr<Span> Span::CreateBthreadSpan(const std::string&
full_method_name,
+ int64_t base_real_us) {
+ std::shared_ptr<Span> parent = Span::tls_parent();
+ Span* span_raw = butil::get_object<Span>(Forbidden());
+ if (__builtin_expect(span_raw == NULL, 0)) {
+ return nullptr;
}
+ std::shared_ptr<Span> span(span_raw, SpanDeleter());
span->_log_id = 0;
span->_base_cid = INVALID_BTHREAD_ID;
- span->_ending_cid = INVALID_BTHREAD_ID;
+ span->_ending_tid = INVALID_BTHREAD; // Bthread Span uses ending_tid
span->_type = SPAN_TYPE_BTHREAD;
span->_async = false;
span->_protocol = PROTOCOL_UNKNOWN;
@@ -171,17 +263,21 @@ Span* Span::CreateBthreadSpan(const std::string&
full_method_name,
span->_start_callback_real_us = 0;
span->_start_send_real_us = 0;
span->_sent_real_us = 0;
- span->_next_client = NULL;
- span->_client_list = NULL;
- span->_tls_next = NULL;
span->_full_method_name = full_method_name;
span->_info.clear();
- span->_trace_id = parent->trace_id();
- span->_parent_span_id = parent->span_id();
- span->_local_parent = parent;
- span->_next_client = parent->_client_list;
- parent->_client_list = span;
+ if (parent) {
+ span->_trace_id = parent->trace_id();
+ span->_parent_span_id = parent->span_id();
+ span->_local_parent = parent;
+ {
+ BAIDU_SCOPED_LOCK(parent->_client_list_spinlock);
+ parent->_client_list.push_back(span);
+ }
+ } else {
+ span->_trace_id = GenerateTraceId();
+ span->_parent_span_id = 0;
+ }
span->_span_id = GenerateSpanId();
return span;
@@ -193,20 +289,21 @@ inline const std::string& unknown_span_name() {
return s_unknown_method_name;
}
-Span* Span::CreateServerSpan(
+std::shared_ptr<Span> Span::CreateServerSpan(
const std::string& full_method_name,
uint64_t trace_id, uint64_t span_id, uint64_t parent_span_id,
int64_t base_real_us) {
- Span* span = butil::get_object<Span>(Forbidden());
- if (__builtin_expect(span == NULL, 0)) {
- return NULL;
+ Span* span_raw = butil::get_object<Span>(Forbidden());
+ if (__builtin_expect(span_raw == NULL, 0)) {
+ return nullptr;
}
+ std::shared_ptr<Span> span(span_raw, SpanDeleter());
span->_trace_id = (trace_id ? trace_id : GenerateTraceId());
span->_span_id = (span_id ? span_id : GenerateSpanId());
span->_parent_span_id = parent_span_id;
span->_log_id = 0;
span->_base_cid = INVALID_BTHREAD_ID;
- span->_ending_cid = INVALID_BTHREAD_ID;
+ span->_ending_cid = INVALID_BTHREAD_ID; // Server Span uses ending_cid
span->_type = SPAN_TYPE_SERVER;
span->_async = false;
span->_protocol = PROTOCOL_UNKNOWN;
@@ -219,17 +316,13 @@ Span* Span::CreateServerSpan(
span->_start_callback_real_us = 0;
span->_start_send_real_us = 0;
span->_sent_real_us = 0;
- span->_next_client = NULL;
- span->_client_list = NULL;
- span->_tls_next = NULL;
span->_full_method_name = (!full_method_name.empty() ?
full_method_name : unknown_span_name());
span->_info.clear();
- span->_local_parent = NULL;
return span;
}
-Span* Span::CreateServerSpan(
+std::shared_ptr<Span> Span::CreateServerSpan(
uint64_t trace_id, uint64_t span_id, uint64_t parent_span_id,
int64_t base_real_us) {
return CreateServerSpan(unknown_span_name(), trace_id, span_id,
@@ -241,26 +334,22 @@ void Span::ResetServerSpanName(const std::string&
full_method_name) {
full_method_name : unknown_span_name());
}
-void Span::destroy() {
+void Span::submit(int64_t cpuwide_us) {
+ // Note: this method is not called for client-side spans.
EndAsParent();
- traversal(this, [](Span* r) {
- r->_info.clear();
- butil::return_object(r);
- });
-}
-
-void Span::traversal(Span* r, const std::function<void(Span*)>& f) const {
- if (r == NULL) {
- return;
- }
- for (auto p = r->_client_list; p != NULL; p = p->_next_client) {
- traversal(p, f);
+ SpanContainer* container = new(std::nothrow)
SpanContainer(shared_from_this());
+ // If memory allocation fails, the server span will not be submitted for
persistence.
+ // The server span will be destroyed later when its shared_ptr refcount
drops to zero
+ // Child spans (held in _client_list) will also be destroyed when
+ // their refcounts reach zero.
+ if (container) {
+ container->submit(cpuwide_us);
}
- f(r);
}
void Span::Annotate(const char* fmt, ...) {
const int64_t anno_time = butil::cpuwide_time_us() + _base_real_us;
+ BAIDU_SCOPED_LOCK(_info_spinlock);
butil::string_appendf(&_info, BRPC_SPAN_INFO_SEP "%lld ",
(long long)anno_time);
va_list ap;
@@ -271,6 +360,7 @@ void Span::Annotate(const char* fmt, ...) {
void Span::Annotate(const char* fmt, va_list args) {
const int64_t anno_time = butil::cpuwide_time_us() + _base_real_us;
+ BAIDU_SCOPED_LOCK(_info_spinlock);
butil::string_appendf(&_info, BRPC_SPAN_INFO_SEP "%lld ",
(long long)anno_time);
butil::string_vappendf(&_info, fmt, args);
@@ -278,6 +368,7 @@ void Span::Annotate(const char* fmt, va_list args) {
void Span::Annotate(const std::string& info) {
const int64_t anno_time = butil::cpuwide_time_us() + _base_real_us;
+ BAIDU_SCOPED_LOCK(_info_spinlock);
butil::string_appendf(&_info, BRPC_SPAN_INFO_SEP "%lld ",
(long long)anno_time);
_info.append(info);
@@ -285,6 +376,7 @@ void Span::Annotate(const std::string& info) {
void Span::AnnotateCStr(const char* info, size_t length) {
const int64_t anno_time = butil::cpuwide_time_us() + _base_real_us;
+ BAIDU_SCOPED_LOCK(_info_spinlock);
butil::string_appendf(&_info, BRPC_SPAN_INFO_SEP "%lld ",
(long long)anno_time);
if (length <= 0) {
@@ -295,9 +387,14 @@ void Span::AnnotateCStr(const char* info, size_t length) {
}
size_t Span::CountClientSpans() const {
- size_t n = 0;
- traversal(const_cast<Span*>(this), [&](Span*) { ++n; });
- return n - 1;
+ size_t n = 1;
+ {
+ BAIDU_SCOPED_LOCK(_client_list_spinlock);
+ for (const auto& child : _client_list) {
+ n += child->CountClientSpans();
+ }
+ }
+ return n;
}
int64_t Span::GetStartRealTimeUs() const {
@@ -345,15 +442,26 @@ bool SpanInfoExtractor::PopAnnotation(
}
bool CanAnnotateSpan() {
- return bthread::tls_bls.rpcz_parent_span;
+ return HasTlsParentSpan();
}
void AnnotateSpan(const char* fmt, ...) {
- Span* span = static_cast<Span*>(bthread::tls_bls.rpcz_parent_span);
- va_list ap;
- va_start(ap, fmt);
- span->Annotate(fmt, ap);
- va_end(ap);
+ std::shared_ptr<Span> span = GetTlsParentSpan();
+ if (span) { // TRACEPRINTF checks CanAnnotateSpan, but this is safer.
+ va_list ap;
+ va_start(ap, fmt);
+ span->Annotate(fmt, ap);
+ va_end(ap);
+ }
+}
+
+void AnnotateSpanEx(std::shared_ptr<Span> span, const char* fmt, ...) {
+ if (span) {
+ va_list ap;
+ va_start(ap, fmt);
+ span->Annotate(fmt, ap);
+ va_end(ap);
+ }
}
class SpanDB : public SharedObject {
@@ -365,7 +473,7 @@ public:
SpanDB() : id_db(NULL), time_db(NULL) { }
static SpanDB* Open();
- leveldb::Status Index(const Span* span, std::string* value_buf);
+ leveldb::Status Index(std::shared_ptr<const Span> span, std::string*
value_buf);
leveldb::Status RemoveSpansBefore(int64_t tm);
private:
@@ -405,10 +513,14 @@ static bvar::DisplaySamplingRatio
s_display_sampling_ratio(
"rpcz_sampling_ratio", &g_span_sl);
struct SpanEarlier {
- bool operator()(bvar::Collected* c1, bvar::Collected* c2) const {
- const Span* span1 = static_cast<const Span*>(c1);
- const Span* span2 = static_cast<const Span*>(c2);
- return span1->GetStartRealTimeUs() < span2->GetStartRealTimeUs();
+ bool operator()(const bvar::Collected* c1, const bvar::Collected* c2)
const {
+ const SpanContainer* container1 = static_cast<const
SpanContainer*>(c1);
+ const SpanContainer* container2 = static_cast<const
SpanContainer*>(c2);
+
+ const int64_t time1 = container1->span()->GetStartRealTimeUs();
+ const int64_t time2 = container2->span()->GetStartRealTimeUs();
+
+ return time1 < time2;
}
};
class SpanPreprocessor : public bvar::CollectorPreprocessor {
@@ -471,8 +583,13 @@ inline int GetSpanDB(butil::intrusive_ptr<SpanDB>* db) {
return -1;
}
-void Span::Submit(Span* span, int64_t cpuwide_time_us) {
- if (span->local_parent() == NULL) {
+void Span::Submit(std::shared_ptr<Span> span, int64_t cpuwide_time_us) {
+ // Only submit spans without a local parent (i.e., server spans).
+ // Server spans hold shared_ptr references to their child spans (via
_client_list),
+ // ensuring child spans remain alive until the server span is submitted
and dumped.
+ // Client spans are not submitted here because their lifetime is managed
by their
+ // parent server span.
+ if (span->local_parent().expired()) {
span->submit(cpuwide_time_us);
}
}
@@ -497,6 +614,7 @@ static void Span2Proto(const Span* span, RpczSpan* out) {
out->set_start_send_real_us(span->start_send_real_us());
out->set_sent_real_us(span->sent_real_us());
out->set_full_method_name(span->full_method_name());
+ // info() returns by value for thread safety (see span.h for details).
out->set_info(span->info());
out->set_error_code(span->error_code());
}
@@ -571,7 +689,7 @@ SpanDB* SpanDB::Open() {
return db;
}
-leveldb::Status SpanDB::Index(const Span* span, std::string* value_buf) {
+leveldb::Status SpanDB::Index(std::shared_ptr<const Span> span, std::string*
value_buf) {
leveldb::WriteOptions options;
options.sync = false;
@@ -637,20 +755,46 @@ leveldb::Status SpanDB::Index(const Span* span,
std::string* value_buf) {
ToBigEndian(span->span_id(), key_data + 2);
leveldb::Slice key((char*)key_data, sizeof(key_data));
RpczSpan value_proto;
- Span2Proto(span, &value_proto);
- // client spans should be reversed.
- size_t client_span_count = span->CountClientSpans();
- for (size_t i = 0; i < client_span_count; ++i) {
- value_proto.add_client_spans();
- }
- size_t i = 0;
- span->traversal(const_cast<Span*>(span), [&](Span* p) {
- if (span == p) {
- return;
+ Span2Proto(span.get(), &value_proto);
+
+ std::vector<std::shared_ptr<const Span>> all_child_spans;
+
+ std::function<void(std::shared_ptr<const Span>)> collect_all_spans =
+ [&](std::shared_ptr<const Span> current_span) {
+ if (!current_span) {
+ return;
+ }
+
+ std::vector<std::shared_ptr<const Span>> children;
+ {
+ BAIDU_SCOPED_LOCK(current_span->_client_list_spinlock);
+ children.reserve(current_span->_client_list.size());
+ for (const auto& child_span : current_span->_client_list) {
+ if (child_span) {
+ children.push_back(child_span);
+ }
+ }
+ }
+
+ for (const auto& child : children) {
+ collect_all_spans(child);
+ }
+
+ all_child_spans.push_back(current_span);
+ };
+
+ collect_all_spans(span);
+
+ // Traverse in reverse order and insert child <span> elements.
+ // Only collect ended spans to avoid race conditions - active spans may
still
+ // be modified by other threads, which could lead to inconsistent data when
+ // serializing to database.
+ for (auto it = all_child_spans.rbegin(); it != all_child_spans.rend();
++it) {
+ if (*it && it->get() != span.get() && !(*it)->is_active()) {
+ RpczSpan* child_proto = value_proto.add_client_spans();
+ Span2Proto((*it).get(), child_proto);
}
- Span2Proto(p, value_proto.mutable_client_spans(client_span_count - i -
1));
- ++i;
- });
+ }
if (!value_proto.SerializeToString(value_buf)) {
return leveldb::Status::InvalidArgument(
leveldb::Slice("Fail to serialize RpczSpan"));
@@ -691,7 +835,7 @@ leveldb::Status SpanDB::RemoveSpansBefore(int64_t tm) {
break;
}
} else {
- LOG(ERROR) << "Fail to parse from value";
+ LOG(ERROR) << "Fail to parse value";
}
rc = time_db->Delete(options, it->key());
if (!rc.ok()) {
@@ -704,7 +848,7 @@ leveldb::Status SpanDB::RemoveSpansBefore(int64_t tm) {
}
// Write span into leveldb.
-void Span::dump_and_destroy(size_t /*round*/) {
+void Span::dump_to_db() {
StartIndexingIfNeeded();
std::string value_buf;
@@ -712,21 +856,18 @@ void Span::dump_and_destroy(size_t /*round*/) {
butil::intrusive_ptr<SpanDB> db;
if (GetSpanDB(&db) != 0) {
if (g_span_ending) {
- destroy();
return;
}
SpanDB* db2 = SpanDB::Open();
if (db2 == NULL) {
LOG(WARNING) << "Fail to open SpanDB";
- destroy();
return;
}
ResetSpanDB(db2);
db.reset(db2);
}
- leveldb::Status st = db->Index(this, &value_buf);
- destroy();
+ leveldb::Status st = db->Index(shared_from_this(), &value_buf);
if (!st.ok()) {
LOG(WARNING) << st.ToString();
if (st.IsNotFound() || st.IsIOError() || st.IsCorruption()) {
@@ -751,6 +892,42 @@ void Span::dump_and_destroy(size_t /*round*/) {
}
}
+// ========== SpanContainer ============
+
+// Destroy the span container without persisting to database.
+// This is called in abnormal scenarios:
+// 1. When the pending sample queue is full (to prevent memory explosion)
+// 2. When grab_thread hasn't run for too long (system overload)
+// In these cases, we discard the span quickly without expensive I/O.
+void SpanContainer::destroy() {
+ delete this;
+}
+
+// The round parameter is required by bvar::Collected interface but unused
here.
+// Other implementations (e.g., SampledRequest in rpc_dump.cpp) use it to
detect
+// new batches and trigger per-round operations like reloading gflags or
switching
+// output files. SpanContainer doesn't need batch-level operations since it
writes
+// directly to leveldb without buffering or configuration reloading.
+void SpanContainer::dump_and_destroy(size_t round) {
+ if (_span) {
+ _span->dump_to_db();
+ }
+ destroy();
+}
+
+void SpanContainer::submit(int64_t cpuwide_us) {
+ bvar::Collected::submit(cpuwide_us);
+}
+
+bvar::CollectorSpeedLimit* SpanContainer::speed_limit() {
+ if (_span) {
+ return _span->speed_limit();
+ }
+ return NULL;
+}
+
+// =====================================
+
int FindSpan(uint64_t trace_id, uint64_t span_id, RpczSpan* response) {
butil::intrusive_ptr<SpanDB> db;
if (GetSpanDB(&db) != 0) {
diff --git a/src/brpc/span.h b/src/brpc/span.h
index 75d8e7fc..70fdbf4d 100644
--- a/src/brpc/span.h
+++ b/src/brpc/span.h
@@ -23,8 +23,11 @@
#include <stdint.h>
#include <string>
+#include <list>
#include <deque>
#include <ostream>
+#include <memory>
+#include <pthread.h>
#include "butil/macros.h"
#include "butil/endpoint.h"
#include "butil/string_splitter.h"
@@ -37,28 +40,48 @@ namespace bthread {
extern __thread bthread::LocalStorage tls_bls;
}
-
namespace brpc {
+class Span;
+
+void SetTlsParentSpan(std::shared_ptr<Span> span);
+std::shared_ptr<Span> GetTlsParentSpan();
+void ClearTlsParentSpan();
+bool HasTlsParentSpan();
+
+void* CreateBthreadSpanAsVoid();
+void DestroyRpczParentSpan(void* ptr);
+void EndBthreadSpan();
+
DECLARE_bool(enable_rpcz);
+class Span;
+class SpanContainer;
+
+// Deleter for Span.
+struct SpanDeleter {
+ void operator()(Span* r) const;
+};
+
// Collect information required by /rpcz and tracing system whose idea is
// described in
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf
-class Span : public bvar::Collected {
+class Span : public std::enable_shared_from_this<Span> {
friend class SpanDB;
- struct Forbidden {};
+friend struct SpanDeleter;
+friend class SpanContainer;
public:
+ struct Forbidden {};
// Call CreateServerSpan/CreateClientSpan instead.
- Span(Forbidden) {}
- ~Span() {}
+ Span(Forbidden);
+ ~Span();
// Create a span to track a request inside server.
- static Span* CreateServerSpan(
+ static std::shared_ptr<Span> CreateServerSpan(
const std::string& full_method_name,
uint64_t trace_id, uint64_t span_id, uint64_t parent_span_id,
int64_t base_real_us);
// Create a span without name to track a request inside server.
- static Span* CreateServerSpan(
+ static std::shared_ptr<Span> CreateServerSpan(
uint64_t trace_id, uint64_t span_id, uint64_t parent_span_id,
int64_t base_real_us);
@@ -66,18 +89,24 @@ public:
void ResetServerSpanName(const std::string& name);
// Create a span to track a request inside channel.
- static Span* CreateClientSpan(const std::string& full_method_name,
- int64_t base_real_us);
+ static std::shared_ptr<Span> CreateClientSpan(const std::string&
full_method_name,
+ int64_t base_real_us);
// Create a span to track start bthread
- static Span* CreateBthreadSpan(const std::string& full_method_name,
- int64_t base_real_us);
-
- static void Submit(Span* span, int64_t cpuwide_time_us);
-
- // Set tls parent.
+ static std::shared_ptr<Span> CreateBthreadSpan(const std::string&
full_method_name,
+ int64_t base_real_us);
+
+ static void Submit(std::shared_ptr<Span> span, int64_t cpuwide_time_us);
+
+ // Set this span as the TLS parent for subsequent child span creation.
+ // Typical flow:
+ // 1. Server span calls AsParent() before user callback to enable tracing
+ // 2. Client spans created in user code automatically link to this parent
+ // 3. When client RPC completes, it restores its own parent via AsParent()
+ // to maintain the trace chain (see Controller::SubmitSpan)
+ // 4. Server span calls EndAsParent() when submitting to clear TLS parent
void AsParent() {
- bthread::tls_bls.rpcz_parent_span = this;
+ SetTlsParentSpan(shared_from_this());
}
// Add log with time.
@@ -96,6 +125,7 @@ public:
void set_log_id(uint64_t cid) { _log_id = cid; }
void set_base_cid(bthread_id_t id) { _base_cid = id; }
void set_ending_cid(bthread_id_t id) { _ending_cid = id; }
+ void set_ending_tid(bthread_t tid) { _ending_tid = tid; }
void set_remote_side(const butil::EndPoint& pt) { _remote_side = pt; }
void set_protocol(ProtocolType p) { _protocol = p; }
void set_error_code(int error_code) { _error_code = error_code; }
@@ -115,9 +145,20 @@ public:
void set_sent_us(int64_t tm)
{ _sent_real_us = tm + _base_real_us; }
- Span* local_parent() const { return _local_parent; }
- static Span* tls_parent() {
- return static_cast<Span*>(bthread::tls_bls.rpcz_parent_span);
+ bool is_active() const {
+ if (_type == SPAN_TYPE_BTHREAD) {
+ return _ending_tid == INVALID_BTHREAD;
+ }
+ return _ending_cid == INVALID_BTHREAD_ID;
+ }
+
+ std::weak_ptr<Span> local_parent() const { return _local_parent; }
+ static std::shared_ptr<Span> tls_parent() {
+ auto parent = GetTlsParentSpan();
+ if (parent && parent->is_active()) {
+ return parent;
+ }
+ return nullptr;
}
uint64_t trace_id() const { return _trace_id; }
@@ -126,6 +167,7 @@ public:
uint64_t log_id() const { return _log_id; }
bthread_id_t base_cid() const { return _base_cid; }
bthread_id_t ending_cid() const { return _ending_cid; }
+ bthread_t ending_tid() const { return _ending_tid; }
const butil::EndPoint& remote_side() const { return _remote_side; }
SpanType type() const { return _type; }
ProtocolType protocol() const { return _protocol; }
@@ -139,20 +181,38 @@ public:
int64_t sent_real_us() const { return _sent_real_us; }
bool async() const { return _async; }
const std::string& full_method_name() const { return _full_method_name; }
- const std::string& info() const { return _info; }
+
+ // Returns a copy instead of a reference for thread safety.
+ //
+ // Current usage: Only called by Span2Proto() which immediately passes the
result
+ // to protobuf's set_info(). In this specific scenario, returning a
reference would
+ // also be safe because set_info() copies the string before the reference
could be
+ // invalidated by concurrent Annotate() calls.
+ //
+ // However, returning by value is more robust: it prevents potential data
races if
+ // future code holds the reference longer, and has no performance penalty
due to
+ // C++11 move semantics (the temporary is moved, not copied, into
protobuf).
+ std::string info() const {
+ BAIDU_SCOPED_LOCK(_info_spinlock);
+ return _info;
+ }
private:
DISALLOW_COPY_AND_ASSIGN(Span);
- void dump_and_destroy(size_t round_index);
- void destroy();
- void traversal(Span*, const std::function<void(Span*)>&) const;
+ void dump_to_db();
+ void submit(int64_t cpuwide_us);
bvar::CollectorSpeedLimit* speed_limit();
bvar::CollectorPreprocessor* preprocessor();
+ // Clear this span from TLS parent if it's currently set as the parent.
+ // Called when server span is being submitted to prevent subsequent spans
+ // from incorrectly linking to an ended span. Only clears if the current
+ // TLS parent is this span (avoids clearing if another span has taken
over).
void EndAsParent() {
- if (this == static_cast<Span*>(bthread::tls_bls.rpcz_parent_span)) {
- bthread::tls_bls.rpcz_parent_span = NULL;
+ std::shared_ptr<Span> current_parent = GetTlsParentSpan();
+ if (current_parent.get() == this) {
+ ClearTlsParentSpan();
}
}
@@ -162,6 +222,7 @@ private:
uint64_t _log_id;
bthread_id_t _base_cid;
bthread_id_t _ending_cid;
+ bthread_t _ending_tid; // Used for bthread span to store the ending
bthread tid
butil::EndPoint _remote_side;
SpanType _type;
bool _async;
@@ -181,11 +242,38 @@ private:
// time2_us \s annotation2 <SEP>
// ...
std::string _info;
+ // Protects _info from concurrent modifications.
+ // Multiple threads may call Annotate() simultaneously (e.g., retry logic,
+ // network layer, user code via TRACEPRINTF), causing data corruption in
+ // string concatenation without synchronization.
+ mutable pthread_spinlock_t _info_spinlock;
+
+ std::weak_ptr<Span> _local_parent;
+ std::list<std::shared_ptr<Span>> _client_list;
+ // Protects _client_list from concurrent modifications.
+ // In some scenarios, multiple bthreads may simultaneously create child
spans
+ // (e.g.,raft leader parallel RPCs to followers) and push_back to parent's
_client_list.
+ // Also protects against concurrent iteration (e.g., CountClientSpans,
SpanDB::Index)
+ // while the list is being modified.
+ mutable pthread_spinlock_t _client_list_spinlock;
+};
+
+class SpanContainer : public bvar::Collected {
+public:
+ explicit SpanContainer(const std::shared_ptr<Span>& span) : _span(span) {}
+ ~SpanContainer() {}
+
+ // Implementations of bvar::Collected
+ void dump_and_destroy(size_t round_index) override;
+ void destroy() override;
+ bvar::CollectorSpeedLimit* speed_limit() override;
- Span* _local_parent;
- Span* _next_client;
- Span* _client_list;
- Span* _tls_next;
+ void submit(int64_t cpuwide_us);
+
+ const std::shared_ptr<Span>& span() const { return _span; }
+
+private:
+ std::shared_ptr<Span> _span;
};
// Extract name and annotations from Span::info()
@@ -198,11 +286,14 @@ private:
butil::StringSplitter _sp;
};
-// These two functions can be used for composing TRACEPRINT as well as hiding
-// span implementations.
-bool CanAnnotateSpan();
+// These two functions can be used for composing TRACEPRINT// Add an
annotation to the current span.
+// If current bthread is not tracing, this function does nothing.
void AnnotateSpan(const char* fmt, ...);
+// Add an annotation to the given span.
+// If the span is NULL, this function does nothing.
+void AnnotateSpanEx(std::shared_ptr<Span> span, const char* fmt, ...);
+
class SpanFilter {
public:
@@ -240,12 +331,6 @@ inline bool IsTraceable(bool is_upstream_traced) {
(FLAGS_enable_rpcz && bvar::is_collectable(&g_span_sl));
}
-inline void* CreateBthreadSpan() {
- const int64_t received_us = butil::cpuwide_time_us();
- const int64_t base_realtime = butil::gettimeofday_us() - received_us;
- return Span::CreateBthreadSpan("Bthread", base_realtime);
-}
-
} // namespace brpc
diff --git a/src/brpc/traceprintf.h b/src/brpc/traceprintf.h
index 513daf2b..47a8dcf3 100644
--- a/src/brpc/traceprintf.h
+++ b/src/brpc/traceprintf.h
@@ -19,6 +19,7 @@
#ifndef BRPC_TRACEPRINTF_H
#define BRPC_TRACEPRINTF_H
+#include <memory>
#include "butil/macros.h"
// To brpc developers: This is a header included by user, don't depend
@@ -27,9 +28,15 @@
namespace brpc {
+// Forward declaration
+class Span;
+
bool CanAnnotateSpan();
void AnnotateSpan(const char* fmt, ...);
+// Declarations for AnnotateSpanEx used by TRACEPRINTF_SPAN macro
+void AnnotateSpanEx(std::shared_ptr<Span> span, const char* fmt, ...);
+
} // namespace brpc
@@ -43,4 +50,14 @@ void AnnotateSpan(const char* fmt, ...);
} \
} while (0)
+
+// Use this macro to print log to a specific span.
+// If span_ptr is NULL, arguments to this macro is NOT evaluated.
+#define TRACEPRINTF_SPAN(span_ptr, fmt, args...) \
+ do { \
+ if ((span_ptr)) { \
+ ::brpc::AnnotateSpanEx((span_ptr), "[" __FILE__ ":"
BAIDU_SYMBOLSTR(__LINE__) "] " fmt, ##args); \
+ } \
+ } while (0)
+
#endif // BRPC_TRACEPRINTF_H
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index ac49f269..27ded27a 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -90,7 +90,6 @@ extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
extern void (*g_worker_startfn)();
extern void (*g_tagged_worker_startfn)(bthread_tag_t);
-extern void* (*g_create_span_func)();
inline TaskControl* get_task_control() {
return g_task_control;
@@ -597,14 +596,6 @@ int bthread_set_tagged_worker_startfn(void
(*start_fn)(bthread_tag_t)) {
return 0;
}
-int bthread_set_create_span_func(void* (*func)()) {
- if (func == NULL) {
- return EINVAL;
- }
- bthread::g_create_span_func = func;
- return 0;
-}
-
void bthread_stop_world() {
bthread::TaskControl* c = bthread::get_task_control();
if (c != NULL) {
@@ -668,6 +659,21 @@ uint64_t bthread_cpu_clock_ns(void) {
return 0;
}
+int bthread_set_span_funcs(bthread_create_span_fn create_fn,
+ bthread_destroy_span_fn destroy_fn,
+ bthread_end_span_fn end_fn) {
+ if ((create_fn && destroy_fn && end_fn) ||
+ (!create_fn && !destroy_fn && !end_fn)) {
+ bthread::g_create_bthread_span = create_fn;
+ bthread::g_rpcz_parent_span_dtor = destroy_fn;
+ bthread::g_end_bthread_span = end_fn;
+ return 0;
+ }
+
+ errno = EINVAL;
+ return -1;
+}
+
} // extern "C"
void bthread_attr_set_name(bthread_attr_t* attr, const char* name) {
diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h
index 603cf04d..402fe70c 100644
--- a/src/bthread/bthread.h
+++ b/src/bthread/bthread.h
@@ -429,6 +429,34 @@ extern int bthread_once(bthread_once_t* once_control, void
(*init_routine)());
*/
extern uint64_t bthread_cpu_clock_ns(void);
+// Span callback function types for tracing bthread lifecycle.
+// These callbacks are typically set by upper-layer frameworks (e.g., brpc)
+// to integrate distributed tracing with bthread execution.
+typedef void* (*bthread_create_span_fn)(void);
+typedef void (*bthread_destroy_span_fn)(void*);
+typedef void (*bthread_end_span_fn)(void);
+
+// Set span-related callbacks for bthread tracing.
+// This should be called during framework initialization (e.g., in
GlobalInitializeOrDie).
+//
+// Parameters:
+// create_fn - Called when creating a bthread with BTHREAD_INHERIT_SPAN
flag.
+// Should return a heap-allocated span context (e.g.,
weak_ptr<Span>*).
+// Returns NULL if span creation is disabled or fails.
+// destroy_fn - Called to destroy the span context when bthread exits or
cleans up.
+// Receives the pointer returned by create_fn.
+// end_fn - Called when bthread ends to finalize the span (e.g., set end
time).
+//
+// All three callbacks must be provided together, or all NULL to disable span
tracking.
+// This function should only be called once during initialization.
+//
+// Returns:
+// 0 on success
+// -1 if parameters are invalid (sets errno to EINVAL)
+extern int bthread_set_span_funcs(bthread_create_span_fn create_fn,
+ bthread_destroy_span_fn destroy_fn,
+ bthread_end_span_fn end_fn);
+
__END_DECLS
#endif // BTHREAD_BTHREAD_H
diff --git a/src/bthread/key.cpp b/src/bthread/key.cpp
index 00215d7f..74945833 100644
--- a/src/bthread/key.cpp
+++ b/src/bthread/key.cpp
@@ -22,6 +22,7 @@
#include <pthread.h>
#include <gflags/gflags.h>
+#include "bthread/bthread.h" // bthread_create_span_fn and related types
#include "bthread/errno.h" // EAGAIN
#include "bthread/task_group.h" // TaskGroup
#include "butil/atomicops.h"
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 877a5d40..579bb231 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -48,6 +48,12 @@
namespace bthread {
+// Global span function pointers for bthread lifecycle tracing.
+// These are set by brpc layer via bthread_set_span_funcs().
+void* (*g_create_bthread_span)() = NULL;
+void (*g_rpcz_parent_span_dtor)(void*) = NULL;
+void (*g_end_bthread_span)() = NULL;
+
static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_INVALID, {0} };
@@ -78,15 +84,6 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr,
NULL);
const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
-void* (*g_create_span_func)() = NULL;
-
-void* run_create_span_func() {
- if (g_create_span_func) {
- return g_create_span_func();
- }
- return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span;
-}
-
AtomicInteger128::Value AtomicInteger128::load() const {
#if __x86_64__ || __ARM_NEON
// Supress compiler warning.
@@ -393,6 +390,12 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
thread_return = e.value();
}
+ if (m->attr.flags & BTHREAD_INHERIT_SPAN) {
+ if (g_end_bthread_span) {
+ g_end_bthread_span();
+ }
+ }
+
// TODO: Save thread_return
(void)thread_return;
@@ -417,6 +420,15 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
m->local_storage.keytable = NULL; // optional
}
+ // Clean up span if it exists. This must be done after keytable cleanup
+ // because span cleanup may use bthread local storage.
+ tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
+ if (tls_bls_ptr->rpcz_parent_span && g_rpcz_parent_span_dtor) {
+ g_rpcz_parent_span_dtor(tls_bls_ptr->rpcz_parent_span);
+ tls_bls_ptr->rpcz_parent_span = NULL;
+ m->local_storage.rpcz_parent_span = NULL;
+ }
+
// During running the function in TaskMeta and deleting the KeyTable in
// return_KeyTable, the group is probably changed.
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
@@ -495,7 +507,11 @@ int TaskGroup::start_foreground(TaskGroup** pg,
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
- m->local_storage.rpcz_parent_span = run_create_span_func();
+ if (g_create_bthread_span) {
+ m->local_storage.rpcz_parent_span = g_create_bthread_span();
+ } else {
+ m->local_storage.rpcz_parent_span =
BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span;
+ }
}
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
@@ -560,7 +576,11 @@ int TaskGroup::start_background(bthread_t* __restrict th,
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
- m->local_storage.rpcz_parent_span = run_create_span_func();
+ if (g_create_bthread_span) {
+ m->local_storage.rpcz_parent_span = g_create_bthread_span();
+ } else {
+ m->local_storage.rpcz_parent_span =
BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span;
+ }
}
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h
index 1b77c0b6..a2490b45 100644
--- a/src/bthread/task_meta.h
+++ b/src/bthread/task_meta.h
@@ -28,6 +28,7 @@
#include "bthread/types.h" // bthread_attr_t
#include "bthread/stack.h" // ContextualStack
#include "bthread/timer_thread.h"
+#include "butil/thread_local.h"
namespace bthread {
@@ -43,13 +44,15 @@ struct ButexWaiter;
struct LocalStorage {
KeyTable* keytable;
void* assigned_data;
- void* rpcz_parent_span;
+ void* rpcz_parent_span; // Points to std::weak_ptr<brpc::Span>* (managed
by brpc)
};
#define BTHREAD_LOCAL_STORAGE_INITIALIZER { NULL, NULL, NULL }
const static LocalStorage LOCAL_STORAGE_INIT =
BTHREAD_LOCAL_STORAGE_INITIALIZER;
+EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(LocalStorage, tls_bls);
+
enum TaskStatus {
TASK_STATUS_UNKNOWN,
TASK_STATUS_CREATED,
@@ -149,6 +152,24 @@ public:
}
};
+// Global callback for creating a new bthread span when creating a new bthread.
+// This is set by brpc layer. When a bthread is created with
BTHREAD_INHERIT_SPAN,
+// this callback is invoked to create a new span for the bthread.
+// The returned void* points to a heap-allocated weak_ptr<Span>* managed by
brpc layer.
+// Returns NULL if span creation is disabled or fails.
+extern void* (*g_create_bthread_span)();
+
+// Global destructor callback for rpcz_parent_span.
+// This is set by brpc layer to clean up the heap-allocated weak_ptr.
+// bthread layer doesn't know the concrete type, it just calls this function
+// with the void* pointer when cleaning up LocalStorage.
+extern void (*g_rpcz_parent_span_dtor)(void*);
+
+// Global callback invoked when a bthread ends (used by higher layers to
+// observe and react to bthread end events, e.g., to finish spans). This
+// pointer is set by the upper layer during initialization.
+extern void (*g_end_bthread_span)();
+
} // namespace bthread
#endif // BTHREAD_TASK_META_H
diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h
index 4580202f..186d9ce6 100644
--- a/src/bthread/unstable.h
+++ b/src/bthread/unstable.h
@@ -92,9 +92,6 @@ extern int bthread_set_worker_startfn(void (*start_fn)());
// Add a startup function with tag
extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));
-// Add a create span function
-extern int bthread_set_create_span_func(void* (*func)());
-
// Stop all bthread and worker pthreads.
// You should avoid calling this function which may cause bthread after main()
// suspend indefinitely.
diff --git a/src/bvar/collector.cpp b/src/bvar/collector.cpp
index 34713a4a..a01f45fd 100644
--- a/src/bvar/collector.cpp
+++ b/src/bvar/collector.cpp
@@ -410,6 +410,11 @@ void Collector::dump_thread() {
}
}
+// Submit a sample for asynchronous dumping. The Collector holds only the
Collected*
+// pointer (e.g., SpanContainer*). Regardless of which branch is taken below,
the
+// sample will eventually be destroyed via either dump_and_destroy() or
destroy(),
+// both of which call 'delete this' to release the container and decrement the
+// reference count of any managed resources (e.g., shared_ptr<Span>).
void Collected::submit(int64_t cpuwide_us) {
Collector* d = butil::get_leaky_singleton<Collector>();
// Destroy the sample in-place if the grab_thread did not run for twice
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index ad667044..de33b443 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -47,13 +47,15 @@ namespace brpc {
DECLARE_int32(idle_timeout_second);
DECLARE_int32(max_connection_pool_size);
class Server;
+class Span;
class MethodStatus;
namespace policy {
void SendRpcResponse(int64_t correlation_id,
Controller* cntl,
RpcPBMessages* messages,
const Server* server_raw,
- MethodStatus *, int64_t);
+ MethodStatus *, int64_t,
+ std::shared_ptr<Span> span);
} // policy
} // brpc
@@ -301,9 +303,10 @@ protected:
int64_t, brpc::Controller*,
brpc::RpcPBMessages*,
const brpc::Server*,
- brpc::MethodStatus*, int64_t>(&brpc::policy::SendRpcResponse,
- meta.correlation_id(), cntl,
- messages, &ts->_dummy, NULL, -1);
+ brpc::MethodStatus*, int64_t, std::shared_ptr<brpc::Span>>(
+ &brpc::policy::SendRpcResponse,
+ meta.correlation_id(), cntl,
+ messages, &ts->_dummy, NULL, -1, nullptr);
ts->_svc.CallMethod(method, cntl, req, res, done);
}
diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp
index dcb8d873..bd31a3c4 100644
--- a/test/bthread_unittest.cpp
+++ b/test/bthread_unittest.cpp
@@ -17,6 +17,7 @@
#include <execinfo.h>
#include <gtest/gtest.h>
+#include <memory>
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/logging.h"
@@ -566,6 +567,13 @@ void* create_span_func() {
return (void*)targets[idx];
}
+void destroy_span_func(void* span) {
+ LOG(INFO) << "Destroy span " << (uint64_t)span;
+}
+
+void end_span_func() {
+}
+
TEST_F(BthreadTest, test_span) {
uint64_t p1 = 0;
uint64_t p2 = 0;
@@ -587,7 +595,7 @@ TEST_F(BthreadTest, test_span) {
LOG(INFO) << "Test bthread create span";
- bthread_set_create_span_func(create_span_func);
+ ASSERT_EQ(0, bthread_set_span_funcs(create_span_func, destroy_span_func,
end_span_func));
bthread_t multi_th1;
bthread_t multi_th2;
@@ -602,6 +610,8 @@ TEST_F(BthreadTest, test_span) {
ASSERT_NE(multi_p1, multi_p2);
ASSERT_NE(std::find(targets, targets + 4, multi_p1), targets + 4);
ASSERT_NE(std::find(targets, targets + 4, multi_p2), targets + 4);
+
+ ASSERT_EQ(0, bthread_set_span_funcs(NULL, NULL, NULL));
}
void* dummy_thread(void*) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]