This is an automated email from the ASF dual-hosted git repository. richardstartin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4d36f3dfbf Refactor quickstart data source (#8567) 4d36f3dfbf is described below commit 4d36f3dfbfc68e5b4f5cf24d7313d20a31e74155 Author: Xiaoman Dong <xiao...@startree.ai> AuthorDate: Tue Apr 26 01:12:07 2022 -0700 Refactor quickstart data source (#8567) * save temp work * refactor done * fix * add comments * fix topic issue * fix executor * more documentation * more refactor * remove accidental file * remove extra member var * save temp work * remove rsvp json stream * fix * fix * fix * fix 4 * set topic * fix my own test * generator * address comments --- .../pinot/spi/stream/StreamDataProducer.java | 72 +++++++ .../pinot/spi/stream/StreamDataProducerTest.java | 55 +++++ .../RealtimeComplexTypeHandlingQuickStart.java | 4 +- .../pinot/tools/RealtimeJsonIndexQuickStart.java | 4 +- .../apache/pinot/tools/UpsertJsonQuickStart.java | 5 +- .../pinot/tools/streams/AirlineDataStream.java | 112 ++-------- .../tools/streams/AvroFileSourceGenerator.java | 141 +++++++++++++ .../pinot/tools/streams/MeetupRsvpJsonStream.java | 53 ----- .../pinot/tools/streams/MeetupRsvpStream.java | 121 +++-------- .../pinot/tools/streams/PinotRealtimeSource.java | 190 +++++++++++++++++ .../tools/streams/PinotSourceDataGenerator.java | 44 ++++ .../tools/streams/PinotStreamRateLimiter.java | 21 +- .../pinot/tools/streams/RsvpSourceGenerator.java | 98 +++++++++ .../GithubPullRequestSourceGenerator.java | 220 ++++++++++++++++++++ .../PullRequestMergedEventsStream.java | 227 ++------------------- .../tools/streams/PinotRealtimeSourceTest.java | 65 ++++++ 16 files changed, 956 insertions(+), 476 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java index 6b3c14010d..d181c44225 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java @@ -18,7 +18,10 @@ */ package org.apache.pinot.spi.stream; +import java.util.Arrays; +import java.util.List; import java.util.Properties; +import javax.annotation.Nullable; /** @@ -32,4 +35,73 @@ public interface StreamDataProducer { void produce(String topic, byte[] key, byte[] payload); void close(); + + /** + * Allows the producer to optimize for a batched write. + * This will help increase throughput in some cases + * @param topic the topic of the output + * @param rows the rows + */ + default void produceBatch(String topic, List<byte[]> rows) { + for (byte[] row: rows) { + produce(topic, row); + } + } + + /** + * Allows the producer to optimize for a batched write. + * This will help increase throughput in some cases + * @param topic the topic of the output + * @param payloadWithKey the payload rows with key + */ + default void produceKeyedBatch(String topic, List<RowWithKey> payloadWithKey) { + for (RowWithKey rowWithKey: payloadWithKey) { + if (rowWithKey.getKey() == null) { + produce(topic, rowWithKey.getPayload()); + } else { + produce(topic, rowWithKey.getKey(), rowWithKey.getPayload()); + } + } + } + + /** + * Helper class so the key and payload can be easily tied together instead of using a pair + * The class is intended for StreamDataProducer only + */ + class RowWithKey { + private final byte[] _key; + private final byte[] _payload; + + public RowWithKey(@Nullable byte[] key, byte[] payload) { + _key = key; + _payload = payload; + } + + public byte[] getKey() { + return _key; + } + + public byte[] getPayload() { + return _payload; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RowWithKey that = (RowWithKey) o; + return Arrays.equals(_key, that._key) && Arrays.equals(_payload, that._payload); + } + + @Override + public int hashCode() { + int result = Arrays.hashCode(_key); + result = 31 * result + Arrays.hashCode(_payload); + return result; + } + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java new file mode 100644 index 0000000000..cd038bd801 --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.spi.stream; + +import java.nio.charset.StandardCharsets; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class StreamDataProducerTest { + + @Test + public void testRowWithKeyEquals() { + byte[] b1 = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}; + byte[] b2 = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}; + byte[] b3 = new byte[]{1, 2, 3, 4, 5, 6, 7, 8}; + byte[] k1 = "somekey".getBytes(StandardCharsets.UTF_8); + byte[] k3 = "anotherkey".getBytes(StandardCharsets.UTF_8); + StreamDataProducer.RowWithKey nullKey1 = new StreamDataProducer.RowWithKey(null, b1); + StreamDataProducer.RowWithKey nullKey2 = new StreamDataProducer.RowWithKey(null, b2); + StreamDataProducer.RowWithKey nullKey3 = new StreamDataProducer.RowWithKey(null, b3); + Assert.assertEquals(nullKey2, nullKey1); + Assert.assertEquals(nullKey1.hashCode(), nullKey2.hashCode()); + Assert.assertNotEquals(nullKey3, nullKey1); + Assert.assertNotEquals(nullKey3.hashCode(), nullKey1.hashCode()); + + Assert.assertEquals(nullKey1, nullKey1); + + StreamDataProducer.RowWithKey b2WithKey = new StreamDataProducer.RowWithKey(k1, b2); + Assert.assertNotEquals(nullKey2, b2WithKey);; + StreamDataProducer.RowWithKey b1WithKey = new StreamDataProducer.RowWithKey(k1, b1); + Assert.assertEquals(b1WithKey, b2WithKey); + Assert.assertEquals(b1WithKey.hashCode(), b2WithKey.hashCode()); + + StreamDataProducer.RowWithKey b2WithDifferentKey = new StreamDataProducer.RowWithKey(k3, b2); + Assert.assertNotEquals(b2WithKey, b2WithDifferentKey); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java index c07004653a..88106d3612 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java @@ -32,7 +32,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpJsonStream; +import org.apache.pinot.tools.streams.MeetupRsvpStream; import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; @@ -88,7 +88,7 @@ public class RealtimeComplexTypeHandlingQuickStart extends QuickStartBase { _kafkaStarter.start(); _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream(); + MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(); meetupRSVPProvider.run(); printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); runner.startAll(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java index 3875543ef0..94aadea9e8 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java @@ -32,7 +32,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpJsonStream; +import org.apache.pinot.tools.streams.MeetupRsvpStream; import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; @@ -87,7 +87,7 @@ public class RealtimeJsonIndexQuickStart extends QuickStartBase { _kafkaStarter.start(); _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream(); + MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(); meetupRSVPProvider.run(); printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); runner.startAll(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java index 7a86ee87cc..b257d491a4 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java @@ -32,7 +32,8 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; -import org.apache.pinot.tools.streams.MeetupRsvpJsonStream; +import org.apache.pinot.tools.streams.MeetupRsvpStream; +import org.apache.pinot.tools.streams.RsvpSourceGenerator; import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; @@ -87,7 +88,7 @@ public class UpsertJsonQuickStart extends QuickStartBase { _kafkaStarter.start(); _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2)); printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream(true); + MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID); meetupRSVPProvider.run(); printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); runner.startAll(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java index d0824e0054..2dee145ac0 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java @@ -19,43 +19,28 @@ package org.apache.pinot.tools.streams; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; import org.apache.pinot.tools.QuickStartBase; import org.apache.pinot.tools.Quickstart; import org.apache.pinot.tools.utils.KafkaStarterUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is used in Hybrid Quickstart. */ public class AirlineDataStream { - private static final Logger logger = LoggerFactory.getLogger(AirlineDataStream.class); - + private static final String KAFKA_TOPIC_NAME = "flights-realtime"; Schema _pinotSchema; String _timeColumnName; File _avroFile; - DataFileStream<GenericRecord> _avroDataStream; - Integer _currentTimeValue = 16102; - boolean _keepIndexing = true; - ExecutorService _service; - int _counter = 0; + final Integer _startTime = 16102; private StreamDataProducer _producer; + private PinotRealtimeSource _pinotStream; public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File avroFile) throws Exception { @@ -67,9 +52,12 @@ public class AirlineDataStream { _pinotSchema = pinotSchema; _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); _avroFile = avroFile; - createStream(); _producer = producer; - _service = Executors.newFixedThreadPool(1); + AvroFileSourceGenerator generator = new AvroFileSourceGenerator(pinotSchema, avroFile, 1, _timeColumnName, + (rowNumber) -> (_startTime + rowNumber / 60)); + _pinotStream = + PinotRealtimeSource.builder().setProducer(_producer).setGenerator(generator).setTopic(KAFKA_TOPIC_NAME) + .setMaxMessagePerSecond(1).build(); QuickStartBase.printStatus(Quickstart.Color.YELLOW, "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time " + "every 60 events (which is approximately 60 seconds) *****"); @@ -85,84 +73,14 @@ public class AirlineDataStream { return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); } - public void shutdown() { - _keepIndexing = false; - _avroDataStream = null; - _producer.close(); - _producer = null; - _service.shutdown(); - } - - private void createStream() - throws IOException { - if (_keepIndexing) { - _avroDataStream = new DataFileStream<>(new FileInputStream(_avroFile), new GenericDatumReader<>()); - return; - } - _avroDataStream = null; - } - - private void publish(GenericRecord message) - throws IOException { - if (!_keepIndexing) { - _avroDataStream.close(); - _avroDataStream = null; - return; - } - _producer.produce("flights-realtime", message.toString().getBytes("UTF-8")); - } - public void run() { + _pinotStream.run(); + } - _service.submit(new Runnable() { - - @Override - public void run() { - while (true) { - while (_avroDataStream.hasNext()) { - if (!_keepIndexing) { - return; - } - - GenericRecord record = _avroDataStream.next(); - - GenericRecord message = new GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(_pinotSchema)); - - for (FieldSpec spec : _pinotSchema.getDimensionFieldSpecs()) { - message.put(spec.getName(), record.get(spec.getName())); - } - - for (FieldSpec spec : _pinotSchema.getMetricFieldSpecs()) { - message.put(spec.getName(), record.get(spec.getName())); - } - - message.put(_timeColumnName, _currentTimeValue); - - try { - publish(message); - _counter++; - if (_counter % 60 == 0) { - _currentTimeValue = _currentTimeValue + 1; - } - Thread.sleep(1000); - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - - try { - _avroDataStream.close(); - } catch (IOException e) { - logger.error(e.getMessage()); - } - - try { - createStream(); - } catch (IOException e) { - logger.error(e.getMessage()); - } - } - } - }); + public void shutdown() + throws Exception { + _pinotStream.close(); + _producer.close(); + _producer = null; } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java new file mode 100644 index 0000000000..bfe6e34a68 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.streams; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.Strings; +import org.apache.pinot.plugin.inputformat.avro.AvroUtils; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.stream.StreamDataProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Generates Pinot Real Time Source by an AvroFile. + * It will keep looping the same file and produce data output. We can pass in a lambda function to compute + * time index based on row number. + */ +public class AvroFileSourceGenerator implements PinotSourceDataGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class); + private DataFileStream<GenericRecord> _avroDataStream; + private final Schema _pinotSchema; + private long _rowsProduced; + // If this var is null, we will not set time index column + private final String _timeColumnName; + private final Function<Long, Long> _rowNumberToTimeIndex; + private final File _avroFile; + private final int _rowsPerBatch; + + /** + * Reads the avro file, produce the rows, and then keep looping without setting time index + * @param pinotSchema the Pinot Schema so the avro rows can be produced + * @param avroFile the avro file as source. + */ + public AvroFileSourceGenerator(Schema pinotSchema, File avroFile) { + this(pinotSchema, avroFile, 1, null, null); + } + + /** + * Reads the avro file, produce the rows, and keep looping, allows customization of time index by a lambda function + * @param pinotSchema the Pinot Schema so the avro rows can be produced + * @param avroFile the avro file as source. + * @param rowsPerBatch in one batch, return several rows at the same time + * @param timeColumnName the time column name for customizing/overriding time index. Null for skipping customization. + * @param rowNumberToTimeIndex the lambda to compute time index based on row number. Null for skipping customization. + */ + public AvroFileSourceGenerator(Schema pinotSchema, File avroFile, int rowsPerBatch, + @Nullable String timeColumnName, @Nullable Function<Long, Long> rowNumberToTimeIndex) { + _pinotSchema = pinotSchema; + _rowsProduced = 0; + _rowNumberToTimeIndex = rowNumberToTimeIndex; + _timeColumnName = timeColumnName; + if (!Strings.isNullOrEmpty(_timeColumnName)) { + DateTimeFieldSpec timeColumnSpec = pinotSchema.getSpecForTimeColumn(timeColumnName); + Preconditions.checkNotNull(timeColumnSpec, + "Time column " + timeColumnName + " is not found in schema, or is not a valid DateTime column"); + } + _avroFile = avroFile; + _rowsPerBatch = rowsPerBatch; + } + + @Override + public void init(Properties properties) { + } + + @Override + public List<StreamDataProducer.RowWithKey> generateRows() { + List<StreamDataProducer.RowWithKey> retVal = new ArrayList<>(); + ensureStream(); + int rowsInCurrentBatch = 0; + while (_avroDataStream.hasNext() && rowsInCurrentBatch < _rowsPerBatch) { + GenericRecord record = _avroDataStream.next(); + GenericRecord message = new GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(_pinotSchema)); + for (FieldSpec spec : _pinotSchema.getDimensionFieldSpecs()) { + message.put(spec.getName(), record.get(spec.getName())); + } + + for (FieldSpec spec : _pinotSchema.getMetricFieldSpecs()) { + message.put(spec.getName(), record.get(spec.getName())); + } + message.put(_timeColumnName, _rowNumberToTimeIndex.apply(_rowsProduced)); + retVal.add(new StreamDataProducer.RowWithKey(null, message.toString().getBytes(StandardCharsets.UTF_8))); + _rowsProduced += 1; + rowsInCurrentBatch += 1; + } + return retVal; + } + + @Override + public void close() + throws Exception { + _avroDataStream.close(); + } + + // Re-opens file stream if the file has reached its end. + private void ensureStream() { + try { + if (_avroDataStream != null && !_avroDataStream.hasNext()) { + _avroDataStream.close(); + _avroDataStream = null; + } + if (_avroDataStream == null) { + _avroDataStream = new DataFileStream<>(new FileInputStream(_avroFile.getPath()), new GenericDatumReader<>()); + } + } catch (IOException ex) { + LOGGER.error("Failed to open/close {}", _avroFile.getPath(), ex); + throw new RuntimeException("Failed to open/close " + _avroFile.getPath(), ex); + } + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java deleted file mode 100644 index 3ceb3734bc..0000000000 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.tools.streams; - -import java.util.function.Consumer; - -import static java.nio.charset.StandardCharsets.UTF_8; - - -public class MeetupRsvpJsonStream extends MeetupRsvpStream { - - public MeetupRsvpJsonStream() - throws Exception { - super(); - } - - public MeetupRsvpJsonStream(boolean partitionByKey) - throws Exception { - super(partitionByKey); - } - - @Override - protected Consumer<RSVP> createConsumer() { - return message -> { - if (_partitionByKey) { - try { - _producer.produce(_topicName, message.getRsvpId().getBytes(UTF_8), message.getPayload().toString() - .getBytes(UTF_8)); - } catch (Exception e) { - LOGGER.error("Caught exception while processing the message: {}", message, e); - } - } else { - _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8)); - } - }; - } -} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java index d10955a22e..f42ffd7fe9 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java @@ -18,40 +18,21 @@ */ package org.apache.pinot.tools.streams; -import com.fasterxml.jackson.databind.node.ObjectNode; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Consumer; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.nio.charset.StandardCharsets.UTF_8; - public class MeetupRsvpStream { protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class); - private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() - .parseCaseInsensitive() - .append(DateTimeFormatter.ISO_LOCAL_DATE) - .appendLiteral(' ') - .append(DateTimeFormatter.ISO_LOCAL_TIME) - .toFormatter(); private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents"; protected String _topicName = DEFAULT_TOPIC_NAME; - protected final boolean _partitionByKey; - protected final StreamDataProducer _producer; - private final Source _source; + protected PinotRealtimeSource _pinotRealtimeSource; public MeetupRsvpStream() throws Exception { @@ -60,94 +41,42 @@ public class MeetupRsvpStream { public MeetupRsvpStream(boolean partitionByKey) throws Exception { - _partitionByKey = partitionByKey; + // calling this constructor means that we wish to use EVENT_ID as key. RsvpId is used by MeetupRsvpJsonStream + this(partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : RsvpSourceGenerator.KeyColumn.NONE); + } + public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn) + throws Exception { Properties properties = new Properties(); properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("request.required.acks", "1"); - _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); - _source = new Source(createConsumer()); + StreamDataProducer producer = + StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); + _pinotRealtimeSource = + PinotRealtimeSource.builder().setGenerator(new RsvpSourceGenerator(keyColumn)).setProducer(producer) + .setRateLimiter(permits -> { + int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1; + try { + Thread.sleep(delay); + } catch (InterruptedException ex) { + LOGGER.warn("Interrupted from sleep but will continue", ex); + } + }) + .setTopic(_topicName) + .build(); } public void run() throws Exception { - _source.start(); + _pinotRealtimeSource.run(); } public void stopPublishing() { - _producer.close(); - _source.close(); - } - - protected Consumer<RSVP> createConsumer() { - return message -> { - try { - if (_partitionByKey) { - _producer.produce(_topicName, message.getEventId().getBytes(UTF_8), - message.getPayload().toString().getBytes(UTF_8)); - } else { - _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8)); - } - } catch (Exception e) { - LOGGER.error("Caught exception while processing the message: {}", message, e); - } - }; - } - - private static class Source implements AutoCloseable, Runnable { - - private final Consumer<RSVP> _consumer; - - private final ExecutorService _executorService = Executors.newSingleThreadExecutor(); - private volatile Future<?> _future; - - private Source(Consumer<RSVP> consumer) { - _consumer = consumer; - } - - @Override - public void close() { - if (_future != null) { - _future.cancel(true); - } - _executorService.shutdownNow(); - } - - public void start() { - _future = _executorService.submit(this); - } - - @Override - public void run() { - while (!Thread.interrupted()) { - try { - RSVP rsvp = createMessage(); - _consumer.accept(rsvp); - int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1; - Thread.sleep(delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - private RSVP createMessage() { - String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + ""; - ObjectNode json = JsonUtils.newObjectNode(); - json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt()); - json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt()); - json.put("event_id", eventId); - json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10))); - json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt()); - json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt()); - json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong())); - json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt()); - json.put("group_lat", ThreadLocalRandom.current().nextFloat()); - json.put("group_lon", ThreadLocalRandom.current().nextFloat()); - json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now())); - json.put("rsvp_count", 1); - return new RSVP(eventId, eventId, json); + try { + _pinotRealtimeSource.close(); + } catch (Exception ex) { + LOGGER.error("Failed to close real time source. ignored and continue", ex); } } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java new file mode 100644 index 0000000000..87c527f9e1 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.streams; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.RateLimiter; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.annotation.Nullable; +import org.apache.pinot.spi.stream.StreamDataProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Represents one Pinot Real Time Source that is capable of + * 1. Keep running forever + * 2. Pull from generator and write into StreamDataProducer + * The Source has a thread that is looping forever. + */ +public class PinotRealtimeSource implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class); + public static final String KEY_OF_MAX_MESSAGE_PER_SECOND = "pinot.stream.max.message.per.second"; + public static final String KEY_OF_TOPIC_NAME = "pinot.topic.name"; + public static final long DEFAULT_MAX_MESSAGE_PER_SECOND = Long.MAX_VALUE; + public static final long DEFAULT_EMPTY_SOURCE_SLEEP_MS = 10; + final StreamDataProducer _producer; + final PinotSourceDataGenerator _generator; + final String _topicName; + final ExecutorService _executor; + final Properties _properties; + PinotStreamRateLimiter _rateLimiter; + protected volatile boolean _shutdown; + + /** + * Constructs a source by passing in a Properties file, a generator, and a producer + * @param settings the settings for all components passed in + * @param generator the generator that can create data + * @param producer the producer to write the generator's data into + */ + public PinotRealtimeSource(Properties settings, PinotSourceDataGenerator generator, StreamDataProducer producer) { + this(settings, generator, producer, null, null); + } + + /** + * Constructs a source by passing in properties file, a generator, a producer and an executor service + * @param settings the settings for all components passed in + * @param generator the generator that can create data + * @param producer the producer to write the generator's data into + * @param executor the preferred executor instead of creating a thread pool. Null for default one + * @param rateLimiter the specialized rate limiter for customization. Null for default guava one + */ + public PinotRealtimeSource(Properties settings, PinotSourceDataGenerator generator, StreamDataProducer producer, + @Nullable ExecutorService executor, @Nullable PinotStreamRateLimiter rateLimiter) { + _properties = settings; + _producer = producer; + Preconditions.checkNotNull(_producer, "Producer of a stream cannot be null"); + _generator = generator; + Preconditions.checkNotNull(_generator, "Generator of a stream cannot be null"); + _executor = executor == null ? Executors.newSingleThreadExecutor() : executor; + _topicName = settings.getProperty(KEY_OF_TOPIC_NAME); + Preconditions.checkNotNull(_topicName, "Topic name needs to be set via " + KEY_OF_TOPIC_NAME); + _rateLimiter = rateLimiter == null ? new GuavaRateLimiter(extractMaxQps(settings)) : rateLimiter; + } + + public void run() { + _executor.execute(() -> { + while (!_shutdown) { + List<StreamDataProducer.RowWithKey> rows = _generator.generateRows(); + // we expect the generator implementation to return empty rows when there is no data available + // as a stream, we expect data to be available all the time + if (rows.isEmpty()) { + try { + Thread.sleep(DEFAULT_EMPTY_SOURCE_SLEEP_MS); + } catch (InterruptedException ex) { + LOGGER.warn("Interrupted from sleep, will check shutdown flag later", ex); + } + } else { + _rateLimiter.acquire(rows.size()); + if (!_shutdown) { + _producer.produceKeyedBatch(_topicName, rows); + } + } + } + }); + } + + @Override + public void close() throws Exception { + _generator.close(); + _shutdown = true; + _producer.close(); + _executor.shutdownNow(); + } + + /** + * A simpler wrapper for guava-based rate limiter + */ + private static class GuavaRateLimiter implements PinotStreamRateLimiter { + private final RateLimiter _rateLimiter; + public GuavaRateLimiter(long maxQps) { + _rateLimiter = RateLimiter.create(maxQps); + } + @Override + public void acquire(int permits) { + _rateLimiter.acquire(); + } + } + + static long extractMaxQps(Properties settings) { + String qpsStr = settings.getProperty(KEY_OF_MAX_MESSAGE_PER_SECOND, String.valueOf(DEFAULT_MAX_MESSAGE_PER_SECOND)); + long maxQps = DEFAULT_MAX_MESSAGE_PER_SECOND; + try { + maxQps = Long.parseLong(qpsStr); + } catch (NumberFormatException ex) { + LOGGER.warn("Cannot parse {} as max qps setting, using default {}", qpsStr, DEFAULT_MAX_MESSAGE_PER_SECOND); + } + return maxQps; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String _topic; + private long _maxMessagePerSecond; + private PinotSourceDataGenerator _generator; + private StreamDataProducer _producer; + private ExecutorService _executor; + private PinotStreamRateLimiter _rateLimiter; + public Builder setTopic(String topic) { + _topic = topic; + return this; + } + + public Builder setMaxMessagePerSecond(long maxMessagePerSecond) { + _maxMessagePerSecond = maxMessagePerSecond; + return this; + } + + public Builder setGenerator(PinotSourceDataGenerator generator) { + _generator = generator; + return this; + } + + public Builder setProducer(StreamDataProducer producer) { + _producer = producer; + return this; + } + + public Builder setExecutor(ExecutorService executor) { + _executor = executor; + return this; + } + + public Builder setRateLimiter(PinotStreamRateLimiter rateLimiter) { + _rateLimiter = rateLimiter; + return this; + } + + public PinotRealtimeSource build() { + Preconditions.checkNotNull(_topic, "PinotRealTimeSource should specify topic name"); + Properties properties = new Properties(); + if (_maxMessagePerSecond > 0) { + properties.setProperty(KEY_OF_MAX_MESSAGE_PER_SECOND, String.valueOf(_maxMessagePerSecond)); + } + properties.setProperty(KEY_OF_TOPIC_NAME, _topic); + return new PinotRealtimeSource(properties, _generator, _producer, _executor, _rateLimiter); + } + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java new file mode 100644 index 0000000000..12e15e4cf3 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.streams; + +import java.util.List; +import java.util.Properties; +import org.apache.pinot.spi.stream.StreamDataProducer; + + +/** + * Represents one Pinot Real Time Data Source that can constantly generate data + * For example it can be pulling a batch from Kafka, or polling some data via HTTP GET + * The generator will be driven by PinotRealtimeSource to keep producing into some downstream sink + */ +public interface PinotSourceDataGenerator extends AutoCloseable { + /** + * Initialize the generator via a property file. It will be called at least once + * @param properties the property files + */ + void init(Properties properties); + + /** + * Generate a small batch of rows represented by bytes. + * It is up to the generator to define the binary format + * @return a small list of RowWithKey, each element of the list will be written as one row of data + */ + List<StreamDataProducer.RowWithKey> generateRows(); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java similarity index 70% copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java copy to pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java index 6b3c14010d..ef5390acea 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java @@ -16,20 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.stream; - -import java.util.Properties; +package org.apache.pinot.tools.streams; /** - * StreamDataProducer is the interface for stream data sources. E.g. KafkaDataProducer. + * Represents a very simple rate limiter that is used by Pinot */ -public interface StreamDataProducer { - void init(Properties props); - - void produce(String topic, byte[] payload); - - void produce(String topic, byte[] key, byte[] payload); - - void close(); +@FunctionalInterface +public interface PinotStreamRateLimiter { + /** + * Blocks current thread until X permits are available + * @param permits how many permits we wish to acquire + */ + void acquire(int permits); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java new file mode 100644 index 0000000000..a09b6bb6fa --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.tools.streams; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableList; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.pinot.spi.stream.StreamDataProducer; +import org.apache.pinot.spi.utils.JsonUtils; + +import static java.nio.charset.StandardCharsets.UTF_8; + + +/** + * A simple random generator that fakes RSVP + */ +public class RsvpSourceGenerator implements PinotSourceDataGenerator { + private final KeyColumn _keyColumn; + public static final DateTimeFormatter DATE_TIME_FORMATTER = + new DateTimeFormatterBuilder().parseCaseInsensitive().append(DateTimeFormatter.ISO_LOCAL_DATE).appendLiteral(' ') + .append(DateTimeFormatter.ISO_LOCAL_TIME).toFormatter(); + + public RsvpSourceGenerator(KeyColumn keyColumn) { + _keyColumn = keyColumn; + } + + public RSVP createMessage() { + String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + ""; + ObjectNode json = JsonUtils.newObjectNode(); + json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt()); + json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt()); + json.put("event_id", eventId); + json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10))); + json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt()); + json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt()); + json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong())); + json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt()); + json.put("group_lat", ThreadLocalRandom.current().nextFloat()); + json.put("group_lon", ThreadLocalRandom.current().nextFloat()); + json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now())); + json.put("rsvp_count", 1); + return new RSVP(eventId, eventId, json); + } + + @Override + public void init(Properties properties) { + } + + @Override + public List<StreamDataProducer.RowWithKey> generateRows() { + RSVP msg = createMessage(); + byte[] key; + switch (_keyColumn) { + case EVENT_ID: + key = msg.getEventId().getBytes(UTF_8); + break; + case RSVP_ID: + key = msg.getRsvpId().getBytes(UTF_8); + break; + default: + key = null; + break; + } + return ImmutableList.of(new StreamDataProducer.RowWithKey(key, msg.getPayload().toString().getBytes(UTF_8))); + } + + @Override + public void close() + throws Exception { + } + public enum KeyColumn { + NONE, + EVENT_ID, + RSVP_ID + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java new file mode 100644 index 0000000000..3d1b6708b4 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.streams.githubevents; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.pinot.plugin.inputformat.avro.AvroUtils; +import org.apache.pinot.spi.stream.StreamDataProducer; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.tools.QuickStartBase; +import org.apache.pinot.tools.Quickstart; +import org.apache.pinot.tools.streams.PinotSourceDataGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The class that pulls events from GitHub by RPC calls, and converts them into byte[] so we can write to Kafka + */ +public class GithubPullRequestSourceGenerator implements PinotSourceDataGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(GithubPullRequestSourceGenerator.class); + private static final long SLEEP_MILLIS = 10_000; + + private GitHubAPICaller _gitHubAPICaller; + private Schema _avroSchema; + private String _etag = null; + + public GithubPullRequestSourceGenerator(File schemaFile, String personalAccessToken) + throws Exception { + try { + _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(schemaFile)); + } catch (Exception e) { + LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFile.getName() + "]"); + throw e; + } + _gitHubAPICaller = new GitHubAPICaller(personalAccessToken); + } + + private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event) + throws IOException { + GenericRecord genericRecord = null; + String type = event.get("type").asText(); + + if ("PullRequestEvent".equals(type)) { + JsonNode payload = event.get("payload"); + if (payload != null) { + String action = payload.get("action").asText(); + JsonNode pullRequest = payload.get("pull_request"); + String merged = pullRequest.get("merged").asText(); + if ("closed".equals(action) && "true".equals(merged)) { // valid pull request merge event + + JsonNode commits = null; + String commitsURL = pullRequest.get("commits_url").asText(); + GitHubAPICaller.GitHubAPIResponse commitsResponse = _gitHubAPICaller.callAPI(commitsURL); + + if (commitsResponse._responseString != null) { + commits = JsonUtils.stringToJsonNode(commitsResponse._responseString); + } + + JsonNode reviewComments = null; + String reviewCommentsURL = pullRequest.get("review_comments_url").asText(); + GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse = _gitHubAPICaller.callAPI(reviewCommentsURL); + if (reviewCommentsResponse._responseString != null) { + reviewComments = JsonUtils.stringToJsonNode(reviewCommentsResponse._responseString); + } + + JsonNode comments = null; + String commentsURL = pullRequest.get("comments_url").asText(); + GitHubAPICaller.GitHubAPIResponse commentsResponse = _gitHubAPICaller.callAPI(commentsURL); + if (commentsResponse._responseString != null) { + comments = JsonUtils.stringToJsonNode(commentsResponse._responseString); + } + + // get PullRequestMergeEvent + PullRequestMergedEvent pullRequestMergedEvent = + new PullRequestMergedEvent(event, commits, reviewComments, comments); + // make generic record + genericRecord = convertToGenericRecord(pullRequestMergedEvent); + } + } + } + return genericRecord; + } + + /** + * Convert the PullRequestMergedEvent to a GenericRecord + */ + private GenericRecord convertToGenericRecord(PullRequestMergedEvent pullRequestMergedEvent) { + GenericRecord genericRecord = new GenericData.Record(_avroSchema); + + // Dimensions + genericRecord.put("title", pullRequestMergedEvent.getTitle()); + genericRecord.put("labels", pullRequestMergedEvent.getLabels()); + genericRecord.put("userId", pullRequestMergedEvent.getUserId()); + genericRecord.put("userType", pullRequestMergedEvent.getUserType()); + genericRecord.put("authorAssociation", pullRequestMergedEvent.getAuthorAssociation()); + genericRecord.put("mergedBy", pullRequestMergedEvent.getMergedBy()); + genericRecord.put("assignees", pullRequestMergedEvent.getAssignees()); + genericRecord.put("committers", pullRequestMergedEvent.getCommitters()); + genericRecord.put("reviewers", pullRequestMergedEvent.getReviewers()); + genericRecord.put("commenters", pullRequestMergedEvent.getCommenters()); + genericRecord.put("authors", pullRequestMergedEvent.getAuthors()); + genericRecord.put("requestedReviewers", pullRequestMergedEvent.getRequestedReviewers()); + genericRecord.put("requestedTeams", pullRequestMergedEvent.getRequestedTeams()); + genericRecord.put("repo", pullRequestMergedEvent.getRepo()); + genericRecord.put("organization", pullRequestMergedEvent.getOrganization()); + + // Metrics + genericRecord.put("numComments", pullRequestMergedEvent.getNumComments()); + genericRecord.put("numReviewComments", pullRequestMergedEvent.getNumReviewComments()); + genericRecord.put("numCommits", pullRequestMergedEvent.getNumCommits()); + genericRecord.put("numLinesAdded", pullRequestMergedEvent.getNumLinesAdded()); + genericRecord.put("numLinesDeleted", pullRequestMergedEvent.getNumLinesDeleted()); + genericRecord.put("numFilesChanged", pullRequestMergedEvent.getNumFilesChanged()); + genericRecord.put("numReviewers", pullRequestMergedEvent.getNumReviewers()); + genericRecord.put("numCommenters", pullRequestMergedEvent.getNumCommenters()); + genericRecord.put("numCommitters", pullRequestMergedEvent.getNumCommitters()); + genericRecord.put("numAuthors", pullRequestMergedEvent.getNumAuthors()); + genericRecord.put("createdTimeMillis", pullRequestMergedEvent.getCreatedTimeMillis()); + genericRecord.put("elapsedTimeMillis", pullRequestMergedEvent.getElapsedTimeMillis()); + + // Time column + genericRecord.put("mergedTimeMillis", pullRequestMergedEvent.getMergedTimeMillis()); + + return genericRecord; + } + + @Override + public void init(Properties properties) { + } + + @Override + public List<StreamDataProducer.RowWithKey> generateRows() { + List<StreamDataProducer.RowWithKey> retVal = new ArrayList<>(); + try { + GitHubAPICaller.GitHubAPIResponse githubAPIResponse = _gitHubAPICaller.callEventsAPI(_etag); + switch (githubAPIResponse._statusCode) { + case 200: // Read new events + _etag = githubAPIResponse._etag; + JsonNode jsonArray = JsonUtils.stringToJsonNode(githubAPIResponse._responseString); + for (JsonNode eventElement : jsonArray) { + try { + GenericRecord genericRecord = convertToPullRequestMergedGenericRecord(eventElement); + if (genericRecord != null) { + QuickStartBase.printStatus(Quickstart.Color.CYAN, genericRecord.toString()); + retVal.add( + new StreamDataProducer.RowWithKey(null, genericRecord.toString().getBytes(StandardCharsets.UTF_8))); + } + } catch (Exception e) { + LOGGER.error("Exception in publishing generic record. Skipping", e); + } + } + break; + case 304: // Not Modified + Quickstart.printStatus(Quickstart.Color.YELLOW, "Not modified. Checking again in 10s."); + Thread.sleep(SLEEP_MILLIS); + break; + case 408: // Timeout + Quickstart.printStatus(Quickstart.Color.YELLOW, "Timeout. Trying again in 10s."); + Thread.sleep(SLEEP_MILLIS); + break; + case 403: // Rate Limit exceeded + Quickstart.printStatus(Quickstart.Color.YELLOW, + "Rate limit exceeded, sleeping until " + githubAPIResponse._resetTimeMs); + long sleepMs = Math.max(60_000L, githubAPIResponse._resetTimeMs - System.currentTimeMillis()); + Thread.sleep(sleepMs); + break; + case 401: // Unauthorized + String msg = "Unauthorized call to GitHub events API. Status message: " + githubAPIResponse._statusMessage + + ". Exiting."; + Quickstart.printStatus(Quickstart.Color.YELLOW, msg); + throw new RuntimeException(msg); + default: // Unknown status code + Quickstart.printStatus(Quickstart.Color.YELLOW, + "Unknown status code " + githubAPIResponse._statusCode + " statusMessage " + + githubAPIResponse._statusMessage + ". Retry in 10s"); + Thread.sleep(SLEEP_MILLIS); + break; + } + } catch (Exception e) { + LOGGER.error("Exception in reading events data", e); + try { + Thread.sleep(SLEEP_MILLIS); + } catch (InterruptedException ex) { + LOGGER.error("Caught exception in retry", ex); + } + } + return retVal; + } + + @Override + public void close() + throws Exception { + _gitHubAPICaller.shutdown(); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java index a1703d8f6f..950af0d260 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java @@ -18,24 +18,16 @@ */ package org.apache.pinot.tools.streams.githubevents; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import java.io.File; -import java.io.IOException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang3.StringUtils; -import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; -import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.Quickstart; +import org.apache.pinot.tools.streams.PinotRealtimeSource; +import org.apache.pinot.tools.streams.PinotSourceDataGenerator; import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.apache.pinot.tools.utils.KinesisStarterUtils; import org.apache.pinot.tools.utils.StreamSourceType; @@ -53,38 +45,26 @@ import static org.apache.pinot.tools.Quickstart.printStatus; */ public class PullRequestMergedEventsStream { private static final Logger LOGGER = LoggerFactory.getLogger(PullRequestMergedEventsStream.class); - private static final long SLEEP_MILLIS = 10_000; - private final ExecutorService _service; - private boolean _keepStreaming = true; - - private final Schema _avroSchema; - private final String _topicName; - private final GitHubAPICaller _gitHubAPICaller; - - private StreamDataProducer _producer; + private PinotRealtimeSource _pinotStream; public PullRequestMergedEventsStream(File schemaFile, String topicName, String personalAccessToken, StreamDataProducer producer) throws Exception { - _service = Executors.newFixedThreadPool(2); - try { - _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(schemaFile)); - } catch (Exception e) { - LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFile.getName() + "]"); - throw e; - } - _topicName = topicName; - _gitHubAPICaller = new GitHubAPICaller(personalAccessToken); - _producer = producer; + PinotSourceDataGenerator generator = new GithubPullRequestSourceGenerator(schemaFile, personalAccessToken); + _pinotStream = + PinotRealtimeSource.builder().setProducer(producer).setGenerator(generator).setTopic(topicName).build(); } public PullRequestMergedEventsStream(String schemaFilePath, String topicName, String personalAccessToken, StreamDataProducer producer) throws Exception { - _service = Executors.newFixedThreadPool(2); + this(getSchemaFile(schemaFilePath), topicName, personalAccessToken, producer); + } + + public static File getSchemaFile(String schemaFilePath) { + File pinotSchema; try { - File pinotSchema; if (schemaFilePath == null) { ClassLoader classLoader = PullRequestMergedEventsStream.class.getClassLoader(); URL resource = classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_schema.json"); @@ -93,14 +73,11 @@ public class PullRequestMergedEventsStream { } else { pinotSchema = new File(schemaFilePath); } - _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(pinotSchema)); } catch (Exception e) { LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFilePath + "]"); throw e; } - _topicName = topicName; - _gitHubAPICaller = new GitHubAPICaller(personalAccessToken); - _producer = producer; + return pinotSchema; } public static StreamDataProducer getKafkaStreamDataProducer() @@ -181,187 +158,13 @@ public class PullRequestMergedEventsStream { * Shuts down the stream. */ public void shutdown() - throws IOException, InterruptedException { + throws Exception { printStatus(Quickstart.Color.GREEN, "***** Shutting down pullRequestMergedEvents Stream *****"); - _keepStreaming = false; - Thread.sleep(3000L); - _gitHubAPICaller.shutdown(); - _producer.close(); - _producer = null; - _service.shutdown(); - } - - /** - * Publishes the message to the kafka topic - */ - private void publish(GenericRecord message) - throws IOException { - if (!_keepStreaming) { - return; - } - _producer.produce(_topicName, message.toString().getBytes(StandardCharsets.UTF_8)); + _pinotStream.close(); } public void start() { - printStatus(Quickstart.Color.CYAN, "***** Starting pullRequestMergedEvents Stream *****"); - - _service.submit(() -> { - - String etag = null; - while (true) { - if (!_keepStreaming) { - return; - } - try { - GitHubAPICaller.GitHubAPIResponse githubAPIResponse = _gitHubAPICaller.callEventsAPI(etag); - switch (githubAPIResponse._statusCode) { - case 200: // Read new events - etag = githubAPIResponse._etag; - JsonNode jsonArray = JsonUtils.stringToJsonNode(githubAPIResponse._responseString); - for (JsonNode eventElement : jsonArray) { - try { - GenericRecord genericRecord = convertToPullRequestMergedGenericRecord(eventElement); - if (genericRecord != null) { - printStatus(Quickstart.Color.CYAN, genericRecord.toString()); - publish(genericRecord); - } - } catch (Exception e) { - LOGGER.error("Exception in publishing generic record. Skipping", e); - } - } - break; - case 304: // Not Modified - printStatus(Quickstart.Color.YELLOW, "Not modified. Checking again in 10s."); - Thread.sleep(SLEEP_MILLIS); - break; - case 408: // Timeout - printStatus(Quickstart.Color.YELLOW, "Timeout. Trying again in 10s."); - Thread.sleep(SLEEP_MILLIS); - break; - case 403: // Rate Limit exceeded - printStatus(Quickstart.Color.YELLOW, - "Rate limit exceeded, sleeping until " + githubAPIResponse._resetTimeMs); - long sleepMs = Math.max(60_000L, githubAPIResponse._resetTimeMs - System.currentTimeMillis()); - Thread.sleep(sleepMs); - break; - case 401: // Unauthorized - printStatus(Quickstart.Color.YELLOW, - "Unauthorized call to GitHub events API. Status message: " + githubAPIResponse._statusMessage - + ". Exiting."); - return; - default: // Unknown status code - printStatus(Quickstart.Color.YELLOW, - "Unknown status code " + githubAPIResponse._statusCode + " statusMessage " - + githubAPIResponse._statusMessage + ". Retry in 10s"); - Thread.sleep(SLEEP_MILLIS); - break; - } - } catch (Exception e) { - LOGGER.error("Exception in reading events data", e); - try { - Thread.sleep(SLEEP_MILLIS); - } catch (InterruptedException ex) { - LOGGER.error("Caught exception in retry", ex); - } - } - } - }); - } - - /** - * Checks for events of type PullRequestEvent which have action = closed and merged = true. - * Find commits, review comments, comments corresponding to this pull request event. - * Construct a PullRequestMergedEvent with the help of the event, commits, review comments and comments. - * Converts PullRequestMergedEvent to GenericRecord - * @param event - */ - private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event) - throws IOException { - GenericRecord genericRecord = null; - String type = event.get("type").asText(); - - if ("PullRequestEvent".equals(type)) { - JsonNode payload = event.get("payload"); - if (payload != null) { - String action = payload.get("action").asText(); - JsonNode pullRequest = payload.get("pull_request"); - String merged = pullRequest.get("merged").asText(); - if ("closed".equals(action) && "true".equals(merged)) { // valid pull request merge event - - JsonNode commits = null; - String commitsURL = pullRequest.get("commits_url").asText(); - GitHubAPICaller.GitHubAPIResponse commitsResponse = _gitHubAPICaller.callAPI(commitsURL); - - if (commitsResponse._responseString != null) { - commits = JsonUtils.stringToJsonNode(commitsResponse._responseString); - } - - JsonNode reviewComments = null; - String reviewCommentsURL = pullRequest.get("review_comments_url").asText(); - GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse = _gitHubAPICaller.callAPI(reviewCommentsURL); - if (reviewCommentsResponse._responseString != null) { - reviewComments = JsonUtils.stringToJsonNode(reviewCommentsResponse._responseString); - } - - JsonNode comments = null; - String commentsURL = pullRequest.get("comments_url").asText(); - GitHubAPICaller.GitHubAPIResponse commentsResponse = _gitHubAPICaller.callAPI(commentsURL); - if (commentsResponse._responseString != null) { - comments = JsonUtils.stringToJsonNode(commentsResponse._responseString); - } - - // get PullRequestMergeEvent - PullRequestMergedEvent pullRequestMergedEvent = - new PullRequestMergedEvent(event, commits, reviewComments, comments); - // make generic record - genericRecord = convertToGenericRecord(pullRequestMergedEvent); - } - } - } - return genericRecord; - } - - /** - * Convert the PullRequestMergedEvent to a GenericRecord - */ - private GenericRecord convertToGenericRecord(PullRequestMergedEvent pullRequestMergedEvent) { - GenericRecord genericRecord = new GenericData.Record(_avroSchema); - - // Dimensions - genericRecord.put("title", pullRequestMergedEvent.getTitle()); - genericRecord.put("labels", pullRequestMergedEvent.getLabels()); - genericRecord.put("userId", pullRequestMergedEvent.getUserId()); - genericRecord.put("userType", pullRequestMergedEvent.getUserType()); - genericRecord.put("authorAssociation", pullRequestMergedEvent.getAuthorAssociation()); - genericRecord.put("mergedBy", pullRequestMergedEvent.getMergedBy()); - genericRecord.put("assignees", pullRequestMergedEvent.getAssignees()); - genericRecord.put("committers", pullRequestMergedEvent.getCommitters()); - genericRecord.put("reviewers", pullRequestMergedEvent.getReviewers()); - genericRecord.put("commenters", pullRequestMergedEvent.getCommenters()); - genericRecord.put("authors", pullRequestMergedEvent.getAuthors()); - genericRecord.put("requestedReviewers", pullRequestMergedEvent.getRequestedReviewers()); - genericRecord.put("requestedTeams", pullRequestMergedEvent.getRequestedTeams()); - genericRecord.put("repo", pullRequestMergedEvent.getRepo()); - genericRecord.put("organization", pullRequestMergedEvent.getOrganization()); - - // Metrics - genericRecord.put("numComments", pullRequestMergedEvent.getNumComments()); - genericRecord.put("numReviewComments", pullRequestMergedEvent.getNumReviewComments()); - genericRecord.put("numCommits", pullRequestMergedEvent.getNumCommits()); - genericRecord.put("numLinesAdded", pullRequestMergedEvent.getNumLinesAdded()); - genericRecord.put("numLinesDeleted", pullRequestMergedEvent.getNumLinesDeleted()); - genericRecord.put("numFilesChanged", pullRequestMergedEvent.getNumFilesChanged()); - genericRecord.put("numReviewers", pullRequestMergedEvent.getNumReviewers()); - genericRecord.put("numCommenters", pullRequestMergedEvent.getNumCommenters()); - genericRecord.put("numCommitters", pullRequestMergedEvent.getNumCommitters()); - genericRecord.put("numAuthors", pullRequestMergedEvent.getNumAuthors()); - genericRecord.put("createdTimeMillis", pullRequestMergedEvent.getCreatedTimeMillis()); - genericRecord.put("elapsedTimeMillis", pullRequestMergedEvent.getElapsedTimeMillis()); - - // Time column - genericRecord.put("mergedTimeMillis", pullRequestMergedEvent.getMergedTimeMillis()); - - return genericRecord; + _pinotStream.run(); } } diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java new file mode 100644 index 0000000000..14f915d62a --- /dev/null +++ b/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.tools.streams; + +import java.util.concurrent.ExecutorService; +import org.apache.pinot.spi.stream.StreamDataProducer; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class PinotRealtimeSourceTest { + + @Test + public void testBuilder() { + StreamDataProducer producer = Mockito.mock(StreamDataProducer.class); + PinotSourceDataGenerator generator = Mockito.mock(PinotSourceDataGenerator.class); + PinotRealtimeSource realtimeSource = + PinotRealtimeSource.builder().setTopic("mytopic").setProducer(producer).setGenerator(generator).build(); + Assert.assertNotNull(realtimeSource); + + PinotStreamRateLimiter limiter = Mockito.mock(PinotStreamRateLimiter.class); + ExecutorService executorService = Mockito.mock(ExecutorService.class); + realtimeSource = PinotRealtimeSource.builder().setRateLimiter(limiter).setProducer(producer).setGenerator(generator) + .setTopic("mytopic").setExecutor(executorService).setMaxMessagePerSecond(9527).build(); + Assert.assertEquals(realtimeSource._executor, executorService); + Assert.assertEquals(realtimeSource._producer, producer); + Assert.assertEquals(realtimeSource._topicName, "mytopic"); + String qps = realtimeSource._properties.get(PinotRealtimeSource.KEY_OF_MAX_MESSAGE_PER_SECOND).toString(); + Assert.assertNotNull(qps); + Assert.assertEquals(qps, "9527"); + Assert.assertEquals(realtimeSource._rateLimiter, limiter); + } + + @Test(expectedExceptions = NullPointerException.class) + public void testBuilderNoNullProducerThrowExceptions() { + PinotSourceDataGenerator generator = Mockito.mock(PinotSourceDataGenerator.class); + PinotRealtimeSource realtimeSource = + PinotRealtimeSource.builder().setTopic("mytopic").setGenerator(generator).build(); + } + + @Test(expectedExceptions = NullPointerException.class) + public void testBuilderNoNullGeneratorThrowExceptions() { + StreamDataProducer producer = Mockito.mock(StreamDataProducer.class); + PinotRealtimeSource realtimeSource = + PinotRealtimeSource.builder().setTopic("mytopic").setProducer(producer).build(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org