Copilot commented on code in PR #58972:
URL: https://github.com/apache/doris/pull/58972#discussion_r2618909758


##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_metadata: row-group/column statistics similar to DuckDB 
parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ */

Review Comment:
   The class documentation describes modes as "parquet_metadata" and 
"parquet_schema" which matches the internal implementation, but the function 
name is "parquet_meta" (line 64). Consider updating the documentation to 
clarify that the function is called "parquet_meta" and accepts a "mode" 
parameter with values "parquet_metadata" or "parquet_schema".



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_metadata: row-group/column statistics similar to DuckDB 
parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);
+        if (Strings.isNullOrEmpty(rawPath)) {
+            throw new AnalysisException("Property 'path' is required for 
parquet_metadata");
+        }
+        List<String> parsedPaths = Arrays.stream(rawPath.split(","))
+                .map(String::trim)
+                .filter(token -> !token.isEmpty())
+                .collect(Collectors.toList());
+        if (parsedPaths.isEmpty()) {
+            throw new AnalysisException("Property 'path' must contain at least 
one location");
+        }
+        List<String> normalizedPaths = parsedPaths;
+
+        String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA);
+        mode = rawMode.toLowerCase();
+        if (!SUPPORTED_MODES.contains(mode)) {
+            throw new AnalysisException("Unsupported mode '" + rawMode + "' 
for parquet_metadata");
+        }
+
+        String firstPath = parsedPaths.get(0);
+        String scheme = null;
+        try {
+            scheme = new URI(firstPath).getScheme();
+        } catch (URISyntaxException e) {
+            scheme = null;
+        }
+        StorageProperties storageProperties = null;
+        if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) {
+            this.fileType = TFileType.FILE_LOCAL;
+            this.properties = Collections.emptyMap();
+        } else {
+            Map<String, String> storageParams = new 
HashMap<>(normalizedParams);
+            // StorageProperties detects provider by "uri", we also hint 
support by scheme.
+            storageParams.put("uri", firstPath);
+            forceStorageSupport(storageParams, scheme.toLowerCase());
+            try {
+                storageProperties = 
StorageProperties.createPrimary(storageParams);
+            } catch (RuntimeException e) {
+                throw new AnalysisException(
+                        "Failed to parse storage properties for 
parquet_metadata: " + e.getMessage(), e);
+            }
+            this.fileType = mapToFileType(storageProperties.getType());
+            Map<String, String> backendProps = 
storageProperties.getBackendConfigProperties();
+            try {
+                List<String> tmpPaths = new ArrayList<>(parsedPaths.size());
+                for (String path : parsedPaths) {
+                    
tmpPaths.add(storageProperties.validateAndNormalizeUri(path));
+                }
+                normalizedPaths = tmpPaths;
+            } catch (UserException e) {
+                throw new AnalysisException(
+                        "Failed to normalize parquet_metadata paths: " + 
e.getMessage(), e);
+            }
+            if (this.fileType == TFileType.FILE_HTTP && 
!backendProps.containsKey("uri")) {
+                backendProps = new HashMap<>(backendProps);
+                backendProps.put("uri", normalizedPaths.get(0));
+            }
+            this.properties = backendProps;
+        }
+
+        // Expand any glob patterns (e.g. *.parquet) to concrete file paths.
+        normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, 
this.fileType);
+
+        this.paths = ImmutableList.copyOf(normalizedPaths);
+    }

Review Comment:
   The new ParquetMetadataTableValuedFunction lacks test coverage. Given that 
