vvivekiyer commented on PR #9544:
URL: https://github.com/apache/pinot/pull/9544#issuecomment-1272076485

   > If Linkedin is using `MessageBatch<IndexedRecord>`, does that mean 
Linkedin is also using its own implementation of kafka consumer and not the OSS 
plugin?
   Yes. LinkedIn has a custom kafka consumer implementation. 
   
   >  I am curious why Linkedin does this? isn't is expensive to deserialize 
every record until when the deserialized payload is actually needed? The 
consumer's contract should not involve deserializing the payload. Can you 
please explain why this is useful?
   LinkedIn's kafka consumer directly fetches the deserialized payload . AFAIK, 
Linkedin Kafka has a schema registry where the payload's schema is registered. 
So they provide (optimized) deser and do not allow clients to have their own 
deserialization. @sajjad to add more details, if any.
   
   > I also don't understand the point about "unnecessary type conversion" 
here. Can you please elaborate?
   I meant additional serialization and deserialization. Edited the description.
   
   > Using generics forces the segment manager implementation to deal with raw 
usage of parameterized classes (due to type erasures) and make the code hard to 
read and maintain. 
   As per my understanding of the code, SegmentManager only deals with 
GenericRow once the deserialization is done.  Depending on various 
implementations, `MessageBatch<T>` and `StreamMessageDecoder ` interfaces take 
care of abstracting the messages and decoding. Each implementation can deal 
with the formats they wish. So can you please clarify what you mean by making 
this code harder to read and maintain?
   
   > Besides, StreamMessage is meant to abstract the entire incoming payload. 
Using only the type of the record's "value" in this generic class seems 
prohibitive.
   I agree with this part. We can discussion and arrive at the best way to do 
this. But IMO, forcing `StreamMessage` to have key and value of type `bytes[]` 
seems counter-intuitive based on our code flow. I've tried to explain the issue 
below.
   
   This is my understanding of our OSS code prior to #9224:
   1. Get a batch of messages:
   `MessageBatch<T> messageBatch = PartitionLevelConsumer.fetchMessage();`
   2. `for (message in MessageBatch<T>)`, decode the message
   `GenericRow row = StreamMessageDecoder<T>.decode(message)`
   
   Note that MessageBatch is a generic interface because users of Pinot are 
free to use their custom kafka (or other) client implementations that could 
return messages in any format.
   
   After #9224, the code looks as follows:
   1. Get a batch of messages:
         `MessageBatch<T> messageBatch = PartitionLevelConsumer.fetchMessage();`
   2. Get each message and store it in a StreamMessage wrapper. Note that 
StreamMessage stores values only as byte[].
          `byte[] StreamMessage.value = messageBatch.getMessageValue(index)`
   3. Decode StreamMessage
         `GenericRow row = 
StreamMessageDecoder<T>.decode(StreamMessage.value())`
   
   Looking at the above, it looks like we've introduced a new step (2), where 
we are forcing messages of generic type (MessageBatch) to be serialized to 
`byte[]` and go back to working on generic types again in 
`StreamMessageDecoder`.
   As you mentioned, the new code assumes that when messages are read from the 
stream consumer, they will always be in serialized format.  But the existing 
code for `MessageBatch<T>` and `StreamMessageDecoder<T>` doesn't honor the 
assumption.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to