merlimat opened a new pull request, #25587:
URL: https://github.com/apache/pulsar/pull/25587
## Summary
Phase 2 of the V5 end-to-end test suite: per-consumer-type basic
behaviors plus the async API surface. Builds on the V5 smoke-test infra
landed in #25586. Single-segment scalable topics throughout — multi-
segment / split / merge / multi-broker scenarios live in dedicated
follow-up suites.
### New test classes
- **\`V5QueueConsumerBasicTest\`** (5 tests) — produce-and-ack-many,
negativeAck redelivery, idle receive timeout, keyed-message metadata
roundtrip, accessors.
- **\`V5StreamConsumerBasicTest\`** (4 tests) — ordered cumulative-ack
receive, \`receiveMulti\` batch shape, idle receive timeout,
accessors.
- **\`V5CheckpointConsumerBasicTest\`** (7 tests) — \`Checkpoint.earliest\`
and \`Checkpoint.latest\` sentinels, checkpoint + close + reopen-with-
saved-position, \`toByteArray\` / \`fromByteArray\` roundtrip,
in-place \`seek(checkpoint)\` rewind, idle timeout, topic accessor.
- **\`V5AsyncApisTest\`** (6 tests) — async producer \`send\` + \`flush\`,
async receive on Queue / Stream / Checkpoint consumers, async ack /
cumulative ack / seek / checkpoint, async close.
22 new tests, ~6 s of test wall-clock on top of the shared cluster
startup that's amortized across the class.
### Real fixes surfaced by writing the tests
Both are squashable: the test commits depend on the fix commits.
- **fix: propagate user consumer config to per-segment consumers**
(\`025da64b4f\`) — \`ScalableQueueConsumer\` and \`ScalableStreamConsumer\`
were creating a fresh \`ConsumerConfigurationData\` per segment that
only carried over \`subscriptionName\` and \`consumerName\`. Every
other builder knob — \`receiverQueueSize\`, \`ackTimeout\`,
\`acknowledgmentGroupTime\`, \`maxAcknowledgmentGroupSize\`,
\`negativeAckRedeliveryBackoff\`, \`ackTimeoutRedeliveryBackoff\`,
\`deadLetterPolicy\`, \`readCompacted\`,
\`replicateSubscriptionState\`, encryption — was silently dropped.
Surfaced by the queue test's nack-backoff scenario. Fix: clone the
user-facing config and override only what must be per-segment
(\`topicNames\`, subscription type, consumer-name suffix). The same
structural issue still exists in \`ScalableTopicProducer\` and
\`ScalableCheckpointConsumer\` — left for follow-ups since they need
a small builder refactor to retain the user-facing config.
- **fix: advance CheckpointConsumer position in \`receive()\`, not on
wire read** (\`0a2f834a5e\`) — \`checkpoint()\` was capturing the
read-loop's pre-fetched wire position, but the read loop pre-fetches
into an in-memory queue. So a checkpoint taken right after \`receive()\`
returned message N could already point past N+1 (or further) if the
loop had run ahead, and \`seek(savedCheckpoint)\` would skip messages
the application had not yet processed. Surfaced by
\`testSeekRewindsToEarlierCheckpoint\`. Fix: advance the per-segment
positions inside \`receive()\` / \`receive(timeout)\` /
\`receiveMulti(...)\` — i.e., the moment a message crosses from the
wire buffer into application code.
### Things deliberately left for later phases
- Multi-segment scalable behavior (split/merge while consuming, hash-
range routing under load, cumulative ack with position vector across
segments).
- Multi-consumer streaming-consumer rebalance / session expiry / graceful
reconnect — exercises the \`SubscriptionCoordinator\` end-to-end.
- Multi-broker scenarios — controller leader election, redirect, segment
cross-broker ownership, controller failover. Needs a multi-broker base
class; will land in its own PR.
- Producer feature matrix (batching / compression / chunking / access
modes / dedup / event time / delivery timing).
- Schema matrix (json / avro / int* / float* / bool / autoProduceBytes).
## Test plan
- [x] \`./gradlew :pulsar-broker:test --tests
"org.apache.pulsar.client.api.v5.*"\` — 23/23 pass
(smoke + queue + stream + checkpoint + async).
- [x] No regressions in \`pulsar-client-v5\` unit tests (48 tests).
- [x] No regressions in broker scalable tests
(\`ScalableTopicControllerTest\`, \`ScalableTopicServiceTest\`,
etc.).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]