aleksdikanski commented on issue #7270:
URL: https://github.com/apache/pinot/issues/7270#issuecomment-998128028


   Sure @KKcorps , I basically followed the guides on the pulsar and pinot 
websites:
   - set up a pulsar standalone cluster using 
https://pulsar.apache.org/docs/en/standalone-docker/#start-pulsar-in-docker
     **Please note** that I ran the pulsar container in the same network as the 
pinot cluster for this demo setup and that I gave the pulsar container a name 
(basically add a `--network <pinot-network> --name pulsar` to the docker run 
command)
   - set up a pinot cluster using the manual setup 
https://docs.pinot.apache.org/basics/getting-started/running-pinot-in-docker#manual-cluster
   - add a pulsar topic (it didn't matter if it was partitioned or not): 
       ```
       $ docker run -it \
          --rm \
          --network <pinot-network> \
          -p 127.0.0.1:6650:6650 \
          -p 127.0.0.1:8080:8080 \
          apachepulsar/pulsar:2.8.1 \
          bin/pulsar-admin create persistent://public/default/pinot
       ```
   - add a table and schema to pinot using the following table and schema 
declaration
        table.json
       ```
       {
        "tableName": "airlineStats",
         "tableType": "REALTIME",
        "tenants": {
          "broker": "DefaultTenant",
          "server": "DefaultTenant"
        },
        "segmentsConfig": {
           "schemaName": "airlineStats",
           "timeColumnName": "DaysSinceEpoch",
           "replication": "1",
           "replicasPerPartition": "1",
           "timeType": "DAYS",
           "retentionTimeUnit": "DAYS",
           "retentionTimeValue": "365",
           "segmentPushType": "APPEND",
           "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
         },
         "tableIndexConfig": {
           "loadMode": "MMAP",
           "streamConfigs": {
             "streamType": "pulsar",
             "stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
             "stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
             "stream.pulsar.consumer.type": "lowlevel",
             "stream.pulsar.topic.name": "pinot",
             "stream.pulsar.fetch.timeout.millis": "10000",
             "stream.pulsar.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
             "stream.pulsar.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
             "realtime.segment.flush.threshold.size": "10000",
             "realtime.segment.flush.threshold.time": "1h"
           }
         },
         "metadata": {}
       }
       ```
       schema.json
       ```
       {
           "metricFieldSpecs": [
           ],
           "dimensionFieldSpecs": [
             {
               "dataType": "INT",
               "name": "ActualElapsedTime"
             },
             {
               "dataType": "INT",
               "name": "AirTime"
             },
             {
               "dataType": "INT",
               "name": "AirlineID"
             },
             {
               "dataType": "INT",
               "name": "ArrDel15"
             },
             {
               "dataType": "INT",
               "name": "ArrDelay"
             },
             {
               "dataType": "INT",
               "name": "ArrDelayMinutes"
             },
             {
               "dataType": "INT",
               "name": "ArrTime"
             },
             {
               "dataType": "STRING",
               "name": "ArrTimeBlk"
             },
             {
               "dataType": "INT",
               "name": "ArrivalDelayGroups"
             },
             {
               "dataType": "INT",
               "name": "CRSArrTime"
             },
             {
               "dataType": "INT",
               "name": "CRSDepTime"
             },
             {
               "dataType": "INT",
               "name": "CRSElapsedTime"
             },
             {
               "dataType": "STRING",
               "name": "CancellationCode"
             },
             {
               "dataType": "INT",
               "name": "Cancelled"
             },
             {
               "dataType": "STRING",
               "name": "Carrier"
             },
             {
               "dataType": "INT",
               "name": "CarrierDelay"
             },
             {
               "dataType": "INT",
               "name": "DayOfWeek"
             },
             {
               "dataType": "INT",
               "name": "DayofMonth"
             },
             {
               "dataType": "INT",
               "name": "DepDel15"
             },
             {
               "dataType": "INT",
               "name": "DepDelay"
             },
             {
               "dataType": "INT",
               "name": "DepDelayMinutes"
             },
             {
               "dataType": "INT",
               "name": "DepTime"
             },
             {
               "dataType": "STRING",
               "name": "DepTimeBlk"
             },
             {
               "dataType": "INT",
               "name": "DepartureDelayGroups"
             },
             {
               "dataType": "STRING",
               "name": "Dest"
             },
             {
               "dataType": "INT",
               "name": "DestAirportID"
             },
             {
               "dataType": "INT",
               "name": "DestAirportSeqID"
             },
             {
               "dataType": "INT",
               "name": "DestCityMarketID"
             },
             {
               "dataType": "STRING",
               "name": "DestCityName"
             },
             {
               "dataType": "STRING",
               "name": "DestState"
             },
             {
               "dataType": "INT",
               "name": "DestStateFips"
             },
             {
               "dataType": "STRING",
               "name": "DestStateName"
             },
             {
               "dataType": "INT",
               "name": "DestWac"
             },
             {
               "dataType": "INT",
               "name": "Distance"
             },
             {
               "dataType": "INT",
               "name": "DistanceGroup"
             },
             {
               "dataType": "INT",
               "name": "DivActualElapsedTime"
             },
             {
               "dataType": "INT",
               "name": "DivAirportIDs",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivAirportLandings"
             },
             {
               "dataType": "INT",
               "name": "DivAirportSeqIDs",
               "singleValueField": false
             },
             {
               "dataType": "STRING",
               "name": "DivAirports",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivArrDelay"
             },
             {
               "dataType": "INT",
               "name": "DivDistance"
             },
             {
               "dataType": "INT",
               "name": "DivLongestGTimes",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivReachedDest"
             },
             {
               "dataType": "STRING",
               "name": "DivTailNums",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivTotalGTimes",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivWheelsOffs",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivWheelsOns",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "Diverted"
             },
             {
               "dataType": "INT",
               "name": "FirstDepTime"
             },
             {
               "dataType": "STRING",
               "name": "FlightDate"
             },
             {
               "dataType": "INT",
               "name": "FlightNum"
             },
             {
               "dataType": "INT",
               "name": "Flights"
             },
             {
               "dataType": "INT",
               "name": "LateAircraftDelay"
             },
             {
               "dataType": "INT",
               "name": "LongestAddGTime"
             },
             {
               "dataType": "INT",
               "name": "Month"
             },
             {
               "dataType": "INT",
               "name": "NASDelay"
             },
             {
               "dataType": "STRING",
               "name": "Origin"
             },
             {
               "dataType": "INT",
               "name": "OriginAirportID"
             },
             {
               "dataType": "INT",
               "name": "OriginAirportSeqID"
             },
             {
               "dataType": "INT",
               "name": "OriginCityMarketID"
             },
             {
               "dataType": "STRING",
               "name": "OriginCityName"
             },
             {
               "dataType": "STRING",
               "name": "OriginState"
             },
             {
               "dataType": "INT",
               "name": "OriginStateFips"
             },
             {
               "dataType": "STRING",
               "name": "OriginStateName"
             },
             {
               "dataType": "INT",
               "name": "OriginWac"
             },
             {
               "dataType": "INT",
               "name": "Quarter"
             },
             {
               "dataType": "STRING",
               "name": "RandomAirports",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "SecurityDelay"
             },
             {
               "dataType": "STRING",
               "name": "TailNum"
             },
             {
               "dataType": "INT",
               "name": "TaxiIn"
             },
             {
               "dataType": "INT",
               "name": "TaxiOut"
             },
             {
               "dataType": "INT",
               "name": "Year"
             },
             {
               "dataType": "INT",
               "name": "WheelsOn"
             },
             {
               "dataType": "INT",
               "name": "WheelsOff"
             },
             {
               "dataType": "INT",
               "name": "WeatherDelay"
             },
             {
               "dataType": "STRING",
               "name": "UniqueCarrier"
             },
             {
               "dataType": "INT",
               "name": "TotalAddGTime"
             }
           ],
           "dateTimeFieldSpecs": [
             {
               "name": "DaysSinceEpoch",
               "dataType": "INT",
               "format": "1:DAYS:EPOCH",
               "granularity": "1:DAYS"
             }
           ],
           "schemaName": "airlineStats"
       }
       ```
   - send a json message from the airline stats example using the pulsar-client
      airlinestats00.json
       ```
       
{"Quarter":1,"FlightNum":1,"Origin":"JFK","LateAircraftDelay":null,"DivActualElapsedTime":null,"DivWheelsOns":null,
         "DivWheelsOffs":null,"ArrDel15":0,"AirTime":359,"DivTotalGTimes":null,
         
"DepTimeBlk":"0900-0959","DestCityMarketID":32575,"DaysSinceEpoch":16071,"DivAirportSeqIDs":null,
         
"DepTime":914,"Month":1,"DestStateName":"California","CRSElapsedTime":385,"Carrier":"AA",
         
"DestAirportID":12892,"Distance":2475,"ArrTimeBlk":"1200-1259","SecurityDelay":null,"DivArrDelay":null,
        
"LongestAddGTime":null,"OriginWac":22,"WheelsOff":934,"UniqueCarrier":"AA","DestAirportSeqID":1289203,
        
"DivReachedDest":null,"Diverted":0,"ActualElapsedTime":384,"AirlineID":19805,"OriginStateName":"New
 York",
        
"FlightDate":"2014-01-01","DepartureDelayGroups":0,"DivAirportLandings":0,"OriginCityName":"New
 York, NY",
        
"OriginStateFips":36,"OriginState":"NY","DistanceGroup":10,"WeatherDelay":null,"DestWac":91,"WheelsOn":1233,
       
"OriginAirportID":12478,"OriginCityMarketID":31703,"NASDelay":null,"DestState":"CA","ArrTime":1238,
 
       
"ArrivalDelayGroups":0,"Flights":1,"DayofMonth":1,"RandomAirports":["SEA","PSC","PHX","MSY","ATL","TYS",
       
"DEN","CHS","PDX","LAX","EWR","SFO","PIT","RDU","RAP","LSE","SAN","SBN","IAH","OAK","BRO","JFK","SAT","ORD",
       
"ACY","DFW","BWI","TPA","BFL","BOS","SNA","ISN"],"TotalAddGTime":null,"CRSDepTime":900,"DayOfWeek":3,
       
"Dest":"LAX","CancellationCode":null,"FirstDepTime":null,"DivTailNums":null,"DepDelayMinutes":14,"DepDelay":14,"
       
TaxiIn":5,"OriginAirportSeqID":1247802,"DestStateFips":6,"ArrDelay":13,"Cancelled":0,"DivAirportIDs":null,
       
"TaxiOut":20,"DepDel15":0,"CarrierDelay":null,"DivLongestGTimes":null,"DivAirports":null,"DivDistance":null,
       
"Year":2014,"CRSArrTime":1225,"ArrDelayMinutes":13,"TailNum":"N338AA","DestCityName":"Los
 Angeles, CA"}
       ```
       send this message using producer client
       ```
        $ docker run -it \
           --rm \
           --network <pinot-network> \
           apachepulsar/pulsar:2.8.1 \
           -v <path/to/dir/containing_airlinestats00.json>:/pulsar/airline \
           bin/pulsar-client produce -f airline/airlinestats00.json -k 0 -n 1 
pinot
       ```
   
   funny enough I tested this today and actually got another error that is 
different from the two mentioned above:
   ```
   2021/12/20 16:26:21.807 ERROR 
[SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel] 
[HelixTaskExecutor-message_handle_thread] Caught exception in state transition 
from OFFLINE -> ONLINE for resource: airlineStats_REALTIME, partition: 
airlineStats__0__0__20211220T1626Z java.lang.RuntimeException: 
org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException:
 While parsing a protocol message, the input ended unexpectedly in the middle 
of a field. This could mean either than the input has been truncated or that an 
embedded message misreported its own length. at 
org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:43)
 
~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] 
at 
org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103)
 
~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] 
at org.apache.pulsar.clien
 t.api.MessageId.fromByteArray(MessageId.java:58) 
~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] 
at 
org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffset.(MessageIdStreamOffset.java:47)
 
~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] 
at 
org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffsetFactory.create(MessageIdStreamOffsetFactory.java:39)
 
~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] 
at 
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.(LLRealtimeSegmentDataManager.java:1209)
 
~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at 
org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:344)
 
~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSe
 gment(HelixInstanceDataManager.java:162) 
~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at 
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:164)
 
[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at 
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:86)
 
[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] 
at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?] at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?] at java.lang.reflect.Method.inv
 oke(Method.java:566) ~[?:?] at 
org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:404)
 
[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at 
org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:331)
 
[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) 
[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) 
[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
 at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.
 java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?]
   ```
   it also related to the MessageId parsing and was also fixed with my 
implementation 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to