[
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825529#comment-17825529
]
Jinyong Choi commented on KAFKA-15302:
--------------------------------------
[https://github.com/apache/kafka/pull/15495]
We have updated the document for users who use it similarly.
> Stale value returned when using store.all() with key deletion in punctuation
> function.
> --------------------------------------------------------------------------------------
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.5.1
> Reporter: Jinyong Choi
> Priority: Major
>
> When using the store.all() function within the Punctuation function of
> this.context.schedule, the previous value is returned. In other words, even
> though the value has been stored from 1 to 2, it doesn't return 2; instead,
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while
> this doesn't occur 100% of the time, by adding logs, it's evident that during
> the while loop after all() is called, the cache is flushed. As a result, the
> named cache holds a null value, causing the return of a value from RocksDB.
> This is observed as the value after the .get() call is different from the
> expected value. This is possibly due to the consistent read functionality of
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be
> any errors.
>
> * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>
> {code:java}
> private void forwardAll(final long timestamp) {
> //
> System.err.println("forwardAll Start"); KeyValueIterator<String,
> Integer> kvList = this.kvStore.all();
> while (kvList.hasNext()) {
> KeyValue<String, Integer> entry = kvList.next();
> final Record<String, Integer> msg = new Record<>(entry.key,
> entry.value, context.currentSystemTimeMs());
> final Integer storeValue = this.kvStore.get(entry.key); if
> (entry.value != storeValue) {
> System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key:
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue +
> " but KeyValueIterator value: " + entry.value);
> throw new RuntimeException("Broken!");
> } this.context.forward(msg);
> }
> kvList.close();
> }
> {code}
> * log file (add log in stream source)
>
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator<Bytes, byte[]>
> all()
> 01:05:00.588
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all
> 01:05:00.590
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.ThreadCache -- stream-thread
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys()
> size():325771
> 01:05:00.637
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator()
> TreeSet size():325771
> ...
> 01:05:07.052
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty()
> eldest.size():103
> 01:05:07.052
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on
> flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402
> 01:05:07.053
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush
> dirtyKeys.size():3459 entries:3460 <= NamedCache.flush()
> ...
> ThreadCache set nextEntry is null key:636398 <=
> MemoryLRUCacheBytesIterator.internalNext()
> ...
> 01:06:31.382
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
> WARN o.a.k.s.s.i.AbstractMergedSortedCacheStoreIterator --
> -AbstractMergedSortedCacheStoreIterator- -> store nextCacheKey: [null],
> nextStoreKey: [636398] nextStoreValue: [1]
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but
> KeyValueIterator value: 1 {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)