This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 42715fcc4af Deterministic data-only CRC (1/2) - add data-only CRC
computation for new segments and persist to ZK with new data CRC key (#17264)
42715fcc4af is described below
commit 42715fcc4af2ce61bf79982a16e39b55045ba09b
Author: Anurag Rai <[email protected]>
AuthorDate: Tue Dec 16 03:17:56 2025 +0530
Deterministic data-only CRC (1/2) - add data-only CRC computation for new
segments and persist to ZK with new data CRC key (#17264)
---
.../common/metadata/segment/SegmentZKMetadata.java | 8 +
.../metadata/segment/SegmentZKMetadataUtils.java | 1 +
.../resources/TableLLCSegmentUploadResponse.java | 11 +-
.../common/metadata/SegmentZKMetadataTest.java | 3 +
.../realtime/PinotLLCRealtimeSegmentManager.java | 14 +-
.../realtime/provisioning/MemoryEstimator.java | 1 +
.../controller/api/upload/ZKOperatorTest.java | 3 +
.../PinotLLCRealtimeSegmentManagerTest.java | 5 +-
.../controller/utils/SegmentMetadataMockUtils.java | 3 +
.../org/apache/pinot/core/util/CrcUtilsTest.java | 207 ++++++++++++++++++++-
.../data/test_crc_complex_sample_data.csv | 20 ++
.../resources/data/test_crc_complex_schema.json | 27 +++
.../resources/data/test_crc_complex_table.json | 83 +++++++++
.../segment/creator/impl/BaseSegmentCreator.java | 40 +++-
.../apache/pinot/segment/local/utils/CrcUtils.java | 71 ++++---
.../apache/pinot/segment/spi/SegmentMetadata.java | 2 +
.../spi/index/metadata/SegmentMetadataImpl.java | 27 ++-
.../pinot/server/api/resources/TablesResource.java | 9 +-
.../pinot/server/api/TablesResourceTest.java | 4 +-
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
20 files changed, 493 insertions(+), 47 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index c26542fe238..8938cd30809 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -174,6 +174,14 @@ public class SegmentZKMetadata implements ZKMetadata {
setNonNegativeValue(Segment.CRC, crc);
}
+ public long getDataCrc() {
+ return _znRecord.getLongField(Segment.DATA_CRC, -1);
+ }
+
+ public void setDataCrc(long dataCrc) {
+ setNonNegativeValue(Segment.DATA_CRC, dataCrc);
+ }
+
public String getTier() {
return _simpleFields.get(Segment.TIER);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
index 58de48a843c..0c72fcd7600 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
@@ -160,6 +160,7 @@ public class SegmentZKMetadataUtils {
segmentZKMetadata.setIndexVersion(segmentVersion != null ?
segmentVersion.toString() : null);
segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
+ segmentZKMetadata.setDataCrc(Long.parseLong(segmentMetadata.getDataCrc()));
segmentZKMetadata.setDownloadUrl(downloadUrl);
segmentZKMetadata.setCrypterName(crypterName);
segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableLLCSegmentUploadResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableLLCSegmentUploadResponse.java
index 2bbbb52f464..af18cf2bb08 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableLLCSegmentUploadResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableLLCSegmentUploadResponse.java
@@ -18,18 +18,21 @@
*/
package org.apache.pinot.common.restlet.resources;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
-
+@JsonIgnoreProperties(ignoreUnknown = true)
public class TableLLCSegmentUploadResponse {
private final String _segmentName;
private final long _crc;
+ private final long _dataCrc;
private final String _downloadUrl;
public TableLLCSegmentUploadResponse(@JsonProperty("segmentName") String
segmentName, @JsonProperty("crc") long crc,
- @JsonProperty("downloadUrl") String downloadUrl) {
+ @JsonProperty ("dataCrc") long dataCrc, @JsonProperty("downloadUrl")
String downloadUrl) {
_segmentName = segmentName;
_crc = crc;
+ _dataCrc = dataCrc;
_downloadUrl = downloadUrl;
}
@@ -41,6 +44,10 @@ public class TableLLCSegmentUploadResponse {
return _crc;
}
+ public long getDataCrc() {
+ return _dataCrc;
+ }
+
public String getDownloadUrl() {
return _downloadUrl;
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
index 482543b4234..5b9764c88da 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
@@ -139,6 +139,7 @@ public class SegmentZKMetadataTest {
record.setSimpleField(CommonConstants.Segment.TIME_UNIT,
TimeUnit.HOURS.toString());
record.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000);
record.setLongField(CommonConstants.Segment.CRC, 1234);
+ record.setLongField(CommonConstants.Segment.DATA_CRC, 4567);
record.setLongField(CommonConstants.Segment.CREATION_TIME, 3000);
record.setIntField(CommonConstants.Segment.Realtime.FLUSH_THRESHOLD_SIZE,
1234);
return record;
@@ -153,6 +154,7 @@ public class SegmentZKMetadataTest {
realtimeSegmentMetadata.setStatus(Status.DONE);
realtimeSegmentMetadata.setTotalDocs(10000);
realtimeSegmentMetadata.setCrc(1234);
+ realtimeSegmentMetadata.setDataCrc(4567);
realtimeSegmentMetadata.setCreationTime(3000);
realtimeSegmentMetadata.setSizeThresholdToFlushSegment(1234);
return realtimeSegmentMetadata;
@@ -208,6 +210,7 @@ public class SegmentZKMetadataTest {
offlineSegmentMetadata.setTimeUnit(TimeUnit.HOURS);
offlineSegmentMetadata.setTotalDocs(50000);
offlineSegmentMetadata.setCrc(54321);
+ offlineSegmentMetadata.setDataCrc(-1);
offlineSegmentMetadata.setCreationTime(1000);
offlineSegmentMetadata.setDownloadUrl("http://localhost:8000/testTable_O_3000_4000");
offlineSegmentMetadata.setPushTime(4000);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9c3a710bfe2..57e2f027c90 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2124,6 +2124,7 @@ public class PinotLLCRealtimeSegmentManager {
LOGGER.info("Updating CRC in ZK metadata for segment: {} from: {} to:
{}", segmentName, currentMetadata.getCrc(),
uploadedMetadata.getCrc());
currentMetadata.setCrc(uploadedMetadata.getCrc());
+ currentMetadata.setDataCrc(uploadedMetadata.getDataCrc());
}
moveSegmentAndSetDownloadUrl(rawTableName, segmentName,
uploadedMetadata.getDownloadUrl(), pinotFS,
currentMetadata);
@@ -2142,9 +2143,20 @@ public class PinotLLCRealtimeSegmentManager {
throws Exception {
long currentCrc = currentMetadata.getCrc();
long newCrc = response.getCrc();
+ long currentDataCrc = currentMetadata.getDataCrc();
+ long newDataCrc = response.getDataCrc();
if (currentCrc != newCrc) {
- LOGGER.info("Updating CRC in ZK metadata for segment: {} from: {} to:
{}", segmentName, currentCrc, newCrc);
+ LOGGER.info("Updating CRC and data CRC in ZK metadata for segment: {} "
+ + "[oldCRC: {} newCRC: {}][oldDataCRC: {} newDataCRC: {}]",
+ segmentName, currentCrc, newCrc, currentMetadata.getDataCrc(),
newDataCrc);
currentMetadata.setCrc(newCrc);
+ // since data CRC is a subset of CRC, we update data CRC too when CRC
changes
+ currentMetadata.setDataCrc(newDataCrc);
+ }
+ if (currentDataCrc != newDataCrc) {
+ LOGGER.info("Updating data CRC in ZK metadata for segment: {} from: {}
to: {}", segmentName, currentDataCrc,
+ newDataCrc);
+ currentMetadata.setDataCrc(newDataCrc);
}
moveSegmentAndSetDownloadUrl(rawTableName, segmentName,
response.getDownloadUrl(), pinotFS, currentMetadata);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
index 4c5240b255c..155f919302f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
@@ -408,6 +408,7 @@ public class MemoryEstimator {
segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
segmentZKMetadata.setTotalDocs(totalDocs);
segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
+ segmentZKMetadata.setDataCrc(Long.parseLong(segmentMetadata.getDataCrc()));
return segmentZKMetadata;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index e76e6abe643..339d6b253e6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -159,6 +159,7 @@ public class ZKOperatorTest {
SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(segmentMetadata.getName()).thenReturn(segmentName);
when(segmentMetadata.getCrc()).thenReturn("12345");
+ when(segmentMetadata.getDataCrc()).thenReturn("432");
when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
HttpHeaders httpHeaders = mock(HttpHeaders.class);
@@ -200,6 +201,7 @@ public class ZKOperatorTest {
SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
when(segmentMetadata.getCrc()).thenReturn("12345");
+ when(segmentMetadata.getDataCrc()).thenReturn("432");
when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
HttpHeaders httpHeaders = mock(HttpHeaders.class);
@@ -331,6 +333,7 @@ public class ZKOperatorTest {
SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
when(segmentMetadata.getCrc()).thenReturn("12345");
+ when(segmentMetadata.getDataCrc()).thenReturn("432");
zkOperator.completeSegmentOperations(REALTIME_TABLE_CONFIG,
segmentMetadata, FileUploadType.SEGMENT, null, null,
"downloadUrl", "downloadUrl", null, 10, true, true,
mock(HttpHeaders.class));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index d2c26e78f2b..5de5716e5cd 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -133,6 +133,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
static final Interval INTERVAL = new Interval(START_TIME_MS, END_TIME_MS);
// NOTE: CRC is always non-negative
static final String CRC = Long.toString(RANDOM.nextLong() & 0xFFFFFFFFL);
+ static final String DATA_CRC = Long.toString(RANDOM.nextLong() &
0xFFFFFFFFL);
static final SegmentVersion SEGMENT_VERSION = RANDOM.nextBoolean() ?
SegmentVersion.v1 : SegmentVersion.v3;
static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
static final long LATEST_OFFSET = PARTITION_OFFSET.getOffset() * 2 +
NUM_DOCS;
@@ -148,6 +149,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getTimeInterval()).thenReturn(INTERVAL);
when(segmentMetadata.getCrc()).thenReturn(CRC);
+ when(segmentMetadata.getDataCrc()).thenReturn(DATA_CRC);
when(segmentMetadata.getVersion()).thenReturn(SEGMENT_VERSION);
when(segmentMetadata.getTotalDocs()).thenReturn(NUM_DOCS);
return segmentMetadata;
@@ -257,6 +259,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertEquals(committedSegmentZKMetadata.getEndOffset(), nextOffset);
assertEquals(committedSegmentZKMetadata.getCreationTime(),
CURRENT_TIME_MS);
assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
+ assertEquals(committedSegmentZKMetadata.getDataCrc(),
Long.parseLong(DATA_CRC));
assertEquals(committedSegmentZKMetadata.getIndexVersion(),
SEGMENT_VERSION.name());
assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
assertEquals(committedSegmentZKMetadata.getSizeInBytes(),
SEGMENT_SIZE_IN_BYTES);
@@ -1459,7 +1462,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
new SegmentZKMetadata(new
ZNRecord(segmentsZKMetadata.get(0).toZNRecord()));
when(segmentManager._mockedFileUploadDownloadClient.uploadLLCToSegmentStore(serverUploadRequestUrl0)).thenReturn(
- new
TableLLCSegmentUploadResponse(segmentsZKMetadata.get(0).getSegmentName(),
12345678L,
+ new
TableLLCSegmentUploadResponse(segmentsZKMetadata.get(0).getSegmentName(),
12345678L, 43210L,
tempSegmentFileLocation.getPath()));
// Change 2nd segment status to be DONE, but with default peer download
url.
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index a604e7e70e1..a434327e291 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -45,6 +45,7 @@ public class SegmentMetadataMockUtils {
Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(numTotalDocs);
Mockito.when(segmentMetadata.getCrc()).thenReturn(crc);
+
Mockito.when(segmentMetadata.getDataCrc()).thenReturn(String.valueOf(Long.parseLong(crc)
+ 100));
Mockito.when(segmentMetadata.getStartTime()).thenReturn(startTime);
Mockito.when(segmentMetadata.getEndTime()).thenReturn(endTime);
Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(
@@ -97,6 +98,7 @@ public class SegmentMetadataMockUtils {
when(segmentMetadata.getTableName()).thenReturn(rawTableName);
when(segmentMetadata.getName()).thenReturn(segmentName);
when(segmentMetadata.getCrc()).thenReturn("0");
+ when(segmentMetadata.getDataCrc()).thenReturn("1");
TreeMap<String, ColumnMetadata> columnMetadataMap = new TreeMap<>();
columnMetadataMap.put(columnName, columnMetadata);
@@ -110,6 +112,7 @@ public class SegmentMetadataMockUtils {
Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(10);
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
+
Mockito.when(segmentMetadata.getDataCrc()).thenReturn(Long.toString(System.nanoTime()));
Mockito.when(segmentMetadata.getStartTime()).thenReturn(endTime - 10);
Mockito.when(segmentMetadata.getEndTime()).thenReturn(endTime);
Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/util/CrcUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/CrcUtilsTest.java
index ec318456833..ecb8d7df18b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/CrcUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/CrcUtilsTest.java
@@ -18,15 +18,23 @@
*/
package org.apache.pinot.core.util;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import
org.apache.pinot.segment.local.segment.index.converter.SegmentV1V2ToV3FormatConverter;
import org.apache.pinot.segment.local.utils.CrcUtils;
+import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
@@ -36,8 +44,11 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -49,6 +60,9 @@ public class CrcUtilsTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"CrcUtilsTest");
private static final String AVRO_DATA = "data/test_data-mv.avro";
private static final String RAW_TABLE_NAME = "testTable";
+ private static final String COMPLEX_SCHEMA_NAME =
"data/test_crc_complex_schema.json";
+ private static final String COMPLEX_TABLE_CONFIG_NAME =
"data/test_crc_complex_table.json";
+ private static final String COMPLEX_DATA_NAME =
"data/test_crc_complex_sample_data.csv";
//@formatter:off
private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
@@ -92,10 +106,10 @@ public class CrcUtilsTest {
driver.build();
File indexDir = driver.getOutputDirectory();
- assertEquals(CrcUtils.forAllFilesInFolder(indexDir).computeCrc(),
2102337593L);
+ assertEquals(CrcUtils.computeCrc(indexDir), 2102337593L);
new SegmentV1V2ToV3FormatConverter().convert(indexDir);
- assertEquals(CrcUtils.forAllFilesInFolder(indexDir).computeCrc(),
3362640853L);
+ assertEquals(CrcUtils.computeCrc(indexDir), 3362640853L);
}
@Test
@@ -122,14 +136,14 @@ public class CrcUtilsTest {
driver.build();
File indexDir = driver.getOutputDirectory();
- assertEquals(CrcUtils.forAllFilesInFolder(indexDir).computeCrc(),
289171778L);
+ assertEquals(CrcUtils.computeCrc(indexDir), 289171778L);
new SegmentV1V2ToV3FormatConverter().convert(indexDir);
- assertEquals(CrcUtils.forAllFilesInFolder(indexDir).computeCrc(),
3409394291L);
+ assertEquals(CrcUtils.computeCrc(indexDir), 3409394291L);
}
@Test
- public void testCrcWithLuceneFstIndex()
+ public void testCrcAllFilesWithLuceneFstIndex()
throws Exception {
URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
assertNotNull(resource);
@@ -151,14 +165,14 @@ public class CrcUtilsTest {
driver.build();
File indexDir = driver.getOutputDirectory();
- assertEquals(CrcUtils.forAllFilesInFolder(indexDir).computeCrc(),
2627227852L);
+ assertEquals(CrcUtils.computeCrc(indexDir), 2627227852L);
new SegmentV1V2ToV3FormatConverter().convert(indexDir);
- assertEquals(CrcUtils.forAllFilesInFolder(indexDir).computeCrc(),
1229791705L);
+ assertEquals(CrcUtils.computeCrc(indexDir), 1229791705L);
}
@Test
- public void testCrcWithLuceneTextIndex()
+ public void testCrcAllFilesWithLuceneTextIndex()
throws Exception {
URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
assertNotNull(resource);
@@ -183,9 +197,182 @@ public class CrcUtilsTest {
// When using text index in RealTime table, different crc values can cause
servers to have to download segments
// from deep store to make segment replicas in sync.
File indexDir = driver.getOutputDirectory();
- System.out.println(CrcUtils.forAllFilesInFolder(indexDir).computeCrc());
+ System.out.println(CrcUtils.computeCrc(indexDir));
new SegmentV1V2ToV3FormatConverter().convert(indexDir);
- System.out.println(CrcUtils.forAllFilesInFolder(indexDir).computeCrc());
+ System.out.println(CrcUtils.computeCrc(indexDir));
+ }
+
+ @Test
+ public void testDataCrcConsistencyWithExternalIndexesForAllSegmentVers()
+ throws Exception {
+ int numRuns = 5;
+
+ URL schemaResource =
getClass().getClassLoader().getResource(COMPLEX_SCHEMA_NAME);
+ URL tableConfigResource =
getClass().getClassLoader().getResource(COMPLEX_TABLE_CONFIG_NAME);
+ URL dataResource =
getClass().getClassLoader().getResource(COMPLEX_DATA_NAME);
+
+ assertNotNull(schemaResource, "Schema file not found: " +
COMPLEX_SCHEMA_NAME);
+ assertNotNull(tableConfigResource, "Table config file not found: " +
COMPLEX_TABLE_CONFIG_NAME);
+ assertNotNull(dataResource, "Data file not found: " + COMPLEX_DATA_NAME);
+
+ Schema schema = Schema.fromFile(new
File(TestUtils.getFileFromResourceUrl(schemaResource)));
+ TableConfig tableConfig = createTableConfig(new
File(tableConfigResource.getFile()));
+ File dataFile = new File(TestUtils.getFileFromResourceUrl(dataResource));
+
+ SegmentVersion[] versionsToTest = new SegmentVersion[]{
+ SegmentVersion.v1,
+ SegmentVersion.v2,
+ SegmentVersion.v3
+ };
+
+ for (SegmentVersion version : versionsToTest) {
+
+ List<Long> crcs = new ArrayList<Long>();
+
+ for (int i = 0; i < numRuns; i++) {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ INDEX_DIR.mkdirs();
+
+ SegmentGeneratorConfig config = new
SegmentGeneratorConfig(tableConfig, schema);
+ config.setInputFilePath(dataFile.getAbsolutePath());
+ config.setSegmentVersion(version);
+ config.setOutDir(INDEX_DIR.getAbsolutePath());
+ config.setFormat(FileFormat.CSV);
+ config.setSegmentName("testCrcSegment");
+
+ CSVRecordReaderConfig csvReaderConfig = new CSVRecordReaderConfig();
+ csvReaderConfig.setMultiValueDelimiter('|');
+ csvReaderConfig.setSkipHeader(false);
+
+ config.setReaderConfig(csvReaderConfig);
+ SegmentIndexCreationDriver driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config);
+ driver.build();
+
+ File indexDir = driver.getOutputDirectory();
+
+ Long currentDataCrc = readDataOnlyCrcFromMetaIfPresent(indexDir);
+
+ Assert.assertNotNull(currentDataCrc);
+
+ crcs.add(currentDataCrc);
+ System.out.println(String.format(" [%s] Run %d Data CRC got from meta
file: %d",
+ version, i + 1,
+ currentDataCrc));
+ }
+
+ Assert.assertFalse(crcs.isEmpty());
+ long firstRunCrc = crcs.get(0);
+ for (int i = 1; i < crcs.size(); i++) {
+ Assert.assertEquals(crcs.get(i).longValue(), firstRunCrc,
+ String.format("CRC Determinism test Failed for %s! Run 1 vs Run
%d", version, i + 1));
+ }
+ }
+ }
+
+ @Test
+ public void testDataCrcNotGeneratedIfForwardIndexDisabled() throws Exception
{
+ URL schemaResource =
getClass().getClassLoader().getResource(COMPLEX_SCHEMA_NAME);
+ URL tableConfigResource =
getClass().getClassLoader().getResource(COMPLEX_TABLE_CONFIG_NAME);
+ URL dataResource =
getClass().getClassLoader().getResource(COMPLEX_DATA_NAME);
+
+ assertNotNull(schemaResource, "Schema file not found: " +
COMPLEX_SCHEMA_NAME);
+ assertNotNull(tableConfigResource, "Table config file not found: " +
COMPLEX_TABLE_CONFIG_NAME);
+ assertNotNull(dataResource, "Data file not found: " + COMPLEX_DATA_NAME);
+
+ Schema schema = Schema.fromFile(new
File(TestUtils.getFileFromResourceUrl(schemaResource)));
+ TableConfig tableConfig = createTableConfig(new
File(tableConfigResource.getFile()));
+ File dataFile = new File(TestUtils.getFileFromResourceUrl(dataResource));
+
+ // Modify table config to disable forward index for cityName column
+ FieldConfig disableFwdIndexfieldConfig = new FieldConfig("associatedInts",
+ FieldConfig.EncodingType.DICTIONARY,
+ List.of(FieldConfig.IndexType.INVERTED),
+ null,
+ Map.of(FieldConfig.FORWARD_INDEX_DISABLED, "true"));
+
+ List<FieldConfig> fieldConfigList = new ArrayList<>();
+ if (tableConfig.getFieldConfigList() != null) {
+ fieldConfigList.addAll(tableConfig.getFieldConfigList());
+ }
+ fieldConfigList.add(disableFwdIndexfieldConfig);
+ tableConfig.setFieldConfigList(fieldConfigList);
+
+ SegmentVersion[] versionsToTest = new SegmentVersion[]{
+ SegmentVersion.v1,
+ SegmentVersion.v3
+ };
+
+ for (SegmentVersion version : versionsToTest) {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ INDEX_DIR.mkdirs();
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setInputFilePath(dataFile.getAbsolutePath());
+ config.setSegmentVersion(version);
+ config.setOutDir(INDEX_DIR.getAbsolutePath());
+ config.setFormat(FileFormat.CSV);
+ config.setSegmentName("testCrcSegmentWithForwardIndexDisabled");
+
+ CSVRecordReaderConfig csvReaderConfig = new CSVRecordReaderConfig();
+ csvReaderConfig.setMultiValueDelimiter('|');
+ csvReaderConfig.setSkipHeader(false);
+
+ config.setReaderConfig(csvReaderConfig);
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config);
+ driver.build();
+
+ File indexDir = driver.getOutputDirectory();
+
+ // Verify that data CRC is not generated, should be null
+ Long dataCrc = readDataOnlyCrcFromMetaIfPresent(indexDir);
+
+ Assert.assertNull(dataCrc,
+ String.format("Data CRC should not be written for segment version %s
when forward index is disabled",
+ version));
+ }
+ }
+
+ /**
+ * Helper method to read the 'dataOnlyCrc' from the creation.meta file if it
exists.
+ * Returns null if data CRC was not written to the file.
+ */
+ private Long readDataOnlyCrcFromMetaIfPresent(File indexDir) throws
IOException {
+ File metaFile = new File(indexDir, V1Constants.SEGMENT_CREATION_META);
+
+ if (!metaFile.exists()) {
+ File[] files = indexDir.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (f.isDirectory()) {
+ File nestedMeta = new File(f, V1Constants.SEGMENT_CREATION_META);
+ if (nestedMeta.exists()) {
+ metaFile = nestedMeta;
+ break;
+ }
+ }
+ }
+ }
+ }
+ Assert.assertTrue(metaFile.exists(), "creation.meta file could not be
found in: " + indexDir.getAbsolutePath());
+
+ try (DataInputStream input = new DataInputStream(new
FileInputStream(metaFile))) {
+ long crc = input.readLong(); // 1st: Standard CRC (Skip)
+ long creationTime = input.readLong(); // 2nd: Creation Time (Skip)
+ try {
+ return input.readLong(); // 3rd: Data Only CRC (Return
this if present)
+ } catch (EOFException e) {
+ return null; // Data CRC not written
+ }
+ }
+ }
+
+ private static TableConfig createTableConfig(File tableConfigFile)
+ throws IOException {
+ InputStream inputStream = new FileInputStream(tableConfigFile);
+ Assert.assertNotNull(inputStream);
+ return JsonUtils.inputStreamToObject(inputStream, TableConfig.class);
}
}
diff --git
a/pinot-core/src/test/resources/data/test_crc_complex_sample_data.csv
b/pinot-core/src/test/resources/data/test_crc_complex_sample_data.csv
new file mode 100644
index 00000000000..f20f4b5f81a
--- /dev/null
+++ b/pinot-core/src/test/resources/data/test_crc_complex_sample_data.csv
@@ -0,0 +1,20 @@
+campaignId,cityName,isTestUser,rawUUID,fullReviewText,coordinates,userTags,associatedInts,associatedFloats,customerDetailsJson,clickCount,totalRevenue,eventTime
+101,New York,true,a8d30e1c-b7f9-4a0d-9b1f-7e2a9c4d8e7d,"Great item, very fast
delivery!",04806e625a6e87f7,VIP|GOLD|US,1|5|9,10.5|20.1,"{""user_id"":123,
""status"": ""Active"", ""orders"": [1]}",5,125.50,1678886400000
+102,Shanghai,false,b7e21d2b-c8g0-5b1e-a0c2-8f3b0d5e9f8e,"Broken on arrival.
Needs refund.",04806e625a6e87f8,US|CA,2|3,5.0,"{""user_id"":456, ""status"":
""Inactive"", ""address"": {""zip"": 10001}}",12,9.99,1678886400000
+101,London,true,c6f12e3c-d9h1-6c2f-b1d3-9g4c1e6f0g9f,"",04806e625a6e87f7,,100,1.1|1.2|1.3,"{""user_id"":789,
""status"": ""Active""}",0,10000.00,1678886400000
+103,Paris,false,,"Excellent quality. 5
stars.",04806e625a6e87f9,FR,99,2.5,,1,55.00,1678886400000
+104,Tokyo,true,e4h34f5d-f1j3-8e4h-d3f5-b9i62g8h2i1j,"Item was slow to
ship.",04806e625a6e87fa,JP,1|2,1.0|2.0,{},8,75.25,1678886400100
+105,"",false,f5i45g6e-g2k4-9f5i-e4g6-c0j73h9i3j2k,"Highly recommended, search
for this product!",04806e625a6e87fb,,,,"{""status"": ""Active"", ""discount"":
true}",50,200.00,1678886400000
+101,New York,true,g6j56h7f-h3l5-a0g6-f5h7-d1k84i0j4k3l,"The best purchase this
year.",04806e625a6e87f7,US,1|1,15.5,"{""status"": ""Active"", ""discount"":
true}",3,125.50,1678886400000
+106,Shanghai,false,h7k67i8g-i4m6-b1h7-g6i8-e2l95j1k5l4m,"Average.",04806e625a6e87f8,CN,4|5,5.0,"{""status"":
""Inactive""}",15,150.00,1678886400200
+107,London,true,i8l78j9h-j5n7-c2i8-h7j9-f3m06k2l6m5n,"Worst experience, Not
good.",04806e625a6e87f7,UK|IRL,10,10.0|20.0,,2,19.99,1678886400000
+101,Paris,true,j9m89k0i-k6o8-d3j9-i8k0-g4n17l3m7n6o,"Happy
customer.",04806e625a6e87f9,,1|2|3,0.5,"{""user_id"":999}",7,77.77,1678886400000
+109,New York,true,a8d30e1c-b7f9-4a0d-9b1f-7e2a9c4d8e7d,"Great item, very fast
delivery!",04806e625a6e87f7,VIP|GOLD|US,1|5|9,10.5|20.1,"{""user_id"":123,
""status"": ""Active"", ""orders"": [1]}",5,125.50,1678886400000
+110,Shanghai,false,b7e21d2b-c8g0-5b1e-a0c2-8f3b0d5e9f8e,"Broken on arrival.
Needs refund.",04806e625a6e87f8,US|CA,2|3,5.0,"{""user_id"":456, ""status"":
""Inactive"", ""address"": {""zip"": 10001}}",12,9.99,1678886400000
+111,London,true,c6f12e3c-d9h1-6c2f-b1d3-9g4c1e6f0g9f,"",04806e625a6e87f7,,100,1.1|1.2|1.3,"{""user_id"":789,
""status"": ""Active""}",0,10000.00,1678886400000
+112,Paris,false,,"Excellent quality. 5
stars.",04806e625a6e87f9,FR,99,2.5,,1,55.00,1678886400000
+104,New Delhi,true,e4h34f5d-f1j3-8e4h-d3f5-b9i62g8h2i1j,"Item was slow to
ship.",04806e625a6e87fa,JP,1|2,1.0|2.0,{},8,75.2,1678886400100
+105,"",false,f5i45g6e-g2k4-9f5i-e4g6-c0j73h9i3j2k,"Highly recommended, search
for this product!",04806e625a6e87fb,,,,"{""status"": ""Active"", ""discount"":
true}",50,200.00,1678886400000
+101,New York,true,g6j56h7f-h3l5-a0g6-f5h7-d1k84i0j4k3l,"The best purchase this
year.Thanks for everything.",04806e625a6e87f7,US,1|1,15.5,"{""status"":
""Active"", ""discount"": true}",3,125.50,1678886400000
+106,Mumbai,false,h7k67i8g-i4m6-b1h7-g6i8-e2l95j1k5l4m,"Nah.",04806e625a6e87f8,CN,4|5,5.0,"{""status"":
""Inactive""}",15,150.00,1678886400200
+107,London,true,i8l78j9h-j5n7-c2i8-h7j9-f3m06k2l6m5n,"Worst experience,
terrible support.",04806e625a6e87f7,UK|IRL,10,10.0|20.0,,2,19.99,1678886400000
\ No newline at end of file
diff --git a/pinot-core/src/test/resources/data/test_crc_complex_schema.json
b/pinot-core/src/test/resources/data/test_crc_complex_schema.json
new file mode 100644
index 00000000000..923ab65514d
--- /dev/null
+++ b/pinot-core/src/test/resources/data/test_crc_complex_schema.json
@@ -0,0 +1,27 @@
+{
+ "schemaName": "testCrcComplexSchema",
+ "dimensionFieldSpecs": [
+ { "name": "campaignId", "dataType": "INT", "defaultNullValue": 0 },
+ { "name": "cityName", "dataType": "STRING", "defaultNullValue": "UNKNOWN"
},
+ { "name": "isTestUser", "dataType": "BOOLEAN", "defaultNullValue": "false"
},
+ { "name": "rawUUID", "dataType": "STRING", "defaultNullValue": "null" },
+ { "name": "fullReviewText", "dataType": "STRING", "defaultNullValue": "" },
+ { "name": "coordinates", "dataType": "BYTES" },
+ { "name": "userTags", "dataType": "STRING", "singleValueField": false,
"defaultNullValue": "null" },
+ { "name": "associatedInts", "dataType": "INT", "singleValueField": false,
"defaultNullValue": 0 },
+ { "name": "associatedFloats", "dataType": "FLOAT", "singleValueField":
false, "defaultNullValue": 0.0 },
+ { "name": "customerDetailsJson", "dataType": "JSON", "defaultNullValue":
"{}" }
+ ],
+ "metricFieldSpecs": [
+ { "name": "clickCount", "dataType": "INT", "defaultNullValue": 0 },
+ { "name": "totalRevenue", "dataType": "DOUBLE", "defaultNullValue": 0.0 }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "eventTime",
+ "dataType": "LONG",
+ "format": "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/pinot-core/src/test/resources/data/test_crc_complex_table.json
b/pinot-core/src/test/resources/data/test_crc_complex_table.json
new file mode 100644
index 00000000000..1e9f96ca24e
--- /dev/null
+++ b/pinot-core/src/test/resources/data/test_crc_complex_table.json
@@ -0,0 +1,83 @@
+{
+ "tableName": "testCrcComplexSchema",
+ "tableType": "OFFLINE",
+ "tenants": {
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "invertedIndexColumns": [
+ "cityName",
+ "userTags",
+ "associatedInts",
+ "campaignId"
+ ],
+ "sortedColumn": ["campaignId"],
+ "noDictionaryColumns": [
+ "rawUUID",
+ "totalRevenue",
+ "associatedFloats",
+ "fullReviewText",
+ "customerDetailsJson"
+ ],
+ "rangeIndexColumns": [
+ "totalRevenue",
+ "eventTime"
+ ],
+ "bloomFilterColumns": [
+ "rawUUID"
+ ],
+ "textIndexColumns": [
+ "fullReviewText"
+ ],
+ "h3IndexConfigs": [
+ { "column": "coordinates", "resolution": 8 }
+ ],
+ "starTreeIndexConfigs": [{
+ "dimensionsSplitOrder": ["campaignId", "cityName", "eventTime"],
+ "skipMaterializationForDimensions": ["cityName"],
+ "maxLeafRecords": 10000,
+ "aggregationConfigs": [
+ {
+ "columnName": "totalRevenue",
+ "aggregationFunction": "SUM"
+ },
+ {
+ "columnName": "clickCount",
+ "aggregationFunction": "SUM"
+ }
+ ]
+ }]
+ },
+ "metadata": {},
+ "fieldConfigList": [
+ {
+ "name": "eventTime",
+ "properties": { "isTimeColumn": "true" }
+ },
+ {
+ "name": "fullReviewText",
+ "encodingType": "RAW",
+ "indexType": "TEXT"
+ },
+ {
+ "name": "customerDetailsJson",
+ "indexes": {
+ "json": {
+ "maxLevels": 2,
+ "excludeArray": false,
+ "disableCrossArrayUnnest": true,
+ "includePaths": null,
+ "excludePaths": null,
+ "excludeFields": null,
+ "indexPaths": null
+ }
+ }
+ }
+ ],
+ "routing": {},
+ "segmentsConfig": {
+ "timeColumn": "eventTime",
+ "replication": 1,
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
+ }
+}
\ No newline at end of file
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java
index 969f2f15a08..20c0b739332 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java
@@ -752,6 +752,13 @@ public abstract class BaseSegmentCreator implements
SegmentCreator {
}
LOGGER.info("Finished segment seal for: {}", _segmentName);
+ // Compute data crc before segment format conversion applies since v3 puts
all index files into a single one
+ // Compute data crc only if no column has fwd index disabled
+ boolean hasForwardIndexDisabledCols =
hasAnyColumnWithForwardIndexDisabled();
+ long dataCrc = Long.MIN_VALUE;
+ if (!hasForwardIndexDisabledCols) {
+ dataCrc = CrcUtils.computeDataCrc(_indexDir);
+ }
// Format conversion
convertFormatIfNecessary(_indexDir);
@@ -765,7 +772,7 @@ public abstract class BaseSegmentCreator implements
SegmentCreator {
updatePostSegmentCreationIndexes(_indexDir);
// Persist creation metadata
- persistCreationMeta(_indexDir);
+ persistCreationMeta(_indexDir, dataCrc);
LOGGER.info("Successfully created segment: {} in {}", _segmentName,
_indexDir);
}
@@ -949,15 +956,37 @@ public abstract class BaseSegmentCreator implements
SegmentCreator {
}
}
+ private boolean hasAnyColumnWithForwardIndexDisabled() {
+ Map<String, FieldIndexConfigs> indexConfigsMap =
_config.getIndexConfigsByColName();
+ for (Map.Entry<String, FieldIndexConfigs> entry :
indexConfigsMap.entrySet()) {
+ String columnName = entry.getKey();
+ FieldIndexConfigs fieldIndexConfigs = entry.getValue();
+ // Ignore virtual columns
+ if (_schema.getFieldSpecFor(columnName).isVirtualColumn()) {
+ continue;
+ }
+
+ boolean isFwdIndexEnabled = indexConfigsMap
+ .get(columnName)
+ .getConfig(StandardIndexes.forward())
+ .isEnabled();
+
+ if (!isFwdIndexEnabled) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Compute CRC and creation time, and persist to segment metadata file.
*
* @param indexDir Segment index directory
* @throws IOException If writing metadata fails
*/
- private void persistCreationMeta(File indexDir)
+ private void persistCreationMeta(File indexDir, long dataCrc)
throws IOException {
- long crc = CrcUtils.forAllFilesInFolder(indexDir).computeCrc();
+ long crc = CrcUtils.computeCrc(indexDir);
long creationTime;
String creationTimeInConfig = _config.getCreationTime();
if (creationTimeInConfig != null) {
@@ -975,6 +1004,11 @@ public abstract class BaseSegmentCreator implements
SegmentCreator {
try (DataOutputStream output = new DataOutputStream(new
FileOutputStream(creationMetaFile))) {
output.writeLong(crc);
output.writeLong(creationTime);
+ // might be negative if the data CRC could not be computed for the
segment. eg. in case a column has fwd index
+ // disabled
+ if (dataCrc >= 0) {
+ output.writeLong(dataCrc);
+ }
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CrcUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CrcUtils.java
index 84dfb8283c0..d91aee666ba 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CrcUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CrcUtils.java
@@ -24,6 +24,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.zip.Adler32;
@@ -35,50 +36,78 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("Duplicates")
public class CrcUtils {
+ private CrcUtils() {
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(CrcUtils.class);
private static final int BUFFER_SIZE = 65536;
private static final String CRC_FILE_EXTENSTION = ".crc";
+ private static final List<String> DATA_FILE_EXTENSIONS =
Arrays.asList(".fwd", ".dict");
- private final List<File> _files;
- private CrcUtils(List<File> files) {
- _files = files;
+ public static long computeCrc(File indexDir) throws IOException {
+ List<File> allNormalFiles = new ArrayList<>();
+ collectFiles(indexDir, allNormalFiles, false);
+ Collections.sort(allNormalFiles);
+ return crcForFiles(allNormalFiles);
}
- public static CrcUtils forAllFilesInFolder(File dir) {
- List<File> normalFiles = new ArrayList<>();
- getAllNormalFiles(dir, normalFiles);
- Collections.sort(normalFiles);
- return new CrcUtils(normalFiles);
+ public static long computeDataCrc(File indexDir) throws IOException {
+ List<File> dataFiles = new ArrayList<>();
+ collectFiles(indexDir, dataFiles, true);
+ Collections.sort(dataFiles);
+ return crcForFiles(dataFiles);
}
/**
- * Helper method to get all normal (non-directory) files under a directory
recursively.
+ * Helper method to get files in the directory to later compute CRC for them.
* <p>NOTE: do not include the segment creation meta file.
+ * @param dir the directory to collect files from
+ * @param files the list to add collected files to
+ * @param dataFilesOnly if true, only collect data files (.fwd, .dict); if
false, collect all normal files
*/
- private static void getAllNormalFiles(File dir, List<File> normalFiles) {
- File[] files = dir.listFiles();
- Preconditions.checkNotNull(files);
- for (File file : files) {
+ private static void collectFiles(File dir, List<File> files, boolean
dataFilesOnly) {
+ File[] dirFiles = dir.listFiles();
+ Preconditions.checkNotNull(dirFiles);
+ for (File file : dirFiles) {
if (file.isFile()) {
+ String fileName = file.getName();
// Certain file systems, e.g. HDFS will create .crc files when perform
data copy.
// We should ignore both SEGMENT_CREATION_META and generated '.crc'
files.
- if (!file.getName().equals(V1Constants.SEGMENT_CREATION_META) &&
!file.getName()
- .endsWith(CRC_FILE_EXTENSTION)) {
- normalFiles.add(file);
+ if (!fileName.equals(V1Constants.SEGMENT_CREATION_META) &&
!fileName.endsWith(CRC_FILE_EXTENSTION)) {
+ if (dataFilesOnly) {
+ // Only add data files
+ if (isDataFile(fileName)) {
+ files.add(file);
+ }
+ } else {
+ // Add all normal files
+ files.add(file);
+ }
}
} else {
- getAllNormalFiles(file, normalFiles);
+ collectFiles(file, files, dataFilesOnly);
+ }
+ }
+ }
+
+ /**
+ * Determines if a file is considered a "Data File" (one of ".fwd", ".dict"
file types).
+ */
+ private static boolean isDataFile(String fileName) {
+ for (String ext : DATA_FILE_EXTENSIONS) {
+ if (fileName.endsWith(ext)) {
+ return true;
}
}
+ return false;
}
- public long computeCrc()
- throws IOException {
+ private static long crcForFiles(List<File> filesToComputeCrc) throws
IOException {
byte[] buffer = new byte[BUFFER_SIZE];
Checksum checksum = new Adler32();
- for (File file : _files) {
+ for (File file : filesToComputeCrc) {
try (InputStream input = new FileInputStream(file)) {
int len;
while ((len = input.read(buffer)) > 0) {
@@ -90,7 +119,7 @@ public class CrcUtils {
}
}
long crc = checksum.getValue();
- LOGGER.info("Computed crc = {}, based on files {}", crc, _files);
+ LOGGER.info("Computed crc = {}, based on files {}", crc,
filesToComputeCrc);
return crc;
}
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
index 1199de0bda3..e76a540c975 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
@@ -64,6 +64,8 @@ public interface SegmentMetadata {
String getCrc();
+ String getDataCrc();
+
SegmentVersion getVersion();
Schema getSchema();
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index e2d8784ed57..5a79535fead 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -78,6 +78,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
private String _segmentName;
private final Schema _schema;
private long _crc = Long.MIN_VALUE;
+ private long _dataCrc = Long.MIN_VALUE;
private long _creationTime = Long.MIN_VALUE;
private long _zkCreationTime = Long.MIN_VALUE; // ZooKeeper creation time
for upsert consistency
private String _timeColumn;
@@ -189,10 +190,15 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
private void loadCreationMeta(File crcFile)
throws IOException {
if (crcFile.exists()) {
- final DataInputStream ds = new DataInputStream(new
FileInputStream(crcFile));
- _crc = ds.readLong();
- _creationTime = ds.readLong();
- ds.close();
+ try (DataInputStream ds = new DataInputStream(new
FileInputStream(crcFile))) {
+ _crc = ds.readLong();
+ _creationTime = ds.readLong();
+ try {
+ _dataCrc = ds.readLong();
+ } catch (IOException e) {
+ LOGGER.debug("Could not find data crc, falling back to default
LONG_MIN value");
+ }
+ }
}
}
@@ -201,6 +207,11 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
try (DataInputStream ds = new DataInputStream(crcFileInputStream)) {
_crc = ds.readLong();
_creationTime = ds.readLong();
+ try {
+ _dataCrc = ds.readLong();
+ } catch (IOException e) {
+ LOGGER.debug("Could not find data crc, falling back to default
LONG_MIN value");
+ }
}
}
@@ -363,6 +374,11 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
return String.valueOf(_crc);
}
+ @Override
+ public String getDataCrc() {
+ return String.valueOf(_dataCrc);
+ }
+
@Override
public SegmentVersion getVersion() {
return _segmentVersion;
@@ -467,6 +483,9 @@ public class SegmentMetadataImpl implements SegmentMetadata
{
segmentMetadata.put("segmentName", _segmentName);
segmentMetadata.put("schemaName", _schema != null ?
_schema.getSchemaName() : null);
segmentMetadata.put("crc", _crc);
+ if (_dataCrc != Long.MIN_VALUE) {
+ segmentMetadata.put("dataCrc", _dataCrc);
+ }
segmentMetadata.put("creationTimeMillis", _creationTime);
TimeZone timeZone = TimeZone.getTimeZone("UTC");
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss:SSS'
UTC'");
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 4ad3a3810ac..4ff0e169379 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -902,7 +902,7 @@ public class TablesResource {
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Upload a low level consumer segment to segment store
and return the segment download url,"
+ "crc and other segment metadata",
- notes = "Upload a low level consumer segment to segment store and return
the segment download url, crc "
+ notes = "Upload a low level consumer segment to segment store and return
the segment download url, crc, data crc "
+ "and other segment metadata")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@@ -949,8 +949,11 @@ public class TablesResource {
try {
segmentTarFile = createSegmentTarFile(tableDataManager, segmentName);
String downloadUrl = uploadSegment(segmentTarFile,
realtimeTableNameWithType, segmentName, timeoutMs);
- return new TableLLCSegmentUploadResponse(segmentName,
-
Long.parseLong(segmentDataManager.getSegment().getSegmentMetadata().getCrc()),
downloadUrl);
+ return new TableLLCSegmentUploadResponse(
+ segmentName,
+
Long.parseLong(segmentDataManager.getSegment().getSegmentMetadata().getCrc()),
+
Long.parseLong(segmentDataManager.getSegment().getSegmentMetadata().getDataCrc()),
+ downloadUrl);
} finally {
FileUtils.deleteQuietly(segmentTarFile);
tableDataManager.releaseSegment(segmentDataManager);
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index eb3fa5298e8..3b62cc2a3d4 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -395,7 +395,7 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertEquals(validDocIdsMetadata.get("totalInvalidDocs").asInt(),
199992);
Assert.assertEquals(validDocIdsMetadata.get("segmentCrc").asText(),
"187068486");
Assert.assertEquals(validDocIdsMetadata.get("validDocIdsType").asText(),
"SNAPSHOT");
-
Assert.assertEquals(validDocIdsMetadata.get("segmentSizeInBytes").asLong(),
4514723);
+
Assert.assertEquals(validDocIdsMetadata.get("segmentSizeInBytes").asLong(),
4514731);
Assert.assertTrue(validDocIdsMetadata.has("segmentCreationTimeMillis"));
Assert.assertTrue(validDocIdsMetadata.get("segmentCreationTimeMillis").asLong()
> 0);
@@ -434,7 +434,7 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertEquals(validDocIdsMetadata.get("totalInvalidDocs").asInt(),
199992);
Assert.assertEquals(validDocIdsMetadata.get("segmentCrc").asText(),
"187068486");
Assert.assertEquals(validDocIdsMetadata.get("validDocIdsType").asText(),
"SNAPSHOT_WITH_DELETE");
-
Assert.assertEquals(validDocIdsMetadata.get("segmentSizeInBytes").asLong(),
4514723);
+
Assert.assertEquals(validDocIdsMetadata.get("segmentSizeInBytes").asLong(),
4514731);
Assert.assertTrue(validDocIdsMetadata.has("segmentCreationTimeMillis"));
Assert.assertTrue(validDocIdsMetadata.get("segmentCreationTimeMillis").asLong()
> 0);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index d12ae72234c..9a7f8804a20 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1781,6 +1781,7 @@ public class CommonConstants {
public static final String INDEX_VERSION = "segment.index.version";
public static final String TOTAL_DOCS = "segment.total.docs";
public static final String CRC = "segment.crc";
+ public static final String DATA_CRC = "segment.data.crc";
public static final String TIER = "segment.tier";
public static final String CREATION_TIME = "segment.creation.time";
public static final String PUSH_TIME = "segment.push.time";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]