vvivekiyer commented on PR #9544: URL: https://github.com/apache/pinot/pull/9544#issuecomment-1272076485
> If Linkedin is using `MessageBatch<IndexedRecord>`, does that mean Linkedin is also using its own implementation of kafka consumer and not the OSS plugin? Yes. LinkedIn has a custom kafka consumer implementation. > I am curious why Linkedin does this? isn't is expensive to deserialize every record until when the deserialized payload is actually needed? The consumer's contract should not involve deserializing the payload. Can you please explain why this is useful? LinkedIn's kafka consumer directly fetches the deserialized payload . AFAIK, Linkedin Kafka has a schema registry where the payload's schema is registered. So they provide (optimized) deser and do not allow clients to have their own deserialization. @sajjad to add more details, if any. > I also don't understand the point about "unnecessary type conversion" here. Can you please elaborate? I meant additional serialization and deserialization. Edited the description. > Using generics forces the segment manager implementation to deal with raw usage of parameterized classes (due to type erasures) and make the code hard to read and maintain. As per my understanding of the code, SegmentManager only deals with GenericRow once the deserialization is done. Depending on various implementations, `MessageBatch<T>` and `StreamMessageDecoder ` interfaces take care of abstracting the messages and decoding. Each implementation can deal with the formats they wish. So can you please clarify what you mean by making this code harder to read and maintain? > Besides, StreamMessage is meant to abstract the entire incoming payload. Using only the type of the record's "value" in this generic class seems prohibitive. I agree with this part. We can discussion and arrive at the best way to do this. But IMO, forcing `StreamMessage` to have key and value of type `bytes[]` seems counter-intuitive based on our code flow. I've tried to explain the issue below. This is my understanding of our OSS code prior to #9224: 1. Get a batch of messages: `MessageBatch<T> messageBatch = PartitionLevelConsumer.fetchMessage();` 2. `for (message in MessageBatch<T>)`, decode the message `GenericRow row = StreamMessageDecoder<T>.decode(message)` Note that MessageBatch is a generic interface because users of Pinot are free to use their custom kafka (or other) client implementations that could return messages in any format. After #9224, the code looks as follows: 1. Get a batch of messages: `MessageBatch<T> messageBatch = PartitionLevelConsumer.fetchMessage();` 2. Get each message and store it in a StreamMessage wrapper. Note that StreamMessage stores values only as byte[]. `byte[] StreamMessage.value = messageBatch.getMessageValue(index)` 3. Decode StreamMessage `GenericRow row = StreamMessageDecoder<T>.decode(StreamMessage.value())` Looking at the above, it looks like we've introduced a new step (2), where we are forcing messages of generic type (MessageBatch) to be serialized to `byte[]` and go back to working on generic types again in `StreamMessageDecoder`. As you mentioned, the new code assumes that when messages are read from the stream consumer, they will always be in serialized format. But the existing code for `MessageBatch<T>` and `StreamMessageDecoder<T>` doesn't honor the assumption. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org