rahulgulati89 opened a new issue, #13383: URL: https://github.com/apache/pinot/issues/13383
I am trying to ingest Kafka messages serialized with Protobuf format into Pinot tables using realtime connector but i am getting errors while querying table. Here are the steps followed. Proto file -> ``` syntax = "proto3"; message snack { string name = 1; string timestamp = 2; } ``` Descriptor Generation -> `protoc --include_imports --descriptor_set_out=output.desc schema.proto` After generating this proto descriptor file, I have copied the file to `pinot-controller`, `pinot-server` and `pinot-broker` container under `tmp`. Pinot Connector configuration -> ``` { "tableName": "transcriptprotobufdescnewprotofile", "tableType": "REALTIME", "tenants": {}, "segmentsConfig": { "replicasPerPartition": "1", "schemaName": "transcriptprotobufdescnewprotofile", "timeColumnName": "timestamp", "timeType": "MILLISECONDS" }, "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "transcriptprotobufdescnewprotofile", "streamType": "kafka", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder", "stream.kafka.decoder.prop.descriptorFile": "file:///tmp/output.desc", "stream.kafka.decoder.prop.protoClassName": "Snack", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.broker.list": "kafka:9092", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.threshold.segment.size": "50M", "stream.kafka.consumer.prop.auto.offset.reset": "smallest" } }, "metadata": { "customConfigs": {} } } ``` Pinot Schema -> ``` { "schemaName": "transcriptprotobufdescnewprotofile", "dimensionFieldSpecs": [ { "name": "name", "dataType": "STRING" } ], "dateTimeFieldSpecs": [{ "name": "timestamp", "dataType": "STRING", "format" : "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }] } ``` Messages are being produced to Kafka topic with following command `kafka-protobuf-console-producer --bootstrap-server kafka:9092 --topic transcriptprotobufdescnewprotofile --property schema.registry.url=http://localhost:8088 --property value.schem'syntax = "proto3"; message snack { string name = 1; string timestamp = 2;}'` Actual Messages -> ``` {"name":"test","timestamp":"1234"} {"name":"test","timestamp":"123334"} ``` Now In the query control dashboard, I am seeing the following error for this newly created table ``` Query failed with exceptions. Please toggle the switch to view details. Error Code: 305 null: 1 segments unavailable: [transcriptprotobufdescnewprotofile__0__0__20240613T0645Z] ``` Upon checking the `pinot-controller` container logs, i see the following errors. ` 2024-06-13 12:15:19 2024/06/13 06:45:19.408 ERROR [MessageGenerationPhase] [HelixController-pipeline-default-PinotCluster-(5b9175c5_DEFAULT)] Event 5b9175c5_DEFAULT : Unable to find a next state for resource: transcriptprotobufdescnewprotofile_REALTIME partition: transcriptprotobufdescnewprotofile__0__0__20240613T0645Z from stateModelDefinitionclass org.apache.helix.model.StateModelDefinition from:ERROR to:CONSUMING` The Pinot Swagger API shows the error/unhealthy status of this new table. ``` errorMessage": "Caught exception while adding CONSUMING segment", "stackTrace": "org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 5 attempts\n\tat org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:65)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.<init>(RealtimeSegmentDataManager.java:1546)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.doAddConsumingSegment(RealtimeTableDataManager.java:494)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addConsumingSegment(RealtimeTableDataManager.java:439)\n\tat org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addConsumingSegment(HelixInstanceDataManager.java:282)\n\tat org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:81)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\ tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)\n\tat org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)\n\tat org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97)\n\tat org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n" } } } } ], "serverDebugInfos": [], "brokerDebugInfos": [], "tableSize": { "reportedSize": "-1 bytes", "estimatedSize": "-1 bytes" }, "ingestionStatus": { "ingestionState": "UNHEALTHY", "errorMessage": "Did not get any response from servers for segment: transcriptprotobufdescnewprotofile__0__0__20240613T0645Z" } ``` What am i missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org