pramsri opened a new issue, #13880:
URL: https://github.com/apache/pinot/issues/13880

   I am working on a project for CDC from a MySQL database that stores each 
update in Pinot once it has been streamed. MySQL, Pulsar and Pinot have been 
run on Docker using the debezium mysql example 0.8 image, and docker compose 
files for both Pulsar 3.3.0 (in standalone mode) and Pinot 1.1.0 (with 
containers for Zookeeper, controller, broker and server). A debezium-mysql 
connector has been configured and mounted in Pulsar, and I am able to get 
messages into a topic every time a table in the mysql database is updated. I 
have also enabled the Pinot-Pulsar plugin in the Pinot compose file.
   
   However, when I create a realtime table in Pinot using the configs mentioned 
in the 1.1.0 documentation and details like topic name, no segment is created 
and the data is not ingested. Since the json schema of the original topic is 
complex, I tried to set up ingestion for another topic with simple schema for 
testing. I created a Pulsar topic and produced messages using the message 
format with only primitive fields (int, float, string and a long for the 
timestamp). I created a temporary consumer to check the message format, and the 
topic was storing the messages correctly with the defined json schema. Even 
with a simpler schema, the issue persisted. I used JsonToPinotSchema to make 
sure there were no inconsistencies, and AddSchema and AddTable on the Pinot 
CLI. The table was visible on the controller port, but no data was ingested, 
neither was a segment created. The logs showed the error: 
[pinotHelixResourceManager] 
org.apache.pinot.pluging.inputformat.json.JsonMessageDecoder canno
 t be cast to class org.apache.pinot.spi.streamconsumerfactory. 
   
   This is a sample message sent by the producer: 
   
![image](https://github.com/user-attachments/assets/c5ed51e7-43d5-4b82-8faf-3421614409ab)
   
   Here is the schema:
   
   {
     "schemaName": "topic1",
     "enableColumnBasedNullHandling": false,
     "dimensionFieldSpecs": [
       {
         "name": "id",
         "dataType": "INT",
         "notNull": false
       },
       {
         "name": "name",
         "dataType": "STRING",
         "notNull": false
       },
       {
         "name": "price",
         "dataType": "FLOAT",
         "notNull": false
       }
     ],
     "dateTimeFieldSpecs": [
       {
         "name": "timestamp",
         "dataType": "LONG",
         "notNull": false,
         "format": "1:MILLISECONDS:EPOCH",
         "granularity": "1:MILLISECONDS"
       }
     ]
   }
   
   And this is the table config:
   
   {
     "REALTIME": {
       "tableName": "topic1_REALTIME",
       "tableType": "REALTIME",
       "segmentsConfig": {
         "schemaName": "topic1",
         "replication": "1",
         "replicasPerPartition": "1",
         "timeColumnName": "timestamp",
         "minimizeDataMovement": false
       },
       "tenants": {
         "broker": "DefaultTenant",
         "server": "DefaultTenant",
         "tagOverrideConfig": {}
       },
       "tableIndexConfig": {
         "invertedIndexColumns": [],
         "noDictionaryColumns": [],
         "columnMajorSegmentBuilderEnabled": false,
         "optimizeDictionary": false,
         "optimizeDictionaryForMetrics": false,
         "noDictionarySizeRatioThreshold": 0.85,
         "rangeIndexColumns": [],
         "rangeIndexVersion": 2,
         "autoGeneratedInvertedIndex": false,
         "createInvertedIndexDuringSegmentGeneration": false,
         "sortedColumn": [],
         "bloomFilterColumns": [],
         "loadMode": "MMAP",
         "streamConfigs": {
           "streamType": "pulsar",
           "stream.pulsar.topic.name": "persistent://public/default/topic1",
           "stream.pulsar.bootstrap.servers": "pulsar://localhost:6650",
           "stream.pulsar.consumer.type": "lowlevel",
           "stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
           "stream.pulsar.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
           "stream.pulsar.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
           "realtime.segment.flush.threshold.rows": "1000000",
           "realtime.segment.flush.threshold.time": "6h",
           "stream.pulsar.fetch.timeout.millis": "30000"
         },
         "onHeapDictionaryColumns": [],
         "varLengthDictionaryColumns": [],
         "enableDefaultStarTree": false,
         "enableDynamicStarTreeCreation": false,
         "aggregateMetrics": false,
         "nullHandlingEnabled": false
       },
       "metadata": {},
       "quota": {},
       "routing": {},
       "query": {},
       "ingestionConfig": {
         "segmentTimeValueCheck": true,
         "continueOnError": false,
         "rowTimeValueCheck": false
       },
       "isDimTable": false
     }
   }
   
   When I created the schema and table directly on the controller port UI, they 
were both created without errors in the log but no ingestion is taking place. 
Another error: [DelayedAutoRebalancer] 
[HelixController-pipeline-default-PinotCluster-(b6629b4c_DEFAULT)] No instances 
or active instances available for resource leadControllerResource, 
allInstances: [Controller_172.18.0.3_9000], liveInstances: [], activeInstances: 
[] is in the logs even with no table created. 
   
   I can't understand why messages are not being ingested into Pinot, and if 
it's an issue with the schema config, or with the stream configs of the table.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to