siddharthteotia commented on a change in pull request #6546:
URL: https://github.com/apache/incubator-pinot/pull/6546#discussion_r572446070



##########
File path: 
pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
##########
@@ -387,4 +418,182 @@ private long 
calculateMemoryForCompletedSegmentsPerPartition(long completedSegme
   public String[][] getNumSegmentsQueriedPerHost() {
     return _numSegmentsQueriedPerHost;
   }
+
+  private static File generateCompletedSegment(File dataCharacteristicsFile, 
File schemaFile, TableConfig tableConfig) {
+    SegmentGenerator segmentGenerator = new 
SegmentGenerator(dataCharacteristicsFile, schemaFile, tableConfig, true);
+    return segmentGenerator.generate();
+  }
+
+  /**
+   * This class is used in Memory Estimator to generate segment based on the 
the given characteristics of data
+   */
+  public static class SegmentGenerator {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGenerator.class);
+    private static final SimpleDateFormat DATE_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
+
+    private File _dataCharacteristicsFile;
+    private File _schemaFile;
+    private TableConfig _tableConfig;
+    private final boolean _deleteCsv;
+
+    public SegmentGenerator(File dataCharacteristicsFile, File schemaFile, 
TableConfig tableConfig, boolean deleteCsv) {
+      _dataCharacteristicsFile = dataCharacteristicsFile;
+      _schemaFile = schemaFile;
+      _tableConfig = tableConfig;
+      _deleteCsv = deleteCsv;
+    }
+
+    public File generate() {
+      Date now = new Date();
+
+      // extract schema
+      Schema schema;
+      try {
+        schema = JsonUtils.fileToObject(_schemaFile, Schema.class);
+      } catch (Exception e) {
+        throw new RuntimeException(String.format("Cannot read schema file '%s' 
to schema object.", _schemaFile));
+      }
+
+      // generate data & creat segment
+      File csvDataFile = generateData(schema, now);
+      File segment = createSegment(csvDataFile, schema, now);
+
+      if (_deleteCsv) {
+        csvDataFile.delete();
+      }
+      return segment;
+    }
+
+    private File generateData(Schema schema, Date now) {
+
+      // extract data characteristics
+      DataCharacteristics dataCharacteristics;
+      try {
+         dataCharacteristics = 
JsonUtils.fileToObject(_dataCharacteristicsFile, DataCharacteristics.class);
+      } catch (Exception e) {
+        throw new RuntimeException("Cannot deserialize data characteristic 
file " + _dataCharacteristicsFile);
+      }
+
+      // create maps of "column name" to ...
+      Map<String, Integer> lengths = new HashMap<>();
+      Map<String, Double> mvCounts = new HashMap<>();
+      Map<String, Integer> cardinalities = new HashMap<>();
+      Map<String, FieldSpec.DataType> dataTypes = new HashMap<>();
+      Map<String, FieldSpec.FieldType> fieldTypes = new HashMap<>();
+      Map<String, TimeUnit> timeUnits = new HashMap<>();
+      List<String> colNames = new ArrayList<>();
+      dataCharacteristics.columnCharacteristics.forEach(column -> {
+        colNames.add(column.name);
+        lengths.put(column.name, column.averageLength);
+        mvCounts.put(column.name, column.numberOfValuesPerEntry);
+        cardinalities.put(column.name, column.cardinality);
+      });
+      schema.getAllFieldSpecs().forEach(fieldSpec -> {
+        String name = fieldSpec.getName();
+        dataTypes.put(name, fieldSpec.getDataType());
+        fieldTypes.put(name, fieldSpec.getFieldType());
+      });
+      timeUnits.put(schema.getTimeFieldSpec().getName(),
+          
schema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType());
+
+      // generate data
+      String outputDir = getOutputDir(now, "-csv");
+      DataGeneratorSpec spec =
+          new DataGeneratorSpec(colNames, cardinalities, new HashMap<>(), new 
HashMap<>(), mvCounts, lengths, dataTypes,
+              fieldTypes, timeUnits, FileFormat.CSV, outputDir, true);
+      DataGenerator dataGenerator = new DataGenerator();
+      try {
+        dataGenerator.init(spec);
+        dataGenerator.generateCsv(dataCharacteristics.numberOfRows, 1);
+        File outputFile = Paths.get(outputDir, "output_0.csv").toFile();
+        LOGGER.info("Successfully generated data file: {}", outputFile);
+        return outputFile;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private File createSegment(File csvDataFile, Schema schema, Date now) {
+
+      // create segment
+      LOGGER.info("Started creating segment from file: {}", csvDataFile);
+      String outDir = getOutputDir(now, "-segment");
+      SegmentGeneratorConfig segmentGeneratorConfig = 
getSegmentGeneratorConfig(csvDataFile, schema, outDir);
+      SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+      try {
+        driver.init(segmentGeneratorConfig);
+        driver.build();
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while generating segment 
from file: " + csvDataFile, e);
+      }
+      String segmentName = driver.getSegmentName();
+      File indexDir = new File(outDir, segmentName);
+      LOGGER.info("Successfully created segment: {} at directory: {}", 
segmentName, indexDir);
+
+      // verify segment
+      LOGGER.info("Verifying the segment by loading it");
+      ImmutableSegment segment;
+      try {
+        segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while verifying the 
created segment", e);
+      }
+      LOGGER.info("Successfully loaded segment: {} of size: {} bytes", 
segmentName,
+          segment.getSegmentSizeBytes());
+      segment.destroy();
+
+      return indexDir;
+    }
+
+    private SegmentGeneratorConfig getSegmentGeneratorConfig(File csvDataFile, 
Schema schema, String outDir) {
+      SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, schema);
+      segmentGeneratorConfig.setInputFilePath(csvDataFile.getPath());
+      segmentGeneratorConfig.setFormat(FileFormat.CSV);
+      segmentGeneratorConfig.setOutDir(outDir);
+      segmentGeneratorConfig.setReaderConfig(new CSVRecordReaderConfig()); // 
FIXME

Review comment:
       Why FIXME?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to