This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new f213c3b610 Push taskId filter down to storage layer in 
AsyncProfilerTaskLog query (#13787)
f213c3b610 is described below

commit f213c3b61022c9b00c61541ed227433406724291
Author: Hyunjin-Jeong <[email protected]>
AuthorDate: Fri Apr 3 14:56:28 2026 +0900

    Push taskId filter down to storage layer in AsyncProfilerTaskLog query 
(#13787)
---
 docs/en/changes/changes.md                                        | 1 +
 .../core/profiling/asyncprofiler/AsyncProfilerQueryService.java   | 8 +-------
 .../profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java    | 8 ++++++--
 .../banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java     | 3 ++-
 .../elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java       | 3 ++-
 .../plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java  | 8 +++++---
 6 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 5d95f03af0..e01dc9c12f 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -7,6 +7,7 @@
   between `opentelemetry-exporter-zipkin-proto-http` (protobuf~=3.12) and 
`opentelemetry-proto` (protobuf>=5.0).
 * Fix missing `taskId` filter and incorrect `IN` clause parameter binding in 
`JDBCJFRDataQueryDAO` and `JDBCPprofDataQueryDAO`.
 * Remove deprecated `GroupBy.field_name` from BanyanDB `MeasureQuery` request 
building (Phase 1 of staged removal across repos).
+* Push `taskId` filter down to the storage layer in 
`IAsyncProfilerTaskLogQueryDAO`, removing in-memory filtering from 
`AsyncProfilerQueryService`.
 
 #### UI
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java
index ce86f0fbc6..1057c4d3f3 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java
@@ -102,13 +102,7 @@ public class AsyncProfilerQueryService implements Service {
     }
 
     public List<AsyncProfilerTaskLog> queryAsyncProfilerTaskLogs(String 
taskId) throws IOException {
-        List<AsyncProfilerTaskLog> taskLogList = 
getTaskLogQueryDAO().getTaskLogList();
-        return findMatchedLogs(taskId, taskLogList);
-    }
-
-    private List<AsyncProfilerTaskLog> findMatchedLogs(final String taskID, 
final List<AsyncProfilerTaskLog> allLogs) {
-        return allLogs.stream()
-                .filter(l -> Objects.equals(l.getId(), taskID))
+        return getTaskLogQueryDAO().getTaskLogList(taskId).stream()
                 .map(this::extendTaskLog)
                 .collect(Collectors.toList());
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java
index 8cb595f6df..ae2c095bd3 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java
@@ -26,7 +26,11 @@ import java.util.List;
 
 public interface IAsyncProfilerTaskLogQueryDAO extends DAO {
     /**
-     * search all task log list in appoint task id
+     * Search task logs by the given task id.
+     *
+     * @param taskId the task id to filter by, must not be null or blank
+     * @return the task logs associated with the given task id
+     * @throws IOException if the query fails
      */
-    List<AsyncProfilerTaskLog> getTaskLogList() throws IOException;
+    List<AsyncProfilerTaskLog> getTaskLogList(String taskId) throws 
IOException;
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java
index 68e783cb45..b1ab61dd22 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java
@@ -53,11 +53,12 @@ public class BanyanDBAsyncProfilerTaskLogQueryDAO extends 
AbstractBanyanDBDAO im
     }
 
     @Override
-    public List<AsyncProfilerTaskLog> getTaskLogList() throws IOException {
+    public List<AsyncProfilerTaskLog> getTaskLogList(String taskId) throws 
IOException {
         StreamQueryResponse resp = query(false, 
AsyncProfilerTaskLogRecord.INDEX_NAME, TAGS,
                 new QueryBuilder<StreamQuery>() {
                     @Override
                     public void apply(StreamQuery query) {
+                        query.and(eq(AsyncProfilerTaskLogRecord.TASK_ID, 
taskId));
                         
query.setLimit(BanyanDBAsyncProfilerTaskLogQueryDAO.this.queryMaxSize);
                     }
                 });
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java
index d06d66ba6a..8715608345 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java
@@ -47,13 +47,14 @@ public class AsyncProfilerTaskLogQueryEsDAO extends EsDAO 
implements IAsyncProfi
     }
 
     @Override
-    public List<AsyncProfilerTaskLog> getTaskLogList() throws IOException {
+    public List<AsyncProfilerTaskLog> getTaskLogList(String taskId) throws 
IOException {
         final String index = 
IndexController.LogicIndicesRegister.getPhysicalTableName(
                 AsyncProfilerTaskLogRecord.INDEX_NAME);
         final BoolQueryBuilder query = Query.bool();
         if 
(IndexController.LogicIndicesRegister.isMergedTable(AsyncProfilerTaskLogRecord.INDEX_NAME))
 {
             
query.must(Query.term(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME, 
AsyncProfilerTaskLogRecord.INDEX_NAME));
         }
+        query.must(Query.term(AsyncProfilerTaskLogRecord.TASK_ID, taskId));
         final SearchBuilder search =
                 Search.builder().query(query)
                         .sort(AsyncProfilerTaskLogRecord.OPERATION_TIME, 
Sort.Order.DESC)
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java
index e8f76d4c4c..88bf096d9d 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java
@@ -42,11 +42,11 @@ public class JDBCAsyncProfilerTaskLogQueryDAO implements 
IAsyncProfilerTaskLogQu
 
     @Override
     @SneakyThrows
-    public List<AsyncProfilerTaskLog> getTaskLogList() {
+    public List<AsyncProfilerTaskLog> getTaskLogList(String taskId) {
         List<String> tables = 
tableHelper.getTablesWithinTTL(AsyncProfilerTaskLogRecord.INDEX_NAME);
         final List<AsyncProfilerTaskLog> results = new 
ArrayList<AsyncProfilerTaskLog>();
         for (String table : tables) {
-            SQLAndParameters sqlAndParameters = buildSQL(table);
+            SQLAndParameters sqlAndParameters = buildSQL(table, taskId);
             List<AsyncProfilerTaskLog> logs = jdbcClient.executeQuery(
                     sqlAndParameters.sql(),
                     resultSet -> {
@@ -62,12 +62,14 @@ public class JDBCAsyncProfilerTaskLogQueryDAO implements 
IAsyncProfilerTaskLogQu
         return results;
     }
 
-    private SQLAndParameters buildSQL(String table) {
+    private SQLAndParameters buildSQL(String table, String taskId) {
         StringBuilder sql = new StringBuilder();
         List<Object> parameters = new ArrayList<>(2);
         sql.append("select * from ").append(table)
                 .append(" where 
").append(JDBCTableInstaller.TABLE_COLUMN).append(" = ?");
         parameters.add(AsyncProfilerTaskLogRecord.INDEX_NAME);
+        sql.append(" and 
").append(AsyncProfilerTaskLogRecord.TASK_ID).append(" = ?");
+        parameters.add(taskId);
         sql.append(" order by 
").append(AsyncProfilerTaskLogRecord.OPERATION_TIME).append(" desc");
         return new SQLAndParameters(sql.toString(), parameters);
     }

Reply via email to