mcvsubbu commented on a change in pull request #6419: URL: https://github.com/apache/incubator-pinot/pull/6419#discussion_r575606362
########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/StreamOp.java ########## @@ -71,18 +125,170 @@ public void setInputDataFileName(String inputDataFileName) { _inputDataFileName = inputDataFileName; } + public String getTableConfigFileName() { + return _tableConfigFileName; + } + + public void setTableConfigFileName(String tableConfigFileName) { + _tableConfigFileName = tableConfigFileName; + } + + public String getRecordReaderConfigFileName() { + return _recordReaderConfigFileName; + } + + public void setRecordReaderConfigFileName(String recordReaderConfigFileName) { + _recordReaderConfigFileName = recordReaderConfigFileName; + } + @Override boolean runOp() { - System.out.println("Produce rows into stream " + _streamConfigFileName + " and verify rows in tables " - + _tableConfigFileNames); + switch(_op) { + case CREATE: + return createKafkaTopic(); + case PRODUCE: + return produceData(); + } return true; } - public List<String> getTableConfigFileNames() { - return _tableConfigFileNames; + private boolean createKafkaTopic() { + try { + Properties streamConfigMap = JsonUtils.fileToObject(new File(_streamConfigFileName), Properties.class); + String topicName = streamConfigMap.getProperty(TOPIC_NAME); + int partitions = Integer.parseInt(streamConfigMap.getProperty(NUM_PARTITIONS)); + + final Map<String, Object> config = new HashMap<>(); + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ClusterDescriptor.DEFAULT_HOST + ":" + ClusterDescriptor.KAFKA_PORT); + config.put(AdminClientConfig.CLIENT_ID_CONFIG, "Kafka2AdminClient-" + UUID.randomUUID().toString()); + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 15000); + AdminClient adminClient = KafkaAdminClient.create(config); + NewTopic topic = new NewTopic(topicName, partitions, (short) 1); Review comment: Can you define this 1 as `private static final short KAFKA_REPLICATION_FACTOR` ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org