Jackie-Jiang commented on code in PR #14447: URL: https://github.com/apache/pinot/pull/14447#discussion_r1842720764
########## pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java: ########## @@ -361,16 +360,32 @@ public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig, TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile); } + public static StreamDataProducer getKafkaProducer(String kafkaBroker) + throws Exception { + Properties properties = new Properties(); + properties.put("metadata.broker.list", kafkaBroker); + properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); + properties.put("request.required.acks", "1"); + properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner"); + return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); + } + /** - * Push the records from the given Avro files into a Kafka stream. - * - * @param csvFile CSV File name - * @param kafkaTopic Kafka topic - * @param partitionColumnIndex Optional Index of the partition column - * @throws Exception + * Push the records from the given CSV file into a Kafka stream. */ - public static void pushCsvIntoKafka(File csvFile, String kafkaTopic, - @Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer) + public static void pushCsvIntoKafka(File csvFile, String kafkaBroker, String kafkaTopic, + @Nullable Integer partitionColumnIndex, boolean injectTombstones) + throws Exception { + try (StreamDataProducer producer = getKafkaProducer(kafkaBroker)) { + pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, injectTombstones, producer); Review Comment: This part ensures the producer is closed to avoid resource leak -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org