[
https://issues.apache.org/jira/browse/KAFKA-13286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17422292#comment-17422292
]
Sagar Rao commented on KAFKA-13286:
-----------------------------------
hey [~guozhang], wanted to know is there anything that can be started with
here? I can give it a shot if something needs to be looked into..
> Revisit Streams State Store and Serde Implementation
> ----------------------------------------------------
>
> Key: KAFKA-13286
> URL: https://issues.apache.org/jira/browse/KAFKA-13286
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Guozhang Wang
> Priority: Major
>
> Kafka Streams state store is built in hierarchical layers as metered ->
> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it
> leveraged on the builtin Serde libraries for serialize / deserialize. There
> are several inefficiencies in the current design:
> * The API only supports serde using byte arrays. This means we generate a lot
> of garbage and spend unnecessary time copying bytes, especially when working
> with windowed state stores that rely on composite keys. In many places in the
> code we have extract parts of the composite key to deserialize the either the
> timestamp or the message key from the state store key (e.g. the methods in
> WindowStoreUtils).
> * The serde operation could happen on multiple layers of the state store
> hierarchies, which means we need to extra byte array copies as we move along
> doing serdes. For example, we do serde in the metered layer, but then again
> in cached layer with cache functions, and also in logged stores for generated
> the key/value in bytes to send to Kafka.
> To improve on this, we can consider having support for serde into/from
> ByteBuffers would allow us to reuse the underlying bytearrays and just pass
> around slices of the underlying Buffers to avoid the unnecessary copying.
> 1) More specifically, e.g. the serialize interface could be refactored to:
> {code}
> ByteBuffer serialize(String topic, T data, ByteBuffer);
> {code}
> Where the serialized bytes would be appended to the ByteBuffer. When a series
> of serialize functions are called along side the state store hierarchies, we
> then just need to make sure that what's should be appended first to the
> ByteBuffer would be serialized first. E.g. if the serialized bytes format of
> a WindowSchema is <timestamp, boolean, key>
> Then we would need to call the serialize as in:
> {code}
> serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer)));
> {code}
> 2) In addition, we can consider having a pool of ByteBuffers representing a
> set of byte arrays that can be re-used. This can be captured as an
> intelligent {{ByteBufferSupplier}}, which provides:
> {code}
> ByteBuffer ByteBufferSupplier#allocate(long size)
> {code}
> Its implementation can choose to either create new byte arrays, or re-use
> existing ones in the pool; the gottcha though is that we may usually not know
> the serialized byte length for raw keys (think: in practice the keys would be
> in json/avro etc), and hence would not know how to pass in {{size}} for
> serialization, and hence may need to be conservative, or trial and error etc.
> Of course callers then would be responsible for returning the used ByteBuffer
> back to the Supplier via
> {code}
> ByteBufferSupplier#deallocate(ByteBuffer buffer)
> {code}
> Some quick notes here regarding concurrency and sharing of the byte-buffer
> pools:
> * For pull query handling threads, if we do not do any deserialization then
> we would not need to access the ByteBufferSuppliers, hence there's no
> concurrent access.
> * For multiple streaming threads, my intention is to have each thread getting
> its own isolated byte-buffer pools to avoid any concurrency.
> 3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also use
> the allocated ByteBuffer via RocksDB'a direct API directly so that using them
> for puts/gets would not go through JNI, hence is more efficient. The Supplier
> then would need to be careful to deallocate these direct byte-buffers since
> they would not be GC'ed by the JVM.
> There's a catch though with direct ByteBuffer that we'd need to be careful
> about: though direct byte buffer is also managed by JVM (and hence GC) like
> heap byte buffer, they are not managed in the same way as the latter [1][2]
> and seems to be more conservative. It was suggested that sometimes users need
> to manually deallocate them if GC did not work promptly. I think the most
> effective way is that we try very best to re-use allocated direct byte-buffer
> from native memory, that means we probably want to allocate conservatively
> large size (if we do not know the serialized length), so that they can be
> reused.
> [1]
> https://www.fusion-reactor.com/blog/evangelism/understanding-java-buffer-pool-memory-space/#:~:text=A%20direct%20buffer%20is%20a,allocateDirect()%20factory%20method
> [2]
> https://stackoverflow.com/questions/3496508/deallocating-direct-buffer-native-memory-in-java-for-jogl/26777380#26777380
--
This message was sent by Atlassian Jira
(v8.3.4#803005)