This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 91c165c7ebb [improve] PIP-467: Convert Jetty request logs to
structured slog events (#25543)
91c165c7ebb is described below
commit 91c165c7ebb95413768ee7dc8590826aa1276e14
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 16 15:42:05 2026 -0700
[improve] PIP-467: Convert Jetty request logs to structured slog events
(#25543)
---
.../pulsar/broker/web/JettyRequestLogFactory.java | 187 +++++++++------------
.../broker/web/WebServiceOriginalClientIPTest.java | 17 +-
.../proxy/server/ProxyOriginalClientIPTest.java | 16 +-
3 files changed, 99 insertions(+), 121 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
index 1519147a292..1625e29f819 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
@@ -20,59 +20,38 @@ package org.apache.pulsar.broker.web;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.TimeZone;
+import java.security.Principal;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.CustomLog;
+import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.security.AuthenticationState;
import org.eclipse.jetty.server.ConnectionMetaData;
import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.CustomRequestLog;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.Slf4jRequestLogWriter;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.HostPort;
+import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
/**
* Class to standardize initialization of a Jetty request logger for all
pulsar components.
+ *
+ * <p>Emits one structured log entry per HTTP request with individual fields
as slog attributes
+ * (method, uri, status, clientAddr, bytesOut, durationMs, etc.) instead of
the previous
+ * pre-formatted NCSA combined log line.
*/
+@CustomLog
public class JettyRequestLogFactory {
/**
- * The time format to use for request logging. This custom format is
necessary because the
- * default option uses GMT for the time zone. Pulsar's request logging has
historically
- * used the JVM's default time zone, so this format uses that time zone.
It is also necessary
- * because the {@link CustomRequestLog#DEFAULT_DATE_FORMAT} is
"dd/MMM/yyyy:HH:mm:ss ZZZ" instead
- * of "dd/MMM/yyyy:HH:mm:ss Z" (the old date format). The key difference
is that ZZZ will render
- * the strict offset for the timezone that is unaware of daylight savings
time while the Z will
- * render the offset based on daylight savings time.
- *
- * As the javadoc for {@link CustomRequestLog} describes, the time code
can take two arguments to
- * configure the format and the time zone. They must be in the form:
"%{format|timeZone}t".
- */
- private static final String TIME_FORMAT = String.format(" %%{%s|%s}t ",
- "dd/MMM/yyyy:HH:mm:ss Z",
- TimeZone.getDefault().getID());
-
- /**
- * This format is essentially the {@link
CustomRequestLog#EXTENDED_NCSA_FORMAT} with three modifications:
- * 1. The time zone will be the JVM's default time zone instead of
always being GMT.
- * 2. The time zone offset will be daylight savings time aware.
- * 3. The final value will be the request time (latency) in milliseconds.
- *
- * See javadoc for {@link CustomRequestLog} for more information.
- */
- private static final String LOG_FORMAT =
- "%{client}a - %u" + TIME_FORMAT + "\"%r\" %s %O \"%{Referer}i\"
\"%{User-Agent}i\" %{ms}T";
-
- /**
- * Build a new Jetty request logger using the format defined in this class.
+ * Build a new Jetty request logger.
* @return a request logger
*/
public static RequestLog createRequestLogger() {
@@ -80,36 +59,39 @@ public class JettyRequestLogFactory {
}
/**
- * Build a new Jetty request logger using the format defined in this class.
+ * Build a new Jetty request logger.
* @param showDetailedAddresses whether to show detailed addresses and
ports in logs
* @return a request logger
*/
public static RequestLog createRequestLogger(boolean
showDetailedAddresses, Server server) {
if (!showDetailedAddresses) {
- return new CustomRequestLog(new Slf4jRequestLogWriter(),
LOG_FORMAT);
+ return new StructuredRequestLog();
} else {
return new OriginalClientIPRequestLog(server);
}
}
/**
- * Logs the original and real remote (client) and local (server) IP
addresses
- * when detailed addresses are enabled.
- * Tracks the real addresses of remote and local using a registered
Connection.Listener
- * when detailed addresses are enabled.
- * This is necessary when Proxy Protocol is used to pass the original
client IP.
+ * Structured request logger that emits one slog line per request with
each field as an attribute.
+ */
+ private static class StructuredRequestLog implements RequestLog {
+ @Override
+ public void log(Request request, Response response) {
+ logRequest(request, response, null);
+ }
+ }
+
+ /**
+ * Emits the same structured attributes as {@link StructuredRequestLog}
and additionally records
+ * the real client and server addresses when Proxy Protocol wraps the
original endpoint.
*/
- @CustomLog
private static class OriginalClientIPRequestLog extends ContainerLifeCycle
implements RequestLog {
- private final ThreadLocal<StringBuilder> requestLogStringBuilder =
ThreadLocal.withInitial(StringBuilder::new);
- private final CustomRequestLog delegate;
- private final Slf4jRequestLogWriter delegateLogWriter;
+ private final Connection.Listener
proxyProtocolOriginalEndpointListener =
+ new ProxyProtocolOriginalEndpointListener();
+ private final ConcurrentHashMap<AddressKey, AddressEntry>
proxyProtocolRealAddressMapping =
+ new ConcurrentHashMap<>();
OriginalClientIPRequestLog(Server server) {
- delegate = new CustomRequestLog(this::write, LOG_FORMAT);
- addBean(delegate);
- delegateLogWriter = new Slf4jRequestLogWriter();
- addBean(delegateLogWriter);
if (server != null) {
for (Connector connector : server.getConnectors()) {
// adding the listener is only necessary for connectors
that use ProxyConnectionFactory
@@ -120,63 +102,9 @@ public class JettyRequestLogFactory {
}
}
- void write(String requestEntry) {
- StringBuilder sb = requestLogStringBuilder.get();
- sb.setLength(0);
- sb.append(requestEntry);
- }
-
@Override
public void log(Request request, Response response) {
- delegate.log(request, response);
- StringBuilder sb = requestLogStringBuilder.get();
- sb.append(" [R:");
- String remoteAddr = Request.getRemoteAddr(request);
- sb.append(remoteAddr);
- sb.append(':');
- int remotePort = Request.getRemotePort(request);
- sb.append(remotePort);
-
- InetSocketAddress realRemoteAddress =
-
lookupRealAddress(unwrap(request.getConnectionMetaData()).getRemoteSocketAddress());
- if (realRemoteAddress != null) {
- String realRemoteHost =
HostPort.normalizeHost(realRemoteAddress.getHostString());
- int realRemotePort = realRemoteAddress.getPort();
- if (!realRemoteHost.equals(remoteAddr) || realRemotePort !=
remotePort) {
- sb.append(" via ");
- sb.append(realRemoteHost);
- sb.append(':');
- sb.append(realRemotePort);
- }
- }
- sb.append("]->[L:");
- InetSocketAddress realLocalAddress =
lookupRealAddress(unwrap(request.getConnectionMetaData())
- .getLocalSocketAddress());
- String localAddr = Request.getLocalAddr(request);
- int localPort = Request.getLocalPort(request);
- if (realLocalAddress != null) {
- String realLocalHost =
HostPort.normalizeHost(realLocalAddress.getHostString());
- int realLocalPort = realLocalAddress.getPort();
- sb.append(realLocalHost);
- sb.append(':');
- sb.append(realLocalPort);
- if (!realLocalHost.equals(localAddr) || realLocalPort !=
localPort) {
- sb.append(" dst ");
- sb.append(localAddr);
- sb.append(':');
- sb.append(localPort);
- }
- } else {
- sb.append(localAddr);
- sb.append(':');
- sb.append(localPort);
- }
- sb.append(']');
- try {
- delegateLogWriter.write(sb.toString());
- } catch (Exception e) {
- log.warn().exception(e).log("Failed to write request log");
- }
+ logRequest(request, response, this);
}
private ConnectionMetaData unwrap(ConnectionMetaData
connectionMetaData) {
@@ -203,19 +131,11 @@ public class JettyRequestLogFactory {
}
}
- private final Connection.Listener
proxyProtocolOriginalEndpointListener =
- new ProxyProtocolOriginalEndpointListener();
-
- private final ConcurrentHashMap<AddressKey, AddressEntry>
proxyProtocolRealAddressMapping =
- new ConcurrentHashMap<>();
-
// Use a record as key since InetSocketAddress hash code changes if
the address gets resolved
record AddressKey(String hostString, int port) {
-
}
record AddressEntry(InetSocketAddress realAddress, AtomicInteger
referenceCount) {
-
}
// Tracks the real addresses of remote and local when detailed
addresses are enabled.
@@ -274,4 +194,53 @@ public class JettyRequestLogFactory {
}
}
}
+
+ private static void logRequest(Request request, Response response,
OriginalClientIPRequestLog addressTracker) {
+ log.info(e -> {
+ e.attr("method", request.getMethod())
+ .attr("uri", request.getHttpURI().asString())
+ .attr("proto",
request.getConnectionMetaData().getProtocol())
+ .attr("status", response.getStatus())
+ .attr("bytesOut",
Response.getContentBytesWritten(response))
+ .attr("clientAddr", Request.getRemoteAddr(request))
+ .attr("clientPort", Request.getRemotePort(request))
+ .attr("user", authUser(request))
+ .attr("referer",
request.getHeaders().get(HttpHeader.REFERER))
+ .attr("userAgent",
request.getHeaders().get(HttpHeader.USER_AGENT))
+ .attr("durationMs",
NanoTime.millisSince(request.getBeginNanoTime()));
+
+ if (addressTracker != null) {
+ String clientAddr = Request.getRemoteAddr(request);
+ int clientPort = Request.getRemotePort(request);
+ String localAddr = Request.getLocalAddr(request);
+ int localPort = Request.getLocalPort(request);
+ e.attr("localAddr", localAddr).attr("localPort", localPort);
+
+ ConnectionMetaData md =
addressTracker.unwrap(request.getConnectionMetaData());
+ InetSocketAddress realRemote =
addressTracker.lookupRealAddress(md.getRemoteSocketAddress());
+ if (realRemote != null) {
+ String host =
HostPort.normalizeHost(realRemote.getHostString());
+ int port = realRemote.getPort();
+ if (!host.equals(clientAddr) || port != clientPort) {
+ e.attr("clientAddrReal", host).attr("clientPortReal",
port);
+ }
+ }
+ InetSocketAddress realLocal =
addressTracker.lookupRealAddress(md.getLocalSocketAddress());
+ if (realLocal != null) {
+ String host =
HostPort.normalizeHost(realLocal.getHostString());
+ int port = realLocal.getPort();
+ if (!host.equals(localAddr) || port != localPort) {
+ e.attr("localAddrReal", host).attr("localPortReal",
port);
+ }
+ }
+ }
+
+ e.log("HTTP request");
+ });
+ }
+
+ private static String authUser(Request request) {
+ Principal principal = AuthenticationState.getUserPrincipal(request);
+ return principal != null ? principal.getName() : null;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
index b8d60d84361..adedda82160 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
@@ -91,7 +91,10 @@ public class WebServiceOriginalClientIPTest extends
MockedPulsarServiceBaseTest
// Validate that the client IP passed in X-Forwarded-For is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
- .anyMatch(line -> line.contains("RequestLog") &&
line.contains("[R:11.22.33.44:12345 via ")));
+ .anyMatch(line -> line.contains("HTTP request")
+ && line.contains("clientAddr=11.22.33.44")
+ && line.contains("clientPort=12345")
+ && line.contains("clientAddrReal=")));
});
}
@@ -110,7 +113,10 @@ public class WebServiceOriginalClientIPTest extends
MockedPulsarServiceBaseTest
// Validate that the client IP passed in Forwarded is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
- .anyMatch(line -> line.contains("RequestLog") &&
line.contains("[R:11.22.33.44:12345 via ")));
+ .anyMatch(line -> line.contains("HTTP request")
+ && line.contains("clientAddr=11.22.33.44")
+ && line.contains("clientPort=12345")
+ && line.contains("clientAddrReal=")));
});
}
@@ -135,8 +141,11 @@ public class WebServiceOriginalClientIPTest extends
MockedPulsarServiceBaseTest
// Validate that the client IP and destination IP passed in HA
Proxy protocol is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
- .anyMatch(line -> line.contains("RequestLog") &&
line.contains("[R:99.22.33.44:1234 via ")
- && line.contains(" dst 5.4.3.1:4321]")));
+ .anyMatch(line -> line.contains("HTTP request")
+ && line.contains("clientAddr=99.22.33.44")
+ && line.contains("clientPort=1234")
+ && line.contains("localAddr=5.4.3.1")
+ && line.contains("localPort=4321")));
});
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
index 5ae2bf26380..d5a396883b3 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
@@ -109,11 +109,11 @@ public class ProxyOriginalClientIPTest extends
MockedPulsarServiceBaseTest {
// Validate that the client IP passed in X-Forwarded-For is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
- .anyMatch(line -> line.contains("pulsar-external-web-") &&
line.contains("RequestLog")
- && line.contains("R:11.22.33.44")), "Expected to
find client IP in proxy logs");
+ .anyMatch(line -> line.contains("pulsar-external-web-") &&
line.contains("HTTP request")
+ && line.contains("clientAddr=11.22.33.44")),
"Expected to find client IP in proxy logs");
assertTrue(consoleCaptor.getStandardOutput().stream()
- .anyMatch(line -> line.contains("pulsar-web-") &&
line.contains("RequestLog")
- && line.contains("R:11.22.33.44")), "Expected to
find client IP in broker logs");
+ .anyMatch(line -> line.contains("pulsar-web-") &&
line.contains("HTTP request")
+ && line.contains("clientAddr=11.22.33.44")),
"Expected to find client IP in broker logs");
});
}
@@ -132,11 +132,11 @@ public class ProxyOriginalClientIPTest extends
MockedPulsarServiceBaseTest {
// Validate that the client IP passed in HA proxy protocol is
logged
assertTrue(consoleCaptor.getStandardOutput().stream()
- .anyMatch(line -> line.contains("pulsar-external-web-") &&
line.contains("RequestLog")
- && line.contains("R:99.22.33.44")), "Expected to
find client IP in proxy logs");
+ .anyMatch(line -> line.contains("pulsar-external-web-") &&
line.contains("HTTP request")
+ && line.contains("clientAddr=99.22.33.44")),
"Expected to find client IP in proxy logs");
assertTrue(consoleCaptor.getStandardOutput().stream()
- .anyMatch(line -> line.contains("pulsar-web-") &&
line.contains("RequestLog")
- && line.contains("R:99.22.33.44")), "Expected to
find client IP in broker logs");
+ .anyMatch(line -> line.contains("pulsar-web-") &&
line.contains("HTTP request")
+ && line.contains("clientAddr=99.22.33.44")),
"Expected to find client IP in broker logs");
});
}