tianfy created FLINK-39391:
------------------------------

             Summary: scan.snapshot.fetch.size is not propagated to Debezium 
properties in Oracle/SqlServer/DB2/Postgres connectors, causing severe snapshot 
performance degradation
                 Key: FLINK-39391
                 URL: https://issues.apache.org/jira/browse/FLINK-39391
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.6.0
         Environment: Flink CDC 3.6.0 (also verified on master branch)
Apache Flink 1.20.1
Debezium 1.9.8.Final (bundled in flink-connector-debezium)
Oracle 19c RAC with ojdbc8 driver
Java 11 (OpenJDK)
Linux x86_64
            Reporter: tianfy
             Fix For: cdc-3.6.0


h3. Problem


The user-configured `scan.snapshot.fetch.size` option has **no effect** on the 
actual JDBC `fetchSize` used during the incremental snapshot phase for Oracle, 
SqlServer, DB2, and Postgres connectors. The value is correctly parsed and 
stored in `JdbcSourceConfig.fetchSize`, but is never written into the Debezium 
`Properties` object that the snapshot execution path actually reads.

This causes:
- **Oracle / SqlServer / DB2**: Debezium's `query.fetch.size` defaults to `0`, 
which causes the JDBC driver to use its own default (e.g., Oracle JDBC defaults 
to `fetchSize=10`). On high-latency networks, this results in 
**order-of-magnitude performance degradation**.
- **Postgres**: Debezium's `snapshot.fetch.size` defaults to `2000` (from 
`HistorizedRelationalDatabaseConnectorConfig.DEFAULT_SNAPSHOT_FETCH_SIZE`), so 
the impact is less severe but the user-configured value is still silently 
ignored.
h3. Root Cause


In each affected `SourceConfigFactory.create()` method, the `fetchSize` field 
inherited from `JdbcSourceConfigFactory` is passed to the `SourceConfig` 
constructor but is **not** set in the Debezium `Properties` object:

**OracleSourceConfigFactory.java** (line ~115):
```java
// fetchSize is passed to OracleSourceConfig constructor...
return new OracleSourceConfig(
...,
fetchSize, // stored in JdbcSourceConfig.fetchSize
...);

// ...but never set in props:
// MISSING: props.setProperty("query.fetch.size", String.valueOf(fetchSize));
```

The snapshot execution path in `OracleScanFetchTask` (line ~319) calls:
```java
connectorConfig.getQueryFetchSize()
```

This reads `CommonConnectorConfig.QUERY_FETCH_SIZE` from the Debezium 
`Configuration`, which defaults to `0` when not explicitly set.

The same pattern exists in:
- `SqlServerSourceConfigFactory.java` — missing `query.fetch.size`
- `Db2SourceConfigFactory.java` — missing `query.fetch.size`
- `PostgresSourceConfigFactory.java` — missing `snapshot.fetch.size`

**MySQL is NOT affected** because `MySqlSourceConfigFactory` correctly sets 
`props.setProperty("database.fetchSize", String.valueOf(fetchSize))`.
h3. Impact


Measured on a real production scenario with Oracle 19c RAC:

| Environment | Network RTT | Actual fetchSize | Throughput (4 parallel) |
|---|---|---|---|
| Host A (low latency) | ~3ms | 10 (bug active) | ~4,000 rows/s |
| Host B (high latency) | ~31ms | 10 (bug active) | **~440 rows/s** |
| Host B + workaround | ~31ms | 10,000 (manual override) | **~3,500 rows/s** |
| Host B plain JDBC benchmark | ~31ms | 10,000 | ~8,200 rows/s |

The theoretical model matches the actual ratio: at `fetchSize=10`, every 10 
rows requires one network roundtrip. With RTT=31ms, each fetch takes 31ms → 
~320 rows/s per thread. The theoretical ratio between 3ms and 31ms RTT is 
**9.0x**, matching the observed **9.1x** almost exactly.
h3. How to Reproduce


1. Deploy Flink CDC with an Oracle source where the network RTT to the database 
is >10ms
2. Configure `scan.snapshot.fetch.size: 10000` in the job YAML
3. Submit a snapshot job and observe the throughput
4. Compare with adding `debezium.query.fetch.size: 10000` — the throughput 
should improve dramatically
h3. Proposed Fix


In each affected `SourceConfigFactory.create()`, add a `props.setProperty()` 
call **before** the `dbzProperties.putAll()` block:

**OracleSourceConfigFactory.java:**
```java
props.setProperty("query.fetch.size", String.valueOf(fetchSize));

// override the user-defined debezium properties
if (dbzProperties != null) {
props.putAll(dbzProperties);
}
```

**SqlServerSourceConfigFactory.java:**
```java
props.setProperty("query.fetch.size", String.valueOf(fetchSize));

if (dbzProperties != null) {
props.putAll(dbzProperties);
}
```

**Db2SourceConfigFactory.java:**
```java
props.setProperty("query.fetch.size", String.valueOf(fetchSize));

if (dbzProperties != null) {
props.putAll(dbzProperties);
}
```

**PostgresSourceConfigFactory.java:**
```java
props.setProperty("snapshot.fetch.size", String.valueOf(fetchSize));

// override the user-defined debezium properties
if (dbzProperties != null) {
props.putAll(dbzProperties);
}
```

Placing the `setProperty` before `dbzProperties.putAll()` ensures that explicit 
user overrides via `debezium.query.fetch.size` or 
`debezium.snapshot.fetch.size` still take precedence.
h3. Workaround


Until the fix is released, users can bypass the broken propagation by adding 
the Debezium property directly:

- **Oracle / SqlServer / DB2**: `debezium.query.fetch.size: 10000`
- **Postgres**: `debezium.snapshot.fetch.size: 1024`
h3. Affected Versions


Verified on `release-3.6.0` branch and `master` branch (as of 2026-04-02). 
Based on code analysis, this bug has existed since the incremental snapshot 
feature was introduced for these connectors.
h3. Related Issue


- FLINK-36044: Reports `scan.snapshot.fetch.size` not taking effect for MySQL 
CDC 2.4.0 — a similar symptom but different code path (MySQL has since been 
fixed with `database.fetchSize`).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to