rahulgulati89 opened a new issue, #13383:
URL: https://github.com/apache/pinot/issues/13383

   I am trying to ingest Kafka messages serialized with Protobuf format into 
Pinot tables using realtime connector but i am getting errors while querying 
table. Here are the steps followed.
   
   Proto file ->
   ```
   syntax = "proto3";
   
   message snack {
   string name = 1;
   string timestamp = 2;
   }
   ```
   
   Descriptor  Generation ->
   `protoc --include_imports --descriptor_set_out=output.desc schema.proto`
   
   After generating this proto descriptor file, I have copied the file to 
`pinot-controller`, `pinot-server` and `pinot-broker` container under `tmp`. 
   
   Pinot Connector configuration ->
   ```
   
   {
     "tableName": "transcriptprotobufdescnewprotofile",
     "tableType": "REALTIME",
     "tenants": {},
     "segmentsConfig": {
       "replicasPerPartition": "1",
       "schemaName": "transcriptprotobufdescnewprotofile",
       "timeColumnName": "timestamp",
       "timeType": "MILLISECONDS"
     },
     "tableIndexConfig": {
       "loadMode": "MMAP",
       "streamConfigs": {
         "streamType": "kafka",
         "stream.kafka.consumer.type": "lowlevel",
         "stream.kafka.topic.name": "transcriptprotobufdescnewprotofile",
         "streamType": "kafka",
         "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder",
         "stream.kafka.decoder.prop.descriptorFile": "file:///tmp/output.desc",
         "stream.kafka.decoder.prop.protoClassName": "Snack",
         "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
         "stream.kafka.broker.list": "kafka:9092",
         "realtime.segment.flush.threshold.rows": "0",
         "realtime.segment.flush.threshold.time": "24h",
         "realtime.segment.flush.threshold.segment.size": "50M",
         "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
       }
     },
     "metadata": {
       "customConfigs": {}
     }
   }
   
   ```
   Pinot Schema ->
   
   ```
   {
     "schemaName": "transcriptprotobufdescnewprotofile",
     "dimensionFieldSpecs": [
       {
         "name": "name",
         "dataType": "STRING"
       }
     ],
     "dateTimeFieldSpecs": [{
       "name": "timestamp",
       "dataType": "STRING",
       "format" : "1:MILLISECONDS:EPOCH",
       "granularity": "1:MILLISECONDS"
     }]
   }
   ```
   Messages are being produced to Kafka topic with following command
   
   `kafka-protobuf-console-producer --bootstrap-server kafka:9092 --topic 
transcriptprotobufdescnewprotofile --property 
schema.registry.url=http://localhost:8088 --property value.schem'syntax = 
"proto3"; message snack { string name = 1; string timestamp = 2;}'`
   
   Actual Messages ->
   ```
   {"name":"test","timestamp":"1234"}
   {"name":"test","timestamp":"123334"}
   ```
   
   Now In the query control dashboard, I am seeing the following error for this 
newly created table
   ```
   
   Query failed with exceptions. Please toggle the switch to view details.
   
   Error Code: 305
   
   null:
   1 segments unavailable: 
[transcriptprotobufdescnewprotofile__0__0__20240613T0645Z]
   ```
   
   Upon checking the `pinot-controller` container logs, i see the following 
errors.
   `
   2024-06-13 12:15:19 2024/06/13 06:45:19.408 ERROR [MessageGenerationPhase] 
[HelixController-pipeline-default-PinotCluster-(5b9175c5_DEFAULT)] Event 
5b9175c5_DEFAULT : Unable to find a next state for resource: 
transcriptprotobufdescnewprotofile_REALTIME partition: 
transcriptprotobufdescnewprotofile__0__0__20240613T0645Z from 
stateModelDefinitionclass org.apache.helix.model.StateModelDefinition 
from:ERROR to:CONSUMING`
   
   The Pinot Swagger API shows the error/unhealthy status of this new table.
   ```
   
   errorMessage": "Caught exception while adding CONSUMING segment",
                 "stackTrace": 
"org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed 
after 5 attempts\n\tat 
org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:65)\n\tat
 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.<init>(RealtimeSegmentDataManager.java:1546)\n\tat
 
org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.doAddConsumingSegment(RealtimeTableDataManager.java:494)\n\tat
 
org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addConsumingSegment(RealtimeTableDataManager.java:439)\n\tat
 
org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addConsumingSegment(HelixInstanceDataManager.java:282)\n\tat
 
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:81)\n\tat
 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)\n\
 tat 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
 java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat 
org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)\n\tat
 
org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)\n\tat
 org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97)\n\tat 
org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49)\n\tat 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
 java.base/java.lang.Thread.run(Thread.java:829)\n"
               }
             }
           }
         }
       ],
       "serverDebugInfos": [],
       "brokerDebugInfos": [],
       "tableSize": {
         "reportedSize": "-1 bytes",
         "estimatedSize": "-1 bytes"
       },
       "ingestionStatus": {
         "ingestionState": "UNHEALTHY",
         "errorMessage": "Did not get any response from servers for segment: 
transcriptprotobufdescnewprotofile__0__0__20240613T0645Z"
       }
   ```
   
   What am i missing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to