J-HowHuang opened a new issue, #15378:
URL: https://github.com/apache/pinot/issues/15378

   # How to reproduce
   
   ```bash
   aws kinesis create-stream --stream-name transcript-stream --shard-count 2
   aws kinesis list-shards --stream-name transcript-stream
   ```
   output: 
   ```json
   {
       "Shards": [
           {
               "ShardId": "shardId-000000000000",
               "HashKeyRange": {
                   "StartingHashKey": "0",
                   "EndingHashKey": "170141183460469231731687303715884105727"
               },
               "SequenceNumberRange": {
                   "StartingSequenceNumber": 
"49661727849342039258072939847765243280241975881076047874"
               }
           },
           {
               "ShardId": "shardId-000000000001",
               "HashKeyRange": {
                   "StartingHashKey": "170141183460469231731687303715884105728",
                   "EndingHashKey": "340282366920938463463374607431768211455"
               },
               "SequenceNumberRange": {
                   "StartingSequenceNumber": 
"49661727849364340003271470470906778998514624242582028306"
               }
           }
       ]
   }
   ```
   Create a table with schema (using `pinot-admin.sh AddTable` here):
   
   ```json
   {
     "REALTIME": {
       "tableName": "transcript_REALTIME",
       "tableType": "REALTIME",
       "segmentsConfig": {
         "schemaName": "transcript",
         "replicasPerPartition": "1",
         "timeType": "MILLISECONDS",
         "minimizeDataMovement": false,
         "timeColumnName": "timestamp"
       },
       "tenants": {
         "broker": "DefaultTenant",
         "server": "DefaultTenant"
       },
       "tableIndexConfig": {
         "streamConfigs": {
           "streamType": "kinesis",
           "stream.kinesis.topic.name": "transcript-stream",
           "region": "us-east-1",
           "shardIteratorType": "AFTER_SEQUENCE_NUMBER",
           "stream.kinesis.consumer.type": "lowlevel",
           "stream.kinesis.fetch.timeout.millis": "120000",
           "stream.kinesis.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
           "stream.kinesis.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
           "realtime.segment.flush.threshold.rows": "10",
           "realtime.segment.flush.threshold.time": "6h"
         },
         "aggregateMetrics": false,
         "enableDefaultStarTree": false,
         "nullHandlingEnabled": false,
         "rangeIndexVersion": 2,
         "optimizeDictionary": false,
         "loadMode": "MMAP",
         "optimizeDictionaryType": false,
         "skipSegmentPreprocess": false,
         "optimizeDictionaryForMetrics": false,
         "noDictionarySizeRatioThreshold": 0.85,
         "enableDynamicStarTreeCreation": false,
         "autoGeneratedInvertedIndex": false,
         "columnMajorSegmentBuilderEnabled": true,
         "createInvertedIndexDuringSegmentGeneration": false
       },
       "metadata": {
         "customConfigs": {}
       },
       "isDimTable": false
     }
   }
   ```
   Run 
   ```bash
   curl -X 'GET' \
     'http://localhost:9000/tables/transcript_REALTIME/consumingSegmentsInfo' \
     -H 'accept: application/json'
   ```
   Response:
   ```json
   {
     "serversFailingToRespond": 2,
     "serversUnparsableRespond": 0,
     "_segmentToConsumingInfoMap": {
       "transcript__0__0__20250326T1827Z": [],
       "transcript__1__0__20250326T1827Z": []
     }
   }
   ```
   Server log:
   ```
   2025/03/26 11:27:55.710 WARN 
[RealtimeSegmentDataManager_transcript__1__0__20250326T1827Z] 
[grizzly-http-server-11] Cannot fetch stream offset with criteria 
OffsetCriteria{_offsetType=LARGEST, _offsetString='largest'} for clientId 
transcript_REALTIME-transcript-stream-1 and partitionGroupId 1 with maxWaitTime 
5000
   java.lang.RuntimeException: Failed to find shard for partitionId: 1
        at 
org.apache.pinot.plugin.stream.kinesis.KinesisStreamMetadataProvider.lambda$fetchStreamPartitionOffset$1(KinesisStreamMetadataProvider.java:105)
 ~[classes/:?]
        at java.base/java.util.Optional.orElseThrow(Optional.java:403) ~[?:?]
        at 
org.apache.pinot.plugin.stream.kinesis.KinesisStreamMetadataProvider.fetchStreamPartitionOffset(KinesisStreamMetadataProvider.java:105)
 ~[classes/:?]
        at 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchStreamOffset(RealtimeSegmentDataManager.java:1790)
 ~[classes/:?]
        at 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchLatestStreamOffset(RealtimeSegmentDataManager.java:1765)
 ~[classes/:?]
        at 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchLatestStreamOffset(RealtimeSegmentDataManager.java:1770)
 ~[classes/:?]
        at 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.getConsumerPartitionState(RealtimeSegmentDataManager.java:1010)
 ~[classes/:?]
        at 
org.apache.pinot.server.api.resources.TablesResource.getConsumingSegmentsInfo(TablesResource.java:1058)
 ~[classes/:?]
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 ~[?:?]
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
        at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
        at 
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
 ~[jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146)
 [jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189)
 [jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)
 [jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93)
 [jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
 [jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
 [jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
 [jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256) 
[jersey-server-2.45.jar:?]
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) 
[jersey-common-2.45.jar:?]
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) 
[jersey-common-2.45.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292) 
[jersey-common-2.45.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274) 
[jersey-common-2.45.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:244) 
[jersey-common-2.45.jar:?]
        at 
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
 [jersey-common-2.45.jar:?]
        at 
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235) 
[jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
 [jersey-server-2.45.jar:?]
        at 
org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpContainer.service(GrizzlyHttpContainer.java:356)
 [jersey-container-grizzly2-http-2.45.jar:?]
        at 
org.glassfish.grizzly.http.server.HttpHandler$1.run(HttpHandler.java:200) 
[grizzly-http-server-2.4.4.jar:2.4.4]
        at 
org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:569)
 [grizzly-framework-2.4.4.jar:2.4.4]
        at 
org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:549)
 [grizzly-framework-2.4.4.jar:2.4.4]
        at java.base/java.lang.Thread.run(Thread.java:842) [?:?]
   ```
   
   # Suggestion
   It seems like the issue roots from the mis-alignment of default Kinesis 
