This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d4d910cec1 FileWriter bug fixes (#12208) d4d910cec1 is described below commit d4d910cec11569c5a0b6f986b361e0b3c1903962 Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Tue Jan 2 23:03:24 2024 +0530 FileWriter bug fixes (#12208) * avoid writing header line for all file formats * ensure number of total docs matches the spec * introduce preprocess step for every file being written * extract headers once and reuse them --- .../recommender/data/writer/CsvWriter.java | 14 +++++++++++++ .../recommender/data/writer/FileWriter.java | 23 ++++++++++++++-------- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java index 88547cc757..d3587336da 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java @@ -25,6 +25,14 @@ import org.apache.pinot.controller.recommender.data.generator.DataGenerator; public class CsvWriter extends FileWriter { + private String _headers; + + @Override + public void init(WriterSpec spec) { + super.init(spec); + _headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ","); + } + @Override protected String generateRow(DataGenerator generator) { Map<String, Object> row = generator.nextRow(); @@ -38,6 +46,12 @@ public class CsvWriter extends FileWriter { return StringUtils.join(values, ","); } + @Override + protected void preprocess(java.io.FileWriter writer) + throws Exception { + writer.append(_headers).append('\n'); + } + private Object serializeIfMultiValue(Object obj) { if (obj instanceof List) { return StringUtils.join((List) obj, ";"); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java index b74c2ebbde..f02f245ced 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java @@ -20,7 +20,6 @@ package org.apache.pinot.controller.recommender.data.writer; import java.io.File; import java.util.Objects; -import org.apache.commons.lang.StringUtils; import org.apache.pinot.controller.recommender.data.generator.DataGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory; public abstract class FileWriter implements Writer { private static final Logger LOGGER = LoggerFactory.getLogger(FileWriter.class); - private FileWriterSpec _spec; + protected FileWriterSpec _spec; @Override public void init(WriterSpec spec) { _spec = (FileWriterSpec) spec; @@ -38,21 +37,29 @@ public abstract class FileWriter implements Writer { @Override public void write() throws Exception { - final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles()); - final String headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ","); + long totalDocs = _spec.getTotalDocs(); + final long docsPerFile = (long) Math.ceil((double) totalDocs / _spec.getNumFiles()); final String extension = getExtension() == null ? "" : String.format(".%s", getExtension()); - for (int i = 0; i < _spec.getNumFiles(); i++) { + long ingestedDocs = 0; + int fileIndex = 0; + while (ingestedDocs < totalDocs) { try (java.io.FileWriter writer = - new java.io.FileWriter(new File(_spec.getBaseDir(), String.format("output_%d%s", i, extension)))) { - writer.append(headers).append('\n'); - for (int j = 0; j < numPerFiles; j++) { + new java.io.FileWriter(new File(_spec.getBaseDir(), String.format("output_%d%s", fileIndex, extension)))) { + preprocess(writer); + for (int j = 0; j < docsPerFile && ingestedDocs < totalDocs; j++) { String appendString = generateRow(_spec.getGenerator()); writer.append(appendString).append('\n'); + ingestedDocs++; } } + fileIndex++; } } + protected void preprocess(java.io.FileWriter writer) + throws Exception { + } + protected String getExtension() { return null; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org