This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7ec47c4 Add ignoreMerger for partial upsert (#7907) 7ec47c4 is described below commit 7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b Author: deemoliu <qiao...@uber.com> AuthorDate: Fri Jan 14 18:20:18 2022 -0800 Add ignoreMerger for partial upsert (#7907) --- .../segment/local/upsert/PartialUpsertHandler.java | 8 ++++++ ...lUpsertMergerFactory.java => IgnoreMerger.java} | 32 ++++++---------------- .../upsert/merger/PartialUpsertMergerFactory.java | 3 ++ .../upsert/merger/PartialUpsertMergerTest.java | 7 +++++ .../pinot/spi/config/table/UpsertConfig.java | 2 +- 5 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 4822339..7bc6e6a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -119,6 +119,14 @@ public class PartialUpsertHandler { /** * Merges 2 records and returns the merged record. + * We used a map to indicate all configured fields for partial upsert. For these fields + * (1) If the prev value is null, return the new value + * (2) If the prev record is not null, the new value is null, return the prev value. + * (3) If neither values are not null, then merge the value and return. + * For un-configured fields, they are using default override behavior, regardless null values. + * + * For example, overwrite merger will only override the prev value if the new value is not null. + * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. * * @param previousRecord the last derived full record during ingestion. * @param newRecord the new consumed record. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java similarity index 50% copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java index 47fae1c..c9bb16f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java @@ -18,30 +18,16 @@ */ package org.apache.pinot.segment.local.upsert.merger; -import org.apache.pinot.spi.config.table.UpsertConfig; - - -public class PartialUpsertMergerFactory { - private PartialUpsertMergerFactory() { +/** + * Merges 2 records and returns the merged record. + * By default, ignore the new value from incoming row. Then return the merged record. + */ +public class IgnoreMerger implements PartialUpsertMerger { + IgnoreMerger() { } - private static final AppendMerger APPEND_MERGER = new AppendMerger(); - private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger(); - private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger(); - private static final UnionMerger UNION_MERGER = new UnionMerger(); - - public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) { - switch (strategy) { - case APPEND: - return APPEND_MERGER; - case INCREMENT: - return INCREMENT_MERGER; - case OVERWRITE: - return OVERWRITE_MERGER; - case UNION: - return UNION_MERGER; - default: - throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy); - } + @Override + public Object merge(Object previousValue, Object currentValue) { + return previousValue; } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java index 47fae1c..23dc985 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java @@ -27,6 +27,7 @@ public class PartialUpsertMergerFactory { private static final AppendMerger APPEND_MERGER = new AppendMerger(); private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger(); + private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger(); private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger(); private static final UnionMerger UNION_MERGER = new UnionMerger(); @@ -36,6 +37,8 @@ public class PartialUpsertMergerFactory { return APPEND_MERGER; case INCREMENT: return INCREMENT_MERGER; + case IGNORE: + return IGNORE_MERGER; case OVERWRITE: return OVERWRITE_MERGER; case UNION: diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java index 002b4a4..7a2c398 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java @@ -42,6 +42,13 @@ public class PartialUpsertMergerTest { } @Test + public void testIgnoreMergers() { + IgnoreMerger ignoreMerger = new IgnoreMerger(); + assertEquals(null, ignoreMerger.merge(null, 3)); + assertEquals(3, ignoreMerger.merge(3, null)); + } + + @Test public void testOverwriteMergers() { OverwriteMerger overwriteMerger = new OverwriteMerger(); assertEquals("newValue", overwriteMerger.merge("oldValue", "newValue")); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 7e0be94..9184bff 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -37,7 +37,7 @@ public class UpsertConfig extends BaseJsonConfig { public enum Strategy { // Todo: add CUSTOM strategies - APPEND, INCREMENT, OVERWRITE, UNION + APPEND, IGNORE, INCREMENT, OVERWRITE, UNION } public enum HashFunction { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org