Copilot commented on code in PR #59461:
URL: https://github.com/apache/doris/pull/59461#discussion_r2654876828
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -17,183 +17,105 @@
package org.apache.doris.cdcclient.utils;
-import org.apache.doris.job.cdc.DataSourceConfigKeys;
-import org.apache.doris.job.cdc.request.JobBaseConfig;
-
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
-import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
-import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
-import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
-import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
-import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
-import java.sql.SQLException;
import java.time.ZoneId;
-import java.util.Arrays;
import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.mysql.cj.conf.ConnectionUrl;
-import io.debezium.connector.mysql.MySqlConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConfigUtil {
- private static final ObjectMapper mapper = new ObjectMapper();
+ private static ObjectMapper objectMapper = new ObjectMapper();
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigUtil.class);
public static String getServerId(long jobId) {
return String.valueOf(Math.abs(String.valueOf(jobId).hashCode()));
}
- public static MySqlSourceConfig generateMySqlConfig(JobBaseConfig config) {
- return generateMySqlConfig(config.getConfig(),
getServerId(config.getJobId()));
- }
-
- public static MySqlSourceConfig generateMySqlConfig(Map<String, String>
config) {
- return generateMySqlConfig(config, "0");
+ public static ZoneId getServerTimeZoneFromJdbcUrl(String jdbcUrl) {
+ if (jdbcUrl == null) {
+ return ZoneId.systemDefault();
+ }
+ if (jdbcUrl.startsWith("jdbc:mysql://") ||
jdbcUrl.startsWith("jdbc:mariadb://")) {
+ return getServerTimeZone(jdbcUrl);
+ } else if (jdbcUrl.startsWith("jdbc:postgresql://")) {
+ return getPostgresServerTimeZone(jdbcUrl);
+ }
+ return ZoneId.systemDefault();
}
- public static ZoneId getServerTimeZone(String jdbcUrl) {
+ private static ZoneId getServerTimeZone(String jdbcUrl) {
Preconditions.checkNotNull(jdbcUrl, "jdbcUrl is null");
ConnectionUrl cu = ConnectionUrl.getConnectionUrlInstance(jdbcUrl,
null);
return getTimeZoneFromProps(cu.getOriginalProperties());
}
- private static MySqlSourceConfig generateMySqlConfig(
- Map<String, String> cdcConfig, String serverId) {
- MySqlSourceConfigFactory configFactory = new
MySqlSourceConfigFactory();
- ConnectionUrl cu =
- ConnectionUrl.getConnectionUrlInstance(
- cdcConfig.get(DataSourceConfigKeys.JDBC_URL), null);
- configFactory.hostname(cu.getMainHost().getHost());
- configFactory.port(cu.getMainHost().getPort());
- configFactory.username(cdcConfig.get(DataSourceConfigKeys.USER));
- configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD));
- String databaseName = cdcConfig.get(DataSourceConfigKeys.DATABASE);
- configFactory.databaseList(databaseName);
- configFactory.serverId(serverId);
-
configFactory.serverTimeZone(getTimeZoneFromProps(cu.getOriginalProperties()).toString());
-
- configFactory.includeSchemaChanges(false);
-
- String includingTables =
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
- String[] includingTbls =
- Arrays.stream(includingTables.split(","))
- .map(t -> databaseName + "." + t.trim())
- .toArray(String[]::new);
- configFactory.tableList(includingTbls);
-
- String excludingTables =
cdcConfig.get(DataSourceConfigKeys.EXCLUDE_TABLES);
- if (StringUtils.isNotEmpty(excludingTables)) {
- String excludingTbls =
- Arrays.stream(excludingTables.split(","))
- .map(t -> databaseName + "." + t.trim())
- .collect(Collectors.joining(","));
- configFactory.excludeTableList(excludingTbls);
- }
-
- // setting startMode
- String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
- if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode))
{
- // do not need set offset when initial
- // configFactory.startupOptions(StartupOptions.initial());
- } else if
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
- configFactory.startupOptions(StartupOptions.earliest());
- BinlogOffset binlogOffset =
- initializeEffectiveOffset(
- configFactory,
StartupOptions.earliest().binlogOffset);
-
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
- } else if
(DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
- configFactory.startupOptions(StartupOptions.latest());
- BinlogOffset binlogOffset =
- initializeEffectiveOffset(configFactory,
StartupOptions.latest().binlogOffset);
-
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
- } else if (isJson(startupMode)) {
- // start from specific offset
- Map<String, String> offsetMap = toStringMap(startupMode);
- if (MapUtils.isEmpty(offsetMap)) {
- throw new RuntimeException("Incorrect offset " + startupMode);
- }
- if (offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
- &&
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)) {
- BinlogOffset binlogOffset = new BinlogOffset(offsetMap);
-
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
- } else {
- throw new RuntimeException("Incorrect offset " + startupMode);
+ public static ZoneId getTimeZoneFromProps(Map<String, String>
originalProperties) {
+ if (originalProperties != null &&
originalProperties.containsKey("serverTimezone")) {
+ String timeZone = originalProperties.get("serverTimezone");
+ if (StringUtils.isNotEmpty(timeZone)) {
+ return ZoneId.of(timeZone);
}
- } else if (is13Timestamp(startupMode)) {
- // start from timestamp
- Long ts = Long.parseLong(startupMode);
- BinlogOffset binlogOffset =
- initializeEffectiveOffset(
- configFactory,
StartupOptions.timestamp(ts).binlogOffset);
-
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
- } else {
- throw new RuntimeException("Unknown offset " + startupMode);
}
+ return ZoneId.systemDefault();
+ }
- Properties jdbcProperteis = new Properties();
- jdbcProperteis.putAll(cu.getOriginalProperties());
- configFactory.jdbcProperties(jdbcProperteis);
-
- // configFactory.heartbeatInterval(Duration.ofMillis(1));
- if (cdcConfig.containsKey(DataSourceConfigKeys.SPLIT_SIZE)) {
- configFactory.splitSize(
-
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE)));
+ public static ZoneId getPostgresServerTimeZone(String jdbcUrl) {
+ Preconditions.checkNotNull(jdbcUrl, "jdbcUrl is null");
+ try {
+ java.util.Properties props =
org.postgresql.Driver.parseURL(jdbcUrl, null);
+ if (props != null && props.containsKey("timezone")) {
+ String timeZone = props.getProperty("timezone");
+ if (StringUtils.isNotEmpty(timeZone)) {
+ return ZoneId.of(timeZone);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to parse Postgres JDBC URL for timezone: {}",
jdbcUrl);
}
-
- return configFactory.createConfig(0);
+ return ZoneId.systemDefault();
}
- private static ZoneId getTimeZoneFromProps(Map<String, String>
originalProperties) {
- if (originalProperties != null &&
originalProperties.containsKey("serverTimezone")) {
- String timeZone = originalProperties.get("serverTimezone");
+ public static ZoneId
getPostgresServerTimeZoneFromProps(java.util.Properties props) {
+ if (props != null && props.containsKey("timezone")) {
+ String timeZone = props.getProperty("timezone");
if (StringUtils.isNotEmpty(timeZone)) {
return ZoneId.of(timeZone);
}
}
return ZoneId.systemDefault();
}
Review Comment:
The method should be marked as static since it doesn't use any instance
variables or methods. Marking it as static would make the intent clearer and
allow it to be called without an instance.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -728,16 +826,20 @@ public boolean hasNext() {
splitState.asSnapshotSplitState().setHighWatermark(watermark);
}
} else if (RecordUtils.isHeartbeatEvent(element)) {
- LOG.debug("Receive heartbeat event: {}", element);
+ LOG.info("Receive heartbeat event: {}", element);
if (splitState.isBinlogSplitState()) {
BinlogOffset position =
RecordUtils.getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position);
}
} else if (RecordUtils.isDataChangeRecord(element)) {
+ if (splitState.isBinlogSplitState()) {
+ BinlogOffset position =
RecordUtils.getBinlogPosition(element);
+
splitState.asBinlogSplitState().setStartingOffset(position);
+ }
nextRecord = element;
return true;
} else {
- LOG.debug("Ignore event: {}", element);
+ LOG.info("Ignore event: {}", element);
Review Comment:
The log level for these heartbeat and ignore event messages should remain at
DEBUG level rather than being changed to INFO. These messages can be very
frequent and will clutter the logs at INFO level, potentially impacting
performance and log readability.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresSourceReader.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public PostgresSourceReader() {
+ super();
+ }
+
+ @Override
+ public void initialize(long jobId, DataSource dataSource, Map<String,
String> config) {
+ PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId);
+ PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ LOG.info("Creating slot for job {}, user {}", jobId,
sourceConfig.getUsername());
+ createSlotForGlobalStreamSplit(dialect);
+ super.initialize(jobId, dataSource, config);
+ }
+
+ /**
+ * copy from org.apache.flink.cdc.connectors.postgres.source
+ * .enumerator.PostgresSourceEnumerator.createSlotForGlobalStreamSplit
+ *
+ * <p>Create slot for the unique global stream split.
+ *
+ * <p>Currently all startup modes need read the stream split. We need open
the slot before
+ * reading the globalStreamSplit to catch all data changes.
+ */
+ private void createSlotForGlobalStreamSplit(PostgresDialect
postgresDialect) {
+ try (PostgresConnection connection =
postgresDialect.openJdbcConnection()) {
+ SlotState slotInfo =
+ connection.getReplicationSlotState(
+ postgresDialect.getSlotName(),
postgresDialect.getPluginName());
+ // skip creating the replication slot when the slot exists.
+ if (slotInfo != null) {
+ return;
+ }
+ PostgresReplicationConnection replicationConnection =
+
postgresDialect.openPostgresReplicationConnection(connection);
+ replicationConnection.createReplicationSlot();
+ replicationConnection.close(false);
+
+ } catch (Throwable t) {
+ throw new CdcClientException(
+ String.format(
+ "Fail to get or create slot for global stream
split, the slot name is %s. Due to: ",
+ postgresDialect.getSlotName()),
+ t);
+ }
+ }
+
+ @Override
+ protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
+ return generatePostgresConfig(config);
+ }
+
+ /** Generate PostgreSQL source config from JobBaseConfig */
+ private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) {
+ return generatePostgresConfig(config.getConfig(), config.getJobId());
+ }
+
+ /** Generate PostgreSQL source config from Map config */
+ private PostgresSourceConfig generatePostgresConfig(Map<String, String>
cdcConfig, Long jobId) {
+ PostgresSourceConfigFactory configFactory = new
PostgresSourceConfigFactory();
+
+ // Parse JDBC URL to extract connection info
+ String jdbcUrl = cdcConfig.get(DataSourceConfigKeys.JDBC_URL);
+ Preconditions.checkNotNull(jdbcUrl, "jdbc_url is required");
+
+ // PostgreSQL JDBC URL format: jdbc:postgresql://host:port/database
+ Properties props = Driver.parseURL(jdbcUrl, null);
+ Preconditions.checkNotNull(props, "Invalid JDBC URL: " + jdbcUrl);
Review Comment:
The check on line 145 validates that props is not null, but this happens
AFTER props has already been used on line 144. This Preconditions check is
ineffective because it comes too late. The null check should be performed
immediately after line 144 before accessing any properties.
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy:
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,external_docker_pg") {
+ def jobName = "test_streaming_postgres_job_all_type_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_all_types_nullable_with_pk_pg"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // create test
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${table1} (
+ id bigserial PRIMARY KEY,
+ smallint_col smallint,
+ integer_col integer,
+ bigint_col bigint,
+ real_col real,
+ double_col double precision,
+ numeric_col numeric(20,6),
+ char_col char(10),
+ varchar_col varchar(255),
+ text_col text,
+ boolean_col boolean,
+ date_col date,
+ time_col time,
+ timetz_col time with time zone,
+ timestamp_col timestamp,
+ timestamptz_col timestamp with time zone,
+ interval_col interval,
+ bytea_col bytea,
+ uuid_col uuid,
+ json_col json,
+ jsonb_col jsonb,
+ inet_col inet,
+ cidr_col cidr,
+ macaddr_col macaddr,
+ bit_col bit(8),
+ bit_varying_col bit varying(16),
+ int_array_col integer[],
+ text_array_col text[],
+ point_col point
+ );
+ """
+ // mock snapshot data
+ sql """
+ INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text
value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01
12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF',
'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)');
+ """
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // check job running
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
Review Comment:
Typo in variable name: 'jobSuccendCount' should be 'jobSucceedCount'
(missing 'e'). This typo appears consistently in the test file.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresSourceReader.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public PostgresSourceReader() {
+ super();
+ }
+
+ @Override
+ public void initialize(long jobId, DataSource dataSource, Map<String,
String> config) {
+ PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId);
+ PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ LOG.info("Creating slot for job {}, user {}", jobId,
sourceConfig.getUsername());
+ createSlotForGlobalStreamSplit(dialect);
+ super.initialize(jobId, dataSource, config);
+ }
+
+ /**
+ * copy from org.apache.flink.cdc.connectors.postgres.source
+ * .enumerator.PostgresSourceEnumerator.createSlotForGlobalStreamSplit
+ *
+ * <p>Create slot for the unique global stream split.
+ *
+ * <p>Currently all startup modes need read the stream split. We need open
the slot before
+ * reading the globalStreamSplit to catch all data changes.
+ */
+ private void createSlotForGlobalStreamSplit(PostgresDialect
postgresDialect) {
+ try (PostgresConnection connection =
postgresDialect.openJdbcConnection()) {
+ SlotState slotInfo =
+ connection.getReplicationSlotState(
+ postgresDialect.getSlotName(),
postgresDialect.getPluginName());
+ // skip creating the replication slot when the slot exists.
+ if (slotInfo != null) {
+ return;
+ }
+ PostgresReplicationConnection replicationConnection =
+
postgresDialect.openPostgresReplicationConnection(connection);
+ replicationConnection.createReplicationSlot();
+ replicationConnection.close(false);
+
+ } catch (Throwable t) {
+ throw new CdcClientException(
+ String.format(
+ "Fail to get or create slot for global stream
split, the slot name is %s. Due to: ",
+ postgresDialect.getSlotName()),
+ t);
+ }
+ }
+
+ @Override
+ protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
+ return generatePostgresConfig(config);
+ }
+
+ /** Generate PostgreSQL source config from JobBaseConfig */
+ private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) {
+ return generatePostgresConfig(config.getConfig(), config.getJobId());
+ }
+
+ /** Generate PostgreSQL source config from Map config */
+ private PostgresSourceConfig generatePostgresConfig(Map<String, String>
cdcConfig, Long jobId) {
+ PostgresSourceConfigFactory configFactory = new
PostgresSourceConfigFactory();
+
+ // Parse JDBC URL to extract connection info
+ String jdbcUrl = cdcConfig.get(DataSourceConfigKeys.JDBC_URL);
+ Preconditions.checkNotNull(jdbcUrl, "jdbc_url is required");
+
+ // PostgreSQL JDBC URL format: jdbc:postgresql://host:port/database
+ Properties props = Driver.parseURL(jdbcUrl, null);
+ Preconditions.checkNotNull(props, "Invalid JDBC URL: " + jdbcUrl);
+
+ String hostname = props.getProperty("PGHOST");
+ String port = props.getProperty("PGPORT");
+ String database = props.getProperty("PGDBNAME");
+ Preconditions.checkNotNull(hostname, "host is required");
+ Preconditions.checkNotNull(port, "port is required");
+ Preconditions.checkNotNull(database, "database is required");
Review Comment:
Missing null check for 'props' before accessing its properties. If
Driver.parseURL returns null (invalid URL), a NullPointerException will be
thrown at line 147 when trying to call props.getProperty.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -17,183 +17,105 @@
package org.apache.doris.cdcclient.utils;
-import org.apache.doris.job.cdc.DataSourceConfigKeys;
-import org.apache.doris.job.cdc.request.JobBaseConfig;
-
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
-import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
-import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
-import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
-import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
-import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
-import java.sql.SQLException;
import java.time.ZoneId;
-import java.util.Arrays;
import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.mysql.cj.conf.ConnectionUrl;
-import io.debezium.connector.mysql.MySqlConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConfigUtil {
- private static final ObjectMapper mapper = new ObjectMapper();
+ private static ObjectMapper objectMapper = new ObjectMapper();
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigUtil.class);
Review Comment:
Inconsistent field naming: the class uses 'objectMapper' (camelCase) at line
35 but 'LOG' (uppercase) at line 36. Consider using consistent naming
convention for all static fields, typically uppercase for constants like
'OBJECT_MAPPER'.
##########
regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.out:
##########
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+Doris 18
+
Review Comment:
The snapshot data comment mentions only A1 and B1 records (lines 52-53), but
the expected output shows A2, B2 records. The insert statements insert data
into both table1 and table2, but the comments are misleading about which
table's data is being referenced as "snapshot data".
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresSourceReader.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public PostgresSourceReader() {
+ super();
+ }
+
+ @Override
+ public void initialize(long jobId, DataSource dataSource, Map<String,
String> config) {
+ PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId);
+ PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ LOG.info("Creating slot for job {}, user {}", jobId,
sourceConfig.getUsername());
+ createSlotForGlobalStreamSplit(dialect);
+ super.initialize(jobId, dataSource, config);
+ }
+
+ /**
+ * copy from org.apache.flink.cdc.connectors.postgres.source
+ * .enumerator.PostgresSourceEnumerator.createSlotForGlobalStreamSplit
+ *
+ * <p>Create slot for the unique global stream split.
+ *
+ * <p>Currently all startup modes need read the stream split. We need open
the slot before
+ * reading the globalStreamSplit to catch all data changes.
+ */
+ private void createSlotForGlobalStreamSplit(PostgresDialect
postgresDialect) {
+ try (PostgresConnection connection =
postgresDialect.openJdbcConnection()) {
+ SlotState slotInfo =
+ connection.getReplicationSlotState(
+ postgresDialect.getSlotName(),
postgresDialect.getPluginName());
+ // skip creating the replication slot when the slot exists.
+ if (slotInfo != null) {
+ return;
+ }
+ PostgresReplicationConnection replicationConnection =
+
postgresDialect.openPostgresReplicationConnection(connection);
+ replicationConnection.createReplicationSlot();
+ replicationConnection.close(false);
+
+ } catch (Throwable t) {
+ throw new CdcClientException(
+ String.format(
+ "Fail to get or create slot for global stream
split, the slot name is %s. Due to: ",
+ postgresDialect.getSlotName()),
+ t);
+ }
+ }
+
+ @Override
+ protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
+ return generatePostgresConfig(config);
+ }
+
+ /** Generate PostgreSQL source config from JobBaseConfig */
+ private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) {
+ return generatePostgresConfig(config.getConfig(), config.getJobId());
+ }
+
+ /** Generate PostgreSQL source config from Map config */
+ private PostgresSourceConfig generatePostgresConfig(Map<String, String>
cdcConfig, Long jobId) {
+ PostgresSourceConfigFactory configFactory = new
PostgresSourceConfigFactory();
+
+ // Parse JDBC URL to extract connection info
+ String jdbcUrl = cdcConfig.get(DataSourceConfigKeys.JDBC_URL);
+ Preconditions.checkNotNull(jdbcUrl, "jdbc_url is required");
+
+ // PostgreSQL JDBC URL format: jdbc:postgresql://host:port/database
+ Properties props = Driver.parseURL(jdbcUrl, null);
+ Preconditions.checkNotNull(props, "Invalid JDBC URL: " + jdbcUrl);
+
+ String hostname = props.getProperty("PGHOST");
+ String port = props.getProperty("PGPORT");
+ String database = props.getProperty("PGDBNAME");
+ Preconditions.checkNotNull(hostname, "host is required");
+ Preconditions.checkNotNull(port, "port is required");
+ Preconditions.checkNotNull(database, "database is required");
Review Comment:
The error messages specify which property is required (host, port, database)
but those messages will never be shown because a NullPointerException will be
thrown first when trying to access the null props object at lines 147-149.
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_docker_pg") {
+ def jobName = "test_streaming_postgres_job_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_normal1"
+ def table2 = "user_info_pg_normal2"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ sql """drop table if exists ${currentDb}.${table2} force"""
+
+ // Pre-create table2
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${table2} (
+ `name` varchar(200) NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`name`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // create test
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "name" varchar(200),
+ "age" int2,
+ PRIMARY KEY ("name")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('A1', 1);"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('B1', 2);"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} (
+ "name" varchar(200),
+ "age" int2,
+ PRIMARY KEY ("name")
+ )"""
+ // mock snapshot data
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age)
VALUES ('A2', 1);"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age)
VALUES ('B2', 2);"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1},${table2}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+ def showAllTables = sql """ show tables from ${currentDb}"""
+ log.info("showAllTables: " + showAllTables)
+ // check table created
+ def showTables = sql """ show tables from ${currentDb} like
'${table1}'; """
+ assert showTables.size() == 1
+ def showTables2 = sql """ show tables from ${currentDb} like
'${table2}'; """
+ assert showTables2.size() == 1
+
+ // check table schema correct
+ def showTbl1 = sql """show create table ${currentDb}.${table1}"""
+ def createTalInfo = showTbl1[0][1];
+ assert createTalInfo.contains("`name` varchar(65533)");
+ assert createTalInfo.contains("`age` smallint");
+ assert createTalInfo.contains("UNIQUE KEY(`name`)");
+ assert createTalInfo.contains("DISTRIBUTED BY HASH(`name`) BUCKETS
AUTO");
+
+ // check job running
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
Review Comment:
Typo in variable name: 'jobSuccendCount' should be 'jobSucceedCount'
(missing 'e').
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import org.apache.doris.cdcclient.exception.CdcClientException;
+import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CompareOffsetRequest;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
+import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Conversions;
+import lombok.Data;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Data
+public class PostgresSourceReader extends JdbcIncrementalSourceReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresSourceReader.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
Review Comment:
Inconsistent field access modifier: 'objectMapper' is declared as private
static but 'LOG' is private static final. Since objectMapper is never
reassigned, it should also be declared as 'final' for immutability and
consistency.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]