Aleksandr Savonin created FLINK-39398:
-----------------------------------------
Summary: Handle duplicate key errors in MongoWriter bulk write
retry loop
Key: FLINK-39398
URL: https://issues.apache.org/jira/browse/FLINK-39398
Project: Flink
Issue Type: Improvement
Components: Connectors / MongoDB
Reporter: Aleksandr Savonin
MongoWriter.doBulkWrite() catches MongoException broadly without inspecting
MongoBulkWriteException.getWriteErrors(). This causes two problems when bulk
writes encounter duplicate key errors (E11000):
*Problem 1: Infinite restart loop*
When using InsertOneModel with a unique index and AT_LEAST_ONCE delivery
guarantee, an unplanned failure (TaskManager kill, OOM, crash) causes Flink to
replay records from the last checkpoint. Some of these records may already
exist in MongoDB because they were written before the crash but the source
offset was not yet committed. The retry loop retries the identical batch
unchanged, all retries fail with the same E11000 errors, and after exhausting
maxRetries throws IOException. Flink restarts from checkpoint, replays the same
records, and the cycle repeats indefinitely.
*Problem 2: Un-attempted operations dropped with ordered=true*
With ordered=true (the default), MongoDB stops at the first error. Operations
after the error index are never attempted by the server. The current code
retries the entire batch unchanged without re-queuing un-attempted operations,
which can lead to data loss.
*Solution*
Add a configurable sink.duplicate-key-handling option with two strategies:
- fail (default): *Preserves* existing retry behavior. Improves the error
message to suggest skip-duplicates or upsert mode when E11000 is detected.
- skip-duplicates: Inspects MongoBulkWriteException.getWriteErrors()
per-operation. Skips duplicate key errors. For ordered=true, re-queues
un-attempted operations after the error index. For ordered=false, considers
non-duplicate operations as already succeeded (no re-queuing).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)