This is an automated email from the ASF dual-hosted git repository. richardstartin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 681da61 Refresh ZK metadata when dimension table is updated (#8133) 681da61 is described below commit 681da619a3921e6cd92179a0d638d5b4270100b6 Author: Mark Needham <m.h.need...@gmail.com> AuthorDate: Mon Mar 21 12:03:10 2022 +0000 Refresh ZK metadata when dimension table is updated (#8133) * Refresh ZK metadata when dimension table is updated * Update DimensionTableDataManagerTest.java * all fields into a volatile class in the CAS loop (as per Richard's feedback) * license missing * Return DimensionTable instead of passing it in * Don't mutate the state of DimensionTable --- .../core/data/manager/offline/DimensionTable.java | 57 ++++++++++++++++++++++ .../manager/offline/DimensionTableDataManager.java | 44 ++++++++++------- .../offline/DimensionTableDataManagerTest.java | 57 ++++++++++++++++++++-- 3 files changed, 136 insertions(+), 22 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java new file mode 100644 index 0000000..d738b5f --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.offline; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.PrimaryKey; + + +class DimensionTable { + + private final Map<PrimaryKey, GenericRow> _lookupTable; + private final Schema _tableSchema; + private final List<String> _primaryKeyColumns; + + DimensionTable(Schema tableSchema, List<String> primaryKeyColumns) { + this(tableSchema, primaryKeyColumns, new HashMap<>()); + } + + DimensionTable(Schema tableSchema, List<String> primaryKeyColumns, Map<PrimaryKey, GenericRow> lookupTable) { + _lookupTable = lookupTable; + _tableSchema = tableSchema; + _primaryKeyColumns = primaryKeyColumns; + } + + List<String> getPrimaryKeyColumns() { + return _primaryKeyColumns; + } + + GenericRow get(PrimaryKey pk) { + return _lookupTable.get(pk); + } + + FieldSpec getFieldSpecFor(String columnName) { + return _tableSchema.getFieldSpecFor(columnName); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index f9a87c4..46a3b11 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.concurrent.ThreadSafe; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; @@ -48,6 +50,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey; */ @ThreadSafe public class DimensionTableDataManager extends OfflineTableDataManager { + // Storing singletons per table in a HashMap private static final Map<String, DimensionTableDataManager> INSTANCES = new ConcurrentHashMap<>(); @@ -73,19 +76,18 @@ public class DimensionTableDataManager extends OfflineTableDataManager { } @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, Map> UPDATER = - AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class, Map.class, "_lookupTable"); - private volatile Map<PrimaryKey, GenericRow> _lookupTable = new HashMap<>(); - private Schema _tableSchema; - private List<String> _primaryKeyColumns; + private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, DimensionTable> UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class, + DimensionTable.class, "_dimensionTable"); + + private volatile DimensionTable _dimensionTable; @Override protected void doInit() { super.doInit(); - // dimension tables should always have schemas with primary keys - _tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - _primaryKeyColumns = _tableSchema.getPrimaryKeyColumns(); + Schema tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); + _dimensionTable = new DimensionTable(tableSchema, tableSchema.getPrimaryKeyColumns()); } @Override @@ -118,17 +120,17 @@ public class DimensionTableDataManager extends OfflineTableDataManager { */ private void loadLookupTable() throws Exception { - Map<PrimaryKey, GenericRow> snapshot; - Map<PrimaryKey, GenericRow> replacement; + DimensionTable snapshot; + DimensionTable replacement; do { - snapshot = _lookupTable; - replacement = new HashMap<>(snapshot.size()); - populate(replacement); + snapshot = _dimensionTable; + replacement = createDimensionTable(); } while (!UPDATER.compareAndSet(this, snapshot, replacement)); } - private void populate(Map<PrimaryKey, GenericRow> map) + private DimensionTable createDimensionTable() throws Exception { + Map<PrimaryKey, GenericRow> map = new HashMap<>(); List<SegmentDataManager> segmentManagers = acquireAllSegments(); try { for (SegmentDataManager segmentManager : segmentManagers) { @@ -137,10 +139,16 @@ public class DimensionTableDataManager extends OfflineTableDataManager { indexSegment.getSegmentMetadata().getIndexDir())) { while (reader.hasNext()) { GenericRow row = reader.next(); - map.put(row.getPrimaryKey(_primaryKeyColumns), row); + map.put(row.getPrimaryKey(_dimensionTable.getPrimaryKeyColumns()), row); } } } + + ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore(); + Schema tableSchema = ZKMetadataProvider.getTableSchema(propertyStore, _tableNameWithType); + List<String> primaryKeyColumns = tableSchema.getPrimaryKeyColumns(); + return new DimensionTable(tableSchema, primaryKeyColumns, map); + } finally { for (SegmentDataManager segmentManager : segmentManagers) { releaseSegment(segmentManager); @@ -149,14 +157,14 @@ public class DimensionTableDataManager extends OfflineTableDataManager { } public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) { - return _lookupTable.get(pk); + return _dimensionTable.get(pk); } public FieldSpec getColumnFieldSpec(String columnName) { - return _tableSchema.getFieldSpecFor(columnName); + return _dimensionTable.getFieldSpecFor(columnName); } public List<String> getPrimaryKeyColumns() { - return _primaryKeyColumns; + return _dimensionTable.getPrimaryKeyColumns(); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index 0daf200..dbce8cb 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -100,7 +100,22 @@ public class DimensionTableDataManagerTest { return propertyStore; } - private DimensionTableDataManager makeTestableManager() { + private ZkHelixPropertyStore mockPropertyStoreWithNewColumn() { + String baseballTeamsSchemaStr = + "{\"schemaName\":\"dimBaseballTeams\",\"dimensionFieldSpecs\":[{\"name\":\"teamID\",\"dataType\":\"STRING\"}," + + "{\"name\":\"teamName\",\"dataType\":\"STRING\"}, {\"name\":\"teamCity\",\"dataType\":\"STRING\"}]," + + "\"primaryKeyColumns\":[\"teamID\"]}"; + ZNRecord zkSchemaRec = new ZNRecord("dimBaseballTeams"); + zkSchemaRec.setSimpleField("schemaJSON", baseballTeamsSchemaStr); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)). + thenReturn(zkSchemaRec); + + return propertyStore; + } + + private DimensionTableDataManager makeTestableManager(HelixManager helixManager) { DimensionTableDataManager tableDataManager = DimensionTableDataManager.createInstanceByTableName(TABLE_NAME); TableDataManagerConfig config; { @@ -109,7 +124,7 @@ public class DimensionTableDataManagerTest { when(config.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath()); } tableDataManager.init(config, "dummyInstance", mockPropertyStore(), - new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null); + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null); tableDataManager.start(); return tableDataManager; @@ -118,7 +133,10 @@ public class DimensionTableDataManagerTest { @Test public void instantiationTests() throws Exception { - DimensionTableDataManager mgr = makeTestableManager(); + HelixManager helixManager = mock(HelixManager.class); + ZkHelixPropertyStore propertyStore = mockPropertyStore(); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + DimensionTableDataManager mgr = makeTestableManager(helixManager); Assert.assertEquals(mgr.getTableName(), TABLE_NAME); // fetch the same instance via static method @@ -144,7 +162,10 @@ public class DimensionTableDataManagerTest { @Test public void lookupTests() throws Exception { - DimensionTableDataManager mgr = makeTestableManager(); + HelixManager helixManager = mock(HelixManager.class); + ZkHelixPropertyStore propertyStore = mockPropertyStore(); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + DimensionTableDataManager mgr = makeTestableManager(helixManager); // try fetching data BEFORE loading segment GenericRow resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"})); @@ -177,4 +198,32 @@ public class DimensionTableDataManagerTest { resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"})); Assert.assertNull(resp, "Response should be null if no segment is loaded"); } + + @Test + public void onRefreshDimensionTable() + throws Exception { + HelixManager helixManager = mock(HelixManager.class); + ZkHelixPropertyStore<ZNRecord> propertyStore = mockPropertyStoreWithNewColumn(); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + + DimensionTableDataManager mgr = makeTestableManager(helixManager); + + mgr.addSegment(_indexDir, _indexLoadingConfig); + + // Confirm table is loaded and available for lookup + GenericRow resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"})); + Assert.assertNotNull(resp, "Should return response after segment load"); + Assert.assertEquals(resp.getValue("teamName"), "San Francisco Giants"); + + // WHEN (segment is refreshed) + + mgr.addSegment(_indexDir, _indexLoadingConfig); + + // THEN + FieldSpec teamCitySpec = mgr.getColumnFieldSpec("teamCity"); + Assert.assertNotNull(teamCitySpec, "Should return spec for existing column"); + Assert.assertEquals(teamCitySpec.getDataType(), FieldSpec.DataType.STRING, + "Should return correct data type for teamCity column"); + + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org