harangozop opened a new issue, #16:
URL: https://github.com/apache/pulsar-connectors/issues/16

   ## Motivation
   
   This is a follow-up to #9 which introduced bounded queue back-pressure via 
`record.fail()` when the queue is full. While that prevents OOM, it causes a 
**nack/redeliver storm** under sustained load: the consumer continuously 
delivers messages, the sink immediately fails them, they get redelivered, and 
the cycle repeats — wasting ~99% of CPU on rejecting/redelivering messages 
instead of actual DB writes.
   
   In production testing, effective throughput dropped from ~1200 msg/s to ~4 
msg/s during back-pressure due to this nack storm.
   
   ## Changes
   
   ### 1. Replace `LinkedList` + `synchronized` with `LinkedBlockingDeque`
   
   Replace the manual `synchronized(incomingList)` blocks with JDK's 
`LinkedBlockingDeque`, which handles all synchronization internally. This 
follows the established pattern already used by other connectors in this repo 
(Aerospike sink uses `LinkedBlockingDeque`, HDFS/Kinesis/DynamoDB sinks use 
`LinkedBlockingQueue`).
   
   ### 2. Blocking back-pressure in `write()`
   
   Instead of `record.fail()` (which triggers immediate nack/redeliver), use 
`incomingList.offer(record, 1, TimeUnit.SECONDS)` which **blocks the Pulsar IO 
thread** until space is available. Blocking the IO thread is the correct 
back-pressure signal — it stops the consumer from fetching more messages for 
this sink's subscription.
   
   - If space becomes available within 1 second (flush drains the queue), the 
record is accepted — zero nacks.
   - If the timeout expires (flush thread stuck or very slow), the record is 
failed — at most 1 nack/second vs. thousands/second with the current approach.
   
   ### 3. Replace recursive `flush()` with iterative loop
   
   The current `flush()` calls itself recursively when `needAnotherRound` is 
true. Under sustained load with large queues, this can build up deep stack 
frames. Replace with a `while` loop.
   
   ### 4. Move `isFlushing.set(false)` to `finally` block
   
   Prevents the `isFlushing` flag from getting stuck if an exception occurs 
before the flag is cleared, which would prevent all future flushes.
   
   ### 5. Drain remaining records in `close()`
   
   On shutdown, drain any records still in the queue and fail them, ensuring 
clean shutdown without message loss.
   
   ## Production results
   
   | Metric | Before (record.fail) | After (LinkedBlockingDeque) |
   |---|---|---|
   | Effective throughput under back-pressure | ~4 msg/s | ~1200 msg/s |
   | CPU waste from nack/redeliver storm | ~99% | ~0% |
   | Nacks per second when queue full | thousands | ≤1 |
   
   ## Compatibility
   
   - `maxQueueSize` config is fully preserved: `-1` (unbounded), `0` (auto), 
positive (bounded)
   - `LinkedBlockingDeque` with no capacity argument = unbounded (same as 
legacy `LinkedList`)
   - No new configuration parameters required


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to