minwoo-jung opened a new issue #7100:
URL: https://github.com/apache/incubator-pinot/issues/7100


   
   
   Hello~
   I am using pinot well. Thank you for making a great product. :)
   I have a question because it didn't work as I thought while using it.
   
   **Even if I recreate the kafka topic or modify the topic properties, I 
wonder how consuming can continue to do it.**
   
   I am storing data in Stream ingestion way.
   for some reason I need to delete the kafka topic and recreate it. The 
consuming segment seems to have stopped.
   That is, no more data is stored.
   I've tested it several times with the same scenario, but the consuming 
segment still stops and no data is saved.
   
   
   So, I tried several methods to solve this problem, and among various 
attempts,
   1 When I disable the data table
   2 delete and recreate the kafka topic
   3 enable the data table
   sometimes the consuming segment recovers its operation.
   However, it does not always work normally.
   
   Also, I tried reload segment after comsuming segment stopped, but it still 
didn't work.
   In addition, I tried various methods, but consuming stopped as it is.
   
   
   My guess is that when the kafka topic is recreated, the data offset is 
changed and this is what happens.
   In my opinion, if the offset is well reset within the pinot consumer, even 
if the kafka topic is recreated, it is normal when the comsuming segment 
continues to accumulate data well.
   **Even if I recreate the kafka topic or modify the properties, I wonder how 
consuming can continue to do it.**
   I don't know the internal logic well, but I looked at the 
org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory, 
KafkaPartitionLevelConsumer, KafkaStreamLevelConsumer class codes, but couldn't 
find any problem.
   
   
   If the consuming segment stops, re-creating the table may be a way,
   but since the previously stored data is lost, I am looking for a way to keep 
the comsuming segment operating normally and not lose data without re-creating 
the table.
   
   Note that
   The docker image version currently used is as follows.
   ```
   {
     "pinot-protobuf": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-kafka-2.0": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-avro": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-distribution": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-csv": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-s3": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-yammer": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-segment-uploader-default": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-batch-ingestion-standalone": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-confluent-avro": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-thrift": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-orc": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-batch-ingestion-spark": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-azure": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-gcs": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-batch-ingestion-hadoop": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-hdfs": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-adls": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-kinesis": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-json": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-minion-builtin-tasks": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-parquet": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
     "pinot-segment-writer-file-based": 
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb"
   }
   ```
   
   table confg
   ```
   {
     "REALTIME": {
       "tableName": "systemMetricLong_REALTIME",
       "tableType": "REALTIME",
       "segmentsConfig": {
         "timeType": "MILLISECONDS",
         "schemaName": "systemMetricLong",
         "retentionTimeUnit": "DAYS",
         "retentionTimeValue": "2",
         "timeColumnName": "timestampInEpoch",
         "replicasPerPartition": "1"
       },
       "tenants": {
         "broker": "DefaultTenant",
         "server": "DefaultTenant"
       },
       "tableIndexConfig": {
         "loadMode": "MMAP",
         "sortedColumn": [
           "applicationName"
         ],
         "autoGeneratedInvertedIndex": false,
         "createInvertedIndexDuringSegmentGeneration": false,
         "streamConfigs": {
           "streamType": "kafka",
           "stream.kafka.consumer.type": "lowlevel",
           "stream.kafka.topic.name": "system-metric-long",
           "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
           "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
           "stream.kafka.broker.list": XXXXXXX
           "realtime.segment.flush.threshold.raws": "0",
           "realtime.segment.flush.threshold.time": "24h",
           "realtime.segment.flush.threshold.segment.size": "50M",
           "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
         },
         "invertedIndexColumns": [
           "tags"
         ],
         "rangeIndexColumns": [
           "timestampInEpoch"
         ],
         "aggregateMetrics": false,
         "nullHandlingEnabled": true,
         "enableDefaultStarTree": false,
         "enableDynamicStarTreeCreation": false
       },
       "metadata": {
         "customConfigs": {}
       },
       "isDimTable": false
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to