Copilot commented on code in PR #59461:
URL: https://github.com/apache/doris/pull/59461#discussion_r2652516431


##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_dup.groovy:
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_streaming_postgres_job_dup", 
"p0,external,mysql,external_docker,external_docker_mysql") {

Review Comment:
   The suite decorator specifies "mysql,external_docker,external_docker_mysql" 
but this test is for PostgreSQL. It should use 
"pg,external_docker,external_docker_pg" instead.



##########
docker/thirdparties/docker-compose/postgresql/init/01-create-schema.sql:
##########
@@ -17,3 +17,4 @@
 
 create schema doris_test;
 create schema catalog_pg_test;
+create schema cdc_test

Review Comment:
   Missing semicolon at the end of the CREATE SCHEMA statement. SQL statements 
should be properly terminated.
   ```suggestion
   create schema cdc_test;
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy:
##########
@@ -25,7 +25,7 @@ suite("test_streaming_mysql_job_exclude", 
"p0,external,mysql,external_docker,ext
     def currentDb = (sql "select database()")[0][0]
     def table1 = "user_info_exclude1"
     def table2 = "user_info_exclude2"
-    def mysqlDb = "test_cdc_db"
+    def mysqlDb = "test_cdc_exclude_db"

Review Comment:
   The variable is named "mysqlDb" but this is a PostgreSQL test. It should be 
renamed to "pgDB" or similar for consistency and clarity, matching the naming 
convention used in other test files.



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -264,14 +265,15 @@ public static List<CreateTableCommand> 
generateCreateTableCmds(String targetDb,
                 continue;
             }
 
-            if (excludeTables != null && excludeTables.contains(table)) {
+            // if set include_tables, exclude_tables is ignored
+            if (!excludeTablesList.isEmpty() && excludeTables.contains(table)) 
{

Review Comment:
   The condition should check if includeTablesList is empty before checking 
excludeTablesList. When include_tables is not set but exclude_tables is set, 
this condition incorrectly evaluates whether to exclude tables. The logic 
should be: if include_tables is empty AND exclude_tables contains the table, 
then skip it.
   ```suggestion
               if (includeTablesList.isEmpty() && !excludeTablesList.isEmpty()
                       && excludeTablesList.contains(table)) {
   ```



##########
.github/workflows/build-extension.yml:
##########
@@ -92,6 +95,41 @@ jobs:
       - name: Build broker
         run: |
           cd fs_brokers/apache_hdfs_broker/ && /bin/bash build.sh
+  build-cdc-client:
+    name: Build Cdc Client
+    needs: changes
+    if: ${{ needs.changes.outputs.cdc_client_changes == 'true' }}
+    runs-on: ubuntu-latest
+    steps:
+      - name: Checkout ${{ github.ref }}
+        uses: actions/checkout@v3
+
+      - name: Setup java
+        uses: actions/setup-java@v2
+        with:
+          distribution: adopt
+          java-version: '17'
+
+      - name: Setup thrift
+        run: |
+          pushd thirdparty
+          branch="${{ github.base_ref }}"
+          if [[ -z "${branch}" ]] || [[ "${branch}" == 'master' || "${branch}" 
== 'branch-4.0' || "${branch}" == 'branch-3.0' || "${branch}" == 'branch-2.1' 
]]; then
+            curl -L 
https://github.com/apache/doris-thirdparty/releases/download/automation/doris-thirdparty-prebuilt-linux-x86_64.tar.xz
 \
+              -o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
+          else
+            curl -L 
"https://github.com/apache/doris-thirdparty/releases/download/automation-${branch/branch-/}/doris-thirdparty-prebuilt-linux-x86_64.tar.xz";
 \
+              -o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
+          fi
+          tar -xvf doris-thirdparty-prebuilt-linux-x86_64.tar.xz
+          popd
+          export 
PATH="${DEFAULT_DIR}/ldb-toolchain/bin/:$(pwd)/thirdparty/installed/bin/:${PATH}"
+          
+          thrift --version
+
+      - name: Build cdc client
+        run: |
+          cd fs_brokers/cdc_client/ && /bin/bash build.sh                 

Review Comment:
   Trailing whitespace should be removed for consistency with code formatting 
standards.
   ```suggestion
             cd fs_brokers/cdc_client/ && /bin/bash build.sh
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,387 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresSourceReader.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public PostgresSourceReader() {
+        super();
+    }
+
+    @Override
+    public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
+        PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        createSlotForGlobalStreamSplit(dialect);
+        super.initialize(jobId, dataSource, config);
+    }
+
+    /**
+     * copy from org.apache.flink.cdc.connectors.postgres.source
+     * .enumerator.PostgresSourceEnumerator.createSlotForGlobalStreamSplit
+     *
+     * <p>Create slot for the unique global stream split.
+     *
+     * <p>Currently all startup modes need read the stream split. We need open 
the slot before
+     * reading the globalStreamSplit to catch all data changes.
+     */
+    private void createSlotForGlobalStreamSplit(PostgresDialect 
postgresDialect) {
+        try (PostgresConnection connection = 
postgresDialect.openJdbcConnection()) {
+            SlotState slotInfo =
+                    connection.getReplicationSlotState(
+                            postgresDialect.getSlotName(), 
postgresDialect.getPluginName());
+            // skip creating the replication slot when the slot exists.
+            if (slotInfo != null) {
+                return;
+            }
+            PostgresReplicationConnection replicationConnection =
+                    
postgresDialect.openPostgresReplicationConnection(connection);
+            replicationConnection.createReplicationSlot();
+            replicationConnection.close(false);
+
+        } catch (Throwable t) {
+            throw new CdcClientException(
+                    String.format(
+                            "Fail to get or create slot for global stream 
split, the slot name is %s. Due to: ",
+                            postgresDialect.getSlotName()),
+                    t);
+        }
+    }
+
+    @Override
+    protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
+        return generatePostgresConfig(config);
+    }
+
+    /** Generate PostgreSQL source config from JobBaseConfig */
+    private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) {
+        return generatePostgresConfig(config.getConfig(), config.getJobId());
+    }
+
+    /** Generate PostgreSQL source config from Map config */
+    private PostgresSourceConfig generatePostgresConfig(Map<String, String> 
cdcConfig, Long jobId) {
+        PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
+
+        // Parse JDBC URL to extract connection info
+        String jdbcUrl = cdcConfig.get(DataSourceConfigKeys.JDBC_URL);
+        Preconditions.checkNotNull(jdbcUrl, "jdbc_url is required");
+
+        // PostgreSQL JDBC URL format: jdbc:postgresql://host:port/database
+        Properties props = Driver.parseURL(jdbcUrl, null);
+        Preconditions.checkNotNull(props, "Invalid JDBC URL: " + jdbcUrl);
+
+        String hostname = props.getProperty("PGHOST");
+        String port = props.getProperty("PGPORT");
+        String database = props.getProperty("PGDBNAME");
+        Preconditions.checkNotNull(hostname, "host is required");
+        Preconditions.checkNotNull(port, "port is required");
+        Preconditions.checkNotNull(database, "database is required");
+
+        configFactory.hostname(hostname);
+        configFactory.port(Integer.parseInt(port));
+        configFactory.username(cdcConfig.get(DataSourceConfigKeys.USER));
+        configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD));
+        configFactory.database(database);
+
+        String schema = cdcConfig.get(DataSourceConfigKeys.SCHEMA);
+        Preconditions.checkNotNull(schema, "schema is required");
+        configFactory.schemaList(new String[] {schema});
+        configFactory.includeSchemaChanges(false);
+
+        // Set table list
+        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
+        if (StringUtils.isNotEmpty(includingTables)) {
+            String[] includingTbls =
+                    Arrays.stream(includingTables.split(","))
+                            .map(t -> schema + "." + t.trim())
+                            .toArray(String[]::new);
+            configFactory.tableList(includingTbls);
+        }
+
+        // Set startup options
+        String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
+        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
+            configFactory.startupOptions(StartupOptions.initial());
+        } else if 
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
+            configFactory.startupOptions(StartupOptions.earliest());
+        } else if 
(DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
+            configFactory.startupOptions(StartupOptions.latest());
+        } else if (ConfigUtil.isJson(startupMode)) {
+            throw new RuntimeException("Unsupported json offset " + 
startupMode);
+        } else if (ConfigUtil.is13Timestamp(startupMode)) {
+            // start from timestamp
+            Long ts = Long.parseLong(startupMode);
+            configFactory.startupOptions(StartupOptions.timestamp(ts));
+        } else {
+            throw new RuntimeException("Unknown offset " + startupMode);
+        }
+
+        // Set split size if provided
+        if (cdcConfig.containsKey(DataSourceConfigKeys.SPLIT_SIZE)) {
+            configFactory.splitSize(
+                    
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE)));
+        }
+
+        Properties dbzProps = new Properties();
+        dbzProps.put("interval.handling.mode", "string");
+        configFactory.debeziumProperties(dbzProps);
+
+        configFactory.serverTimeZone(
+                
ConfigUtil.getPostgresServerTimeZoneFromProps(props).toString());
+        configFactory.slotName(getSlotName(jobId));
+        configFactory.decodingPluginName("pgoutput");
+        // 
configFactory.heartbeatInterval(Duration.ofMillis(Constants.POLL_SPLIT_RECORDS_TIMEOUTS));
+        return configFactory.create(0);
+    }
+
+    private String getSlotName(Long jobId) {
+        return "doris_cdc_" + jobId;
+    }
+
+    @Override
+    protected IncrementalSourceScanFetcher 
getSnapshotSplitReader(JobBaseConfig config) {
+        PostgresSourceConfig sourceConfig = getSourceConfig(config);
+        IncrementalSourceScanFetcher snapshotReader = this.getSnapshotReader();
+        if (snapshotReader == null) {
+            PostgresDialect dialect = new PostgresDialect(sourceConfig);
+            PostgresSourceFetchTaskContext taskContext =
+                    new PostgresSourceFetchTaskContext(sourceConfig, dialect);
+            snapshotReader = new IncrementalSourceScanFetcher(taskContext, 0);
+            this.setSnapshotReader(snapshotReader);
+        }
+        return snapshotReader;
+    }
+
+    @Override
+    protected IncrementalSourceStreamFetcher 
getBinlogSplitReader(JobBaseConfig config) {
+        PostgresSourceConfig sourceConfig = getSourceConfig(config);
+        IncrementalSourceStreamFetcher binlogReader = this.getBinlogReader();
+        if (binlogReader == null) {
+            PostgresDialect dialect = new PostgresDialect(sourceConfig);
+            PostgresSourceFetchTaskContext taskContext =
+                    new PostgresSourceFetchTaskContext(sourceConfig, dialect);
+            binlogReader = new IncrementalSourceStreamFetcher(taskContext, 0);
+            this.setBinlogReader(binlogReader);
+        }
+        return binlogReader;
+    }
+
+    @Override
+    protected OffsetFactory getOffsetFactory() {
+        return new PostgresOffsetFactory();
+    }
+
+    @Override
+    protected Offset createOffset(Map<String, ?> offset) {
+        return PostgresOffset.of(offset);
+    }
+
+    @Override
+    protected Offset createInitialOffset() {
+        return PostgresOffset.INITIAL_OFFSET;
+    }
+
+    @Override
+    protected Offset createNoStoppingOffset() {
+        return PostgresOffset.NO_STOPPING_OFFSET;
+    }
+
+    @Override
+    protected JdbcDataSourceDialect getDialect(JdbcSourceConfig sourceConfig) {
+        return new PostgresDialect((PostgresSourceConfig) sourceConfig);
+    }
+
+    @Override
+    protected DataType fromDbzColumn(Column splitColumn) {
+        return PostgresTypeUtils.fromDbzColumn(splitColumn);
+    }
+
+    /**
+     * Why not call dialect.displayCurrentOffset(sourceConfig) ? The 
underlying system calls
+     * `txid_current()` to advance the WAL log. Here, it's just a query; 
retrieving the LSN is
+     * sufficient because `PostgresOffset.compare` only compares the LSN.
+     */
+    @Override
+    public Map<String, String> getEndOffset(JobBaseConfig jobConfig) {
+        PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
+        try {
+            PostgresDialect dialect = new PostgresDialect(sourceConfig);
+            try (JdbcConnection jdbcConnection = 
dialect.openJdbcConnection(sourceConfig)) {
+                PostgresConnection pgConnection = (PostgresConnection) 
jdbcConnection;
+                Long lsn = pgConnection.currentXLogLocation();
+                Map<String, String> offsetMap = new HashMap<>();
+                offsetMap.put(SourceInfo.LSN_KEY, lsn.toString());
+                offsetMap.put(
+                        SourceInfo.TIMESTAMP_USEC_KEY,
+                        
String.valueOf(Conversions.toEpochMicros(Instant.MIN)));
+                return offsetMap;
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public int compareOffset(CompareOffsetRequest compareOffsetRequest) {
+        Map<String, String> offsetFirst = 
compareOffsetRequest.getOffsetFirst();
+        Map<String, String> offsetSecond = 
compareOffsetRequest.getOffsetSecond();
+
+        PostgresOffset postgresOffset1 = PostgresOffset.of(offsetFirst);
+        PostgresOffset postgresOffset2 = PostgresOffset.of(offsetSecond);
+        return postgresOffset1.compareTo(postgresOffset2);
+    }
+
+    @Override
+    protected Map<TableId, TableChanges.TableChange> 
discoverTableSchemas(JobBaseConfig config) {
+        PostgresSourceConfig sourceConfig = getSourceConfig(config);
+        try {
+            PostgresDialect dialect = new PostgresDialect(sourceConfig);
+            try (JdbcConnection jdbcConnection = 
dialect.openJdbcConnection(sourceConfig)) {
+                List<TableId> tableIds =
+                        TableDiscoveryUtils.listTables(
+                                sourceConfig.getDatabaseList().get(0),
+                                jdbcConnection,
+                                sourceConfig.getTableFilters(),
+                                sourceConfig.includePartitionedTables());
+                CustomPostgresSchema customPostgresSchema =
+                        new CustomPostgresSchema((PostgresConnection) 
jdbcConnection, sourceConfig);
+                return customPostgresSchema.getTableSchema(tableIds);
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    protected FetchTask<SourceSplitBase> createFetchTaskFromSplit(
+            JobBaseConfig jobConfig, SourceSplitBase split) {
+        PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        FetchTask<SourceSplitBase> fetchTask = dialect.createFetchTask(split);
+        return fetchTask;
+    }
+
+    /**
+     * This commit commits values up to the startOffset of the current split; 
even if

Review Comment:
   The comment has a typo "commites" should be "commits".
   ```suggestion
        * This method commits values up to the startOffset of the current 
split; even if
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -0,0 +1,730 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
+import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import 
org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner;
+import 
org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
+import 
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplitState;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplitState;
+import 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import org.apache.flink.cdc.connectors.base.source.reader.external.Fetcher;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+import static 
org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit.STREAM_SPLIT_ID;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public abstract class JdbcIncrementalSourceReader implements SourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
+    private static ObjectMapper objectMapper = new ObjectMapper();
+    private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
+    private IncrementalSourceScanFetcher snapshotReader;
+    private IncrementalSourceStreamFetcher binlogReader;
+    private Fetcher<SourceRecords, SourceSplitBase> currentReader;
+    private Map<TableId, TableChanges.TableChange> tableSchemas;
+    private SplitRecords currentSplitRecords;
+    private SourceSplitBase currentSplit;
+    protected FetchTask<SourceSplitBase> currentFetchTask;
+
+    public JdbcIncrementalSourceReader() {
+        this.serializer = new DebeziumJsonDeserializer();
+    }
+
+    @Override
+    public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
+        this.serializer.init(config);
+    }
+
+    @Override
+    public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
+        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
+        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+                remainingSnapshotSplits = new ArrayList<>();
+        StreamSplit remainingStreamSplit = null;
+
+        // Check startup mode - for PostgreSQL, we use similar logic as MySQL
+        String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
+        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
+            remainingSnapshotSplits =
+                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
+        } else {
+            // For non-initial mode, create a stream split
+            Offset startingOffset = createInitialOffset();
+            remainingStreamSplit =
+                    new StreamSplit(
+                            STREAM_SPLIT_ID,
+                            startingOffset,
+                            createNoStoppingOffset(),
+                            new ArrayList<>(),
+                            new HashMap<>(),
+                            0);
+        }
+
+        List<AbstractSourceSplit> splits = new ArrayList<>();
+        if (!remainingSnapshotSplits.isEmpty()) {
+            for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
+                    snapshotSplit : remainingSnapshotSplits) {
+                String splitId = snapshotSplit.splitId();
+                String tableId = snapshotSplit.getTableId().identifier();
+                Object[] splitStart = snapshotSplit.getSplitStart();
+                Object[] splitEnd = snapshotSplit.getSplitEnd();
+                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
+                SnapshotSplit split =
+                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
+                splits.add(split);
+            }
+        } else {
+            Offset startingOffset = remainingStreamSplit.getStartingOffset();
+            BinlogSplit streamSplit = new BinlogSplit();
+            streamSplit.setSplitId(remainingStreamSplit.splitId());
+            streamSplit.setStartingOffset(startingOffset.getOffset());
+            splits.add(streamSplit);
+        }
+        return splits;
+    }
+
+    @Override
+    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+        Map<String, Object> offsetMeta = baseReq.getMeta();
+        if (offsetMeta == null || offsetMeta.isEmpty()) {
+            throw new RuntimeException("miss meta offset");
+        }
+        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
+
+        //  If there is an active split being consumed, reuse it directly;
+        //  Otherwise, create a new snapshot/stream split based on offset and 
start the reader.
+        SourceSplitBase split = null;
+        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
+        if (currentSplitRecords == null) {
+            Fetcher<SourceRecords, SourceSplitBase> currentReader = 
this.getCurrentReader();
+            if (currentReader == null || baseReq.isReload()) {
+                LOG.info(
+                        "No current reader or reload {}, create new split 
reader",
+                        baseReq.isReload());
+                // build split
+                Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
+                split = splitFlag.f0;
+                closeBinlogReader();
+                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
+                this.setCurrentSplitRecords(currentSplitRecords);
+                this.setCurrentSplit(split);
+            } else if (currentReader instanceof 
IncrementalSourceStreamFetcher) {
+                LOG.info("Continue poll records with current binlog reader");
+                // only for binlog reader
+                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
+                split = this.getCurrentSplit();
+            } else {
+                throw new RuntimeException("Should not happen");
+            }
+        } else {
+            LOG.info(
+                    "Continue read records with current split records, 
splitId: {}",
+                    currentSplitRecords.getSplitId());
+        }
+
+        // build response with iterator
+        SplitReadResult result = new SplitReadResult();
+        SourceSplitState currentSplitState = null;
+        SourceSplitBase currentSplit = this.getCurrentSplit();
+        if (currentSplit.isSnapshotSplit()) {
+            currentSplitState = new 
SnapshotSplitState(currentSplit.asSnapshotSplit());
+        } else {
+            currentSplitState = new 
StreamSplitState(currentSplit.asStreamSplit());
+        }
+
+        Iterator<SourceRecord> filteredIterator =
+                new FilteredRecordIterator(currentSplitRecords, 
currentSplitState);
+
+        result.setRecordIterator(filteredIterator);
+        result.setSplitState(currentSplitState);
+        result.setSplit(split);
+        return result;
+    }
+
+    protected abstract DataType fromDbzColumn(Column splitColumn);
+
+    protected abstract Fetcher<SourceRecords, SourceSplitBase> 
getSnapshotSplitReader(
+            JobBaseConfig jobConfig);
+
+    protected abstract Fetcher<SourceRecords, SourceSplitBase> 
getBinlogSplitReader(
+            JobBaseConfig jobConfig);
+
+    protected abstract OffsetFactory getOffsetFactory();
+
+    protected abstract Offset createOffset(Map<String, ?> offset);
+
+    protected abstract Offset createInitialOffset();
+
+    protected abstract Offset createNoStoppingOffset();
+
+    protected abstract JdbcDataSourceDialect getDialect(JdbcSourceConfig 
sourceConfig);
+
+    protected Tuple2<SourceSplitBase, Boolean> createSourceSplit(
+            Map<String, Object> offsetMeta, JobBaseConfig jobConfig) {
+        Tuple2<SourceSplitBase, Boolean> splitRes = null;
+        String splitId = String.valueOf(offsetMeta.get(SPLIT_ID));
+        if (!BinlogSplit.BINLOG_SPLIT_ID.equals(splitId)) {
+            
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit split =
+                    createSnapshotSplit(offsetMeta, jobConfig);
+            splitRes = Tuple2.of(split, false);
+        } else {
+            splitRes = createStreamSplit(offsetMeta, jobConfig);
+        }
+        return splitRes;
+    }
+
+    private 
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
+            createSnapshotSplit(Map<String, Object> offset, JobBaseConfig 
jobConfig) {
+        SnapshotSplit snapshotSplit = objectMapper.convertValue(offset, 
SnapshotSplit.class);
+        TableId tableId = TableId.parse(snapshotSplit.getTableId(), false);
+        Object[] splitStart = snapshotSplit.getSplitStart();
+        Object[] splitEnd = snapshotSplit.getSplitEnd();
+        List<String> splitKeys = snapshotSplit.getSplitKey();
+        Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(jobConfig);
+        TableChanges.TableChange tableChange = tableSchemas.get(tableId);
+        Preconditions.checkNotNull(
+                tableChange, "Can not find table " + tableId + " in job " + 
jobConfig.getJobId());
+        // only support one split key
+        String splitKey = splitKeys.get(0);
+        io.debezium.relational.Column splitColumn = 
tableChange.getTable().columnWithName(splitKey);
+        RowType splitType = getSplitType(splitColumn);
+        org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit 
split =
+                new 
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit(
+                        tableId,
+                        snapshotSplit.getSplitId(),
+                        splitType,
+                        splitStart,
+                        splitEnd,
+                        null,
+                        tableSchemas);
+        return split;
+    }
+
+    private RowType getSplitType(Column splitColumn) {
+        return (RowType)
+                DataTypes.ROW(
+                                new DataTypes.Field[] {
+                                    DataTypes.FIELD(
+                                            splitColumn.name(), 
this.fromDbzColumn(splitColumn))
+                                })
+                        .getLogicalType();
+    }
+
+    private Tuple2<SourceSplitBase, Boolean> createStreamSplit(
+            Map<String, Object> meta, JobBaseConfig config) {
+        BinlogSplit streamSplit = objectMapper.convertValue(meta, 
BinlogSplit.class);
+        List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new 
ArrayList<>();
+        Offset minOffsetFinishSplits = null;
+        Offset maxOffsetFinishSplits = null;
+        if (CollectionUtils.isNotEmpty(streamSplit.getFinishedSplits())) {
+            List<SnapshotSplit> splitWithHW = streamSplit.getFinishedSplits();
+            List<SnapshotSplit> assignedSplitLists =
+                    splitWithHW.stream()
+                            
.sorted(Comparator.comparing(AbstractSourceSplit::getSplitId))
+                            .toList();
+
+            for (SnapshotSplit split : assignedSplitLists) {
+                // find the min offset
+                Map<String, String> offsetMap = split.getHighWatermark();
+                Offset sourceOffset = createOffset(offsetMap);
+                if (minOffsetFinishSplits == null || 
sourceOffset.isBefore(minOffsetFinishSplits)) {
+                    minOffsetFinishSplits = sourceOffset;
+                }
+                if (maxOffsetFinishSplits == null || 
sourceOffset.isAfter(maxOffsetFinishSplits)) {
+                    maxOffsetFinishSplits = sourceOffset;
+                }
+                finishedSnapshotSplitInfos.add(
+                        new FinishedSnapshotSplitInfo(
+                                TableId.parse(split.getTableId()),
+                                split.getSplitId(),
+                                split.getSplitStart(),
+                                split.getSplitEnd(),
+                                sourceOffset,
+                                getOffsetFactory()));
+            }
+        }
+
+        Offset startOffset;
+        Offset lastOffset =
+                createOffset(
+                        streamSplit.getStartingOffset() == null
+                                ? new HashMap<>()
+                                : streamSplit.getStartingOffset());
+        if (minOffsetFinishSplits != null && lastOffset.getOffset().isEmpty()) 
{
+            startOffset = minOffsetFinishSplits;
+        } else if (!lastOffset.getOffset().isEmpty()) {
+            lastOffset.getOffset().remove(SPLIT_ID);
+            startOffset = lastOffset;
+        } else {
+            // The input offset from params is empty
+            JdbcSourceConfig sourceConfig = getSourceConfig(config);
+            startOffset = getStartOffsetFromConfig(sourceConfig);
+        }
+
+        boolean pureStreamPhase = false;
+        if (maxOffsetFinishSplits == null) {
+            pureStreamPhase = true;
+        } else if (startOffset.isAtOrAfter(maxOffsetFinishSplits)) {
+            // All the offsets of the current split are smaller than the 
offset of the stream,
+            // indicating that the stream phase has been fully entered.
+            pureStreamPhase = true;
+            LOG.info(
+                    "The stream phase has been fully entered, the current 
split is: {}",
+                    startOffset);
+        }
+
+        StreamSplit split =
+                new StreamSplit(
+                        STREAM_SPLIT_ID,
+                        startOffset,
+                        createNoStoppingOffset(),
+                        finishedSnapshotSplitInfos,
+                        new HashMap<>(),
+                        0);
+        // filterTableSchema
+        StreamSplit streamSplitFinal =
+                StreamSplit.fillTableSchemas(split.asStreamSplit(), 
getTableSchemas(config));
+        return Tuple2.of(streamSplitFinal, pureStreamPhase);
+    }
+
+    private Offset getStartOffsetFromConfig(JdbcSourceConfig sourceConfig) {
+        StartupOptions startupOptions = sourceConfig.getStartupOptions();
+        Offset startingOffset;
+        switch (startupOptions.startupMode) {
+            case LATEST_OFFSET:
+                startingOffset = 
getDialect(sourceConfig).displayCurrentOffset(sourceConfig);
+                break;
+            case EARLIEST_OFFSET:
+                startingOffset = createInitialOffset();
+                break;
+            case TIMESTAMP:
+            case SPECIFIC_OFFSETS:
+            case COMMITTED_OFFSETS:
+            default:
+                throw new IllegalStateException(
+                        "Unsupported startup mode " + 
startupOptions.startupMode);
+        }
+        return startingOffset;
+    }
+
+    private 
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+            startSplitChunks(
+                    JdbcSourceConfig sourceConfig,
+                    String snapshotTable,
+                    Map<String, String> config) {
+        List<TableId> remainingTables = new ArrayList<>();
+        if (snapshotTable != null) {
+            String schema = config.get(DataSourceConfigKeys.SCHEMA);
+            remainingTables.add(new TableId(null, schema, snapshotTable));
+        }
+        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit> 
remainingSplits =
+                new ArrayList<>();
+        HybridSplitAssigner<JdbcSourceConfig> splitAssigner =
+                new HybridSplitAssigner<>(
+                        sourceConfig,
+                        1,
+                        remainingTables,
+                        true,
+                        getDialect(sourceConfig),
+                        getOffsetFactory(),
+                        new MockSplitEnumeratorContext(1));
+        splitAssigner.open();
+        try {
+            while (true) {
+                Optional<SourceSplitBase> split = splitAssigner.getNext();
+                if (split.isPresent()) {
+                    
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
+                            snapshotSplit = split.get().asSnapshotSplit();
+                    remainingSplits.add(snapshotSplit);
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            closeChunkSplitterOnly(splitAssigner);
+        }
+        return remainingSplits;
+    }
+
+    /**
+     * Close only the chunk splitter to avoid closing shared connection pools 
Similar to MySQL
+     * implementation Note: HybridSplitAssigner wraps SnapshotSplitAssigner, 
so we need to get the
+     * inner assigner first
+     */
+    private static void closeChunkSplitterOnly(HybridSplitAssigner<?> 
splitAssigner) {
+        try {
+            // First, get the inner SnapshotSplitAssigner from 
HybridSplitAssigner
+            java.lang.reflect.Field snapshotAssignerField =
+                    
HybridSplitAssigner.class.getDeclaredField("snapshotSplitAssigner");
+            snapshotAssignerField.setAccessible(true);
+            SnapshotSplitAssigner<?> snapshotSplitAssigner =
+                    (SnapshotSplitAssigner<?>) 
snapshotAssignerField.get(splitAssigner);
+
+            if (snapshotSplitAssigner == null) {
+                LOG.warn("snapshotSplitAssigner is null in 
HybridSplitAssigner");
+                return;
+            }
+
+            // Call closeExecutorService() via reflection
+            java.lang.reflect.Method closeExecutorMethod =
+                    
SnapshotSplitAssigner.class.getDeclaredMethod("closeExecutorService");
+            closeExecutorMethod.setAccessible(true);
+            closeExecutorMethod.invoke(snapshotSplitAssigner);
+
+            // Call chunkSplitter.close() via reflection
+            java.lang.reflect.Field chunkSplitterField =
+                    
SnapshotSplitAssigner.class.getDeclaredField("chunkSplitter");
+            chunkSplitterField.setAccessible(true);
+            Object chunkSplitter = 
chunkSplitterField.get(snapshotSplitAssigner);
+
+            if (chunkSplitter != null) {
+                java.lang.reflect.Method closeMethod = 
chunkSplitter.getClass().getMethod("close");
+                closeMethod.invoke(chunkSplitter);
+                LOG.info("Closed Source chunkSplitter JDBC connection");
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to close chunkSplitter via reflection", e);

Review Comment:
   Using reflection to access private fields and methods can cause issues with 
Java module system and future JVM versions. Consider adding public methods to 
the Flink CDC classes or using a more maintainable approach. If the Flink CDC 
library doesn't provide a proper way to close only the chunk splitter, consider 
filing an issue with the upstream project.
   ```suggestion
        * Close resources associated with the given {@link HybridSplitAssigner}.
        *
        * <p>This implementation avoids using reflection to access private 
fields or methods in
        * Flink CDC classes, and instead relies on the public API of the 
assigner.
        */
       private static void closeChunkSplitterOnly(HybridSplitAssigner<?> 
splitAssigner) {
           if (splitAssigner == null) {
               return;
           }
           try {
               // Use the public lifecycle method of the assigner to release 
resources instead of
               // accessing private fields/methods via reflection.
               splitAssigner.close();
               LOG.info("Closed HybridSplitAssigner resources.");
           } catch (Exception e) {
               LOG.warn("Failed to close HybridSplitAssigner resources", e);
   ```



##########
docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl:
##########
@@ -25,6 +25,14 @@ services:
       POSTGRES_PASSWORD: 123456
     ports:
       - ${DOCKER_PG_14_EXTERNAL_PORT}:5432
+    command:
+      - "postgres"
+      - "-c"
+      - "wal_level=logical"
+      - "-c"
+      - "max_wal_senders=30"
+      - "-c"
+      - "max_replication_slots=30" 

Review Comment:
   Trailing whitespace should be removed for consistency with code formatting 
standards.
   ```suggestion
         - "max_replication_slots=30"
   ```



##########
.github/workflows/build-extension.yml:
##########
@@ -53,9 +54,11 @@ jobs:
         with:
           filters: |
             broker_changes:
-              - 'fs_brokers/**'
+              - 'fs_brokers/apache_hdfs_broker/**'
             docs_changes:
               - 'docs/**'
+            cdc_client_changes:
+              - 'fs_brokers/cdc_client/**'  

Review Comment:
   Trailing whitespace should be removed for consistency with code formatting 
standards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to