Copilot commented on code in PR #59461:
URL: https://github.com/apache/doris/pull/59461#discussion_r2654876828


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -17,183 +17,105 @@
 
 package org.apache.doris.cdcclient.utils;
 
-import org.apache.doris.job.cdc.DataSourceConfigKeys;
-import org.apache.doris.job.cdc.request.JobBaseConfig;
-
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
-import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
-import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
-import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
-import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
-import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 
-import java.sql.SQLException;
 import java.time.ZoneId;
-import java.util.Arrays;
 import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.mysql.cj.conf.ConnectionUrl;
-import io.debezium.connector.mysql.MySqlConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConfigUtil {
-    private static final ObjectMapper mapper = new ObjectMapper();
+    private static ObjectMapper objectMapper = new ObjectMapper();
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigUtil.class);
 
     public static String getServerId(long jobId) {
         return String.valueOf(Math.abs(String.valueOf(jobId).hashCode()));
     }
 
-    public static MySqlSourceConfig generateMySqlConfig(JobBaseConfig config) {
-        return generateMySqlConfig(config.getConfig(), 
getServerId(config.getJobId()));
-    }
-
-    public static MySqlSourceConfig generateMySqlConfig(Map<String, String> 
config) {
-        return generateMySqlConfig(config, "0");
+    public static ZoneId getServerTimeZoneFromJdbcUrl(String jdbcUrl) {
+        if (jdbcUrl == null) {
+            return ZoneId.systemDefault();
+        }
+        if (jdbcUrl.startsWith("jdbc:mysql://") || 
jdbcUrl.startsWith("jdbc:mariadb://")) {
+            return getServerTimeZone(jdbcUrl);
+        } else if (jdbcUrl.startsWith("jdbc:postgresql://")) {
+            return getPostgresServerTimeZone(jdbcUrl);
+        }
+        return ZoneId.systemDefault();
     }
 
-    public static ZoneId getServerTimeZone(String jdbcUrl) {
+    private static ZoneId getServerTimeZone(String jdbcUrl) {
         Preconditions.checkNotNull(jdbcUrl, "jdbcUrl is null");
         ConnectionUrl cu = ConnectionUrl.getConnectionUrlInstance(jdbcUrl, 
null);
         return getTimeZoneFromProps(cu.getOriginalProperties());
     }
 
-    private static MySqlSourceConfig generateMySqlConfig(
-            Map<String, String> cdcConfig, String serverId) {
-        MySqlSourceConfigFactory configFactory = new 
MySqlSourceConfigFactory();
-        ConnectionUrl cu =
-                ConnectionUrl.getConnectionUrlInstance(
-                        cdcConfig.get(DataSourceConfigKeys.JDBC_URL), null);
-        configFactory.hostname(cu.getMainHost().getHost());
-        configFactory.port(cu.getMainHost().getPort());
-        configFactory.username(cdcConfig.get(DataSourceConfigKeys.USER));
-        configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD));
-        String databaseName = cdcConfig.get(DataSourceConfigKeys.DATABASE);
-        configFactory.databaseList(databaseName);
-        configFactory.serverId(serverId);
-        
configFactory.serverTimeZone(getTimeZoneFromProps(cu.getOriginalProperties()).toString());
-
-        configFactory.includeSchemaChanges(false);
-
-        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
-        String[] includingTbls =
-                Arrays.stream(includingTables.split(","))
-                        .map(t -> databaseName + "." + t.trim())
-                        .toArray(String[]::new);
-        configFactory.tableList(includingTbls);
-
-        String excludingTables = 
cdcConfig.get(DataSourceConfigKeys.EXCLUDE_TABLES);
-        if (StringUtils.isNotEmpty(excludingTables)) {
-            String excludingTbls =
-                    Arrays.stream(excludingTables.split(","))
-                            .map(t -> databaseName + "." + t.trim())
-                            .collect(Collectors.joining(","));
-            configFactory.excludeTableList(excludingTbls);
-        }
-
-        // setting startMode
-        String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
-            // do not need set offset when initial
-            // configFactory.startupOptions(StartupOptions.initial());
-        } else if 
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
-            configFactory.startupOptions(StartupOptions.earliest());
-            BinlogOffset binlogOffset =
-                    initializeEffectiveOffset(
-                            configFactory, 
StartupOptions.earliest().binlogOffset);
-            
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
-        } else if 
(DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
-            configFactory.startupOptions(StartupOptions.latest());
-            BinlogOffset binlogOffset =
-                    initializeEffectiveOffset(configFactory, 
StartupOptions.latest().binlogOffset);
-            
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
-        } else if (isJson(startupMode)) {
-            // start from specific offset
-            Map<String, String> offsetMap = toStringMap(startupMode);
-            if (MapUtils.isEmpty(offsetMap)) {
-                throw new RuntimeException("Incorrect offset " + startupMode);
-            }
-            if (offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
-                    && 
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)) {
-                BinlogOffset binlogOffset = new BinlogOffset(offsetMap);
-                
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
-            } else {
-                throw new RuntimeException("Incorrect offset " + startupMode);
+    public static ZoneId getTimeZoneFromProps(Map<String, String> 
originalProperties) {
+        if (originalProperties != null && 
originalProperties.containsKey("serverTimezone")) {
+            String timeZone = originalProperties.get("serverTimezone");
+            if (StringUtils.isNotEmpty(timeZone)) {
+                return ZoneId.of(timeZone);
             }
-        } else if (is13Timestamp(startupMode)) {
-            // start from timestamp
-            Long ts = Long.parseLong(startupMode);
-            BinlogOffset binlogOffset =
-                    initializeEffectiveOffset(
-                            configFactory, 
StartupOptions.timestamp(ts).binlogOffset);
-            
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
-        } else {
-            throw new RuntimeException("Unknown offset " + startupMode);
         }
+        return ZoneId.systemDefault();
+    }
 
-        Properties jdbcProperteis = new Properties();
-        jdbcProperteis.putAll(cu.getOriginalProperties());
-        configFactory.jdbcProperties(jdbcProperteis);
-
-        // configFactory.heartbeatInterval(Duration.ofMillis(1));
-        if (cdcConfig.containsKey(DataSourceConfigKeys.SPLIT_SIZE)) {
-            configFactory.splitSize(
-                    
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE)));
+    public static ZoneId getPostgresServerTimeZone(String jdbcUrl) {
+        Preconditions.checkNotNull(jdbcUrl, "jdbcUrl is null");
+        try {
+            java.util.Properties props = 
org.postgresql.Driver.parseURL(jdbcUrl, null);
+            if (props != null && props.containsKey("timezone")) {
+                String timeZone = props.getProperty("timezone");
+                if (StringUtils.isNotEmpty(timeZone)) {
+                    return ZoneId.of(timeZone);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to parse Postgres JDBC URL for timezone: {}", 
jdbcUrl);
         }
-
-        return configFactory.createConfig(0);
+        return ZoneId.systemDefault();
     }
 
-    private static ZoneId getTimeZoneFromProps(Map<String, String> 
originalProperties) {
-        if (originalProperties != null && 
originalProperties.containsKey("serverTimezone")) {
-            String timeZone = originalProperties.get("serverTimezone");
+    public static ZoneId 
getPostgresServerTimeZoneFromProps(java.util.Properties props) {
+        if (props != null && props.containsKey("timezone")) {
+            String timeZone = props.getProperty("timezone");
             if (StringUtils.isNotEmpty(timeZone)) {
                 return ZoneId.of(timeZone);
             }
         }
         return ZoneId.systemDefault();
     }

Review Comment:
   The method should be marked as static since it doesn't use any instance 
variables or methods. Marking it as static would make the intent clearer and 
allow it to be called without an instance.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -728,16 +826,20 @@ public boolean hasNext() {
                         
splitState.asSnapshotSplitState().setHighWatermark(watermark);
                     }
                 } else if (RecordUtils.isHeartbeatEvent(element)) {
-                    LOG.debug("Receive heartbeat event: {}", element);
+                    LOG.info("Receive heartbeat event: {}", element);
                     if (splitState.isBinlogSplitState()) {
                         BinlogOffset position = 
RecordUtils.getBinlogPosition(element);
                         
splitState.asBinlogSplitState().setStartingOffset(position);
                     }
                 } else if (RecordUtils.isDataChangeRecord(element)) {
+                    if (splitState.isBinlogSplitState()) {
+                        BinlogOffset position = 
RecordUtils.getBinlogPosition(element);
+                        
splitState.asBinlogSplitState().setStartingOffset(position);
+                    }
                     nextRecord = element;
                     return true;
                 } else {
-                    LOG.debug("Ignore event: {}", element);
+                    LOG.info("Ignore event: {}", element);

Review Comment:
   The log level for these heartbeat and ignore event messages should remain at 
DEBUG level rather than being changed to INFO. These messages can be very 
frequent and will clutter the logs at INFO level, potentially impacting 
performance and log readability.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresSourceReader.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public PostgresSourceReader() {
+        super();
+    }
+
+    @Override
+    public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
+        PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        LOG.info("Creating slot for job {}, user {}", jobId, 
sourceConfig.getUsername());
+        createSlotForGlobalStreamSplit(dialect);
+        super.initialize(jobId, dataSource, config);
+    }
+
+    /**
+     * copy from org.apache.flink.cdc.connectors.postgres.source
+     * .enumerator.PostgresSourceEnumerator.createSlotForGlobalStreamSplit
+     *
+     * <p>Create slot for the unique global stream split.
+     *
+     * <p>Currently all startup modes need read the stream split. We need open 
the slot before
+     * reading the globalStreamSplit to catch all data changes.
+     */
+    private void createSlotForGlobalStreamSplit(PostgresDialect 
postgresDialect) {
+        try (PostgresConnection connection = 
postgresDialect.openJdbcConnection()) {
+            SlotState slotInfo =
+                    connection.getReplicationSlotState(
+                            postgresDialect.getSlotName(), 
postgresDialect.getPluginName());
+            // skip creating the replication slot when the slot exists.
+            if (slotInfo != null) {
+                return;
+            }
+            PostgresReplicationConnection replicationConnection =
+                    
postgresDialect.openPostgresReplicationConnection(connection);
+            replicationConnection.createReplicationSlot();
+            replicationConnection.close(false);
+
+        } catch (Throwable t) {
+            throw new CdcClientException(
+                    String.format(
+                            "Fail to get or create slot for global stream 
split, the slot name is %s. Due to: ",
+                            postgresDialect.getSlotName()),
+                    t);
+        }
+    }
+
+    @Override
+    protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
+        return generatePostgresConfig(config);
+    }
+
+    /** Generate PostgreSQL source config from JobBaseConfig */
+    private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) {
+        return generatePostgresConfig(config.getConfig(), config.getJobId());
+    }
+
+    /** Generate PostgreSQL source config from Map config */
+    private PostgresSourceConfig generatePostgresConfig(Map<String, String> 
cdcConfig, Long jobId) {
+        PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
+
+        // Parse JDBC URL to extract connection info
+        String jdbcUrl = cdcConfig.get(DataSourceConfigKeys.JDBC_URL);
+        Preconditions.checkNotNull(jdbcUrl, "jdbc_url is required");
+
+        // PostgreSQL JDBC URL format: jdbc:postgresql://host:port/database
+        Properties props = Driver.parseURL(jdbcUrl, null);
+        Preconditions.checkNotNull(props, "Invalid JDBC URL: " + jdbcUrl);

Review Comment:
   The check on line 145 validates that props is not null, but this happens 
AFTER props has already been used on line 144. This Preconditions check is 
ineffective because it comes too late. The null check should be performed 
immediately after line 144 before accessing any properties.



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy:
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,external_docker_pg") {
+    def jobName = "test_streaming_postgres_job_all_type_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_all_types_nullable_with_pk_pg"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // create test
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """
+            create table ${pgDB}.${pgSchema}.${table1} (
+                id                  bigserial PRIMARY KEY,
+                smallint_col        smallint,
+                integer_col         integer,
+                bigint_col          bigint,
+                real_col            real,
+                double_col          double precision,
+                numeric_col         numeric(20,6),
+                char_col            char(10),
+                varchar_col         varchar(255),
+                text_col            text,
+                boolean_col         boolean,
+                date_col            date,
+                time_col            time,
+                timetz_col          time with time zone,
+                timestamp_col       timestamp,
+                timestamptz_col     timestamp with time zone,
+                interval_col        interval,
+                bytea_col           bytea,
+                uuid_col            uuid,
+                json_col            json,
+                jsonb_col           jsonb,
+                inet_col            inet,
+                cidr_col            cidr,
+                macaddr_col         macaddr,
+                bit_col             bit(8),
+                bit_varying_col     bit varying(16),
+                int_array_col       integer[],
+                text_array_col      text[],
+                point_col           point
+            );
+            """
+            // mock snapshot data
+            sql """
+            INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text 
value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 
12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 
'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)');
+            """
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}", 
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // check job running
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """

Review Comment:
   Typo in variable name: 'jobSuccendCount' should be 'jobSucceedCount' 
(missing 'e'). This typo appears consistently in the test file.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresSourceReader.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public PostgresSourceReader() {
+        super();
+    }
+
+    @Override
+    public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
+        PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        LOG.info("Creating slot for job {}, user {}", jobId, 
sourceConfig.getUsername());
+        createSlotForGlobalStreamSplit(dialect);
+        super.initialize(jobId, dataSource, config);
+    }
+
+    /**
+     * copy from org.apache.flink.cdc.connectors.postgres.source
+     * .enumerator.PostgresSourceEnumerator.createSlotForGlobalStreamSplit
+     *
+     * <p>Create slot for the unique global stream split.
+     *
+     * <p>Currently all startup modes need read the stream split. We need open 
the slot before
+     * reading the globalStreamSplit to catch all data changes.
+     */
+    private void createSlotForGlobalStreamSplit(PostgresDialect 
postgresDialect) {
+        try (PostgresConnection connection = 
postgresDialect.openJdbcConnection()) {
+            SlotState slotInfo =
+                    connection.getReplicationSlotState(
+                            postgresDialect.getSlotName(), 
postgresDialect.getPluginName());
+            // skip creating the replication slot when the slot exists.
+            if (slotInfo != null) {
+                return;
+            }
+            PostgresReplicationConnection replicationConnection =
+                    
postgresDialect.openPostgresReplicationConnection(connection);
+            replicationConnection.createReplicationSlot();
+            replicationConnection.close(false);
+
+        } catch (Throwable t) {
+            throw new CdcClientException(
+                    String.format(
+                            "Fail to get or create slot for global stream 
split, the slot name is %s. Due to: ",
+                            postgresDialect.getSlotName()),
+                    t);
+        }
+    }
+
+    @Override
+    protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
+        return generatePostgresConfig(config);
+    }
+
+    /** Generate PostgreSQL source config from JobBaseConfig */
+    private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) {
+        return generatePostgresConfig(config.getConfig(), config.getJobId());
+    }
+
+    /** Generate PostgreSQL source config from Map config */
+    private PostgresSourceConfig generatePostgresConfig(Map<String, String> 
cdcConfig, Long jobId) {
+        PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
+
+        // Parse JDBC URL to extract connection info
+        String jdbcUrl = cdcConfig.get(DataSourceConfigKeys.JDBC_URL);
+        Preconditions.checkNotNull(jdbcUrl, "jdbc_url is required");
+
+        // PostgreSQL JDBC URL format: jdbc:postgresql://host:port/database
+        Properties props = Driver.parseURL(jdbcUrl, null);
+        Preconditions.checkNotNull(props, "Invalid JDBC URL: " + jdbcUrl);
+
+        String hostname = props.getProperty("PGHOST");
+        String port = props.getProperty("PGPORT");
+        String database = props.getProperty("PGDBNAME");
+        Preconditions.checkNotNull(hostname, "host is required");
+        Preconditions.checkNotNull(port, "port is required");
+        Preconditions.checkNotNull(database, "database is required");

Review Comment:
   Missing null check for 'props' before accessing its properties. If 
Driver.parseURL returns null (invalid URL), a NullPointerException will be 
thrown at line 147 when trying to call props.getProperty.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -17,183 +17,105 @@
 
 package org.apache.doris.cdcclient.utils;
 
-import org.apache.doris.job.cdc.DataSourceConfigKeys;
-import org.apache.doris.job.cdc.request.JobBaseConfig;
-
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
-import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
-import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
-import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
-import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
-import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 
-import java.sql.SQLException;
 import java.time.ZoneId;
-import java.util.Arrays;
 import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.mysql.cj.conf.ConnectionUrl;
-import io.debezium.connector.mysql.MySqlConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConfigUtil {
-    private static final ObjectMapper mapper = new ObjectMapper();
+    private static ObjectMapper objectMapper = new ObjectMapper();
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigUtil.class);

Review Comment:
   Inconsistent field naming: the class uses 'objectMapper' (camelCase) at line 
35 but 'LOG' (uppercase) at line 36. Consider using consistent naming 
convention for all static fields, typically uppercase for constants like 
'OBJECT_MAPPER'.



##########
regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.out:
##########
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+Doris  18
+

Review Comment:
   The snapshot data comment mentions only A1 and B1 records (lines 52-53), but 
the expected output shows A2, B2 records. The insert statements insert data 
into both table1 and table2, but the comments are misleading about which 
table's data is being referenced as "snapshot data".



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresSourceReader.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public PostgresSourceReader() {
+        super();
+    }
+
+    @Override
+    public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
+        PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        LOG.info("Creating slot for job {}, user {}", jobId, 
sourceConfig.getUsername());
+        createSlotForGlobalStreamSplit(dialect);
+        super.initialize(jobId, dataSource, config);
+    }
+
+    /**
+     * copy from org.apache.flink.cdc.connectors.postgres.source
+     * .enumerator.PostgresSourceEnumerator.createSlotForGlobalStreamSplit
+     *
+     * <p>Create slot for the unique global stream split.
+     *
+     * <p>Currently all startup modes need read the stream split. We need open 
the slot before
+     * reading the globalStreamSplit to catch all data changes.
+     */
+    private void createSlotForGlobalStreamSplit(PostgresDialect 
postgresDialect) {
+        try (PostgresConnection connection = 
postgresDialect.openJdbcConnection()) {
+            SlotState slotInfo =
+                    connection.getReplicationSlotState(
+                            postgresDialect.getSlotName(), 
postgresDialect.getPluginName());
+            // skip creating the replication slot when the slot exists.
+            if (slotInfo != null) {
+                return;
+            }
+            PostgresReplicationConnection replicationConnection =
+                    
postgresDialect.openPostgresReplicationConnection(connection);
+            replicationConnection.createReplicationSlot();
+            replicationConnection.close(false);
+
+        } catch (Throwable t) {
+            throw new CdcClientException(
+                    String.format(
+                            "Fail to get or create slot for global stream 
split, the slot name is %s. Due to: ",
+                            postgresDialect.getSlotName()),
+                    t);
+        }
+    }
+
+    @Override
+    protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
+        return generatePostgresConfig(config);
+    }
+
+    /** Generate PostgreSQL source config from JobBaseConfig */
+    private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) {
+        return generatePostgresConfig(config.getConfig(), config.getJobId());
+    }
+
+    /** Generate PostgreSQL source config from Map config */
+    private PostgresSourceConfig generatePostgresConfig(Map<String, String> 
cdcConfig, Long jobId) {
+        PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
+
+        // Parse JDBC URL to extract connection info
+        String jdbcUrl = cdcConfig.get(DataSourceConfigKeys.JDBC_URL);
+        Preconditions.checkNotNull(jdbcUrl, "jdbc_url is required");
+
+        // PostgreSQL JDBC URL format: jdbc:postgresql://host:port/database
+        Properties props = Driver.parseURL(jdbcUrl, null);
+        Preconditions.checkNotNull(props, "Invalid JDBC URL: " + jdbcUrl);
+
+        String hostname = props.getProperty("PGHOST");
+        String port = props.getProperty("PGPORT");
+        String database = props.getProperty("PGDBNAME");
+        Preconditions.checkNotNull(hostname, "host is required");
+        Preconditions.checkNotNull(port, "port is required");
+        Preconditions.checkNotNull(database, "database is required");

Review Comment:
   The error messages specify which property is required (host, port, database) 
but those messages will never be shown because a NullPointerException will be 
thrown first when trying to access the null props object at lines 147-149.



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_docker_pg") {
+    def jobName = "test_streaming_postgres_job_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_pg_normal1"
+    def table2 = "user_info_pg_normal2"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+    sql """drop table if exists ${currentDb}.${table2} force"""
+
+    // Pre-create table2
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${table2} (
+            `name` varchar(200) NULL,
+            `age` int NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // create test
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "name" varchar(200),
+                  "age" int2,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('A1', 1);"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('B1', 2);"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} (
+                  "name" varchar(200),
+                  "age" int2,
+                  PRIMARY KEY ("name")
+                )"""
+            // mock snapshot data
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age) 
VALUES ('A2', 1);"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age) 
VALUES ('B2', 2);"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1},${table2}", 
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+        def showAllTables = sql """ show tables from ${currentDb}"""
+        log.info("showAllTables: " + showAllTables)
+        // check table created
+        def showTables = sql """ show tables from ${currentDb} like 
'${table1}'; """
+        assert showTables.size() == 1
+        def showTables2 = sql """ show tables from ${currentDb} like 
'${table2}'; """
+        assert showTables2.size() == 1
+
+        // check table schema correct
+        def showTbl1 = sql """show create table ${currentDb}.${table1}"""
+        def createTalInfo = showTbl1[0][1];
+        assert createTalInfo.contains("`name` varchar(65533)");
+        assert createTalInfo.contains("`age` smallint");
+        assert createTalInfo.contains("UNIQUE KEY(`name`)");
+        assert createTalInfo.contains("DISTRIBUTED BY HASH(`name`) BUCKETS 
AUTO");
+
+        // check job running
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """

Review Comment:
   Typo in variable name: 'jobSuccendCount' should be 'jobSucceedCount' 
(missing 'e').



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresSourceReader.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();

Review Comment:
   Inconsistent field access modifier: 'objectMapper' is declared as private 
static but 'LOG' is private static final. Since objectMapper is never 
reassigned, it should also be declared as 'final' for immutability and 
consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to