This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d910cec1 FileWriter bug fixes (#12208)
d4d910cec1 is described below

commit d4d910cec11569c5a0b6f986b361e0b3c1903962
Author: Shounak kulkarni <shounakmk...@gmail.com>
AuthorDate: Tue Jan 2 23:03:24 2024 +0530

    FileWriter bug fixes (#12208)
    
    * avoid writing header line for all file formats
    
    * ensure number of total docs matches the spec
    
    * introduce preprocess step for every file being written
    
    * extract headers once and reuse them
---
 .../recommender/data/writer/CsvWriter.java         | 14 +++++++++++++
 .../recommender/data/writer/FileWriter.java        | 23 ++++++++++++++--------
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
index 88547cc757..d3587336da 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
@@ -25,6 +25,14 @@ import 
org.apache.pinot.controller.recommender.data.generator.DataGenerator;
 
 
 public class CsvWriter extends FileWriter {
+  private String _headers;
+
+  @Override
+  public void init(WriterSpec spec) {
+    super.init(spec);
+    _headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ",");
+  }
+
   @Override
   protected String generateRow(DataGenerator generator) {
     Map<String, Object> row = generator.nextRow();
@@ -38,6 +46,12 @@ public class CsvWriter extends FileWriter {
     return StringUtils.join(values, ",");
   }
 
+  @Override
+  protected void preprocess(java.io.FileWriter writer)
+      throws Exception {
+    writer.append(_headers).append('\n');
+  }
+
   private Object serializeIfMultiValue(Object obj) {
     if (obj instanceof List) {
       return StringUtils.join((List) obj, ";");
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
index b74c2ebbde..f02f245ced 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.recommender.data.writer;
 
 import java.io.File;
 import java.util.Objects;
-import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
 public abstract class FileWriter implements Writer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FileWriter.class);
 
-  private FileWriterSpec _spec;
+  protected FileWriterSpec _spec;
   @Override
   public void init(WriterSpec spec) {
     _spec = (FileWriterSpec) spec;
@@ -38,21 +37,29 @@ public abstract class FileWriter implements Writer {
   @Override
   public void write()
       throws Exception {
-    final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles());
-    final String headers = 
StringUtils.join(_spec.getGenerator().nextRow().keySet(), ",");
+    long totalDocs = _spec.getTotalDocs();
+    final long docsPerFile = (long) Math.ceil((double) totalDocs / 
_spec.getNumFiles());
     final String extension = getExtension() == null ? "" : 
String.format(".%s", getExtension());
-    for (int i = 0; i < _spec.getNumFiles(); i++) {
+    long ingestedDocs = 0;
+    int fileIndex = 0;
+    while (ingestedDocs < totalDocs) {
       try (java.io.FileWriter writer =
-          new java.io.FileWriter(new File(_spec.getBaseDir(), 
String.format("output_%d%s", i, extension)))) {
-        writer.append(headers).append('\n');
-        for (int j = 0; j < numPerFiles; j++) {
+          new java.io.FileWriter(new File(_spec.getBaseDir(), 
String.format("output_%d%s", fileIndex, extension)))) {
+        preprocess(writer);
+        for (int j = 0; j < docsPerFile && ingestedDocs < totalDocs; j++) {
           String appendString = generateRow(_spec.getGenerator());
           writer.append(appendString).append('\n');
+          ingestedDocs++;
         }
       }
+      fileIndex++;
     }
   }
 
+  protected void preprocess(java.io.FileWriter writer)
+      throws Exception {
+  }
+
   protected String getExtension() {
     return null;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to