leekeiabstraction commented on code in PR #407:
URL: https://github.com/apache/fluss-rust/pull/407#discussion_r2937085452
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -386,12 +470,34 @@ where
ConnectionState::Poison(e) => return
Err(RpcError::Poisoned(Arc::clone(e)).into()),
}
- self.send_message(buf).await?;
+ // count only the API message body, excluding the protocol header.
+ let request_body_bytes =
buf.len().saturating_sub(REQUEST_HEADER_LENGTH) as u64;
+ let mut request_metrics = RequestMetricsLifecycle::begin(R::API_KEY,
request_body_bytes);
+
+ if let Err(e) = self.send_message(buf).await {
Review Comment:
Would match_err or inspect_err be more succinct/idiomatic?
E.g.
'''rust
some_call().inspect_err(|e| {
log::error!("something went wrong: {e}");
})?;
'''
Similarly on other parts within this fn.
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -217,6 +218,89 @@ struct ActiveRequest {
channel: Sender<Result<Response, RpcError>>,
}
+/// Tracks per-request connection metrics and ensures in-flight gauge cleanup
on drop.
+struct RequestMetricsLifecycle {
+ label: Option<&'static str>,
+ start: Instant,
+ completed: bool,
+}
+
+impl RequestMetricsLifecycle {
+ fn begin(api_key: crate::rpc::ApiKey, request_bytes: u64) -> Self {
+ let label = crate::metrics::api_key_label(api_key);
+ if let Some(label) = label {
+ // Match Java semantics: count request attempts before write/send.
+ metrics::counter!(
+ crate::metrics::CLIENT_REQUESTS_TOTAL,
Review Comment:
Nit: import/use crates so that it's less verbose here and in other parts of
the PR e.g. CLIENT_REQUESTS_TOTAL instead of crate::metrics..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]