harangozop opened a new pull request, #9: URL: https://github.com/apache/pulsar-connectors/pull/9
### Motivation Fixes https://github.com/apache/pulsar/issues/25030 The JDBC sink's internal queue (`incomingList`) is an unbounded `LinkedList`. When the database connection drops: 1. `executeBatch()` hangs until TCP socket timeout (can be minutes) 2. `isFlushing` stays `true`, blocking all queue draining 3. `write()` keeps accepting records without any limit 4. `incomingList` grows until `OutOfMemoryError: Java heap space` This is a production issue observed with both PostgreSQL and MariaDB JDBC sinks under Pulsar FunctionMesh. ### Modifications **`JdbcAbstractSink.java`:** - **Bounded queue**: `write()` rejects records when queue exceeds `maxQueueSize`, applying Pulsar-level back-pressure via `record.fail()` (negative ack → consumer redelivers later) - **State check in `write()`**: records are failed immediately when state != OPEN (after `fatal()` or `close()`) - **Connection validation & reconnection**: `ensureConnection()` validates the JDBC connection via `Connection.isValid()` before each flush and reconnects automatically, allowing recovery from transient DB outages without pod restart - **Scheduled flush cancellation**: `fatal()` and `close()` cancel the periodic flush `ScheduledFuture` to stop repeated failures on a broken connection **`JdbcSinkConfig.java`:** - Added `maxQueueSize` config (default: 0 = auto, which resolves to `batchSize * 10`). Users can override for their workload. ### Verifying this change - Added `testWriteRejectsRecordsAfterFatal` — verifies `write()` fails records when state is FAILED - Added `testBoundedQueueBackPressure` — verifies records are failed when internal queue is full - All existing SQLite sink tests pass (including batch variant) ### Does this pull request potentially affect one of the following parts of the Pulsar project? - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [x] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [x] Anything that affects deployment (configuration default changes are backward compatible — `maxQueueSize=0` preserves legacy unbounded behavior) ### Documentation - [x] `doc-not-needed` — Bug fix with backward-compatible config addition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
