minwoo-jung opened a new issue #7100: URL: https://github.com/apache/incubator-pinot/issues/7100
Hello~ I am using pinot well. Thank you for making a great product. :) I have a question because it didn't work as I thought while using it. **Even if I recreate the kafka topic or modify the topic properties, I wonder how consuming can continue to do it.** I am storing data in Stream ingestion way. for some reason I need to delete the kafka topic and recreate it. The consuming segment seems to have stopped. That is, no more data is stored. I've tested it several times with the same scenario, but the consuming segment still stops and no data is saved. So, I tried several methods to solve this problem, and among various attempts, 1 When I disable the data table 2 delete and recreate the kafka topic 3 enable the data table sometimes the consuming segment recovers its operation. However, it does not always work normally. Also, I tried reload segment after comsuming segment stopped, but it still didn't work. In addition, I tried various methods, but consuming stopped as it is. My guess is that when the kafka topic is recreated, the data offset is changed and this is what happens. In my opinion, if the offset is well reset within the pinot consumer, even if the kafka topic is recreated, it is normal when the comsuming segment continues to accumulate data well. **Even if I recreate the kafka topic or modify the properties, I wonder how consuming can continue to do it.** I don't know the internal logic well, but I looked at the org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory, KafkaPartitionLevelConsumer, KafkaStreamLevelConsumer class codes, but couldn't find any problem. If the consuming segment stops, re-creating the table may be a way, but since the previously stored data is lost, I am looking for a way to keep the comsuming segment operating normally and not lose data without re-creating the table. Note that The docker image version currently used is as follows. ``` { "pinot-protobuf": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-kafka-2.0": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-avro": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-distribution": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-csv": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-s3": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-yammer": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-segment-uploader-default": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-batch-ingestion-standalone": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-confluent-avro": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-thrift": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-orc": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-batch-ingestion-spark": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-azure": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-gcs": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-batch-ingestion-hadoop": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-hdfs": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-adls": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-kinesis": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-json": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-minion-builtin-tasks": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-parquet": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb", "pinot-segment-writer-file-based": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb" } ``` table confg ``` { "REALTIME": { "tableName": "systemMetricLong_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "timeType": "MILLISECONDS", "schemaName": "systemMetricLong", "retentionTimeUnit": "DAYS", "retentionTimeValue": "2", "timeColumnName": "timestampInEpoch", "replicasPerPartition": "1" }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "tableIndexConfig": { "loadMode": "MMAP", "sortedColumn": [ "applicationName" ], "autoGeneratedInvertedIndex": false, "createInvertedIndexDuringSegmentGeneration": false, "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "system-metric-long", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.broker.list": XXXXXXX "realtime.segment.flush.threshold.raws": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.threshold.segment.size": "50M", "stream.kafka.consumer.prop.auto.offset.reset": "smallest" }, "invertedIndexColumns": [ "tags" ], "rangeIndexColumns": [ "timestampInEpoch" ], "aggregateMetrics": false, "nullHandlingEnabled": true, "enableDefaultStarTree": false, "enableDynamicStarTreeCreation": false }, "metadata": { "customConfigs": {} }, "isDimTable": false } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org