harangozop opened a new issue, #16: URL: https://github.com/apache/pulsar-connectors/issues/16
## Motivation This is a follow-up to #9 which introduced bounded queue back-pressure via `record.fail()` when the queue is full. While that prevents OOM, it causes a **nack/redeliver storm** under sustained load: the consumer continuously delivers messages, the sink immediately fails them, they get redelivered, and the cycle repeats — wasting ~99% of CPU on rejecting/redelivering messages instead of actual DB writes. In production testing, effective throughput dropped from ~1200 msg/s to ~4 msg/s during back-pressure due to this nack storm. ## Changes ### 1. Replace `LinkedList` + `synchronized` with `LinkedBlockingDeque` Replace the manual `synchronized(incomingList)` blocks with JDK's `LinkedBlockingDeque`, which handles all synchronization internally. This follows the established pattern already used by other connectors in this repo (Aerospike sink uses `LinkedBlockingDeque`, HDFS/Kinesis/DynamoDB sinks use `LinkedBlockingQueue`). ### 2. Blocking back-pressure in `write()` Instead of `record.fail()` (which triggers immediate nack/redeliver), use `incomingList.offer(record, 1, TimeUnit.SECONDS)` which **blocks the Pulsar IO thread** until space is available. Blocking the IO thread is the correct back-pressure signal — it stops the consumer from fetching more messages for this sink's subscription. - If space becomes available within 1 second (flush drains the queue), the record is accepted — zero nacks. - If the timeout expires (flush thread stuck or very slow), the record is failed — at most 1 nack/second vs. thousands/second with the current approach. ### 3. Replace recursive `flush()` with iterative loop The current `flush()` calls itself recursively when `needAnotherRound` is true. Under sustained load with large queues, this can build up deep stack frames. Replace with a `while` loop. ### 4. Move `isFlushing.set(false)` to `finally` block Prevents the `isFlushing` flag from getting stuck if an exception occurs before the flag is cleared, which would prevent all future flushes. ### 5. Drain remaining records in `close()` On shutdown, drain any records still in the queue and fail them, ensuring clean shutdown without message loss. ## Production results | Metric | Before (record.fail) | After (LinkedBlockingDeque) | |---|---|---| | Effective throughput under back-pressure | ~4 msg/s | ~1200 msg/s | | CPU waste from nack/redeliver storm | ~99% | ~0% | | Nacks per second when queue full | thousands | ≤1 | ## Compatibility - `maxQueueSize` config is fully preserved: `-1` (unbounded), `0` (auto), positive (bounded) - `LinkedBlockingDeque` with no capacity argument = unbounded (same as legacy `LinkedList`) - No new configuration parameters required -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
