harangozop opened a new pull request, #17: URL: https://github.com/apache/pulsar-connectors/pull/17
## Motivation Follow-up to #9. The `record.fail()` back-pressure introduced in #9 causes a **nack/redeliver storm** under sustained load — the consumer delivers messages, the sink immediately fails them, they get redelivered, repeating endlessly. This wastes ~99% of CPU, dropping effective throughput from ~1200 msg/s to ~4 msg/s. ## Modifications Replace `LinkedList` + `synchronized` with `LinkedBlockingDeque`: - **`write()`**: `offer(record, 1, TimeUnit.SECONDS)` blocks the Pulsar IO thread when queue is full — this IS the back-pressure (stops consumption). At most 1 nack/second on timeout vs. thousands/second before. - **`flush()`**: `drainTo(swapList, batchSize)` — non-blocking atomic drain, no synchronized blocks. Automatically wakes blocked `offer()` calls by making space. - **Recursive → iterative**: `flush()` uses a `while` loop instead of recursive self-calls. - **`isFlushing` in `finally`**: prevents stuck flag after exceptions. - **`close()` drains queue**: fails remaining records for clean shutdown. Follows the pattern already used by Aerospike (`LinkedBlockingDeque`), HDFS, Kinesis, and DynamoDB (`LinkedBlockingQueue`) connectors in this repo. ## Verifying this change - Existing tests pass (queue semantics preserved) - `testBoundedQueueBackPressure` may need timeout adjustment (6th write now blocks 1s instead of failing instantly) ## Production results | Metric | Before (`record.fail`) | After (`LinkedBlockingDeque`) | |---|---|---| | Throughput under back-pressure | ~4 msg/s | ~1200 msg/s | | CPU waste from nack storm | ~99% | ~0% | | Nacks/sec when queue full | thousands | ≤1 | Fixes #16 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
