C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2056973237
Thanks Sagar, great catch! I suspected this would be a gnarly one to tackle
but it's turning out to be even harder than I thought.
I think there's still an issue with the current state of the PR. It looks
like we aren't blocking on the future returned by `setPrimaryThenSecondary`,
which means that we may spuriously return early from `get` in the future we're
returning from `ConnectorOffsetBackingStore::set` if the write to the primary
store hasn't completed yet. I believe this is missed by tests because the
producer writes we mock out all take place synchronously; maybe we can use the
`MockProducer` more idiomatically to simulate records being ack'd after calls
to `MockProducer::send` have returned?
I've sketched a new kind of `Future` implementation that seems to do the
trick, though I haven't tested it rigorously:
```java
private class ChainedOffsetWriteFuture implements Future<Void> {
private final OffsetBackingStore primaryStore;
private final OffsetBackingStore secondaryStore;
private final Map<ByteBuffer, ByteBuffer> completeOffsets;
private final Map<ByteBuffer, ByteBuffer> regularOffsets;
private final Callback<Void> callback;
private final AtomicReference<Throwable> writeError;
private final CountDownLatch completed;
public ChainedOffsetWriteFuture(
OffsetBackingStore primaryStore,
OffsetBackingStore secondaryStore,
Map<ByteBuffer, ByteBuffer> completeOffsets,
Map<ByteBuffer, ByteBuffer> regularOffsets,
Map<ByteBuffer, ByteBuffer> tombstoneOffsets,
Callback<Void> callback
) {
this.primaryStore = primaryStore;
this.secondaryStore = secondaryStore;
this.completeOffsets = completeOffsets;
this.regularOffsets = regularOffsets;
this.callback = callback;
this.writeError = new AtomicReference<>();
this.completed = new CountDownLatch(1);
secondaryStore.set(tombstoneOffsets, this::onFirstWrite);
}
private void onFirstWrite(Throwable error, Void ignored) {
if (error != null) {
log.trace("Skipping offsets write to primary store because
secondary tombstone write has failed", error);
try (LoggingContext context = loggingContext()) {
callback.onCompletion(error, ignored);
writeError.compareAndSet(null, error);
completed.countDown();
}
return;
}
setPrimaryThenSecondary(primaryStore, secondaryStore,
completeOffsets, regularOffsets, this::onSecondWrite);
}
private void onSecondWrite(Throwable error, Void ignored) {
callback.onCompletion(error, ignored);
writeError.compareAndSet(null, error);
completed.countDown();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return completed.getCount() == 0;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
completed.await();
if (writeError.get() != null) {
throw new ExecutionException(writeError.get());
}
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
if (!completed.await(timeout, unit)) {
throw new TimeoutException("Failed to complete offset write in
time");
}
if (writeError.get() != null) {
throw new ExecutionException(writeError.get());
}
return null;
}
}
```
(I've omitted an implementation of `cancel` and `isCancelled` for now since
I'm not sure it's really necessary, but LMK if I've missed a case where this
would make a difference.)
The new class can be used at the end of `ConnectorOffsetBackingStore::set`
like this:
```java
if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
return new ChainedOffsetWriteFuture(
primaryStore,
secondaryStore,
values,
regularOffsets,
tombstoneOffsets,
callback
);
} else {
return setPrimaryThenSecondary(primaryStore, secondaryStore, values,
regularOffsets, callback);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]