J-HowHuang opened a new issue, #15378: URL: https://github.com/apache/pinot/issues/15378
# How to reproduce ```bash aws kinesis create-stream --stream-name transcript-stream --shard-count 2 aws kinesis list-shards --stream-name transcript-stream ``` output: ```json { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "170141183460469231731687303715884105727" }, "SequenceNumberRange": { "StartingSequenceNumber": "49661727849342039258072939847765243280241975881076047874" } }, { "ShardId": "shardId-000000000001", "HashKeyRange": { "StartingHashKey": "170141183460469231731687303715884105728", "EndingHashKey": "340282366920938463463374607431768211455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49661727849364340003271470470906778998514624242582028306" } } ] } ``` Create a table with schema (using `pinot-admin.sh AddTable` here): ```json { "REALTIME": { "tableName": "transcript_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "schemaName": "transcript", "replicasPerPartition": "1", "timeType": "MILLISECONDS", "minimizeDataMovement": false, "timeColumnName": "timestamp" }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "tableIndexConfig": { "streamConfigs": { "streamType": "kinesis", "stream.kinesis.topic.name": "transcript-stream", "region": "us-east-1", "shardIteratorType": "AFTER_SEQUENCE_NUMBER", "stream.kinesis.consumer.type": "lowlevel", "stream.kinesis.fetch.timeout.millis": "120000", "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory", "realtime.segment.flush.threshold.rows": "10", "realtime.segment.flush.threshold.time": "6h" }, "aggregateMetrics": false, "enableDefaultStarTree": false, "nullHandlingEnabled": false, "rangeIndexVersion": 2, "optimizeDictionary": false, "loadMode": "MMAP", "optimizeDictionaryType": false, "skipSegmentPreprocess": false, "optimizeDictionaryForMetrics": false, "noDictionarySizeRatioThreshold": 0.85, "enableDynamicStarTreeCreation": false, "autoGeneratedInvertedIndex": false, "columnMajorSegmentBuilderEnabled": true, "createInvertedIndexDuringSegmentGeneration": false }, "metadata": { "customConfigs": {} }, "isDimTable": false } } ``` Run ```bash curl -X 'GET' \ 'http://localhost:9000/tables/transcript_REALTIME/consumingSegmentsInfo' \ -H 'accept: application/json' ``` Response: ```json { "serversFailingToRespond": 2, "serversUnparsableRespond": 0, "_segmentToConsumingInfoMap": { "transcript__0__0__20250326T1827Z": [], "transcript__1__0__20250326T1827Z": [] } } ``` Server log: ``` 2025/03/26 11:27:55.710 WARN [RealtimeSegmentDataManager_transcript__1__0__20250326T1827Z] [grizzly-http-server-11] Cannot fetch stream offset with criteria OffsetCriteria{_offsetType=LARGEST, _offsetString='largest'} for clientId transcript_REALTIME-transcript-stream-1 and partitionGroupId 1 with maxWaitTime 5000 java.lang.RuntimeException: Failed to find shard for partitionId: 1 at org.apache.pinot.plugin.stream.kinesis.KinesisStreamMetadataProvider.lambda$fetchStreamPartitionOffset$1(KinesisStreamMetadataProvider.java:105) ~[classes/:?] at java.base/java.util.Optional.orElseThrow(Optional.java:403) ~[?:?] at org.apache.pinot.plugin.stream.kinesis.KinesisStreamMetadataProvider.fetchStreamPartitionOffset(KinesisStreamMetadataProvider.java:105) ~[classes/:?] at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchStreamOffset(RealtimeSegmentDataManager.java:1790) ~[classes/:?] at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchLatestStreamOffset(RealtimeSegmentDataManager.java:1765) ~[classes/:?] at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchLatestStreamOffset(RealtimeSegmentDataManager.java:1770) ~[classes/:?] at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.getConsumerPartitionState(RealtimeSegmentDataManager.java:1010) ~[classes/:?] at org.apache.pinot.server.api.resources.TablesResource.getConsumingSegmentsInfo(TablesResource.java:1058) ~[classes/:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[?:?] at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[jersey-server-2.45.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146) [jersey-server-2.45.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189) [jersey-server-2.45.jar:?] at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219) [jersey-server-2.45.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93) [jersey-server-2.45.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478) [jersey-server-2.45.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400) [jersey-server-2.45.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) [jersey-server-2.45.jar:?] at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256) [jersey-server-2.45.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) [jersey-common-2.45.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) [jersey-common-2.45.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:292) [jersey-common-2.45.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:274) [jersey-common-2.45.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:244) [jersey-common-2.45.jar:?] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) [jersey-common-2.45.jar:?] at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235) [jersey-server-2.45.jar:?] at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684) [jersey-server-2.45.jar:?] at org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpContainer.service(GrizzlyHttpContainer.java:356) [jersey-container-grizzly2-http-2.45.jar:?] at org.glassfish.grizzly.http.server.HttpHandler$1.run(HttpHandler.java:200) [grizzly-http-server-2.4.4.jar:2.4.4] at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:569) [grizzly-framework-2.4.4.jar:2.4.4] at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:549) [grizzly-framework-2.4.4.jar:2.4.4] at java.base/java.lang.Thread.run(Thread.java:842) [?:?] ``` # Suggestion It seems like the issue roots from the mis-alignment of default Kinesis shard naming (e.g. `shardId-000000000000` in this example), and Pinot partition. The API works after making this change: https://github.com/apache/pinot/blob/07785bf8cff073791492f2cb1667e2d12709703b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java#L54 to ```java public static final String SHARD_ID_PREFIX = "shardId-00000000000"; ``` API response after the change (the change only made to one server) ```json { "serversFailingToRespond": 1, "serversUnparsableRespond": 0, "_segmentToConsumingInfoMap": { "transcript__0__0__20250326T1827Z": [], "transcript__1__0__20250326T1827Z": [ { "serverName": "Server_100.114.242.49_8098", "consumerState": "CONSUMING", "lastConsumedTimestamp": -1, "partitionToOffsetMap": { "1": "{\"shardId-000000000001\":\"49661785334262847198041109885226199023478929735110623250\"}" }, "partitionOffsetInfo": { "currentOffsetsMap": { "1": "{\"shardId-000000000001\":\"49661785334262847198041109885226199023478929735110623250\"}" }, "latestUpstreamOffsetMap": { "1": "{\"shardId-000000000001\":null}" }, "recordsLagMap": { "1": "NOT_CALCULATED" }, "availabilityLagMsMap": { "1": "UNKNOWN" } } } ] } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org