[ 
https://issues.apache.org/jira/browse/KAFKA-13286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17422292#comment-17422292
 ] 

Sagar Rao commented on KAFKA-13286:
-----------------------------------

hey [~guozhang], wanted to know is there anything that can be started with 
here? I can give it a shot if something needs to be looked into..

> Revisit Streams State Store and Serde Implementation
> ----------------------------------------------------
>
>                 Key: KAFKA-13286
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13286
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>
> Kafka Streams state store is built in hierarchical layers as metered -> 
> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it 
> leveraged on the builtin Serde libraries for serialize / deserialize. There 
> are several inefficiencies in the current design:
> * The API only supports serde using byte arrays. This means we generate a lot 
> of garbage and spend unnecessary time copying bytes, especially when working 
> with windowed state stores that rely on composite keys. In many places in the 
> code we have extract parts of the composite key to deserialize the either the 
> timestamp or the message key from the state store key (e.g. the methods in 
> WindowStoreUtils).
> * The serde operation could happen on multiple layers of the state store 
> hierarchies, which means we need to extra byte array copies as we move along 
> doing serdes. For example, we do serde in the metered layer, but then again 
> in cached layer with cache functions, and also in logged stores for generated 
> the key/value in bytes to send to Kafka.
> To improve on this, we can consider having support for serde into/from 
> ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
> around slices of the underlying Buffers to avoid the unnecessary copying. 
> 1) More specifically, e.g. the serialize interface could be refactored to:
> {code}
> ByteBuffer serialize(String topic, T data, ByteBuffer);
> {code}
> Where the serialized bytes would be appended to the ByteBuffer. When a series 
> of serialize functions are called along side the state store hierarchies, we 
> then just need to make sure that what's should be appended first to the 
> ByteBuffer would be serialized first. E.g. if the serialized bytes format of 
> a WindowSchema is <timestamp, boolean, key>
> Then we would need to call the serialize as in:
> {code}
> serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
> {code}
> 2) In addition, we can consider having a pool of ByteBuffers representing a 
> set of byte arrays that can be re-used. This can be captured as an 
> intelligent {{ByteBufferSupplier}}, which provides:
> {code}
> ByteBuffer ByteBufferSupplier#allocate(long size)
> {code}
> Its implementation can choose to either create new byte arrays, or re-use 
> existing ones in the pool; the gottcha though is that we may usually not know 
> the serialized byte length for raw keys (think: in practice the keys would be 
> in json/avro etc), and hence would not know how to pass in {{size}} for 
> serialization, and hence may need to be conservative, or trial and error etc.
> Of course callers then would be responsible for returning the used ByteBuffer 
> back to the Supplier via
> {code}
> ByteBufferSupplier#deallocate(ByteBuffer buffer)
> {code}
> Some quick notes here regarding concurrency and sharing of the byte-buffer 
> pools:
> * For pull query handling threads, if we do not do any deserialization then 
> we would not need to access the ByteBufferSuppliers, hence there's no 
> concurrent access.
> * For multiple streaming threads, my intention is to have each thread getting 
> its own isolated byte-buffer pools to avoid any concurrency.
> 3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also use 
> the allocated ByteBuffer via RocksDB'a direct API directly so that using them 
> for puts/gets would not go through JNI, hence is more efficient. The Supplier 
> then would need to be careful to deallocate these direct byte-buffers since 
> they would not be GC'ed by the JVM.
> There's a catch though with direct ByteBuffer that we'd need to be careful 
> about: though direct byte buffer is also managed by JVM (and hence GC) like 
> heap byte buffer, they are not managed in the same way as the latter [1][2] 
> and seems to be more conservative. It was suggested that sometimes users need 
> to manually deallocate them if GC did not work promptly. I think the most 
> effective way is that we try very best to re-use allocated direct byte-buffer 
> from native memory, that means we probably want to allocate conservatively 
> large size (if we do not know the serialized length), so that they can be 
> reused.
> [1] 
> https://www.fusion-reactor.com/blog/evangelism/understanding-java-buffer-pool-memory-space/#:~:text=A%20direct%20buffer%20is%20a,allocateDirect()%20factory%20method
> [2] 
> https://stackoverflow.com/questions/3496508/deallocating-direct-buffer-native-memory-in-java-for-jogl/26777380#26777380



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to