shard naming (e.g. `shardId-000000000000` in this example), and Pinot partition.
   
   The API works after making this change:
   
https://github.com/apache/pinot/blob/07785bf8cff073791492f2cb1667e2d12709703b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java#L54
 
   
   to 
   ```java
   public static final String SHARD_ID_PREFIX = "shardId-00000000000";
   ```
   
   API response after the change (the change only made to one server)
   ```json
   {
     "serversFailingToRespond": 1,
     "serversUnparsableRespond": 0,
     "_segmentToConsumingInfoMap": {
       "transcript__0__0__20250326T1827Z": [],
       "transcript__1__0__20250326T1827Z": [
         {
           "serverName": "Server_100.114.242.49_8098",
           "consumerState": "CONSUMING",
           "lastConsumedTimestamp": -1,
           "partitionToOffsetMap": {
             "1": 
"{\"shardId-000000000001\":\"49661785334262847198041109885226199023478929735110623250\"}"
           },
           "partitionOffsetInfo": {
             "currentOffsetsMap": {
               "1": 
"{\"shardId-000000000001\":\"49661785334262847198041109885226199023478929735110623250\"}"
             },
             "latestUpstreamOffsetMap": {
               "1": "{\"shardId-000000000001\":null}"
             },
             "recordsLagMap": {
               "1": "NOT_CALCULATED"
             },
             "availabilityLagMsMap": {
               "1": "UNKNOWN"
             }
           }
         }
       ]
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to