aleksdikanski edited a comment on issue #7270: URL: https://github.com/apache/pinot/issues/7270#issuecomment-998128028
Sure @KKcorps , I basically followed the guides on the pulsar and pinot websites: - set up a pulsar standalone cluster using https://pulsar.apache.org/docs/en/standalone-docker/#start-pulsar-in-docker **Please note** that I ran the pulsar container in the same network as the pinot cluster for this demo setup and that I gave the pulsar container a name (basically add a `--network <pinot-network> --name pulsar` to the docker run command) - set up a pinot cluster using the manual setup https://docs.pinot.apache.org/basics/getting-started/running-pinot-in-docker#manual-cluster - add a pulsar topic (it didn't matter if it was partitioned or not): ``` $ docker run -it \ --rm \ --network <pinot-network> \ -p 127.0.0.1:6650:6650 \ -p 127.0.0.1:8080:8080 \ apachepulsar/pulsar:2.8.1 \ bin/pulsar-admin create persistent://public/default/pinot ``` - add a table and schema to pinot using the following table and schema declaration table.json ``` { "tableName": "airlineStats", "tableType": "REALTIME", "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "segmentsConfig": { "schemaName": "airlineStats", "timeColumnName": "DaysSinceEpoch", "replication": "1", "replicasPerPartition": "1", "timeType": "DAYS", "retentionTimeUnit": "DAYS", "retentionTimeValue": "365", "segmentPushType": "APPEND", "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy" }, "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { "streamType": "pulsar", "stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650", "stream.pulsar.consumer.prop.auto.offset.reset": "smallest", "stream.pulsar.consumer.type": "lowlevel", "stream.pulsar.topic.name": "pinot", "stream.pulsar.fetch.timeout.millis": "10000", "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory", "realtime.segment.flush.threshold.size": "10000", "realtime.segment.flush.threshold.time": "1h" } }, "metadata": {} } ``` schema.json ``` { "metricFieldSpecs": [ ], "dimensionFieldSpecs": [ { "dataType": "INT", "name": "ActualElapsedTime" }, { "dataType": "INT", "name": "AirTime" }, { "dataType": "INT", "name": "AirlineID" }, { "dataType": "INT", "name": "ArrDel15" }, { "dataType": "INT", "name": "ArrDelay" }, { "dataType": "INT", "name": "ArrDelayMinutes" }, { "dataType": "INT", "name": "ArrTime" }, { "dataType": "STRING", "name": "ArrTimeBlk" }, { "dataType": "INT", "name": "ArrivalDelayGroups" }, { "dataType": "INT", "name": "CRSArrTime" }, { "dataType": "INT", "name": "CRSDepTime" }, { "dataType": "INT", "name": "CRSElapsedTime" }, { "dataType": "STRING", "name": "CancellationCode" }, { "dataType": "INT", "name": "Cancelled" }, { "dataType": "STRING", "name": "Carrier" }, { "dataType": "INT", "name": "CarrierDelay" }, { "dataType": "INT", "name": "DayOfWeek" }, { "dataType": "INT", "name": "DayofMonth" }, { "dataType": "INT", "name": "DepDel15" }, { "dataType": "INT", "name": "DepDelay" }, { "dataType": "INT", "name": "DepDelayMinutes" }, { "dataType": "INT", "name": "DepTime" }, { "dataType": "STRING", "name": "DepTimeBlk" }, { "dataType": "INT", "name": "DepartureDelayGroups" }, { "dataType": "STRING", "name": "Dest" }, { "dataType": "INT", "name": "DestAirportID" }, { "dataType": "INT", "name": "DestAirportSeqID" }, { "dataType": "INT", "name": "DestCityMarketID" }, { "dataType": "STRING", "name": "DestCityName" }, { "dataType": "STRING", "name": "DestState" }, { "dataType": "INT", "name": "DestStateFips" }, { "dataType": "STRING", "name": "DestStateName" }, { "dataType": "INT", "name": "DestWac" }, { "dataType": "INT", "name": "Distance" }, { "dataType": "INT", "name": "DistanceGroup" }, { "dataType": "INT", "name": "DivActualElapsedTime" }, { "dataType": "INT", "name": "DivAirportIDs", "singleValueField": false }, { "dataType": "INT", "name": "DivAirportLandings" }, { "dataType": "INT", "name": "DivAirportSeqIDs", "singleValueField": false }, { "dataType": "STRING", "name": "DivAirports", "singleValueField": false }, { "dataType": "INT", "name": "DivArrDelay" }, { "dataType": "INT", "name": "DivDistance" }, { "dataType": "INT", "name": "DivLongestGTimes", "singleValueField": false }, { "dataType": "INT", "name": "DivReachedDest" }, { "dataType": "STRING", "name": "DivTailNums", "singleValueField": false }, { "dataType": "INT", "name": "DivTotalGTimes", "singleValueField": false }, { "dataType": "INT", "name": "DivWheelsOffs", "singleValueField": false }, { "dataType": "INT", "name": "DivWheelsOns", "singleValueField": false }, { "dataType": "INT", "name": "Diverted" }, { "dataType": "INT", "name": "FirstDepTime" }, { "dataType": "STRING", "name": "FlightDate" }, { "dataType": "INT", "name": "FlightNum" }, { "dataType": "INT", "name": "Flights" }, { "dataType": "INT", "name": "LateAircraftDelay" }, { "dataType": "INT", "name": "LongestAddGTime" }, { "dataType": "INT", "name": "Month" }, { "dataType": "INT", "name": "NASDelay" }, { "dataType": "STRING", "name": "Origin" }, { "dataType": "INT", "name": "OriginAirportID" }, { "dataType": "INT", "name": "OriginAirportSeqID" }, { "dataType": "INT", "name": "OriginCityMarketID" }, { "dataType": "STRING", "name": "OriginCityName" }, { "dataType": "STRING", "name": "OriginState" }, { "dataType": "INT", "name": "OriginStateFips" }, { "dataType": "STRING", "name": "OriginStateName" }, { "dataType": "INT", "name": "OriginWac" }, { "dataType": "INT", "name": "Quarter" }, { "dataType": "STRING", "name": "RandomAirports", "singleValueField": false }, { "dataType": "INT", "name": "SecurityDelay" }, { "dataType": "STRING", "name": "TailNum" }, { "dataType": "INT", "name": "TaxiIn" }, { "dataType": "INT", "name": "TaxiOut" }, { "dataType": "INT", "name": "Year" }, { "dataType": "INT", "name": "WheelsOn" }, { "dataType": "INT", "name": "WheelsOff" }, { "dataType": "INT", "name": "WeatherDelay" }, { "dataType": "STRING", "name": "UniqueCarrier" }, { "dataType": "INT", "name": "TotalAddGTime" } ], "dateTimeFieldSpecs": [ { "name": "DaysSinceEpoch", "dataType": "INT", "format": "1:DAYS:EPOCH", "granularity": "1:DAYS" } ], "schemaName": "airlineStats" } ``` - send a json message from the airline stats example using the pulsar-client airlinestats00.json ``` {"Quarter":1,"FlightNum":1,"Origin":"JFK","LateAircraftDelay":null,"DivActualElapsedTime":null,"DivWheelsOns":null, "DivWheelsOffs":null,"ArrDel15":0,"AirTime":359,"DivTotalGTimes":null, "DepTimeBlk":"0900-0959","DestCityMarketID":32575,"DaysSinceEpoch":16071,"DivAirportSeqIDs":null, "DepTime":914,"Month":1,"DestStateName":"California","CRSElapsedTime":385,"Carrier":"AA", "DestAirportID":12892,"Distance":2475,"ArrTimeBlk":"1200-1259","SecurityDelay":null,"DivArrDelay":null, "LongestAddGTime":null,"OriginWac":22,"WheelsOff":934,"UniqueCarrier":"AA","DestAirportSeqID":1289203, "DivReachedDest":null,"Diverted":0,"ActualElapsedTime":384,"AirlineID":19805,"OriginStateName":"New York", "FlightDate":"2014-01-01","DepartureDelayGroups":0,"DivAirportLandings":0,"OriginCityName":"New York, NY", "OriginStateFips":36,"OriginState":"NY","DistanceGroup":10,"WeatherDelay":null,"DestWac":91,"WheelsOn":1233, "OriginAirportID":12478,"OriginCityMarketID":31703,"NASDelay":null,"DestState":"CA","ArrTime":1238, "ArrivalDelayGroups":0,"Flights":1,"DayofMonth":1,"RandomAirports":["SEA","PSC","PHX","MSY","ATL","TYS", "DEN","CHS","PDX","LAX","EWR","SFO","PIT","RDU","RAP","LSE","SAN","SBN","IAH","OAK","BRO","JFK","SAT","ORD", "ACY","DFW","BWI","TPA","BFL","BOS","SNA","ISN"],"TotalAddGTime":null,"CRSDepTime":900,"DayOfWeek":3, "Dest":"LAX","CancellationCode":null,"FirstDepTime":null,"DivTailNums":null,"DepDelayMinutes":14,"DepDelay":14," TaxiIn":5,"OriginAirportSeqID":1247802,"DestStateFips":6,"ArrDelay":13,"Cancelled":0,"DivAirportIDs":null, "TaxiOut":20,"DepDel15":0,"CarrierDelay":null,"DivLongestGTimes":null,"DivAirports":null,"DivDistance":null, "Year":2014,"CRSArrTime":1225,"ArrDelayMinutes":13,"TailNum":"N338AA","DestCityName":"Los Angeles, CA"} ``` send this message using producer client ``` $ docker run -it \ --rm \ --network <pinot-network> \ apachepulsar/pulsar:2.8.1 \ -v <path/to/dir/containing_airlinestats00.json>:/pulsar/airline \ bin/pulsar-client produce -f airline/airlinestats00.json -k 0 -n 1 pinot ``` funny enough I tested this today and actually got another error that is different from the two mentioned above: ``` 2021/12/20 16:26:21.807 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: airlineStats_REALTIME, partition: airlineStats__0__0__20211220T1626Z java.lang.RuntimeException: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length. at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:43) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] at org.apache.pulsar.client.api.MessageId.fromByteArray(MessageId.java:58) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffset.(MessageIdStreamOffset.java:47) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffsetFactory.create(MessageIdStreamOffsetFactory.java:39) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.(LLRealtimeSegmentDataManager.java:1209) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:344) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:162) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:164) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:86) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:404) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:331) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] ``` it also related to the MessageId parsing and was also fixed with my implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org