This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b58650658c Allow updating start/end offset for pushed segments (#8653)
b58650658c is described below
commit b58650658c0aac258e1c7932073188ddc5590d89
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed May 11 11:14:58 2022 -0700
Allow updating start/end offset for pushed segments (#8653)
- Support updating start/end offset in the segment ZK metadata for pushed
segments if provided in the segment metadata
- Refactor the updating segment ZK metadata logic to have consistent
behavior for adding new segment and refreshing existing segment
---
.../broker/broker/HelixBrokerStarterTest.java | 2 +-
.../common/metadata/segment/SegmentZKMetadata.java | 3 +-
.../pinot/controller/api/upload/ZKOperator.java | 162 +++++++++++----------
.../helix/core/PinotHelixResourceManager.java | 61 ++------
.../helix/core/util/ZKMetadataUtils.java | 105 +++++++++----
.../helix/core/retention/RetentionManagerTest.java | 38 ++---
.../validation/ValidationManagerTest.java | 3 +-
.../creator/impl/SegmentColumnarIndexCreator.java | 9 +-
.../apache/pinot/segment/spi/SegmentMetadata.java | 4 +
.../org/apache/pinot/segment/spi/V1Constants.java | 5 +
.../spi/index/metadata/SegmentMetadataImpl.java | 29 +++-
11 files changed, 228 insertions(+), 193 deletions(-)
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index a2047ba4a6..ca5cc76de7 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -227,7 +227,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
_helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME,
segmentToRefresh);
_helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME,
segmentToRefresh, newEndTime),
- segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null, -1);
+ segmentZKMetadata, EXPECTED_VERSION, "downloadUrl");
TestUtils.waitForCondition(aVoid ->
routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue()
.equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update
the time boundary for refreshed segment");
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index 0b73af486d..169171bb2e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.metadata.segment;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.MapUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadata;
import org.apache.pinot.spi.utils.CommonConstants.Segment;
@@ -231,7 +232,7 @@ public class SegmentZKMetadata implements ZKMetadata {
public void setCustomMap(Map<String, String> customMap) {
Map<String, Map<String, String>> mapFields = _znRecord.getMapFields();
- if (customMap != null) {
+ if (MapUtils.isNotEmpty(customMap)) {
mapFields.put(Segment.CUSTOM_MAP, customMap);
} else {
mapFields.remove(Segment.CUSTOM_MAP);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 28f17536ab..2e7ba5c76e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -31,10 +31,10 @@ import
org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
* The ZKOperator is a util class that is used during segment upload to set
relevant metadata fields in zk. It will
* currently
* also perform the data move. In the future when we introduce versioning, we
will decouple these two steps.
+ * TODO: Merge it into PinotHelixResourceManager
*/
public class ZKOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(ZKOperator.class);
@@ -69,6 +70,7 @@ public class ZKOperator {
boolean refreshOnly =
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
if (segmentMetadataZNRecord == null) {
+ // Add a new segment
if (refreshOnly) {
throw new ControllerApplicationException(LOGGER,
"Cannot refresh non-existing segment, aborted uploading segment: "
+ segmentName + " of table: "
@@ -78,41 +80,30 @@ public class ZKOperator {
processNewSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation, zkDownloadURI, headers,
crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation,
enableParallelPushProtection,
segmentSizeInBytes);
- return;
- }
-
- // We reach here if a segment with the same name already exists.
-
- if (!allowRefresh) {
- // We cannot perform this check up-front in UploadSegment API call. If a
segment doesn't exist during the check
- // done up-front but ends up getting created before the check here, we
could incorrectly refresh an existing
- // segment.
- throw new ControllerApplicationException(LOGGER,
- "Segment: " + segmentName + " already exists in table: " +
tableNameWithType + ". Refresh not permitted.",
- Response.Status.CONFLICT);
- }
-
- // TODO Allow segment refreshing for realtime tables.
- if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
- throw new ControllerApplicationException(LOGGER,
- "Refresh existing segment " + segmentName + " for realtime table " +
tableNameWithType
- + " is not yet supported ", Response.Status.NOT_IMPLEMENTED);
+ } else {
+ // Refresh an existing segment
+ if (!allowRefresh) {
+ // We cannot perform this check up-front in UploadSegment API call. If
a segment doesn't exist during the check
+ // done up-front but ends up getting created before the check here, we
could incorrectly refresh an existing
+ // segment.
+ throw new ControllerApplicationException(LOGGER,
+ "Segment: " + segmentName + " already exists in table: " +
tableNameWithType + ". Refresh not permitted.",
+ Response.Status.CONFLICT);
+ }
+ LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, tableNameWithType);
+ processExistingSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation,
+ enableParallelPushProtection, headers, zkDownloadURI, crypter,
tableNameWithType, segmentName,
+ segmentMetadataZNRecord, moveSegmentToFinalLocation,
segmentSizeInBytes);
}
-
- LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, tableNameWithType);
- processExistingSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation,
- enableParallelPushProtection, headers, zkDownloadURI, crypter,
tableNameWithType, segmentName,
- segmentMetadataZNRecord, moveSegmentToFinalLocation,
segmentSizeInBytes);
}
private void processExistingSegment(SegmentMetadata segmentMetadata, URI
finalSegmentLocationURI,
- File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI,
+ File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String downloadUrl,
String crypter, String tableNameWithType, String segmentName, ZNRecord
znRecord,
boolean moveSegmentToFinalLocation, long segmentSizeInBytes)
throws Exception {
-
- SegmentZKMetadata existingSegmentZKMetadata = new
SegmentZKMetadata(znRecord);
- long existingCrc = existingSegmentZKMetadata.getCrc();
+ SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(znRecord);
+ long existingCrc = segmentZKMetadata.getCrc();
int expectedVersion = znRecord.getVersion();
// Check if CRC match when IF-MATCH header is set
@@ -121,7 +112,7 @@ public class ZKOperator {
// Check segment upload start time when parallel push protection enabled
if (enableParallelPushProtection) {
// When segment upload start time is larger than 0, that means another
upload is in progress
- long segmentUploadStartTime =
existingSegmentZKMetadata.getSegmentUploadStartTime();
+ long segmentUploadStartTime =
segmentZKMetadata.getSegmentUploadStartTime();
if (segmentUploadStartTime > 0) {
if (System.currentTimeMillis() - segmentUploadStartTime >
_controllerConf.getSegmentUploadTimeoutInMillis()) {
// Last segment upload does not finish properly, replace the segment
@@ -137,9 +128,8 @@ public class ZKOperator {
}
// Lock the segment by setting the upload start time in ZK
-
existingSegmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
- if (!_pinotHelixResourceManager
- .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata,
expectedVersion)) {
+ segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
throw new ControllerApplicationException(LOGGER,
"Failed to lock the segment: " + segmentName + " of table: " +
tableNameWithType + ", retry later",
Response.Status.CONFLICT);
@@ -152,40 +142,37 @@ public class ZKOperator {
// Reset segment upload start time to unlock the segment later
// NOTE: reset this value even if parallel push protection is not enabled
so that segment can recover in case
// previous segment upload did not finish properly and the parallel push
protection is turned off
- existingSegmentZKMetadata.setSegmentUploadStartTime(-1);
+ segmentZKMetadata.setSegmentUploadStartTime(-1);
try {
- // Modify the custom map in segment ZK metadata
- String segmentZKMetadataCustomMapModifierStr =
+ // Construct the segment ZK metadata custom map modifier
+ String customMapModifierStr =
headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER);
- SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier;
- if (segmentZKMetadataCustomMapModifierStr != null) {
- segmentZKMetadataCustomMapModifier =
- new
SegmentZKMetadataCustomMapModifier(segmentZKMetadataCustomMapModifierStr);
- } else {
- // By default, use REPLACE modify mode
- segmentZKMetadataCustomMapModifier =
- new
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE,
null);
- }
- existingSegmentZKMetadata
-
.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(existingSegmentZKMetadata.getCustomMap()));
+ SegmentZKMetadataCustomMapModifier customMapModifier =
+ customMapModifierStr != null ? new
SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null;
// Update ZK metadata and refresh the segment if necessary
- long newCrc = Long.valueOf(segmentMetadata.getCrc());
+ long newCrc = Long.parseLong(segmentMetadata.getCrc());
if (newCrc == existingCrc) {
LOGGER.info(
"New segment crc '{}' is the same as existing segment crc for
segment '{}'. Updating ZK metadata without "
+ "refreshing the segment.", newCrc, segmentName);
- // NOTE: even though we don't need to refresh the segment, we should
still update creation time and refresh time
- // (creation time is not included in the crc)
-
existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
- existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
- // NOTE: in rare cases the segment can be deleted before the metadata
is updated and the expected version won't
- // match, we should fail the request for such cases
- if (!_pinotHelixResourceManager
- .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata,
expectedVersion)) {
+ // NOTE: Even though we don't need to refresh the segment, we should
still update the following fields:
+ // - Creation time (not included in the crc)
+ // - Refresh time
+ // - Custom map
+
segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+ segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+ if (customMapModifier != null) {
+
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
+ } else {
+ // If no modifier is provided, use the custom map from the segment
metadata
+ segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
+ }
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
throw new RuntimeException(
- "Failed to update ZK metadata for segment: " + segmentName + "
of table: " + tableNameWithType);
+ String.format("Failed to update ZK metadata for segment: %s,
table: %s, expected version: %d",
+ segmentName, tableNameWithType, expectedVersion));
}
} else {
// New segment is different with the existing one, update ZK metadata
and refresh the segment
@@ -198,17 +185,38 @@ public class ZKOperator {
currentSegmentLocation.getAbsolutePath(),
finalSegmentLocationURI.getPath());
} else {
LOGGER.info("Skipping segment move, keeping segment {} from table {}
at {}", segmentName, tableNameWithType,
- zkDownloadURI);
+ downloadUrl);
+ }
+
+ // NOTE: Must first set the segment ZK metadata before trying to
refresh because servers and brokers rely on
+ // segment ZK metadata to refresh the segment (server will compare the
segment ZK metadata with the local
+ // metadata to decide whether to download the new segment; broker will
update the segment partition info & time
+ // boundary based on the segment ZK metadata)
+ if (customMapModifier == null) {
+ // If no modifier is provided, use the custom map from the segment
metadata
+ segmentZKMetadata.setCustomMap(null);
+ ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType,
segmentZKMetadata, segmentMetadata, downloadUrl,
+ crypter, segmentSizeInBytes);
+ } else {
+ // If modifier is provided, first set the custom map from the
segment metadata, then apply the modifier
+ ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType,
segmentZKMetadata, segmentMetadata, downloadUrl,
+ crypter, segmentSizeInBytes);
+
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
+ }
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
+ throw new RuntimeException(
+ String.format("Failed to update ZK metadata for segment: %s,
table: %s, expected version: %d",
+ segmentName, tableNameWithType, expectedVersion));
}
+ LOGGER.info("Updated segment: {} of table: {} to property store",
segmentName, tableNameWithType);
- _pinotHelixResourceManager
- .refreshSegment(tableNameWithType, segmentMetadata,
existingSegmentZKMetadata, expectedVersion,
- zkDownloadURI, crypter, segmentSizeInBytes);
+ // Send a message to servers and brokers hosting the table to refresh
the segment
+
_pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType,
segmentName, true, true);
}
} catch (Exception e) {
- if (!_pinotHelixResourceManager
- .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata,
expectedVersion)) {
- LOGGER.error("Failed to update ZK metadata for segment: {} of table:
{}", segmentName, tableNameWithType);
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
+ LOGGER.error("Failed to update ZK metadata for segment: {}, table: {},
expected version: {}", segmentName,
+ tableNameWithType, expectedVersion);
}
throw e;
}
@@ -239,8 +247,8 @@ public class ZKOperator {
long segmentSizeInBytes)
throws Exception {
SegmentZKMetadata newSegmentZKMetadata =
-
_pinotHelixResourceManager.constructZkMetadataForNewSegment(tableNameWithType,
segmentMetadata, zkDownloadURI,
- crypter, segmentSizeInBytes);
+ ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType,
segmentMetadata, zkDownloadURI, crypter,
+ segmentSizeInBytes);
// Lock if enableParallelPushProtection is true.
if (enableParallelPushProtection) {
@@ -248,13 +256,13 @@ public class ZKOperator {
}
// Update zk metadata customer map
- String segmentZKMetadataCustomMapModifierStr = headers != null ? headers
-
.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER)
: null;
+ String segmentZKMetadataCustomMapModifierStr = headers != null ?
headers.getHeaderString(
+
FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER)
: null;
if (segmentZKMetadataCustomMapModifierStr != null) {
SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
new
SegmentZKMetadataCustomMapModifier(segmentZKMetadataCustomMapModifierStr);
- newSegmentZKMetadata
-
.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap()));
+ newSegmentZKMetadata.setCustomMap(
+
segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap()));
}
if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType,
newSegmentZKMetadata)) {
throw new RuntimeException(
@@ -265,13 +273,12 @@ public class ZKOperator {
if (moveSegmentToFinalLocation) {
try {
moveSegmentToPermanentDirectory(currentSegmentLocation,
finalSegmentLocationURI);
- LOGGER
- .info("Moved segment {} from temp location {} to {}", segmentName,
currentSegmentLocation.getAbsolutePath(),
- finalSegmentLocationURI.getPath());
+ LOGGER.info("Moved segment {} from temp location {} to {}",
segmentName,
+ currentSegmentLocation.getAbsolutePath(),
finalSegmentLocationURI.getPath());
} catch (Exception e) {
// Cleanup the Zk entry and the segment from the permanent directory
if it exists.
- LOGGER
- .error("Could not move segment {} from table {} to permanent
directory", segmentName, tableNameWithType, e);
+ LOGGER.error("Could not move segment {} from table {} to permanent
directory", segmentName, tableNameWithType,
+ e);
_pinotHelixResourceManager.deleteSegment(tableNameWithType,
segmentName);
LOGGER.info("Deleted zk entry and segment {} for table {}.",
segmentName, tableNameWithType);
throw e;
@@ -285,9 +292,8 @@ public class ZKOperator {
_pinotHelixResourceManager.assignTableSegment(tableNameWithType,
segmentMetadata.getName());
} catch (Exception e) {
// assignTableSegment removes the zk entry. Call deleteSegment to remove
the segment from permanent location.
- LOGGER
- .error("Caught exception while calling assignTableSegment for adding
segment: {} to table: {}", segmentName,
- tableNameWithType, e);
+ LOGGER.error("Caught exception while calling assignTableSegment for
adding segment: {} to table: {}", segmentName,
+ tableNameWithType, e);
_pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
LOGGER.info("Deleted zk entry and segment {} for table {}.",
segmentName, tableNameWithType);
throw e;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 860ed3d34d..0334721571 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1988,16 +1988,12 @@ public class PinotHelixResourceManager {
return instanceSet;
}
+ @VisibleForTesting
public void addNewSegment(String tableNameWithType, SegmentMetadata
segmentMetadata, String downloadUrl) {
- addNewSegment(tableNameWithType, segmentMetadata, downloadUrl, null);
- }
-
- public void addNewSegment(String tableNameWithType, SegmentMetadata
segmentMetadata, String downloadUrl,
- @Nullable String crypter) {
// NOTE: must first set the segment ZK metadata before assigning segment
to instances because segment assignment
// might need them to determine the partition of the segment, and server
will need them to download the segment
SegmentZKMetadata segmentZkmetadata =
- constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata,
downloadUrl, crypter, -1);
+ ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType,
segmentMetadata, downloadUrl, null, -1);
ZNRecord znRecord = segmentZkmetadata.toZNRecord();
String segmentName = segmentMetadata.getName();
@@ -2010,40 +2006,6 @@ public class PinotHelixResourceManager {
assignTableSegment(tableNameWithType, segmentName);
}
- /**
- * Construct segmentZkMetadata for new segment of offline or realtime table.
- *
- * @param tableNameWithType Table name with type
- * @param segmentMetadata Segment metadata
- * @param downloadUrl Download URL
- * @param crypter Crypter
- * @param segmentSizeInBytes Size of segment in bytes.
- * @return SegmentZkMetadata of the input segment
- */
- public SegmentZKMetadata constructZkMetadataForNewSegment(String
tableNameWithType, SegmentMetadata segmentMetadata,
- String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
- // Construct segment zk metadata with common fields for offline and
realtime.
- String segmentName = segmentMetadata.getName();
- SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
- ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata);
- segmentZKMetadata.setDownloadUrl(downloadUrl);
- segmentZKMetadata.setCrypterName(crypter);
- segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
-
- if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
- Preconditions.checkState(isUpsertTable(tableNameWithType),
- "Upload segment " + segmentName + " for non upsert enabled realtime
table " + tableNameWithType
- + " is not supported");
- // Set fields specific to realtime segments.
-
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
- } else {
- // Set fields specific to offline segments.
- segmentZKMetadata.setPushTime(System.currentTimeMillis());
- }
-
- return segmentZKMetadata;
- }
-
public void assignTableSegment(String tableNameWithType, String segmentName)
{
String segmentZKMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
segmentName);
@@ -2169,22 +2131,19 @@ public class PinotHelixResourceManager {
}
}
+ @VisibleForTesting
public void refreshSegment(String tableNameWithType, SegmentMetadata
segmentMetadata,
- SegmentZKMetadata segmentZKMetadata, int expectedVersion, String
downloadUrl, @Nullable String crypter,
- long segmentSizeInBytes) {
+ SegmentZKMetadata segmentZKMetadata, int expectedVersion, String
downloadUrl) {
String segmentName = segmentMetadata.getName();
// NOTE: Must first set the segment ZK metadata before trying to refresh
because servers and brokers rely on segment
// ZK metadata to refresh the segment (server will compare the segment ZK
metadata with the local metadata to decide
- // whether to download the new segment; broker will update the the segment
partition info & time boundary based on
- // the segment ZK metadata)
- ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata);
- segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
- segmentZKMetadata.setDownloadUrl(downloadUrl);
- segmentZKMetadata.setCrypterName(crypter);
- segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
- if (!ZKMetadataProvider
- .setSegmentZKMetadata(_propertyStore, tableNameWithType,
segmentZKMetadata, expectedVersion)) {
+ // whether to download the new segment; broker will update the segment
partition info & time boundary based on the
+ // segment ZK metadata)
+ ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType,
segmentZKMetadata, segmentMetadata, downloadUrl, null,
+ -1);
+ if (!ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentZKMetadata,
+ expectedVersion)) {
throw new RuntimeException(
"Failed to update ZK metadata for segment: " + segmentName + " of
table: " + tableNameWithType);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index bf7b305410..2628d9f6bd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -20,58 +20,105 @@ package org.apache.pinot.controller.helix.core.util;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
-import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
public class ZKMetadataUtils {
private ZKMetadataUtils() {
}
- public static void updateSegmentMetadata(SegmentZKMetadata
segmentZKMetadata, SegmentMetadata segmentMetadata) {
- SegmentVersion segmentVersion = segmentMetadata.getVersion();
- if (segmentVersion != null) {
- segmentZKMetadata.setIndexVersion(segmentVersion.name());
- }
+ /**
+ * Creates the segment ZK metadata for a new segment.
+ */
+ public static SegmentZKMetadata createSegmentZKMetadata(String
tableNameWithType, SegmentMetadata segmentMetadata,
+ String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
+ SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(segmentMetadata.getName());
+ updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypter,
+ segmentSizeInBytes);
+ segmentZKMetadata.setPushTime(System.currentTimeMillis());
+ return segmentZKMetadata;
+ }
+
+ /**
+ * Refreshes the segment ZK metadata for a segment being replaced.
+ */
+ public static void refreshSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
+ SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypter, long segmentSizeInBytes) {
+ updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypter,
+ segmentSizeInBytes);
+ segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+ }
+
+ private static void updateSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
+ SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypter, long segmentSizeInBytes) {
if (segmentMetadata.getTimeInterval() != null) {
segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+ } else {
+ segmentZKMetadata.setStartTime(-1);
+ segmentZKMetadata.setEndTime(-1);
+ segmentZKMetadata.setTimeUnit(null);
+ }
+ if (segmentMetadata.getVersion() != null) {
+ segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion().name());
+ } else {
+ segmentZKMetadata.setIndexVersion(null);
}
segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
- segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+ segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
- SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
- new
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
- segmentZKMetadata.getCustomMap());
-
segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
+ segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+ segmentZKMetadata.setDownloadUrl(downloadUrl);
+ segmentZKMetadata.setCrypterName(crypter);
- // Extract column partition metadata (if any), and set it into segment ZK
metadata.
+ // Set partition metadata
Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
- if (segmentMetadata instanceof SegmentMetadataImpl) {
- for (Map.Entry<String, ColumnMetadata> entry :
segmentMetadata.getColumnMetadataMap().entrySet()) {
- String column = entry.getKey();
- ColumnMetadata columnMetadata = entry.getValue();
- PartitionFunction partitionFunction =
columnMetadata.getPartitionFunction();
-
- if (partitionFunction != null) {
- ColumnPartitionMetadata columnPartitionMetadata =
- new ColumnPartitionMetadata(partitionFunction.getName(),
partitionFunction.getNumPartitions(),
- columnMetadata.getPartitions(),
partitionFunction.getFunctionConfig());
- columnPartitionMap.put(column, columnPartitionMetadata);
- }
+ segmentMetadata.getColumnMetadataMap().forEach((column, columnMetadata) ->
{
+ PartitionFunction partitionFunction =
columnMetadata.getPartitionFunction();
+ if (partitionFunction != null) {
+ ColumnPartitionMetadata columnPartitionMetadata =
+ new ColumnPartitionMetadata(partitionFunction.getName(),
partitionFunction.getNumPartitions(),
+ columnMetadata.getPartitions(),
partitionFunction.getFunctionConfig());
+ columnPartitionMap.put(column, columnPartitionMetadata);
}
- }
-
+ });
if (!columnPartitionMap.isEmpty()) {
segmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(columnPartitionMap));
+ } else {
+ segmentZKMetadata.setPartitionMetadata(null);
+ }
+
+ // Update custom metadata
+ // NOTE: Do not remove existing keys because they can be set by the HTTP
header from the segment upload request
+ Map<String, String> customMap = segmentZKMetadata.getCustomMap();
+ if (customMap == null) {
+ customMap = segmentMetadata.getCustomMap();
+ } else {
+ customMap.putAll(segmentMetadata.getCustomMap());
+ }
+ segmentZKMetadata.setCustomMap(customMap);
+
+ // Set fields specific to realtime table
+ if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
+
+ // NOTE:
+ // - If start/end offset is available in the uploaded segment, update
them in the segment ZK metadata
+ // - If not, keep the existing start/end offset in the segment ZK
metadata unchanged
+ if (segmentMetadata.getStartOffset() != null) {
+ segmentZKMetadata.setStartOffset(segmentMetadata.getStartOffset());
+ }
+ if (segmentMetadata.getEndOffset() != null) {
+ segmentZKMetadata.setEndOffset(segmentMetadata.getEndOffset());
+ }
}
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index ce902636a8..b872f70d06 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -23,7 +23,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
@@ -33,16 +35,12 @@ import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
-import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
-import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.joda.time.Duration;
-import org.joda.time.Interval;
import org.mockito.ArgumentMatchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -66,18 +64,14 @@ public class RetentionManagerTest {
final int numOlderSegments = 10;
List<String> removedSegments = new ArrayList<>();
for (int i = 0; i < numOlderSegments; i++) {
- SegmentMetadata segmentMetadata = mockSegmentMetadata(pastTimeStamp,
pastTimeStamp, timeUnit);
- SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(segmentMetadata.getName());
- ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata,
segmentMetadata);
+ SegmentZKMetadata segmentZKMetadata =
mockSegmentZKMetadata(pastTimeStamp, pastTimeStamp, timeUnit);
segmentsZKMetadata.add(segmentZKMetadata);
removedSegments.add(segmentZKMetadata.getSegmentName());
}
// Create metadata for 5 segments that will not be removed.
for (int i = 0; i < 5; i++) {
- SegmentMetadata segmentMetadata =
- mockSegmentMetadata(dayAfterTomorrowTimeStamp,
dayAfterTomorrowTimeStamp, timeUnit);
- SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(segmentMetadata.getName());
- ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata,
segmentMetadata);
+ SegmentZKMetadata segmentZKMetadata =
+ mockSegmentZKMetadata(dayAfterTomorrowTimeStamp,
dayAfterTomorrowTimeStamp, timeUnit);
segmentsZKMetadata.add(segmentZKMetadata);
}
final TableConfig tableConfig = createOfflineTableConfig();
@@ -167,6 +161,9 @@ public class RetentionManagerTest {
final String tableNameWithType = tableConfig.getTableName();
when(resourceManager.getAllTables()).thenReturn(Collections.singletonList(tableNameWithType));
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
SegmentDeletionManager deletionManager =
mock(SegmentDeletionManager.class);
// Ignore the call to SegmentDeletionManager.removeAgedDeletedSegments. we
only test that the call is made once per
// run of the retention manager
@@ -308,18 +305,13 @@ public class RetentionManagerTest {
return segmentMetadata;
}
- private SegmentMetadata mockSegmentMetadata(long startTime, long endTime,
TimeUnit timeUnit) {
+ private SegmentZKMetadata mockSegmentZKMetadata(long startTime, long
endTime, TimeUnit timeUnit) {
long creationTime = System.currentTimeMillis();
- SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
- when(segmentMetadata.getName()).thenReturn(TEST_TABLE_NAME + creationTime);
- when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTime);
- when(segmentMetadata.getCrc()).thenReturn(Long.toString(creationTime));
- when(segmentMetadata.getStartTime()).thenReturn(startTime);
- when(segmentMetadata.getEndTime()).thenReturn(endTime);
- when(segmentMetadata.getTimeUnit()).thenReturn(timeUnit);
- when(segmentMetadata.getTimeInterval())
- .thenReturn(new Interval(timeUnit.toMillis(startTime),
timeUnit.toMillis(endTime)));
- when(segmentMetadata.getTimeGranularity()).thenReturn(new
Duration(timeUnit.toMillis(1)));
- return segmentMetadata;
+ SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata.getSegmentName()).thenReturn(TEST_TABLE_NAME +
creationTime);
+ when(segmentZKMetadata.getCreationTime()).thenReturn(creationTime);
+
when(segmentZKMetadata.getStartTimeMs()).thenReturn(timeUnit.toMillis(startTime));
+
when(segmentZKMetadata.getEndTimeMs()).thenReturn(timeUnit.toMillis(endTime));
+ return segmentZKMetadata;
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index fcac9a04fd..cc2dd52faf 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -89,8 +89,7 @@ public class ValidationManagerTest {
}, 30_000L, "Failed to find the segment in the ExternalView");
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
ControllerTestUtils.getHelixResourceManager()
- .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata,
EXPECTED_VERSION, "downloadUrl", null,
- -1);
+ .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata,
EXPECTED_VERSION, "downloadUrl");
segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 86de96c170..26a4c693dc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -64,7 +64,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
@@ -285,8 +284,8 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
// Do not create dictionary if index size with dictionary is going to be
larger than index size without dictionary
// This is done to reduce the cost of dictionary for high cardinality
columns
// Off by default and needs optimizeDictionaryEnabled to be set to true
- if (config.isOptimizeDictionaryForMetrics() && spec.getFieldType() ==
FieldType.METRIC
- && spec.isSingleValueField() && spec.getDataType().isFixedWidth()) {
+ if (config.isOptimizeDictionaryForMetrics() && spec.getFieldType() ==
FieldType.METRIC && spec.isSingleValueField()
+ && spec.getDataType().isFixedWidth()) {
long dictionarySize = info.getDistinctValueCount() *
spec.getDataType().size();
long forwardIndexSize =
((long) info.getTotalNumberOfEntries() *
PinotDataBitSet.getNumBitsPerValue(info.getDistinctValueCount() - 1)
@@ -663,8 +662,8 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
SegmentZKPropsConfig segmentZKPropsConfig =
_config.getSegmentZKPropsConfig();
if (segmentZKPropsConfig != null) {
- properties.setProperty(CommonConstants.Segment.Realtime.START_OFFSET,
segmentZKPropsConfig.getStartOffset());
- properties.setProperty(CommonConstants.Segment.Realtime.END_OFFSET,
segmentZKPropsConfig.getEndOffset());
+ properties.setProperty(Realtime.START_OFFSET,
segmentZKPropsConfig.getStartOffset());
+ properties.setProperty(Realtime.END_OFFSET,
segmentZKPropsConfig.getEndOffset());
}
properties.save();
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
index 3e977cb88d..d1be4b8624 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
@@ -94,6 +94,10 @@ public interface SegmentMetadata {
Map<String, String> getCustomMap();
+ String getStartOffset();
+
+ String getEndOffset();
+
default Set<String> getAllColumns() {
return getSchema().getColumnNames();
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index fc47d53765..02049438ab 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -77,6 +77,11 @@ public class V1Constants {
public static final String SEGMENT_PADDING_CHARACTER =
"segment.padding.character";
public static final String CUSTOM_SUBSET = "custom";
+
+ public static class Realtime {
+ public static final String START_OFFSET =
"segment.realtime.startOffset";
+ public static final String END_OFFSET = "segment.realtime.endOffset";
+ }
}
public static class Column {
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index 7096f72ece..b17f5d41ea 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -81,13 +81,19 @@ public class SegmentMetadataImpl implements SegmentMetadata
{
private SegmentVersion _segmentVersion;
private List<StarTreeV2Metadata> _starTreeV2MetadataList;
- // Caching properties around can be costly when the number of segments is
high according to the
- // finding in PR #2996. So for now, caching is used only when initializing
from input streams.
- private PropertiesConfiguration _segmentMetadataPropertiesConfiguration =
null;
private String _creatorName;
private int _totalDocs;
private final Map<String, String> _customMap = new HashMap<>();
+ // Fields specific to realtime table
+ private String _startOffset;
+ private String _endOffset;
+
+ // TODO: No need to cache this. We cannot modify the metadata if it is from
a input stream
+ // Caching properties around can be costly when the number of segments is
high according to the
+ // finding in PR #2996. So for now, caching is used only when initializing
from input streams.
+ private PropertiesConfiguration _segmentMetadataPropertiesConfiguration =
null;
+
@Deprecated
private String _rawTableName;
@@ -250,6 +256,10 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
}
}
+ // Set start/end offset if available
+ _startOffset =
segmentMetadataPropertiesConfiguration.getString(Segment.Realtime.START_OFFSET,
null);
+ _endOffset =
segmentMetadataPropertiesConfiguration.getString(Segment.Realtime.END_OFFSET,
null);
+
// Set custom configs from metadata properties
setCustomConfigs(segmentMetadataPropertiesConfiguration, _customMap);
}
@@ -378,6 +388,16 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
return _customMap;
}
+ @Override
+ public String getStartOffset() {
+ return _startOffset;
+ }
+
+ @Override
+ public String getEndOffset() {
+ return _endOffset;
+ }
+
@Override
public Map<String, ColumnMetadata> getColumnMetadataMap() {
return _columnMetadataMap;
@@ -427,6 +447,9 @@ public class SegmentMetadataImpl implements SegmentMetadata
{
}
segmentMetadata.set("custom", customConfigs);
+ segmentMetadata.put("startOffset", _startOffset);
+ segmentMetadata.put("endOffset", _endOffset);
+
if (_columnMetadataMap != null) {
ArrayNode columnsMetadata = JsonUtils.newArrayNode();
for (Map.Entry<String, ColumnMetadata> entry :
_columnMetadataMap.entrySet()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]