github-actions[bot] commented on code in PR #62006:
URL: https://github.com/apache/doris/pull/62006#discussion_r3025176484


##########
fe/fe-core/src/test/java/org/apache/doris/datasource/systable/IcebergSysTableResolverTest.java:
##########
@@ -67,6 +69,19 @@ public void testPositionDeletesKeepsUnsupportedError() 
throws Exception {
                 SysTableResolver.resolveForPlan(sourceTable, "test_ctl", 
"test_db", "tbl$position_deletes"));
     }
 
+    @Test
+    public void testMetadataPlanningSysTableUsesSpecialExternalTable() throws 
Exception {
+        IcebergExternalTable sourceTable = newIcebergTable();
+
+        IcebergMetadataPlanningExternalTable planningTable = 
(IcebergMetadataPlanningExternalTable)
+                ((NativeSysTable) 
IcebergSysTable.INTERNAL_METADATA_PLANNING_TABLE).createSysExternalTable(sourceTable);
+
+        Assertions.assertEquals("tbl$metadata_planning", 
planningTable.getName());

Review Comment:
   `IcebergMetadataPlanningExternalTable#getFullSchema()` returns 
`FULL_SCHEMA`, not `RESULT_SCHEMA`, so this table has 11 columns here: the 10 
projected metadata columns plus the hidden control column `predicate`. This 
assertion is off by one and fails as soon as the FE unit test runs.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/DistributedPlanningSplitProducer.java:
##########
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.source;
+
+import org.apache.doris.common.UserException;
+import 
org.apache.doris.datasource.iceberg.IcebergMetadataPlanningExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.exceptions.NotSupportedException;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.statistics.ResultRow;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.gson.reflect.TypeToken;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.util.SerializationUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * FE-side distributed planning producer.
+ *
+ * <p>The producer queries the logical system table {@code 
tbl$metadata_planning},
+ * reconstructs file scan tasks from the result rows, and then reuses the 
normal
+ * {@code FileScanTask -> IcebergSplit} path in {@link IcebergScanNode}.
+ */
+public class DistributedPlanningSplitProducer implements PlanningSplitProducer 
{
+    private static final Logger LOG = 
LogManager.getLogger(DistributedPlanningSplitProducer.class);
+    private static final Type LONG_LIST_TYPE = new TypeToken<List<Long>>() { 
}.getType();
+    // Pre-built index map for O(1) column lookup; avoids O(n) linear search 
per field per row.
+    private static final Map<String, Integer> COL_INDEX;
+
+    static {
+        COL_INDEX = new HashMap<>();
+        List<String> cols = 
IcebergMetadataPlanningExternalTable.OUTPUT_COLUMN_NAMES;
+        for (int i = 0; i < cols.size(); i++) {
+            COL_INDEX.put(cols.get(i).toLowerCase(), i);
+        }
+    }
+
+    private final PlanningSplitProducer.Context context;
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
+    private final AtomicReference<CompletableFuture<Void>> runningTask = new 
AtomicReference<>();
+
+    public DistributedPlanningSplitProducer(PlanningSplitProducer.Context 
context) {
+        this.context = Preconditions.checkNotNull(context, "planning context 
is null");
+    }
+
+    @Override
+    public void start(int numBackends, SplitSink sink) throws UserException {
+        stopped.set(false);
+        String planningQuerySql;
+        try {
+            TableScan scan = context.getExecutionAuthenticator().execute(new 
Callable<TableScan>() {
+                @Override
+                public TableScan call() throws Exception {
+                    return context.createTableScan();
+                }
+            });
+            planningQuerySql = buildMetadataPlanningQuerySql(scan);
+        } catch (Exception e) {
+            Optional<NotSupportedException> opt = 
context.checkNotSupportedException(e);
+            if (opt.isPresent()) {
+                throw new UserException(opt.get().getMessage(), opt.get());
+            } else if (e instanceof UserException) {
+                throw (UserException) e;
+            }
+            throw new UserException(e.getMessage(), e);
+        }
+        final String sql = planningQuerySql;
+        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+            try {
+                context.getExecutionAuthenticator().execute(() -> {
+                    List<ResultRow> queryRows = executePlanningQuery(sql);
+                    List<FileScanTask> planningTasks = 
Lists.newArrayListWithCapacity(queryRows.size());
+                    for (ResultRow queryRow : queryRows) {
+                        if (stopped.get()) {
+                            LOG.debug("Iceberg distributed metadata planning 
is stopped");
+                            return null;
+                        }
+                        planningTasks.add(createPlanningTask(queryRow));
+                    }
+                    try (CloseableIterable<FileScanTask> splitTasks = 
context.splitPlanningTasks(planningTasks)) {
+                        CloseableIterator<FileScanTask> iterator = 
splitTasks.iterator();
+                        while (!stopped.get() && sink.needMore() && 
iterator.hasNext()) {
+                            
sink.addBatch(Lists.newArrayList(context.createPlanningSplit(iterator.next())));
+                        }
+                    }
+                    return null;
+                });
+                if (!stopped.get()) {
+                    sink.finish();
+                    LOG.info("Iceberg distributed metadata planning executed 
query [{}] "
+                            + "and converted rows to split tasks", sql);
+                }
+            } catch (Exception e) {
+                if (stopped.get()) {
+                    LOG.debug("Iceberg distributed metadata planning is 
stopped");
+                    return;
+                }
+                Optional<NotSupportedException> opt = 
context.checkNotSupportedException(e);
+                if (opt.isPresent()) {
+                    sink.fail(new UserException(opt.get().getMessage(), 
opt.get()));
+                } else if (e instanceof UserException) {
+                    sink.fail((UserException) e);
+                } else {
+                    LOG.warn("Unexpected error during iceberg distributed 
metadata planning", e);
+                    sink.fail(new UserException(e.getMessage(), e));
+                }
+            }
+        }, context.getScheduleExecutor());
+        runningTask.set(future);
+    }
+
+    @Override
+    public void stop() {
+        if (!stopped.compareAndSet(false, true)) {
+            return;
+        }
+        CompletableFuture<Void> task = runningTask.get();
+        if (task != null) {
+            task.cancel(true);
+        }
+    }
+
+    private String buildMetadataPlanningQuerySql(TableScan scan) {
+        List<String> qualifier = context.getSourceTableQualifier();
+        String qualifiedTable = quoteIdentifier(qualifier.get(0))
+                + "." + quoteIdentifier(qualifier.get(1))
+                + "." + quoteIdentifier(qualifier.get(2)
+                + "$" + 
org.apache.doris.datasource.systable.IcebergSysTable.METADATA_PLANNING);
+        String projection = String.join(", ", 
IcebergMetadataPlanningExternalTable.OUTPUT_COLUMN_NAMES);
+        long snapshotId = Preconditions.checkNotNull(scan.snapshot(),
+                "Iceberg distributed planning snapshot is null").snapshotId();
+        String serializedPredicate = 
SerializationUtil.serializeToBase64(scan.filter());
+        return "SELECT " + projection + " FROM " + qualifiedTable
+                + " FOR VERSION AS OF " + snapshotId
+                + " WHERE " + 
quoteIdentifier(IcebergMetadataPlanningExternalTable.CONTROL_PREDICATE_COLUMN)
+                + " = " + quoteStringLiteral(serializedPredicate);
+    }
+
+    private StmtExecutor createPlanningQueryExecutor(String planningQuerySql) {
+        ConnectContext previousContext = ConnectContext.get();
+        ConnectContext parentContext = 
Preconditions.checkNotNull(ConnectContext.get(), "ConnectContext is null");

Review Comment:
   This runs on `context.getScheduleExecutor()`, so there is usually no 
thread-local `ConnectContext` bound to the worker thread. As written, 
distributed mode reaches `ConnectContext.get()` here and fails the 
`checkNotNull` before the internal query can even start. Doris' other 
internal-query helpers keep the cloned context installed for the full 
`StmtExecutor.executeInternalQuery()` scope (for example via 
`AutoCloseConnectContext`); this path needs the same treatment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to