Copilot commented on code in PR #60116: URL: https://github.com/apache/doris/pull/60116#discussion_r2980526691
########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java: ########## @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String URI = "http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream"; + + public CdcStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException { + validate(properties); + processProps(properties); + } + + private void processProps(Map<String, String> properties) throws AnalysisException { + Map<String, String> copyProps = new HashMap<>(properties); + copyProps.put("format", "json"); + super.parseCommonProperties(copyProps); + this.processedParams.put("enable_cdc_client", "true"); + this.processedParams.put("uri", URI); + this.processedParams.put("http.enable.range.request", "false"); + this.processedParams.put("http.enable.chunk.response", "true"); + this.processedParams.put("http.method", "POST"); + + String payload = generateParams(properties); + this.processedParams.put("http.payload", payload); + this.backendConnectProperties.putAll(processedParams); + generateFileStatus(); + } + + private String generateParams(Map<String, String> properties) throws AnalysisException { + FetchRecordRequest recordRequest = new FetchRecordRequest(); + recordRequest.setJobId(UUID.randomUUID().toString().replace("-", "")); + recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); + recordRequest.setConfig(properties); + try { + return objectMapper.writeValueAsString(recordRequest); + } catch (IOException e) { + LOG.info("Failed to serialize fetch record request," + e.getMessage()); + throw new AnalysisException(e.getMessage()); Review Comment: `generateParams` logs at INFO and drops the exception cause, then throws a new `AnalysisException` with only `e.getMessage()`. This makes diagnosing serialization failures harder (no stack trace / context). Consider logging with the throwable and rethrowing an `AnalysisException` that includes a clear message plus the original cause. ```suggestion LOG.warn("Failed to serialize fetch record request", e); throw new AnalysisException("Failed to serialize fetch record request: " + e.getMessage(), e); ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java: ########## @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.CdcStreamTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** + * CdcStream TVF. + */ +public class CdcStream extends TableValuedFunction { + + public CdcStream(Properties tvfProperties) { + super("cdc_stream", tvfProperties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map<String, String> arguments = getTVFProperties().getMap(); + return new CdcStreamTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build CdcStreamTableValuedFunction by " Review Comment: Spelling/grammar: prefer “Cannot” over “Can not” in user-facing exception text. ```suggestion throw new AnalysisException("Cannot build CdcStreamTableValuedFunction by " ``` ########## be/src/io/fs/http_file_reader.cpp: ########## @@ -224,9 +276,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len << " with_range=" << _range_supported; - // Prepare and initialize the HTTP client for GET request + // Prepare and initialize the HTTP client for request RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false)); - _client->set_method(HttpMethod::GET); + + // Determine HTTP method from configuration (default: GET) + HttpMethod method = HttpMethod::GET; + auto method_iter = _extend_kv.find("http.method"); + if (method_iter != _extend_kv.end()) { + method = to_http_method(method_iter->second.c_str()); + if (method == HttpMethod::UNKNOWN) { + LOG(WARNING) << "Invalid http.method value: " << method_iter->second + << ", falling back to GET"; + method = HttpMethod::GET; + } + } + _client->set_method(method); + + // Set payload if configured (supports POST, PUT, DELETE, etc.) + auto payload_iter = _extend_kv.find("http.payload"); + if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) { + _client->set_payload(payload_iter->second); + _client->set_content_type("application/json"); + VLOG(2) << "HTTP request with payload, size=" << payload_iter->second.size(); + } Review Comment: `HttpFileReader` now honors user-supplied `http.method` and `http.payload`, which allows arbitrary non-GET requests (with bodies) from BE to any HTTP endpoint. This expands the attack surface beyond read-only SSRF (e.g. POST/DELETE side effects). Consider restricting these options to the CDC path only (e.g. require `enable_cdc_client=true` and/or only allow loopback URLs), and/or whitelist allowed methods (likely just POST) and reject others explicitly. ########## fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java: ########## @@ -461,7 +461,7 @@ private static String getRemoteDbName(DataSourceType sourceType, Map<String, Str Preconditions.checkArgument(StringUtils.isNotEmpty(remoteDb), "schema is required"); break; default: - throw new JobException("Unsupported source type " + sourceType); + throw new RuntimeException("Unsupported source type " + sourceType); } Review Comment: `getRemoteDbName` used to throw `JobException` (checked) but now throws/returns `RuntimeException`, and it’s called from job code paths like `generateCreateTableCmds(...)` that declare `throws JobException`. This can bypass existing error handling/reporting for streaming jobs. Consider keeping `JobException` here (or wrapping unsupported source type / missing schema|database in `JobException`) and let the TVF convert that to an `AnalysisException` at its boundary. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java: ########## @@ -107,6 +109,21 @@ public static Properties getDefaultDebeziumProps() { return properties; } + public static String[] getTableList(String schema, Map<String, String> cdcConfig) { + String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); + String table = cdcConfig.get(DataSourceConfigKeys.TABLE); + if (StringUtils.isNotEmpty(includingTables)) { + return Arrays.stream(includingTables.split(",")) + .map(t -> schema + "." + t.trim()) + .toArray(String[]::new); + } else if (StringUtils.isNotEmpty(table)) { + Preconditions.checkArgument(!table.contains(","), "table only supports one table"); + return new String[] {schema + "." + table.trim()}; Review Comment: `getTableList()` trims tokens but doesn’t filter out empty entries (e.g. `include_tables="t1, ,t2"` will produce `schema.""`). Consider filtering blank table names after `trim()` (and possibly validating against `.` to prevent already-qualified names) to fail fast with a clearer error. ```suggestion String[] tableNames = Arrays.stream(includingTables.split(",")) .map(String::trim) .filter(t -> !t.isEmpty()) .toArray(String[]::new); Preconditions.checkArgument(tableNames.length > 0, "include_tables must contain at least one non-blank table name"); for (String t : tableNames) { Preconditions.checkArgument(!t.contains("."), "Table name should not be schema-qualified in include_tables: %s", t); } return Arrays.stream(tableNames) .map(t -> schema + "." + t) .toArray(String[]::new); } else if (StringUtils.isNotEmpty(table)) { Preconditions.checkArgument(!table.contains(","), "table only supports one table"); String trimmedTable = table.trim(); Preconditions.checkArgument(!trimmedTable.isEmpty(), "table must not be blank"); Preconditions.checkArgument(!trimmedTable.contains("."), "Table name should not be schema-qualified in table: %s", trimmedTable); return new String[] {schema + "." + trimmedTable}; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
