mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551492153



##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";

Review comment:
       Please pick this up from CommonConstants

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String 
tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) 
{
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + 
_inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller 
and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), 
"pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file 
{}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports 
generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to 
TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new 
File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), 
Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, 
_recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", 
_segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new 
URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, 
segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state 
matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else 
false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > 
DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after 
max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after 
{} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is 
{}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new 
File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      
ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} 
failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > 
DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater 
than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will 
retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", 
_segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        
ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are 
in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)

Review comment:
       The code here will work, but seems to be implying that a segment can 
exist in both realtime and offline tables with the same name.
   
   You can choose to just consider the offline side for now. Let us add the 
realtime side later.

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String 
tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) 
{
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + 
_inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller 
and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), 
"pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file 
{}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports 
generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to 
TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new 
File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), 
Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, 
_recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", 
_segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new 
URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, 
segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state 
matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else 
false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > 
DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after 
max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after 
{} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is 
{}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new 
File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      
ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} 
failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > 
DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater 
than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will 
retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", 
_segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        
ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are 
in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    long offlineSegmentCount =
+        getExternalViewForTable().offline != null ? 
getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equalsIgnoreCase(_segmentName)).filter(v 
-> v.getValue().values().contains(state))

Review comment:
       segment names are case sensitive. Please do not ignore case.
   Also, if there are two replicas, and one of them is ONLINE and the other is 
not, how will this work?
   Instead of counting the number, can we get the state of all replicas of the 
segment and return true ONLY if they are in the intended state?

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";
+  private static final int DEFAULT_MAX_SLEEP_TIME_MS = 30000;
+  private static final int DEFAULT_SLEEP_INTERVAL_MS = 200;
+
   public enum Op {
-    UPLOAD,
-    DELETE
+    UPLOAD, DELETE

Review comment:
       nit:  I prefer one per line for readability




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to