Jackie-Jiang commented on code in PR #14447:
URL: https://github.com/apache/pinot/pull/14447#discussion_r1842720764


##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java:
##########
@@ -361,16 +360,32 @@ public static void buildSegmentFromAvro(File avroFile, 
TableConfig tableConfig,
     TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
   }
 
+  public static StreamDataProducer getKafkaProducer(String kafkaBroker)
+      throws Exception {
+    Properties properties = new Properties();
+    properties.put("metadata.broker.list", kafkaBroker);
+    properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
+    properties.put("request.required.acks", "1");
+    properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
+    return 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
+  }
+
   /**
-   * Push the records from the given Avro files into a Kafka stream.
-   *
-   * @param csvFile CSV File name
-   * @param kafkaTopic Kafka topic
-   * @param partitionColumnIndex Optional Index of the partition column
-   * @throws Exception
+   * Push the records from the given CSV file into a Kafka stream.
    */
-  public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
-      @Nullable Integer partitionColumnIndex, boolean injectTombstones, 
StreamDataProducer producer)
+  public static void pushCsvIntoKafka(File csvFile, String kafkaBroker, String 
kafkaTopic,
+      @Nullable Integer partitionColumnIndex, boolean injectTombstones)
+      throws Exception {
+    try (StreamDataProducer producer = getKafkaProducer(kafkaBroker)) {
+      pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, 
injectTombstones, producer);

Review Comment:
   This part ensures the producer is closed to avoid resource leak



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to