other table-valued functions in the repository have test coverage (e.g., 
ExternalFileTableValuedFunctionTest.java), unit tests should be added to verify 
the function's behavior, including parameter validation, path expansion, mode 
handling, and error cases.



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_metadata: row-group/column statistics similar to DuckDB 
parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);
+        if (Strings.isNullOrEmpty(rawPath)) {
+            throw new AnalysisException("Property 'path' is required for 
parquet_metadata");
+        }
+        List<String> parsedPaths = Arrays.stream(rawPath.split(","))
+                .map(String::trim)
+                .filter(token -> !token.isEmpty())
+                .collect(Collectors.toList());
+        if (parsedPaths.isEmpty()) {
+            throw new AnalysisException("Property 'path' must contain at least 
one location");
+        }
+        List<String> normalizedPaths = parsedPaths;
+
+        String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA);
+        mode = rawMode.toLowerCase();
+        if (!SUPPORTED_MODES.contains(mode)) {
+            throw new AnalysisException("Unsupported mode '" + rawMode + "' 
for parquet_metadata");
+        }
+
+        String firstPath = parsedPaths.get(0);
+        String scheme = null;
+        try {
+            scheme = new URI(firstPath).getScheme();
+        } catch (URISyntaxException e) {
+            scheme = null;
+        }
+        StorageProperties storageProperties = null;
+        if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) {
+            this.fileType = TFileType.FILE_LOCAL;
+            this.properties = Collections.emptyMap();
+        } else {
+            Map<String, String> storageParams = new 
HashMap<>(normalizedParams);
+            // StorageProperties detects provider by "uri", we also hint 
support by scheme.
+            storageParams.put("uri", firstPath);
+            forceStorageSupport(storageParams, scheme.toLowerCase());
+            try {
+                storageProperties = 
StorageProperties.createPrimary(storageParams);
+            } catch (RuntimeException e) {
+                throw new AnalysisException(
+                        "Failed to parse storage properties for 
parquet_metadata: " + e.getMessage(), e);
+            }
+            this.fileType = mapToFileType(storageProperties.getType());
+            Map<String, String> backendProps = 
storageProperties.getBackendConfigProperties();
+            try {
+                List<String> tmpPaths = new ArrayList<>(parsedPaths.size());
+                for (String path : parsedPaths) {
+                    
tmpPaths.add(storageProperties.validateAndNormalizeUri(path));
+                }
+                normalizedPaths = tmpPaths;
+            } catch (UserException e) {
+                throw new AnalysisException(
+                        "Failed to normalize parquet_metadata paths: " + 
e.getMessage(), e);
+            }
+            if (this.fileType == TFileType.FILE_HTTP && 
!backendProps.containsKey("uri")) {
+                backendProps = new HashMap<>(backendProps);
+                backendProps.put("uri", normalizedPaths.get(0));
+            }
+            this.properties = backendProps;
+        }
+
+        // Expand any glob patterns (e.g. *.parquet) to concrete file paths.
+        normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, 
this.fileType);
+
+        this.paths = ImmutableList.copyOf(normalizedPaths);
+    }
+
+    /**
+     * Expand wildcard paths to matching files.
+     */
+    private static List<String> expandGlobPaths(List<String> inputPaths,
+                                               StorageProperties 
storageProperties,
+                                               TFileType fileType) throws 
AnalysisException {
+        if (inputPaths == null || inputPaths.isEmpty()) {
+            return Collections.emptyList();
+        }
+        List<String> expanded = new ArrayList<>();
+        for (String path : inputPaths) {
+            if (!containsWildcards(path)) {
+                expanded.add(path);
+                continue;
+            }
+            expanded.addAll(expandSingleGlob(path, storageProperties, 
fileType));
+        }
+        if (expanded.isEmpty()) {
+            throw new AnalysisException("No files matched parquet_metadata 
path patterns: " + inputPaths);
+        }
+        return expanded;
+    }
+
+    private static boolean containsWildcards(String path) {
+        if (Strings.isNullOrEmpty(path)) {
+            return false;
+        }
+        return path.contains("*") || path.contains("?") || path.contains("[") 
|| path.contains("{");
+    }
+
+    private static List<String> expandSingleGlob(String pattern,
+                                                 StorageProperties 
storageProperties,
+                                                 TFileType fileType) throws 
AnalysisException {
+        if (fileType == TFileType.FILE_LOCAL) {
+            return globLocal(pattern);
+        }
+        if (storageProperties == null) {
+            throw new AnalysisException("Storage properties is required for 
glob pattern: " + pattern);
+        }
+        if (fileType == TFileType.FILE_S3 || fileType == TFileType.FILE_HDFS) {
+            return globRemote(storageProperties, pattern);
+        }
+        throw new AnalysisException("Glob patterns are not supported for file 
type: " + fileType);
+    }
+
+    private static List<String> globRemote(StorageProperties storageProperties,
+                                          String pattern) throws 
AnalysisException {
+        List<RemoteFile> remoteFiles = new ArrayList<>();
+        try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
+            Status status = fileSystem.globList(pattern, remoteFiles, false);
+            if (!status.ok()) {
+                throw new AnalysisException(status.getErrMsg());
+            }
+        } catch (Exception e) {
+            throw new AnalysisException("Failed to expand glob pattern '" + 
pattern + "': "
+                    + e.getMessage(), e);
+        }
+        return remoteFiles.stream()
+                .filter(RemoteFile::isFile)
+                .map(RemoteFile::getName)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Local glob expansion is executed on an arbitrary alive backend, because 
FE may not
+     * have access to BE local file system.
+     */
+    private static List<String> globLocal(String pattern) throws 
AnalysisException {
+        Backend backend;
+        try {
+            backend = 
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().stream()
+                    .filter(Backend::isAlive)
+                    .findFirst()
+                    .orElse(null);
+        } catch (AnalysisException e) {
+            throw e;
+        }

Review Comment:
   The catch block unnecessarily catches and re-throws the same 
AnalysisException without adding any value. The try-catch block should be 
removed since getBackendsByCurrentCluster() already throws AnalysisException 
which will propagate naturally.
   ```suggestion
           backend = 
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().stream()
                   .filter(Backend::isAlive)
                   .findFirst()
                   .orElse(null);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_metadata: row-group/column statistics similar to DuckDB 
parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);
+        if (Strings.isNullOrEmpty(rawPath)) {
+            throw new AnalysisException("Property 'path' is required for 
parquet_metadata");
+        }
+        List<String> parsedPaths = Arrays.stream(rawPath.split(","))
+                .map(String::trim)
+                .filter(token -> !token.isEmpty())
+                .collect(Collectors.toList());
+        if (parsedPaths.isEmpty()) {
+            throw new AnalysisException("Property 'path' must contain at least 
one location");
+        }
+        List<String> normalizedPaths = parsedPaths;
+
+        String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA);
+        mode = rawMode.toLowerCase();
+        if (!SUPPORTED_MODES.contains(mode)) {
+            throw new AnalysisException("Unsupported mode '" + rawMode + "' 
for parquet_metadata");
+        }
+
+        String firstPath = parsedPaths.get(0);
+        String scheme = null;
+        try {
+            scheme = new URI(firstPath).getScheme();
+        } catch (URISyntaxException e) {
+            scheme = null;
+        }
+        StorageProperties storageProperties = null;
+        if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) {
+            this.fileType = TFileType.FILE_LOCAL;
+            this.properties = Collections.emptyMap();
+        } else {
+            Map<String, String> storageParams = new 
HashMap<>(normalizedParams);
+            // StorageProperties detects provider by "uri", we also hint 
support by scheme.
+            storageParams.put("uri", firstPath);
+            forceStorageSupport(storageParams, scheme.toLowerCase());
+            try {
+                storageProperties = 
StorageProperties.createPrimary(storageParams);
+            } catch (RuntimeException e) {
+                throw new AnalysisException(
+                        "Failed to parse storage properties for 
parquet_metadata: " + e.getMessage(), e);
+            }
+            this.fileType = mapToFileType(storageProperties.getType());
+            Map<String, String> backendProps = 
storageProperties.getBackendConfigProperties();
+            try {
+                List<String> tmpPaths = new ArrayList<>(parsedPaths.size());
+                for (String path : parsedPaths) {
+                    
tmpPaths.add(storageProperties.validateAndNormalizeUri(path));
+                }
+                normalizedPaths = tmpPaths;
+            } catch (UserException e) {
+                throw new AnalysisException(
+                        "Failed to normalize parquet_metadata paths: " + 
e.getMessage(), e);
+            }
+            if (this.fileType == TFileType.FILE_HTTP && 
!backendProps.containsKey("uri")) {
+                backendProps = new HashMap<>(backendProps);
+                backendProps.put("uri", normalizedPaths.get(0));
+            }
+            this.properties = backendProps;
+        }
+
+        // Expand any glob patterns (e.g. *.parquet) to concrete file paths.
+        normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, 
this.fileType);
+
+        this.paths = ImmutableList.copyOf(normalizedPaths);
+    }
+
+    /**
+     * Expand wildcard paths to matching files.
+     */
+    private static List<String> expandGlobPaths(List<String> inputPaths,
+                                               StorageProperties 
storageProperties,
+                                               TFileType fileType) throws 
AnalysisException {
+        if (inputPaths == null || inputPaths.isEmpty()) {
+            return Collections.emptyList();
+        }
+        List<String> expanded = new ArrayList<>();
+        for (String path : inputPaths) {
+            if (!containsWildcards(path)) {
+                expanded.add(path);
+                continue;
+            }
+            expanded.addAll(expandSingleGlob(path, storageProperties, 
fileType));
+        }
+        if (expanded.isEmpty()) {
+            throw new AnalysisException("No files matched parquet_metadata 
path patterns: " + inputPaths);
+        }
+        return expanded;
+    }
+
+    private static boolean containsWildcards(String path) {
+        if (Strings.isNullOrEmpty(path)) {
+            return false;
+        }
+        return path.contains("*") || path.contains("?") || path.contains("[") 
|| path.contains("{");
+    }
+
+    private static List<String> expandSingleGlob(String pattern,
+                                                 StorageProperties 
storageProperties,
+                                                 TFileType fileType) throws 
AnalysisException {
+        if (fileType == TFileType.FILE_LOCAL) {
+            return globLocal(pattern);
+        }
+        if (storageProperties == null) {
+            throw new AnalysisException("Storage properties is required for 
glob pattern: " + pattern);
+        }
+        if (fileType == TFileType.FILE_S3 || fileType == TFileType.FILE_HDFS) {
+            return globRemote(storageProperties, pattern);
+        }
+        throw new AnalysisException("Glob patterns are not supported for file 
type: " + fileType);
+    }
+
+    private static List<String> globRemote(StorageProperties storageProperties,
+                                          String pattern) throws 
AnalysisException {
+        List<RemoteFile> remoteFiles = new ArrayList<>();
+        try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
+            Status status = fileSystem.globList(pattern, remoteFiles, false);
+            if (!status.ok()) {
+                throw new AnalysisException(status.getErrMsg());
+            }
+        } catch (Exception e) {
+            throw new AnalysisException("Failed to expand glob pattern '" + 
pattern + "': "
+                    + e.getMessage(), e);
+        }
+        return remoteFiles.stream()
+                .filter(RemoteFile::isFile)
+                .map(RemoteFile::getName)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Local glob expansion is executed on an arbitrary alive backend, because 
FE may not
+     * have access to BE local file system.
+     */
+    private static List<String> globLocal(String pattern) throws 
AnalysisException {
+        Backend backend;
+        try {
+            backend = 
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().stream()
+                    .filter(Backend::isAlive)
+                    .findFirst()
+                    .orElse(null);
+        } catch (AnalysisException e) {
+            throw e;
+        }
+        if (backend == null) {
+            throw new AnalysisException("No alive backends to expand local 
glob pattern");
+        }
+        BackendServiceProxy proxy = BackendServiceProxy.getInstance();
+        InternalService.PGlobRequest.Builder requestBuilder = 
InternalService.PGlobRequest.newBuilder();
+        requestBuilder.setPattern(pattern);
+        long timeoutS = ConnectContext.get() == null ? 60
+                : Math.min(ConnectContext.get().getQueryTimeoutS(), 60);
+        try {
+            Future<InternalService.PGlobResponse> response =
+                    proxy.glob(backend.getBrpcAddress(), 
requestBuilder.build());
+            InternalService.PGlobResponse globResponse =
+                    response.get(timeoutS, TimeUnit.SECONDS);
+            if (globResponse.getStatus().getStatusCode() != 0) {
+                throw new AnalysisException("Expand local glob pattern failed: 
"
+                        + globResponse.getStatus().getErrorMsgsList());
+            }
+            List<String> result = new ArrayList<>();
+            for (InternalService.PGlobResponse.PFileInfo fileInfo : 
globResponse.getFilesList()) {
+                result.add(fileInfo.getFile().trim());
+            }
+            return result;
+        } catch (Exception e) {
+            throw new AnalysisException("Failed to expand local glob pattern 
'" + pattern + "': "
+                    + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Add fs.* support hints so StorageProperties can be created from 
URI-only inputs.
+     */
+    private static void forceStorageSupport(Map<String, String> params, String 
scheme) {
+        if ("s3".equals(scheme) || "s3a".equals(scheme) || 
"s3n".equals(scheme)) {
+            params.put(StorageProperties.FS_S3_SUPPORT, "true");
+        } else if ("oss".equals(scheme)) {
+            params.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        } else if ("obs".equals(scheme)) {
+            params.put(StorageProperties.FS_OBS_SUPPORT, "true");
+        } else if ("cos".equals(scheme)) {
+            params.put(StorageProperties.FS_COS_SUPPORT, "true");
+        } else if ("gs".equals(scheme) || "gcs".equals(scheme)) {
+            params.put(StorageProperties.FS_GCS_SUPPORT, "true");
+        } else if ("minio".equals(scheme)) {
+            params.put(StorageProperties.FS_MINIO_SUPPORT, "true");
+        } else if ("azure".equals(scheme)) {
+            params.put(StorageProperties.FS_AZURE_SUPPORT, "true");
+        } else if ("http".equals(scheme) || "https".equals(scheme) || 
"hf".equals(scheme)) {
+            params.put(StorageProperties.FS_HTTP_SUPPORT, "true");
+        }
+    }
+
+    /**
+     * Map FE storage type to BE file type.
+     */
+    private static TFileType mapToFileType(StorageProperties.Type type) throws 
AnalysisException {
+        switch (type) {
+            case HDFS:
+            case OSS_HDFS:
+                return TFileType.FILE_HDFS;
+            case HTTP:
+                return TFileType.FILE_HTTP;
+            case LOCAL:
+                return TFileType.FILE_LOCAL;
+            case S3:
+            case OSS:
+            case OBS:
+            case COS:
+            case GCS:
+            case MINIO:
+            case AZURE:
+                return TFileType.FILE_S3;
+            default:
+                throw new AnalysisException("Unsupported storage type for 
parquet_metadata: " + type);

Review Comment:
   Error messages reference "parquet_metadata" instead of the actual function 
name "parquet_meta". This inconsistency appears in multiple error messages 
(lines 125, 132, 139, 162, 174, 207, 226, 338). All error messages should use 
the actual function name "parquet_meta" for clarity.
   ```suggestion
                   throw new AnalysisException("Unsupported storage type for 
parquet_meta: " + type);
   ```



##########
sbe:
##########
@@ -0,0 +1,7 @@
+#!/bin/bash
+set -euo pipefail
+
+unset http_proxy
+unset https_proxy
+cd /mnt/disk1/chenjunwei/doris_build/doris/output/be
+./bin/start_be.sh --daemon

Review Comment:
   This file appears to be a personal development script with hardcoded paths 
(e.g., `/mnt/disk1/chenjunwei/doris_build/doris/output/be`) and should not be 
included in the PR. It should be removed from the repository.
   ```suggestion
   
   ```



##########
be/src/vec/exec/format/table/parquet_metadata_reader.h:
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/factory_creator.h"
+#include "common/status.h"
+#include "runtime/descriptors.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+namespace io {
+class FileReader;
+} // namespace io
+} // namespace doris
+
+namespace doris::vectorized {
+class Block;
+
+// Lightweight reader that surfaces Parquet footer metadata as a table-valued 
scan.
+// It reads only file footers (no data pages) and emits either schema rows or
+// row-group/column statistics based on `mode`.
+class ParquetMetadataReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(ParquetMetadataReader);
+
+public:
+    class ModeHandler;
+
+    ParquetMetadataReader(std::vector<SlotDescriptor*> slots, RuntimeState* 
state,
+                          RuntimeProfile* profile, TMetaScanRange scan_range);
+    ~ParquetMetadataReader() override;
+
+    Status init_reader();
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+    Status close() override;
+
+private:
+
+

Review Comment:
   There are extra blank lines between the private access specifier and the 
first private method declaration. Remove one of the blank lines for consistency 
with the codebase style.
   ```suggestion
   
   ```



##########
be/src/vec/exec/format/table/parquet_metadata_reader.cpp:
##########
@@ -0,0 +1,908 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/parquet_metadata_reader.h"
+
+#include <fmt/format.h>
+
+#include <array>
+#include <algorithm>
+#include <cctype>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "common/logging.h"
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/local_file_system.h"
+#include "io/io_common.h"
+#include "runtime/runtime_state.h"
+#include "util/string_util.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/unaligned.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/exec/format/parquet/parquet_column_convert.h"
+#include "vec/exec/format/parquet/parquet_thrift_util.h"
+#include "vec/exec/format/parquet/schema_desc.h"
+#include "vec/exec/format/parquet/vparquet_file_metadata.h"
+
+namespace doris::vectorized {
+namespace {
+
+constexpr const char* MODE_SCHEMA = "parquet_schema";
+constexpr const char* MODE_METADATA = "parquet_metadata";
+
+enum SchemaColumnIndex : size_t {
+    SCHEMA_FILE_NAME = 0,
+    SCHEMA_COLUMN_NAME,
+    SCHEMA_COLUMN_PATH,
+    SCHEMA_PHYSICAL_TYPE,
+    SCHEMA_LOGICAL_TYPE,
+    SCHEMA_REPETITION_LEVEL,
+    SCHEMA_DEFINITION_LEVEL,
+    SCHEMA_TYPE_LENGTH,
+    SCHEMA_PRECISION,
+    SCHEMA_SCALE,
+    SCHEMA_IS_NULLABLE,
+    SCHEMA_COLUMN_COUNT
+};
+
+enum MetadataColumnIndex : size_t {
+    META_FILE_NAME = 0,
+    META_ROW_GROUP_ID,
+    META_COLUMN_ID,
+    META_COLUMN_NAME,
+    META_COLUMN_PATH,
+    META_PHYSICAL_TYPE,
+    META_LOGICAL_TYPE,
+    META_TYPE_LENGTH,
+    META_CONVERTED_TYPE,
+    META_NUM_VALUES,
+    META_NULL_COUNT,
+    META_DISTINCT_COUNT,
+    META_ENCODINGS,
+    META_COMPRESSION,
+    META_DATA_PAGE_OFFSET,
+    META_INDEX_PAGE_OFFSET,
+    META_DICTIONARY_PAGE_OFFSET,
+    META_TOTAL_COMPRESSED_SIZE,
+    META_TOTAL_UNCOMPRESSED_SIZE,
+    META_STATISTICS_MIN,
+    META_STATISTICS_MAX,
+    META_COLUMN_COUNT
+};
+
+constexpr std::array<const char*, SCHEMA_COLUMN_COUNT> kSchemaColumnNames = {
+        "file_name",        "column_name",       "column_path",       
"physical_type",
+        "logical_type",     "repetition_level",  "definition_level",  
"type_length",
+        "precision",        "scale",             "is_nullable"};
+
+static_assert(kSchemaColumnNames.size() == SCHEMA_COLUMN_COUNT);
+
+constexpr std::array<const char*, META_COLUMN_COUNT> kMetadataColumnNames = {
+        "file_name", "row_group_id", "column_id", "column_name", 
"column_path", "physical_type",
+        "logical_type", "type_length", "converted_type", "num_values", 
"null_count",
+        "distinct_count", "encodings", "compression", "data_page_offset", 
"index_page_offset",
+        "dictionary_page_offset", "total_compressed_size", 
"total_uncompressed_size",
+        "statistics_min", "statistics_max"};
+
+static_assert(kMetadataColumnNames.size() == META_COLUMN_COUNT);
+
+std::string join_path(const std::vector<std::string>& items) {
+    return join(items, ".");
+}
+
+template <typename ColumnType, typename T>
+void insert_numeric_impl(MutableColumnPtr& column, T value) {
+    if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
+        nullable->get_null_map_data().push_back(0);
+        auto& nested = nullable->get_nested_column();
+        assert_cast<ColumnType&>(nested).insert_value(value);
+    } else {
+        assert_cast<ColumnType&>(*column).insert_value(value);
+    }
+}
+
+void insert_int32(MutableColumnPtr& column, Int32 value) {
+    insert_numeric_impl<ColumnInt32>(column, value);
+}
+
+void insert_int64(MutableColumnPtr& column, Int64 value) {
+    insert_numeric_impl<ColumnInt64>(column, value);
+}
+
+void insert_bool(MutableColumnPtr& column, bool value) {
+    insert_numeric_impl<ColumnUInt8>(column, static_cast<UInt8>(value));
+}
+
+void insert_string(MutableColumnPtr& column, const std::string& value) {
+    if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
+        nullable->get_null_map_data().push_back(0);
+        auto& nested = nullable->get_nested_column();
+        assert_cast<ColumnString&>(nested).insert_data(value.c_str(), 
value.size());
+    } else {
+        assert_cast<ColumnString&>(*column).insert_data(value.c_str(), 
value.size());
+    }
+}
+
+void insert_null(MutableColumnPtr& column) {
+    if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
+        nullable->get_null_map_data().push_back(1);
+        nullable->get_nested_column().insert_default();
+    } else {
+        column->insert_default();
+    }
+}
+
+std::string physical_type_to_string(tparquet::Type::type type) {
+    switch (type) {
+    case tparquet::Type::BOOLEAN:
+        return "BOOLEAN";
+    case tparquet::Type::INT32:
+        return "INT32";
+    case tparquet::Type::INT64:
+        return "INT64";
+    case tparquet::Type::INT96:
+        return "INT96";
+    case tparquet::Type::FLOAT:
+        return "FLOAT";
+    case tparquet::Type::DOUBLE:
+        return "DOUBLE";
+    case tparquet::Type::BYTE_ARRAY:
+        return "BYTE_ARRAY";
+    case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
+        return "FIXED_LEN_BYTE_ARRAY";
+    default:
+        return "UNKNOWN";
+    }
+}
+
+std::string compression_to_string(tparquet::CompressionCodec::type codec) {
+    switch (codec) {
+    case tparquet::CompressionCodec::UNCOMPRESSED:
+        return "UNCOMPRESSED";
+    case tparquet::CompressionCodec::SNAPPY:
+        return "SNAPPY";
+    case tparquet::CompressionCodec::GZIP:
+        return "GZIP";
+    case tparquet::CompressionCodec::LZO:
+        return "LZO";
+    case tparquet::CompressionCodec::BROTLI:
+        return "BROTLI";
+    case tparquet::CompressionCodec::LZ4:
+        return "LZ4";
+    case tparquet::CompressionCodec::ZSTD:
+        return "ZSTD";
+    case tparquet::CompressionCodec::LZ4_RAW:
+        return "LZ4_RAW";
+    default:
+        return "UNKNOWN";
+    }
+}
+
+std::string converted_type_to_string(tparquet::ConvertedType::type type) {
+    switch (type) {
+    case tparquet::ConvertedType::UTF8:
+        return "UTF8";
+    case tparquet::ConvertedType::MAP:
+        return "MAP";
+    case tparquet::ConvertedType::MAP_KEY_VALUE:
+        return "MAP_KEY_VALUE";
+    case tparquet::ConvertedType::LIST:
+        return "LIST";
+    case tparquet::ConvertedType::ENUM:
+        return "ENUM";
+    case tparquet::ConvertedType::DECIMAL:
+        return "DECIMAL";
+    case tparquet::ConvertedType::DATE:
+        return "DATE";
+    case tparquet::ConvertedType::TIME_MILLIS:
+        return "TIME_MILLIS";
+    case tparquet::ConvertedType::TIME_MICROS:
+        return "TIME_MICROS";
+    case tparquet::ConvertedType::TIMESTAMP_MILLIS:
+        return "TIMESTAMP_MILLIS";
+    case tparquet::ConvertedType::TIMESTAMP_MICROS:
+        return "TIMESTAMP_MICROS";
+    case tparquet::ConvertedType::UINT_8:
+        return "UINT_8";
+    case tparquet::ConvertedType::UINT_16:
+        return "UINT_16";
+    case tparquet::ConvertedType::UINT_32:
+        return "UINT_32";
+    case tparquet::ConvertedType::UINT_64:
+        return "UINT_64";
+    case tparquet::ConvertedType::INT_8:
+        return "INT_8";
+    case tparquet::ConvertedType::INT_16:
+        return "INT_16";
+    case tparquet::ConvertedType::INT_32:
+        return "INT_32";
+    case tparquet::ConvertedType::INT_64:
+        return "INT_64";
+    case tparquet::ConvertedType::JSON:
+        return "JSON";
+    case tparquet::ConvertedType::BSON:
+        return "BSON";
+    case tparquet::ConvertedType::INTERVAL:
+        return "INTERVAL";
+    default:
+        return "UNKNOWN";
+    }
+}
+
+std::string logical_type_to_string(const tparquet::SchemaElement& element) {
+    if (element.__isset.logicalType) {
+        const auto& logical = element.logicalType;
+        if (logical.__isset.STRING) {
+            return "STRING";
+        } else if (logical.__isset.MAP) {
+            return "MAP";
+        } else if (logical.__isset.LIST) {
+            return "LIST";
+        } else if (logical.__isset.ENUM) {
+            return "ENUM";
+        } else if (logical.__isset.DECIMAL) {
+            return "DECIMAL";
+        } else if (logical.__isset.DATE) {
+            return "DATE";
+        } else if (logical.__isset.TIME) {
+            return "TIME";
+        } else if (logical.__isset.TIMESTAMP) {
+            return "TIMESTAMP";
+        } else if (logical.__isset.INTEGER) {
+            return "INTEGER";
+        } else if (logical.__isset.UNKNOWN) {
+            return "UNKNOWN";
+        } else if (logical.__isset.JSON) {
+            return "JSON";
+        } else if (logical.__isset.BSON) {
+            return "BSON";
+        } else if (logical.__isset.UUID) {
+            return "UUID";
+        } else if (logical.__isset.FLOAT16) {
+            return "FLOAT16";
+        } else if (logical.__isset.VARIANT) {
+            return "VARIANT";
+        } else if (logical.__isset.GEOMETRY) {
+            return "GEOMETRY";
+        } else if (logical.__isset.GEOGRAPHY) {
+            return "GEOGRAPHY";
+        }
+    }
+    if (element.__isset.converted_type) {
+        return converted_type_to_string(element.converted_type);
+    }
+    return "";
+}
+
+std::string encodings_to_string(const std::vector<tparquet::Encoding::type>& 
encodings) {
+    std::vector<std::string> parts;
+    parts.reserve(encodings.size());
+    for (auto encoding : encodings) {
+        switch (encoding) {
+        case tparquet::Encoding::PLAIN:
+            parts.emplace_back("PLAIN");
+            break;
+        case tparquet::Encoding::PLAIN_DICTIONARY:
+            parts.emplace_back("PLAIN_DICTIONARY");
+            break;
+        case tparquet::Encoding::RLE:
+            parts.emplace_back("RLE");
+            break;
+        case tparquet::Encoding::BIT_PACKED:
+            parts.emplace_back("BIT_PACKED");
+            break;
+        case tparquet::Encoding::DELTA_BINARY_PACKED:
+            parts.emplace_back("DELTA_BINARY_PACKED");
+            break;
+        case tparquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
+            parts.emplace_back("DELTA_LENGTH_BYTE_ARRAY");
+            break;
+        case tparquet::Encoding::DELTA_BYTE_ARRAY:
+            parts.emplace_back("DELTA_BYTE_ARRAY");
+            break;
+        case tparquet::Encoding::RLE_DICTIONARY:
+            parts.emplace_back("RLE_DICTIONARY");
+            break;
+        default:
+            parts.emplace_back("UNKNOWN");
+            break;
+        }
+    }
+    return fmt::format("{}", fmt::join(parts, ","));
+}
+
+bool try_get_statistics_encoded_value(const tparquet::Statistics& statistics, 
bool is_min,
+                                     std::string* encoded_value) {
+    if (is_min) {
+        if (statistics.__isset.min_value) {
+            *encoded_value = statistics.min_value;
+            return true;
+        }
+        if (statistics.__isset.min) {
+            *encoded_value = statistics.min;
+            return true;
+        }
+    } else {
+        if (statistics.__isset.max_value) {
+            *encoded_value = statistics.max_value;
+            return true;
+        }
+        if (statistics.__isset.max) {
+            *encoded_value = statistics.max;
+            return true;
+        }
+    }
+    encoded_value->clear();
+    return false;
+}
+
+std::string bytes_to_hex_string(const std::string& bytes) {
+    static constexpr char kHexDigits[] = "0123456789ABCDEF";
+    std::string hex;
+    hex.resize(bytes.size() * 2);
+    for (size_t i = 0; i < bytes.size(); ++i) {
+        auto byte = static_cast<uint8_t>(bytes[i]);
+        hex[i * 2] = kHexDigits[byte >> 4];
+        hex[i * 2 + 1] = kHexDigits[byte & 0x0F];
+    }
+    return fmt::format("0x{}", hex);
+}
+
+std::string decode_statistics_value(const FieldSchema* schema_field, 
tparquet::Type::type physical_type,
+                                    const std::string& encoded_value,
+                                    const cctz::time_zone& ctz) {
+    if (encoded_value.empty()) {
+        return "";
+    }
+    if (schema_field == nullptr) {
+        return bytes_to_hex_string(encoded_value);
+    }
+
+    auto logical_data_type = remove_nullable(schema_field->data_type);
+    auto converter = parquet::PhysicalToLogicalConverter::get_converter(
+            schema_field, logical_data_type, logical_data_type, &ctz);
+    if (!converter || !converter->support()) {
+        return bytes_to_hex_string(encoded_value);
+    }
+
+    ColumnPtr physical_column;
+    switch (physical_type) {
+    case tparquet::Type::type::BOOLEAN: {
+        if (encoded_value.size() != sizeof(UInt8)) {
+            return bytes_to_hex_string(encoded_value);
+        }
+        auto physical_col = ColumnUInt8::create();
+        
physical_col->insert_value(doris::unaligned_load<UInt8>(encoded_value.data()));
+        physical_column = std::move(physical_col);
+        break;
+    }
+    case tparquet::Type::type::INT32: {
+        if (encoded_value.size() != sizeof(Int32)) {
+            return bytes_to_hex_string(encoded_value);
+        }
+        auto physical_col = ColumnInt32::create();
+        
physical_col->insert_value(doris::unaligned_load<Int32>(encoded_value.data()));
+        physical_column = std::move(physical_col);
+        break;
+    }
+    case tparquet::Type::type::INT64: {
+        if (encoded_value.size() != sizeof(Int64)) {
+            return bytes_to_hex_string(encoded_value);
+        }
+        auto physical_col = ColumnInt64::create();
+        
physical_col->insert_value(doris::unaligned_load<Int64>(encoded_value.data()));
+        physical_column = std::move(physical_col);
+        break;
+    }
+    case tparquet::Type::type::FLOAT: {
+        if (encoded_value.size() != sizeof(Float32)) {
+            return bytes_to_hex_string(encoded_value);
+        }
+        auto physical_col = ColumnFloat32::create();
+        
physical_col->insert_value(doris::unaligned_load<Float32>(encoded_value.data()));
+        physical_column = std::move(physical_col);
+        break;
+    }
+    case tparquet::Type::type::DOUBLE: {
+        if (encoded_value.size() != sizeof(Float64)) {
+            return bytes_to_hex_string(encoded_value);
+        }
+        auto physical_col = ColumnFloat64::create();
+        
physical_col->insert_value(doris::unaligned_load<Float64>(encoded_value.data()));
+        physical_column = std::move(physical_col);
+        break;
+    }
+    case tparquet::Type::type::BYTE_ARRAY: {
+        auto physical_col = ColumnString::create();
+        physical_col->insert_data(encoded_value.data(), encoded_value.size());
+        physical_column = std::move(physical_col);
+        break;
+    }
+    case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: {
+        int32_t type_length = schema_field->parquet_schema.__isset.type_length
+                                      ? 
schema_field->parquet_schema.type_length
+                                      : 0;
+        if (type_length <= 0) {
+            type_length = static_cast<int32_t>(encoded_value.size());
+        }
+        if (static_cast<size_t>(type_length) != encoded_value.size()) {
+            return bytes_to_hex_string(encoded_value);
+        }
+        auto physical_col = ColumnUInt8::create();
+        physical_col->resize(type_length);
+        memcpy(physical_col->get_data().data(), encoded_value.data(), 
encoded_value.size());
+        physical_column = std::move(physical_col);
+        break;
+    }
+    case tparquet::Type::type::INT96: {
+        constexpr size_t kInt96Size = 12;
+        if (encoded_value.size() != kInt96Size) {
+            return bytes_to_hex_string(encoded_value);
+        }
+        auto physical_col = ColumnInt8::create();
+        physical_col->resize(kInt96Size);
+        memcpy(physical_col->get_data().data(), encoded_value.data(), 
encoded_value.size());
+        physical_column = std::move(physical_col);
+        break;
+    }
+    default:
+        return bytes_to_hex_string(encoded_value);
+    }
+
+    ColumnPtr logical_column;
+    if (converter->is_consistent()) {
+        logical_column = physical_column;
+    } else {
+        logical_column = logical_data_type->create_column();
+        if (Status st = converter->physical_convert(physical_column, 
logical_column); !st.ok()) {
+            return bytes_to_hex_string(encoded_value);
+        }
+    }
+
+    if (logical_column->size() != 1) {
+        return bytes_to_hex_string(encoded_value);
+    }
+    return logical_data_type->to_string(*logical_column, 0);
+}
+
+void build_path_map(const FieldSchema& field, const std::string& prefix,
+                    std::unordered_map<std::string, const FieldSchema*>* map) {
+    std::string current = prefix.empty() ? field.name : fmt::format("{}.{}", 
prefix, field.name);
+    if (field.children.empty()) {
+        (*map)[current] = &field;
+    } else {
+        for (const auto& child : field.children) {
+            build_path_map(child, current, map);
+        }
+    }
+}
+
+} // namespace
+
+class ParquetMetadataReader::ModeHandler {
+public:
+    explicit ModeHandler(RuntimeState* state) : _state(state) {}
+    virtual ~ModeHandler() = default;
+
+    virtual void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) 
= 0;
+    virtual Status append_rows(const std::string& path, FileMetaData* metadata,
+                               std::vector<MutableColumnPtr>& columns) = 0;
+
+protected:
+    RuntimeState* _state = nullptr;
+
+    static std::unordered_map<std::string, int> _build_name_to_pos_map(
+            const std::vector<SlotDescriptor*>& slots) {
+        std::unordered_map<std::string, int> name_to_pos;
+        name_to_pos.reserve(slots.size());
+        for (size_t i = 0; i < slots.size(); ++i) {
+            name_to_pos.emplace(to_lower(slots[i]->col_name()), 
static_cast<int>(i));
+        }
+        return name_to_pos;
+    }
+
+    template <size_t N>
+    static void _init_slot_pos_map(const std::unordered_map<std::string, int>& 
name_to_pos,
+                                   const std::array<const char*, N>& 
column_names,
+                                   std::array<int, N>* slot_pos) {
+        slot_pos->fill(-1);
+        for (size_t i = 0; i < column_names.size(); ++i) {
+            auto it = name_to_pos.find(column_names[i]);
+            if (it != name_to_pos.end()) {
+                (*slot_pos)[i] = it->second;
+            }
+        }
+    }
+};
+
+class ParquetSchemaModeHandler final : public 
ParquetMetadataReader::ModeHandler {
+public:
+    explicit ParquetSchemaModeHandler(RuntimeState* state) : 
ModeHandler(state) {}
+
+    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override 
{
+        std::unordered_map<std::string, int> name_to_pos = 
_build_name_to_pos_map(slots);
+        _init_slot_pos_map(name_to_pos, kSchemaColumnNames, &_slot_pos);
+    }
+
+    Status append_rows(const std::string& path, FileMetaData* metadata,
+                       std::vector<MutableColumnPtr>& columns) override {
+        const auto& fields = metadata->schema().get_fields_schema();
+        for (const auto& field : fields) {
+            RETURN_IF_ERROR(_append_schema_field(path, field, field.name, 
columns));
+        }
+        return Status::OK();
+    }
+
+private:
+    std::array<int, SCHEMA_COLUMN_COUNT> _slot_pos {};
+
+    Status _append_schema_field(const std::string& path, const FieldSchema& 
field,
+                                const std::string& current_path,
+                                std::vector<MutableColumnPtr>& columns) {
+        if (!field.children.empty()) {
+            for (const auto& child : field.children) {
+                std::string child_path = current_path.empty()
+                                                 ? child.name
+                                                 : fmt::format("{}.{}", 
current_path, child.name);
+                RETURN_IF_ERROR(_append_schema_field(path, child, child_path, 
columns));
+            }
+            return Status::OK();
+        }
+
+        auto insert_if_requested = [&](SchemaColumnIndex idx, auto&& inserter, 
auto&&... args) {
+            int pos = _slot_pos[idx];
+            if (pos >= 0) {
+                inserter(columns[pos], std::forward<decltype(args)>(args)...);
+            }
+        };
+
+        insert_if_requested(SCHEMA_FILE_NAME, insert_string, path);
+        insert_if_requested(SCHEMA_COLUMN_NAME, insert_string, field.name);
+        insert_if_requested(SCHEMA_COLUMN_PATH, insert_string, current_path);
+        insert_if_requested(SCHEMA_PHYSICAL_TYPE, insert_string,
+                            physical_type_to_string(field.physical_type));
+        insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_string,
+                            logical_type_to_string(field.parquet_schema));
+        insert_if_requested(SCHEMA_REPETITION_LEVEL, insert_int32, 
field.repetition_level);
+        insert_if_requested(SCHEMA_DEFINITION_LEVEL, insert_int32, 
field.definition_level);
+
+        int32_t type_length = field.parquet_schema.__isset.type_length ? 
field.parquet_schema.type_length
+                                                                       : 0;
+        insert_if_requested(SCHEMA_TYPE_LENGTH, insert_int32, type_length);
+        int32_t precision =
+                field.parquet_schema.__isset.precision ? 
field.parquet_schema.precision : 0;
+        insert_if_requested(SCHEMA_PRECISION, insert_int32, precision);
+        int32_t scale = field.parquet_schema.__isset.scale ? 
field.parquet_schema.scale : 0;
+        insert_if_requested(SCHEMA_SCALE, insert_int32, scale);
+        bool is_nullable_field =
+                !field.parquet_schema.__isset.repetition_type ||
+                field.parquet_schema.repetition_type != 
tparquet::FieldRepetitionType::REQUIRED;
+        insert_if_requested(SCHEMA_IS_NULLABLE, insert_bool, 
is_nullable_field);
+        return Status::OK();
+    }
+};
+
+class ParquetMetadataModeHandler final : public 
ParquetMetadataReader::ModeHandler {
+public:
+    explicit ParquetMetadataModeHandler(RuntimeState* state) : 
ModeHandler(state) {}
+
+    void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override 
{
+        std::unordered_map<std::string, int> name_to_pos = 
_build_name_to_pos_map(slots);
+        _init_slot_pos_map(name_to_pos, kMetadataColumnNames, &_slot_pos);
+    }
+
+    Status append_rows(const std::string& path, FileMetaData* metadata,
+                       std::vector<MutableColumnPtr>& columns) override {
+        const tparquet::FileMetaData& thrift_meta = metadata->to_thrift();
+        if (thrift_meta.row_groups.empty()) {
+            return Status::OK();
+        }
+
+        std::unordered_map<std::string, const FieldSchema*> path_map;
+        const auto& fields = metadata->schema().get_fields_schema();
+        for (const auto& field : fields) {
+            build_path_map(field, "", &path_map);
+        }
+
+        for (size_t rg_index = 0; rg_index < thrift_meta.row_groups.size(); 
++rg_index) {
+            const auto& row_group = thrift_meta.row_groups[rg_index];
+            for (size_t col_idx = 0; col_idx < row_group.columns.size(); 
++col_idx) {
+                const auto& column_chunk = row_group.columns[col_idx];
+                if (!column_chunk.__isset.meta_data) {
+                    continue;
+                }
+                const auto& column_meta = column_chunk.meta_data;
+                std::string column_path = 
join_path(column_meta.path_in_schema);
+                const FieldSchema* schema_field = nullptr;
+                auto it = path_map.find(column_path);
+                if (it != path_map.end()) {
+                    schema_field = it->second;
+                }
+
+                auto insert_if_requested =
+                        [&](MetadataColumnIndex idx, auto&& inserter, 
auto&&... args) {
+                            int pos = _slot_pos[idx];
+                            if (pos >= 0) {
+                                inserter(columns[pos], 
std::forward<decltype(args)>(args)...);
+                            }
+                        };
+
+                insert_if_requested(META_FILE_NAME, insert_string,
+                                    column_chunk.__isset.file_path ? 
column_chunk.file_path
+                                                                   : path);
+                insert_if_requested(META_ROW_GROUP_ID, insert_int32,
+                                    static_cast<Int32>(rg_index));
+                insert_if_requested(META_COLUMN_ID, insert_int32, 
static_cast<Int32>(col_idx));
+                std::string column_name = column_meta.path_in_schema.empty()
+                                                  ? ""
+                                                  : 
column_meta.path_in_schema.back();
+                insert_if_requested(META_COLUMN_NAME, insert_string, 
column_name);
+                insert_if_requested(META_COLUMN_PATH, insert_string, 
column_path);
+                insert_if_requested(META_PHYSICAL_TYPE, insert_string,
+                                    physical_type_to_string(column_meta.type));
+
+                if (schema_field != nullptr) {
+                    insert_if_requested(META_LOGICAL_TYPE, insert_string,
+                                        
logical_type_to_string(schema_field->parquet_schema));
+                    int32_t type_length = 
schema_field->parquet_schema.__isset.type_length
+                                                  ? 
schema_field->parquet_schema.type_length
+                                                  : 0;
+                    insert_if_requested(META_TYPE_LENGTH, insert_int32, 
type_length);
+                    if (schema_field->parquet_schema.__isset.converted_type) {
+                        insert_if_requested(META_CONVERTED_TYPE, insert_string,
+                                            converted_type_to_string(
+                                                    
schema_field->parquet_schema.converted_type));
+                    } else {
+                        insert_if_requested(META_CONVERTED_TYPE, 
insert_string, "");
+                    }
+                } else {
+                    insert_if_requested(META_LOGICAL_TYPE, insert_string, "");
+                    insert_if_requested(META_TYPE_LENGTH, insert_int32, 0);
+                    insert_if_requested(META_CONVERTED_TYPE, insert_string, 
"");
+                }
+
+                insert_if_requested(META_NUM_VALUES, insert_int64, 
column_meta.num_values);
+                if (column_meta.__isset.statistics &&
+                    column_meta.statistics.__isset.null_count) {
+                    insert_if_requested(META_NULL_COUNT, insert_int64,
+                                        column_meta.statistics.null_count);
+                } else {
+                    insert_if_requested(META_NULL_COUNT, insert_null);
+                }
+                if (column_meta.__isset.statistics &&
+                    column_meta.statistics.__isset.distinct_count) {
+                    insert_if_requested(META_DISTINCT_COUNT, insert_int64,
+                                        column_meta.statistics.distinct_count);
+                } else {
+                    insert_if_requested(META_DISTINCT_COUNT, insert_null);
+                }
+
+                insert_if_requested(META_ENCODINGS, insert_string,
+                                    
encodings_to_string(column_meta.encodings));
+                insert_if_requested(META_COMPRESSION, insert_string,
+                                    compression_to_string(column_meta.codec));
+                insert_if_requested(META_DATA_PAGE_OFFSET, insert_int64,
+                                    column_meta.data_page_offset);
+                if (column_meta.__isset.index_page_offset) {
+                    insert_if_requested(META_INDEX_PAGE_OFFSET, insert_int64,
+                                        column_meta.index_page_offset);
+                } else {
+                    insert_if_requested(META_INDEX_PAGE_OFFSET, insert_null);
+                }
+                if (column_meta.__isset.dictionary_page_offset) {
+                    insert_if_requested(META_DICTIONARY_PAGE_OFFSET, 
insert_int64,
+                                        column_meta.dictionary_page_offset);
+                } else {
+                    insert_if_requested(META_DICTIONARY_PAGE_OFFSET, 
insert_null);
+                }
+                insert_if_requested(META_TOTAL_COMPRESSED_SIZE, insert_int64,
+                                    column_meta.total_compressed_size);
+                insert_if_requested(META_TOTAL_UNCOMPRESSED_SIZE, insert_int64,
+                                    column_meta.total_uncompressed_size);
+
+                if (column_meta.__isset.statistics) {
+                    static const cctz::time_zone kUtc0 = cctz::utc_time_zone();
+                    const cctz::time_zone& ctz = _state != nullptr ? 
_state->timezone_obj() : kUtc0;
+
+                    std::string min_value;
+                    std::string max_value;
+                    bool has_min = try_get_statistics_encoded_value(
+                            column_meta.statistics, true, &min_value);
+                    bool has_max = try_get_statistics_encoded_value(
+                            column_meta.statistics, false, &max_value);
+
+                    if (has_min) {
+                        std::string decoded =
+                                decode_statistics_value(schema_field, 
column_meta.type, min_value, ctz);
+                        insert_if_requested(META_STATISTICS_MIN, 
insert_string, decoded);
+                    } else {
+                        insert_if_requested(META_STATISTICS_MIN, insert_null);
+                    }
+                    if (has_max) {
+                        std::string decoded =
+                                decode_statistics_value(schema_field, 
column_meta.type, max_value, ctz);
+                        insert_if_requested(META_STATISTICS_MAX, 
insert_string, decoded);
+                    } else {
+                        insert_if_requested(META_STATISTICS_MAX, insert_null);
+                    }
+                } else {
+                    insert_if_requested(META_STATISTICS_MIN, insert_null);
+                    insert_if_requested(META_STATISTICS_MAX, insert_null);
+                }
+            }
+        }
+        return Status::OK();
+    }
+
+private:
+    std::array<int, META_COLUMN_COUNT> _slot_pos {};
+};
+
+ParquetMetadataReader::ParquetMetadataReader(std::vector<SlotDescriptor*> 
slots,
+                                             RuntimeState* state, 
RuntimeProfile* profile,
+                                             TMetaScanRange scan_range)
+        : _state(state), _slots(std::move(slots)), 
_scan_range(std::move(scan_range)) {
+    (void)profile;
+}
+
+ParquetMetadataReader::~ParquetMetadataReader() = default;
+
+Status ParquetMetadataReader::init_reader() {
+    RETURN_IF_ERROR(_init_from_scan_range(_scan_range));
+    if (_mode_type == Mode::SCHEMA) {
+        _mode_handler = std::make_unique<ParquetSchemaModeHandler>(_state);
+    } else {
+        _mode_handler = std::make_unique<ParquetMetadataModeHandler>(_state);
+    }
+    _mode_handler->init_slot_pos_map(_slots);
+    return Status::OK();
+}
+
+Status ParquetMetadataReader::_init_from_scan_range(const TMetaScanRange& 
scan_range) {
+    if (!scan_range.__isset.parquet_params) {
+        return Status::InvalidArgument(
+                "Missing parquet parameters for parquet_metadata table 
function");
+    }
+    const TParquetMetadataParams& params = scan_range.parquet_params;
+    if (!params.__isset.paths || params.paths.empty()) {
+        return Status::InvalidArgument("Property 'path' must be set for 
parquet_metadata");

Review Comment:
   Error messages in the backend reference "parquet_metadata" instead of the 
actual function name "parquet_meta". This is inconsistent with the function 
name defined on the frontend (NAME = "parquet_meta"). Update these error 
messages to use "parquet_meta" for consistency.
   ```suggestion
                   "Missing parquet parameters for parquet_meta table 
function");
       }
       const TParquetMetadataParams& params = scan_range.parquet_params;
       if (!params.__isset.paths || params.paths.empty()) {
           return Status::InvalidArgument("Property 'path' must be set for 
parquet_meta");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to