harangozop opened a new pull request, #9:
URL: https://github.com/apache/pulsar-connectors/pull/9

   ### Motivation
   
   Fixes https://github.com/apache/pulsar/issues/25030
   
   The JDBC sink's internal queue (`incomingList`) is an unbounded 
`LinkedList`. When the database connection drops:
   
   1. `executeBatch()` hangs until TCP socket timeout (can be minutes)
   2. `isFlushing` stays `true`, blocking all queue draining
   3. `write()` keeps accepting records without any limit
   4. `incomingList` grows until `OutOfMemoryError: Java heap space`
   
   This is a production issue observed with both PostgreSQL and MariaDB JDBC 
sinks under Pulsar FunctionMesh.
   
   ### Modifications
   
   **`JdbcAbstractSink.java`:**
   - **Bounded queue**: `write()` rejects records when queue exceeds 
`maxQueueSize`, applying Pulsar-level back-pressure via `record.fail()` 
(negative ack → consumer redelivers later)
   - **State check in `write()`**: records are failed immediately when state != 
OPEN (after `fatal()` or `close()`)
   - **Connection validation & reconnection**: `ensureConnection()` validates 
the JDBC connection via `Connection.isValid()` before each flush and reconnects 
automatically, allowing recovery from transient DB outages without pod restart
   - **Scheduled flush cancellation**: `fatal()` and `close()` cancel the 
periodic flush `ScheduledFuture` to stop repeated failures on a broken 
connection
   
   **`JdbcSinkConfig.java`:**
   - Added `maxQueueSize` config (default: 0 = auto, which resolves to 
`batchSize * 10`). Users can override for their workload.
   
   ### Verifying this change
   
   - Added `testWriteRejectsRecordsAfterFatal` — verifies `write()` fails 
records when state is FAILED
   - Added `testBoundedQueueBackPressure` — verifies records are failed when 
internal queue is full
   - All existing SQLite sink tests pass (including batch variant)
   
   ### Does this pull request potentially affect one of the following parts of 
the Pulsar project?
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [x] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [x] Anything that affects deployment (configuration default changes are 
backward compatible — `maxQueueSize=0` preserves legacy unbounded behavior)
   
   ### Documentation
   
   - [x] `doc-not-needed` — Bug fix with backward-compatible config addition


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to