dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r855611494


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java:
##########
@@ -60,94 +41,40 @@ public MeetupRsvpStream()
 
   public MeetupRsvpStream(boolean partitionByKey)
       throws Exception {
-    _partitionByKey = partitionByKey;
+    // calling this constructor means that we wish to use EVENT_ID as key. 
RsvpId is used by MeetupRsvpJsonStream
+    this(partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : 
RsvpSourceGenerator.KeyColumn.NONE);
+  }
 
+  public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn)
+      throws Exception {
     Properties properties = new Properties();
     properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
-    _producer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
-    _source = new Source(createConsumer());
+    StreamDataProducer producer =
+        
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
+    _pinotRealtimeSource =
+        PinotRealtimeSource.builder().setGenerator(new 
RsvpSourceGenerator(keyColumn)).setProducer(producer)
+            .setRateLimiter(permits -> {
+              int delay = (int) 
(Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
+              try {
+                Thread.sleep(delay);
+              } catch (InterruptedException ex) {
+                LOGGER.warn("Interrupted from sleep but will continue", ex);
+              }
+            }).build();
   }
 
   public void run()
       throws Exception {
-    _source.start();
+    _pinotRealtimeSource.run();
   }
 
   public void stopPublishing() {
-    _producer.close();
-    _source.close();
-  }
-
-  protected Consumer<RSVP> createConsumer() {
-    return message -> {
-      try {
-        if (_partitionByKey) {
-          _producer.produce(_topicName, message.getEventId().getBytes(UTF_8),

Review Comment:
   For Reviewer: Note that this line is the difference with 
`MeetupRsvpJsonStream`. `getEventId()` is the different key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to