github-actions[bot] commented on code in PR #61382:
URL: https://github.com/apache/doris/pull/61382#discussion_r2985601233


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.stream;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class TableStreamManager implements Writable {
+    private static final Logger LOG = 
LogManager.getLogger(TableStreamManager.class);
+    @SerializedName(value = "dbStreamMap")
+    private Map<Long, Set<Long>> dbStreamMap;
+    protected MonitoredReentrantReadWriteLock rwLock;
+
+    public TableStreamManager() {
+        this.rwLock = new MonitoredReentrantReadWriteLock(true);
+        this.dbStreamMap = new HashMap<>();
+    }
+
+    public void addTableStream(BaseTableStream stream) {
+        rwLock.writeLock().lock();
+        try {
+            dbStreamMap.computeIfAbsent(stream.getDatabase().getId(), k -> new 
HashSet<>()).add(stream.getId());
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public void removeTableStream(BaseTableStream stream) {
+        rwLock.writeLock().lock();
+        try {
+            Optional.ofNullable(dbStreamMap.get(stream.getDatabase().getId()))
+                    .ifPresent(set -> set.remove(stream.getId()));
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static TableStreamManager read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, TableStreamManager.class);
+    }
+
+    public Set<Long> getTableStreamIds(DatabaseIf db) {
+        Set<Long> result = new HashSet<>();
+        rwLock.readLock().lock();
+        try {
+            result.addAll(dbStreamMap.getOrDefault(db.getId(), new 
HashSet<>()));
+        } finally {
+            rwLock.readLock().unlock();
+        }
+        return result;
+    }
+
+    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+        Map<Long, Set<Long>> copiedMap = new HashMap<>();
+        rwLock.readLock().lock();
+        try {
+            for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) {
+                copiedMap.put(e.getKey(), new HashSet<>(e.getValue()));
+            }
+        } finally {
+            rwLock.readLock().unlock();
+        }
+        for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (db.isPresent()) {
+                for (Long tableId : entry.getValue()) {
+                    Optional<Table> table = db.get().getTable(tableId);
+                    if (!table.isPresent()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.warn("invalid stream id: {}, db: {}", tableId, 
db.get().getFullName());

Review Comment:
   Bug: `LOG.warn` inside `LOG.isDebugEnabled()` guard. This means the warning 
will only be emitted when DEBUG logging is enabled, which is almost never in 
production. This should either be:
   - `if (LOG.isDebugEnabled()) { LOG.debug(...) }` — if you want debug-only 
logging, or
   - Just `LOG.warn(...)` without the guard — if this is truly a warning-worthy 
situation (invalid stream ID).
   
   Given this indicates stale data in `dbStreamMap`, a plain `LOG.warn(...)` 
without the `isDebugEnabled` guard seems more appropriate.



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.stream;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class TableStreamManager implements Writable {
+    private static final Logger LOG = 
LogManager.getLogger(TableStreamManager.class);
+    @SerializedName(value = "dbStreamMap")
+    private Map<Long, Set<Long>> dbStreamMap;
+    protected MonitoredReentrantReadWriteLock rwLock;
+
+    public TableStreamManager() {
+        this.rwLock = new MonitoredReentrantReadWriteLock(true);
+        this.dbStreamMap = new HashMap<>();
+    }
+
+    public void addTableStream(BaseTableStream stream) {
+        rwLock.writeLock().lock();
+        try {
+            dbStreamMap.computeIfAbsent(stream.getDatabase().getId(), k -> new 
HashSet<>()).add(stream.getId());
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public void removeTableStream(BaseTableStream stream) {
+        rwLock.writeLock().lock();
+        try {
+            Optional.ofNullable(dbStreamMap.get(stream.getDatabase().getId()))
+                    .ifPresent(set -> set.remove(stream.getId()));
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static TableStreamManager read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, TableStreamManager.class);
+    }
+
+    public Set<Long> getTableStreamIds(DatabaseIf db) {
+        Set<Long> result = new HashSet<>();
+        rwLock.readLock().lock();
+        try {
+            result.addAll(dbStreamMap.getOrDefault(db.getId(), new 
HashSet<>()));
+        } finally {
+            rwLock.readLock().unlock();
+        }
+        return result;
+    }
+
+    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+        Map<Long, Set<Long>> copiedMap = new HashMap<>();
+        rwLock.readLock().lock();
+        try {
+            for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) {
+                copiedMap.put(e.getKey(), new HashSet<>(e.getValue()));
+            }
+        } finally {
+            rwLock.readLock().unlock();
+        }
+        for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (db.isPresent()) {
+                for (Long tableId : entry.getValue()) {
+                    Optional<Table> table = db.get().getTable(tableId);
+                    if (!table.isPresent()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.warn("invalid stream id: {}, db: {}", tableId, 
db.get().getFullName());
+                        }
+                        continue;
+                    }
+                    Preconditions.checkArgument(table.get() instanceof 
BaseTableStream);
+                    BaseTableStream stream = (BaseTableStream) table.get();

Review Comment:
   Using `Preconditions.checkArgument` here will crash the FE if a non-stream 
table ID somehow ends up in `dbStreamMap`. Since this is in a metadata query 
path (information_schema), crashing is disproportionate. Consider either:
   - Using a defensive `if (!(table.get() instanceof BaseTableStream)) { 
LOG.warn(...); continue; }`, or
   - Using `DORIS_CHECK` equivalent if crash-on-invariant-violation is truly 
desired.
   
   The same applies at the other `Preconditions.checkArgument` calls in this 
class (line for baseTableQualifiers.size() == 3, and in 
`fillStreamConsumptionValuesMetadataResult`).



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.stream;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class TableStreamManager implements Writable {
+    private static final Logger LOG = 
LogManager.getLogger(TableStreamManager.class);
+    @SerializedName(value = "dbStreamMap")
+    private Map<Long, Set<Long>> dbStreamMap;
+    protected MonitoredReentrantReadWriteLock rwLock;
+
+    public TableStreamManager() {
+        this.rwLock = new MonitoredReentrantReadWriteLock(true);
+        this.dbStreamMap = new HashMap<>();
+    }
+
+    public void addTableStream(BaseTableStream stream) {
+        rwLock.writeLock().lock();
+        try {
+            dbStreamMap.computeIfAbsent(stream.getDatabase().getId(), k -> new 
HashSet<>()).add(stream.getId());
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public void removeTableStream(BaseTableStream stream) {
+        rwLock.writeLock().lock();
+        try {
+            Optional.ofNullable(dbStreamMap.get(stream.getDatabase().getId()))
+                    .ifPresent(set -> set.remove(stream.getId()));
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static TableStreamManager read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, TableStreamManager.class);
+    }
+
+    public Set<Long> getTableStreamIds(DatabaseIf db) {
+        Set<Long> result = new HashSet<>();
+        rwLock.readLock().lock();
+        try {
+            result.addAll(dbStreamMap.getOrDefault(db.getId(), new 
HashSet<>()));
+        } finally {
+            rwLock.readLock().unlock();
+        }
+        return result;
+    }
+
+    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+        Map<Long, Set<Long>> copiedMap = new HashMap<>();
+        rwLock.readLock().lock();
+        try {
+            for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) {
+                copiedMap.put(e.getKey(), new HashSet<>(e.getValue()));
+            }
+        } finally {
+            rwLock.readLock().unlock();
+        }
+        for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (db.isPresent()) {
+                for (Long tableId : entry.getValue()) {
+                    Optional<Table> table = db.get().getTable(tableId);
+                    if (!table.isPresent()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.warn("invalid stream id: {}, db: {}", tableId, 
db.get().getFullName());
+                        }
+                        continue;
+                    }
+                    Preconditions.checkArgument(table.get() instanceof 
BaseTableStream);
+                    BaseTableStream stream = (BaseTableStream) table.get();
+                    if (stream.readLockIfExist()) {
+                        try {
+                            TRow trow = new TRow();
+                            // DB_NAME
+                            trow.addToColumnValue(new 
TCell().setStringVal(stream.getDatabase().getFullName()));
+                            // STREAM_NAME
+                            trow.addToColumnValue(new 
TCell().setStringVal(stream.getName()));
+                            // STREAM_ID
+                            trow.addToColumnValue(new 
TCell().setLongVal(stream.getId()));
+                            // STREAM_TYPE
+                            trow.addToColumnValue(new 
TCell().setStringVal(stream.getTableStreamType()));
+                            // CONSUME_TYPE
+                            trow.addToColumnValue(new 
TCell().setStringVal(stream.getConsumeType()));
+                            // STREAM_COMMENT
+                            trow.addToColumnValue(new 
TCell().setStringVal(stream.getComment()));
+                            TableIf baseTable = stream.getBaseTableNullable();
+                            if (baseTable == null) {
+                                // BASE_TABLE_NAME
+                                trow.addToColumnValue(new 
TCell().setStringVal("N/A"));
+                                // BASE_TABLE_DB
+                                trow.addToColumnValue(new 
TCell().setStringVal("N/A"));
+                                // BASE_TABLE_CTL
+                                trow.addToColumnValue(new 
TCell().setStringVal("N/A"));
+                                // BASE_TABLE_TYPE
+                                trow.addToColumnValue(new 
TCell().setStringVal("N/A"));
+                            } else {
+                                List<String> baseTableQualifiers = 
baseTable.getFullQualifiers();
+                                
Preconditions.checkArgument(baseTableQualifiers.size() == 3);
+                                // BASE_TABLE_NAME
+                                trow.addToColumnValue(new 
TCell().setStringVal(baseTableQualifiers.get(2)));
+                                // BASE_TABLE_DB
+                                trow.addToColumnValue(new 
TCell().setStringVal(baseTableQualifiers.get(1)));
+                                // BASE_TABLE_CTL
+                                trow.addToColumnValue(new 
TCell().setStringVal(baseTableQualifiers.get(0)));
+                                // BASE_TABLE_TYPE
+                                trow.addToColumnValue(new 
TCell().setStringVal(baseTable.getType().name()));
+                            }
+                            // ENABLED
+                            trow.addToColumnValue(new 
TCell().setBoolVal(!stream.isDisabled()));
+                            // IS_STALE
+                            trow.addToColumnValue(new 
TCell().setBoolVal(stream.isStale()));
+                            // STALE_REASON
+                            trow.addToColumnValue(new 
TCell().setStringVal(stream.getStaleReason()));
+                            dataBatch.add(trow);
+                        } finally {
+                            stream.readUnlock();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void fillStreamConsumptionValuesMetadataResult(List<TRow> 
dataBatch) {
+        Map<Long, Set<Long>> copiedMap = new HashMap<>();
+        rwLock.readLock().lock();
+        try {
+            for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) {
+                copiedMap.put(e.getKey(), new HashSet<>(e.getValue()));
+            }
+        } finally {
+            rwLock.readLock().unlock();
+        }
+        for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (db.isPresent()) {
+                for (Long tableId : entry.getValue()) {
+                    Optional<Table> table = db.get().getTable(tableId);
+                    if (!table.isPresent()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.warn("invalid stream id: {}, db: {}", tableId, 
db.get().getFullName());

Review Comment:
   Same bug as above: `LOG.warn` inside `LOG.isDebugEnabled()` guard. This 
pattern appears in both `fillTableStreamValuesMetadataResult` and 
`fillStreamConsumptionValuesMetadataResult`.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -994,6 +1007,9 @@ private void dropTableInternal(Database db, Table table, 
boolean isView, boolean
         if (table instanceof OlapTable) {
             Env.getCurrentEnv().getMtmvService().dropTable(table);
         }
+        if (table instanceof BaseTableStream) {
+            
Env.getCurrentEnv().getTableStreamManager().removeTableStream((BaseTableStream) 
table);
+        }

Review Comment:
   Issue: Double removal of stream from `TableStreamManager`. 
`removeTableStream` is called here in `dropTableInternal` (line 1011), but it 
was already called inside `unprotectDropTable` (line 1045) which is invoked at 
line 990 above. The second call is a no-op but indicates unclear ownership of 
this responsibility. Consider removing one of the two calls to avoid confusion 
— the one in `unprotectDropTable` is the canonical location since it handles 
both normal and replay paths.



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseTableStream.java:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.stream;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TRow;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseTableStream extends Table {
+    public enum StreamConsumeType {
+        DEFAULT,
+        APPEND_ONLY,
+        MIN_DELTA,
+        UNKNOWN;
+        public static StreamConsumeType getType(String typeName) {
+            if (typeName == null) {
+                return UNKNOWN;
+            }
+            typeName = typeName.toLowerCase();
+            switch (typeName) {
+                case "default":
+                    return DEFAULT;
+                case "append_only":
+                    return APPEND_ONLY;
+                case "min_delta":
+                    return MIN_DELTA;
+                default:
+                    return UNKNOWN;
+            }
+        }
+    }
+
+    private static ImmutableList<TableType> supportedTableTypeList = 
ImmutableList.of(TableType.OLAP);
+
+    @SerializedName("sct")
+    protected StreamConsumeType streamConsumeType = StreamConsumeType.DEFAULT;
+
+    @SerializedName("sir")
+    protected boolean showInitialRows;
+
+    @SerializedName("sti")
+    protected StreamTableInfo streamTableInfo;
+
+    @SerializedName("d")
+    private boolean disabled;
+
+    @SerializedName("s")
+    private boolean stale;
+
+    @SerializedName("sr")
+    private String staleReason = "N/A";
+
+    protected volatile TableIf baseTable;
+

Review Comment:
   Minor: The `volatile` on `baseTable` provides visibility across threads but 
`getBaseTableNullable()` has a check-then-act race: two threads could both see 
`baseTable == null` and both call `streamTableInfo.getTableNullable()`. Since 
`getTableNullable()` is idempotent and `TableIf` assignment is atomic for 
references, this is benign — but worth a comment noting this is intentional 
(similar to the double-checked locking pattern without synchronization).



##########
be/src/information_schema/schema_table_streams_scanner.cpp:
##########
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "information_schema/schema_table_streams_scanner.h"
+
+#include <cstddef>
+
+#include "core/block/block.h"
+#include "core/data_type/data_type_factory.hpp"
+#include "core/data_type/define_primitive_type.h"
+#include "core/string_ref.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "information_schema/schema_helper.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/client_cache.h"
+#include "util/thrift_rpc_helper.h"
+
+namespace doris {
+
+std::vector<SchemaScanner::ColumnDesc> 
SchemaTableStreamsScanner::_s_table_streams_columns = {
+        {"DB_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"STREAM_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"STREAM_ID", TYPE_BIGINT, sizeof(int64_t), true},
+        {"STREAM_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"CONSUME_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"STREAM_COMMENT", TYPE_STRING, sizeof(StringRef), true},
+        {"BASE_TABLE_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"BASE_TABLE_DB", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"BASE_TABLE_CTL", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"BASE_TABLE_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"ENABLED", TYPE_BOOLEAN, sizeof(int8_t), true},
+        {"IS_STALE", TYPE_BOOLEAN, sizeof(int8_t), true},
+        {"STALE_REASON", TYPE_STRING, sizeof(StringRef), true},
+};
+
+SchemaTableStreamsScanner::SchemaTableStreamsScanner()
+        : SchemaScanner(_s_table_streams_columns, 
TSchemaTableType::SCH_TABLE_STREAMS) {}
+
+SchemaTableStreamsScanner::~SchemaTableStreamsScanner() = default;
+
+Status SchemaTableStreamsScanner::start(RuntimeState* state) {
+    _block_rows_limit = state->batch_size();
+    _rpc_timeout_ms = state->execution_timeout() * 1000;
+    return Status::OK();
+}
+
+Status SchemaTableStreamsScanner::_get_table_streams_block_from_fe() {
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
+
+    TSchemaTableRequestParams schema_table_request_params;
+    TFetchSchemaTableDataRequest request;
+    request.__set_schema_table_name(TSchemaTableName::TABLE_STREAMS);
+    request.__set_schema_table_params(schema_table_request_params);
+
+    TFetchSchemaTableDataResult result;
+
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->fetchSchemaTableData(result, request);
+            },
+            _rpc_timeout_ms));
+
+    Status status(Status::create(result.status));
+    if (!status.ok()) {
+        LOG(WARNING) << "fetch transactions from FE failed, errmsg=" << status;
+        return status;

Review Comment:
   Nit: Copy-paste error in log message. This says `"fetch transactions from FE 
failed"` but this is fetching **streams**, not transactions. Should be `"fetch 
table streams from FE failed"`.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateStreamCommand.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.stream.BaseTableStream;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.info.TableNameInfo;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.qe.StmtExecutor;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Represents the command for SHOW CREATE STREAM.
+ */
+public class ShowCreateStreamCommand extends ShowCommand {
+    private static final ShowResultSetMetaData STREAM_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("Stream", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Create Stream", 
ScalarType.createVarchar(30)))
+                    .build();
+
+    private final TableNameInfo tblNameInfo;
+
+    public ShowCreateStreamCommand(TableNameInfo tableNameInfo) {
+        super(PlanType.SHOW_CREATE_STREAM_COMMAND);
+        this.tblNameInfo = tableNameInfo;
+    }
+
+    private void validate(ConnectContext ctx) throws AnalysisException {
+        tblNameInfo.analyze(ctx);
+
+        TableIf tableIf = Env.getCurrentEnv().getCatalogMgr()
+                .getCatalogOrAnalysisException(tblNameInfo.getCtl())
+                
.getDbOrAnalysisException(tblNameInfo.getDb()).getTableOrAnalysisException(tblNameInfo.getTbl());
+
+        if (!(tableIf instanceof BaseTableStream)) {
+            ErrorReport.reportAnalysisException(tblNameInfo.toFullyQualified()
+                    + " is not a stream, type:" + tableIf.getType());

Review Comment:
   Issue: `ErrorReport.reportAnalysisException()` is called with a raw string 
without an `ErrorCode`. The first argument should be an `ErrorCode` enum value, 
not a free-form string. Looking at the pattern used for SHOW CREATE TABLE in 
`ShowCreateTableCommand`, it uses `ErrorCode.ERR_WRONG_OBJECT`. This should 
follow the same pattern:
   ```java
   ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_OBJECT, 
tblNameInfo.getDb(), tblNameInfo.getTbl(), "STREAM", ...);
   ```



##########
be/src/information_schema/schema_table_stream_consumption_scanner.cpp:
##########
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "information_schema/schema_table_stream_consumption_scanner.h"
+
+#include <cstddef>
+
+#include "core/block/block.h"
+#include "core/data_type/data_type_factory.hpp"
+#include "core/data_type/define_primitive_type.h"
+#include "core/string_ref.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "information_schema/schema_helper.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/client_cache.h"
+#include "util/thrift_rpc_helper.h"
+
+namespace doris {
+
+std::vector<SchemaScanner::ColumnDesc>
+        
SchemaTableStreamConsumptionScanner::_s_table_stream_consumption_columns = {
+                {"DB_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+                {"STREAM_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+                {"STREAM_ID", TYPE_BIGINT, sizeof(int64_t), true},
+                {"UNIT", TYPE_VARCHAR, sizeof(StringRef), true},
+                {"CONSUMPTION_STATUS", TYPE_VARCHAR, sizeof(StringRef), true},
+                {"LAG", TYPE_VARCHAR, sizeof(StringRef), true},
+                {"LAST_CONSUMPTION_TIME", TYPE_BIGINT, sizeof(int64_t), true},
+};
+
+SchemaTableStreamConsumptionScanner::SchemaTableStreamConsumptionScanner()
+        : SchemaScanner(_s_table_stream_consumption_columns,
+                        TSchemaTableType::SCH_TABLE_STREAM_CONSUMPTION) {}
+
+SchemaTableStreamConsumptionScanner::~SchemaTableStreamConsumptionScanner() = 
default;
+
+Status SchemaTableStreamConsumptionScanner::start(RuntimeState* state) {
+    _block_rows_limit = state->batch_size();
+    _rpc_timeout_ms = state->execution_timeout() * 1000;
+    return Status::OK();
+}
+
+Status 
SchemaTableStreamConsumptionScanner::_get_table_stream_consumption_block_from_fe()
 {
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
+
+    TSchemaTableRequestParams schema_table_request_params;
+    TFetchSchemaTableDataRequest request;
+    
request.__set_schema_table_name(TSchemaTableName::TABLE_STREAM_CONSUMPTION);
+    request.__set_schema_table_params(schema_table_request_params);
+
+    TFetchSchemaTableDataResult result;
+
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->fetchSchemaTableData(result, request);
+            },
+            _rpc_timeout_ms));
+
+    Status status(Status::create(result.status));
+    if (!status.ok()) {
+        LOG(WARNING) << "fetch transactions from FE failed, errmsg=" << status;
+        return status;

Review Comment:
   Nit: Same copy-paste error — log message says `"fetch transactions from FE 
failed"` but should say `"fetch stream consumption from FE failed"`.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -3902,4 +3923,72 @@ public Map<String, Long> getUsedDataQuota() {
     public void onClose() {
         Env.getCurrentEnv().getRefreshManager().removeFromRefreshMap(getId());
     }
+
+    public void createTableStream(CreateStreamCommand command) throws 
DdlException {
+        if (!Config.enable_table_stream) {
+            throw new DdlException("Table Stream is experimental."
+                    + " Please set enable_table_stream=true to enable it.");
+        }
+        CreateStreamInfo createStreamInfo = command.getCreateStreamInfo();
+        String dbName = createStreamInfo.getStreamName().getDb();
+        String streamName = createStreamInfo.getStreamName().getTbl();
+        // check if db exists
+        Database db = getDbOrDdlException(dbName);
+        // check if table exists in db
+        boolean replace = false;
+        if (db.getTable(streamName).isPresent()) {
+            if (createStreamInfo.isIfNotExists()) {
+                LOG.info("create stream[{}] which already exists", streamName);
+                return;
+            } else if (createStreamInfo.isOrReplace()) {
+                replace = true;
+                LOG.info("stream[{}] already exists, need to replace it", 
streamName);
+            } else {
+                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, streamName);
+            }
+        }
+        if (replace) {
+            throw new DdlException("do not support replace currently");
+        } else {
+            // get base table
+            CatalogIf baseCatalog;
+            if 
(Strings.isNullOrEmpty(createStreamInfo.getBaseTableName().getCtl())) {
+                baseCatalog = this;
+            } else {
+                baseCatalog = Env.getCurrentEnv().getCatalogMgr()
+                        
.getCatalogOrDdlException(createStreamInfo.getBaseTableName().getCtl());
+            }
+            BaseTableStream newStream;
+            TableIf baseTable = 
baseCatalog.getDbOrDdlException(createStreamInfo.getBaseTableName().getDb())
+                    
.getTableOrDdlException(createStreamInfo.getBaseTableName().getTbl());
+            // lock base table for stream init
+            baseTable.readLock();
+            try {
+                Map<String, String> properties = 
createStreamInfo.getProperties();
+                // build new stream
+                newStream = new TableStreamBuildFactory()
+                        .withName(streamName)
+                        .withBaseTable(baseTable)
+                        .build();
+                newStream.setComment(createStreamInfo.getComment());
+                try {
+                    newStream.setProperties(properties);
+                } catch (AnalysisException e) {
+                    throw new DdlException(e.getMessage(), e);
+                }
+                if (properties != null && !properties.isEmpty()) {
+                    // before here, all properties should be checked
+                    throw new DdlException("Unknown properties: " + 
properties);
+                }
+                newStream.setId((Env.getCurrentEnv().getNextId()));
+            } finally {

Review Comment:
   Issue: `newStream.setId()` is called after `baseTable.readUnlock()`, outside 
the base table read lock but before `db.createTableWithLock()`. While 
`getNextId()` is atomic, there's a TOCTOU issue: between checking the stream 
name doesn't exist (line 3939) and actually creating it (line 3987), another 
thread could create a table/stream with the same name. The 
`db.createTableWithLock` internally handles this with `ifNotExists`, but 
without `ifNotExists`, the error message from `createTableWithLock` might be 
different from the expected `ERR_TABLE_EXISTS_ERROR`. Consider wrapping the 
existence check and creation under the DB write lock, similar to how 
`createTable` works for OlapTable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to