Jackie-Jiang commented on code in PR #15515:
URL: https://github.com/apache/pinot/pull/15515#discussion_r2058931303


##########
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java:
##########
@@ -66,6 +71,8 @@ public class TableCache implements PinotConfigProvider {
   private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
   private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
   private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+  private static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL";

Review Comment:
   Consider adding this under `/LOGICAL/TABLE` in case we want to add other 
logical abstractions (e.g. logical segments)



##########
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java:
##########
@@ -390,6 +423,78 @@ private List<Schema> getSchemas() {
     return schemas;
   }
 
+  private void addLogicalTables(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _zkLogicalTableChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putLogicalTable(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding logical table for 
ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
+    }
+  }
+
+  private void putLogicalTable(ZNRecord znRecord)
+      throws IOException {
+    LogicalTable logicalTable = LogicalTableUtils.fromZNRecord(znRecord);
+    String logicalTableName = logicalTable.getTableName();
+    _logicalTableNameMap.put(logicalTableName, logicalTable);
+  }
+
+  private void removeLogicalTable(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _zkLogicalTableChangeListener);
+    String logicalTableName = 
path.substring(LOGICAL_TABLE_PATH_PREFIX.length());
+    _logicalTableNameMap.remove(logicalTableName);
+  }
+
+  private void notifyLogicalTableChangeListeners() {
+    if (!_logicalTableChangeListeners.isEmpty()) {
+      List<LogicalTable> logicalTables = new 
ArrayList<>(_logicalTableNameMap.values());
+      for (LogicalTableChangeListener listener : _logicalTableChangeListeners) 
{
+        listener.onChange(logicalTables);
+      }
+    }
+  }
+
+  @Nullable
+  public LogicalTable getLogicalTable(String logicalTableName) {
+    return _logicalTableNameMap.get(logicalTableName);
+  }
+
+  public boolean isLogicalTable(String logicalTableName) {
+    return _logicalTableNameMap.containsKey(logicalTableName);
+  }
+
+  public List<LogicalTable> getLogicalTables() {
+    return new ArrayList<>(_logicalTableNameMap.values());
+  }
+
+  public Schema getLogicalTableSchema(String logicalTableName) {

Review Comment:
   We want to first lookup the schema using raw logicalTableName, then fallback 
to the first physical table.
   @vrajat Do we want to enforce schema for logical table?



##########
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java:
##########
@@ -216,11 +241,19 @@ public boolean 
registerTableConfigChangeListener(TableConfigChangeListener table
   }
 
   /**
-   * Returns the schema for the given table, or {@code null} if it does not 
exist.
+   * Returns the schema for the given logical or physical table, or {@code 
null} if it does not exist.
    */
   @Nullable
   @Override
   public Schema getSchema(String rawTableName) {
+    if (isLogicalTable(rawTableName)) {
+      return getLogicalTableSchema(rawTableName);
+    } else {
+      return getPhysicalTableSchema(rawTableName);
+    }
+  }
+
+  private Schema getPhysicalTableSchema(String rawTableName) {

Review Comment:
   (minor)
   ```suggestion
     private Schema getSchema(String schemaName) {
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java:
##########
@@ -390,6 +423,78 @@ private List<Schema> getSchemas() {
     return schemas;
   }
 
+  private void addLogicalTables(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _zkLogicalTableChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putLogicalTable(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding logical table for 
ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
+    }
+  }
+
+  private void putLogicalTable(ZNRecord znRecord)
+      throws IOException {
+    LogicalTable logicalTable = LogicalTableUtils.fromZNRecord(znRecord);
+    String logicalTableName = logicalTable.getTableName();

Review Comment:
   We also need to handle `_ignoreCase` for logical tables as well.



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -809,4 +819,45 @@ private static Map<String, Double> 
toApplicationQpsQuotas(Map<String, String> qu
       return result;
     }
   }
+
+  public static void setLogicalTable(ZkHelixPropertyStore<ZNRecord> 
propertyStore, LogicalTable logicalTable) {

Review Comment:
   (minor) Put logical table APIs next to physical table ones for readability



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableBuilder.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils.builder;
+
+import java.util.Map;
+import org.apache.pinot.spi.data.LogicalTable;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+
+
+public class LogicalTableBuilder {

Review Comment:
   (minor) Since the config is very simple, we don't need to introduce a 
builder. We can introduce builder when it becomes more complex



##########
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java:
##########
@@ -476,6 +581,49 @@ public synchronized void handleDataDeleted(String path) {
     }
   }
 
+  private class ZkLogicalTableChangeListener implements IZkChildListener, 
IZkDataListener {
+
+    @Override
+    public synchronized void handleChildChange(String path, List<String> 
logicalTableNames) {
+      if (CollectionUtils.isEmpty(logicalTableNames)) {
+        return;
+      }
+
+      // Only process new added logical tables. Changed/removed logical tables 
are handled by other callbacks.
+      List<String> pathsToAdd = new ArrayList<>();
+      for (String logicalTableName : logicalTableNames) {
+        if (!_logicalTableNameMap.containsKey(logicalTableName)) {
+          pathsToAdd.add(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+        }
+      }
+      if (!pathsToAdd.isEmpty()) {
+        addLogicalTables(pathsToAdd);
+      }
+      notifyLogicalTableChangeListeners();
+    }
+
+    @Override
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          putLogicalTable(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing logical table for 
ZNRecord: {}", znRecord.getId(), e);
+        }
+        notifyLogicalTableChangeListeners();
+      }
+    }
+
+    @Override
+    public synchronized void handleDataDeleted(String path) {
+      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
+      String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
+      removeLogicalTable(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+      notifyLogicalTableChangeListeners();
+    }
+  }
+
   private static class TableConfigInfo {

Review Comment:
   We will likely need a entry similar to this for logical table. These are all 
query related properties.
   
   (minor) Since logical table config handling is very similar to table config 
handling, consider putting all new methods next to table config methods for 
readability.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTable.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LogicalTable {

Review Comment:
   Should this class extend `BaseJsonConfig`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to