Copilot commented on code in PR #10874:
URL: https://github.com/apache/gravitino/pull/10874#discussion_r3159261206


##########
server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java:
##########
@@ -570,6 +629,8 @@ private void loadOwnerPolicy(String metalake, 
MetadataObject metadataObject, Lon
           }
         }
       }
+    } catch (NoSuchEntityException nse) {
+      LOG.debug("No owner entity found for metalake {}", metalake);

Review Comment:
   In `loadOwnerPolicy`, the `NoSuchEntityException` path only logs and returns 
without populating `ownerRel`. That means repeated checks for the same 
`metadataId` will keep re-hitting the store and `checkOwnership` will continue 
to see the cache as “not loaded”. Consider caching `Optional.empty()` for this 
`metadataId` in the `NoSuchEntityException` branch (and include the metadata 
object in the debug message).
   ```suggestion
         ownerRel.put(metadataId, Optional.empty());
         LOG.debug(
             "No owner entity found for metalake {} and metadata object {}",
             metalake,
             metadataObject);
   ```



##########
server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java:
##########
@@ -238,8 +238,8 @@ public static String replaceAnyPrivilege(String expression) 
{
     expression =
         expression.replaceAll(
             "ANY_CREATE_SCHEMA",
-            "((ANY(CREATE_SCHEMA, METALAKE, CATALOG)) "
-                + "&& !(ANY(DENY_CREATE_SCHEMA, METALAKE, CATALOG)))");
+            "((ANY(CREATE_SCHEMA, METALAKE, CATALOG, SCHEMA)) "
+                + "&& !(ANY(DENY_CREATE_SCHEMA, METALAKE, CATALOG, SCHEMA)))");

Review Comment:
   `replaceAnyPrivilege` behavior for `ANY_CREATE_SCHEMA` changed to include 
`SCHEMA`, but there is no unit test covering the `replaceAnyPrivilege` 
expansion for this token. Please add a test case in 
`TestAuthorizationExpressionConverter` to assert the expanded expression 
includes the `SCHEMA` scope and the corresponding deny clause, to prevent 
regressions.



##########
core/src/test/java/org/apache/gravitino/hook/TestSchemaHookDispatcher.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.SchemaDispatcher;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestSchemaHookDispatcher {
+
+  private static final String METALAKE = "test_metalake";
+  private static final String CATALOG = "test_catalog";
+  private static final String SEPARATOR = ":";
+
+  private SchemaDispatcher mockDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  private SchemaHookDispatcher hookDispatcher;
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    mockDispatcher = mock(SchemaDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(Configs.SCHEMA_SEPARATOR)).thenReturn(SEPARATOR);
+
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", mockConfig, 
true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", 
mockOwnerDispatcher, true);
+
+    hookDispatcher = new SchemaHookDispatcher(mockDispatcher);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "config", null, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "ownerDispatcher", null, 
true);
+  }
+
+  @Test
+  public void testCreateFlatSchemaCreatesNoParents() {
+    NameIdentifier ident = NameIdentifier.of(METALAKE, CATALOG, "myschema");
+    Schema mockSchema = mock(Schema.class);
+    when(mockDispatcher.createSchema(eq(ident), any(), 
any())).thenReturn(mockSchema);
+
+    hookDispatcher.createSchema(ident, "comment", Collections.emptyMap());
+
+    // Only the schema itself should be created, no parent creation calls
+    verify(mockDispatcher, times(1)).createSchema(eq(ident), any(), any());
+    verify(mockDispatcher, never()).schemaExists(any());
+
+    // Owner should be set for the schema
+    verify(mockOwnerDispatcher, times(1))
+        .setOwner(
+            eq(METALAKE),
+            eq(NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.SCHEMA)),
+            any(),
+            eq(Owner.Type.USER));
+  }
+
+  @Test
+  public void testCreateNestedSchemaAutoCreatesMissingParents() {
+    NameIdentifier ident = NameIdentifier.of(METALAKE, CATALOG, "A:B:C");
+    NameIdentifier parentA = NameIdentifier.of(METALAKE, CATALOG, "A");
+    NameIdentifier parentAB = NameIdentifier.of(METALAKE, CATALOG, "A:B");
+    Schema mockSchema = mock(Schema.class);
+
+    when(mockDispatcher.schemaExists(parentA)).thenReturn(false);
+    when(mockDispatcher.schemaExists(parentAB)).thenReturn(false);
+    when(mockDispatcher.createSchema(eq(ident), any(), 
any())).thenReturn(mockSchema);
+
+    hookDispatcher.createSchema(ident, "comment", Collections.emptyMap());
+
+    // Parents are auto-created by catalog; hook dispatcher only creates 
target schema.
+    verify(mockDispatcher, never()).createSchema(eq(parentA), any(), any());
+    verify(mockDispatcher, never()).createSchema(eq(parentAB), any(), any());
+    verify(mockDispatcher, times(1)).createSchema(eq(ident), any(), any());
+
+    // Verify owner is set for the requested schema.
+    verify(mockOwnerDispatcher, times(1)).setOwner(eq(METALAKE), any(), any(), 
eq(Owner.Type.USER));
+    verify(mockOwnerDispatcher, times(1))
+        .setOwner(
+            eq(METALAKE),
+            eq(NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.SCHEMA)),
+            any(),
+            eq(Owner.Type.USER));
+  }

Review Comment:
   These tests don’t verify the new behavior added in `SchemaHookDispatcher` to 
assign owners to auto-created parent schemas. Consider stubbing 
`schemaExists(parent)` to return `false` before `createSchema` and `true` 
after, then asserting `ownerDispatcher.setOwner(...)` is called for each 
missing parent (and that existing parents are not reassigned). This will guard 
against regressions in the parent-ownership logic.



##########
core/src/main/java/org/apache/gravitino/catalog/HierarchicalSchemaUtil.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.GravitinoEnv;
+
+/**
+ * Utility class for hierarchical schema name conversions.
+ *
+ * <p>Gravitino supports nested namespace semantics where a logical schema 
name like {@code A:B:C}
+ * (using a configurable external separator, default {@code :}) is mapped to a 
physical schema name
+ * {@code A.B.C} stored in {@code EntityStore} using {@code .} as the internal 
separator.
+ *
+ * <p>Schema names in {@code EntityStore} never contain the external 
separator; the external
+ * separator is only used at the API boundary and in catalog capability 
validation.
+ */
+public final class HierarchicalSchemaUtil {
+
+  /** The internal separator used in EntityStore for nested schema names. */
+  private static final String PHYSICAL_SEPARATOR = ".";
+
+  private HierarchicalSchemaUtil() {}
+
+  /**
+   * Returns the configured external schema separator from the server 
configuration. All code that
+   * needs the separator should use this method instead of reading the config 
directly.
+   *
+   * @return the configured separator string (default {@code ":"})
+   */
+  public static String schemaSeparator() {
+    Config config = GravitinoEnv.getInstance().config();
+    if (config == null) {
+      return Configs.SCHEMA_SEPARATOR.getDefaultValue();
+    }
+    String separator = config.get(Configs.SCHEMA_SEPARATOR);
+    return StringUtils.defaultIfBlank(separator, 
Configs.SCHEMA_SEPARATOR.getDefaultValue());
+  }
+
+  /**
+   * Converts a logical schema path (using the external separator) to a 
physical schema name (using
+   * {@code .} as separator) suitable for storage in EntityStore.
+   *
+   * <p>Example: {@code "A:B:C"} with separator {@code ":"} → {@code "A.B.C"}
+   *
+   * @param logicalPath the logical schema path using the external separator
+   * @param separator the external separator configured on the server
+   * @return the physical schema name using {@code .} as separator
+   */
+  public static String logicalToPhysical(String logicalPath, String separator) 
{
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(logicalPath), "logicalPath must not be blank");
+    Preconditions.checkArgument(StringUtils.isNotBlank(separator), "separator 
must not be blank");
+    return logicalPath.replace(separator, PHYSICAL_SEPARATOR);
+  }
+
+  /**
+   * Converts a physical schema name (using {@code .} as separator) back to 
the logical schema path
+   * using the configured external separator.
+   *
+   * <p>Example: {@code "A.B.C"} with separator {@code ":"} → {@code "A:B:C"}
+   *
+   * @param physicalName the physical schema name using {@code .} as separator
+   * @param separator the external separator configured on the server
+   * @return the logical schema path using the external separator
+   */
+  public static String physicalToLogical(String physicalName, String 
separator) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(physicalName), "physicalName must not be 
blank");
+    Preconditions.checkArgument(StringUtils.isNotBlank(separator), "separator 
must not be blank");
+    return physicalName.replace(PHYSICAL_SEPARATOR, separator);
+  }
+
+  /**
+   * Returns whether a schema name is a nested path (contains the external 
separator).
+   *
+   * @param name the schema name to test
+   * @param separator the external separator
+   * @return {@code true} if the name contains the external separator
+   */
+  public static boolean isNested(String name, String separator) {
+    return StringUtils.isNotBlank(name) && name.contains(separator);
+  }
+
+  /**
+   * Returns all ancestor schema names of the given schema name, ordered from 
outermost to innermost
+   * (but excluding the name itself). Returns an empty list for top-level 
(non-nested) schemas.
+   *
+   * <p>Example: {@code "A:B:C"} with separator {@code ":"} → {@code ["A", 
"A:B"]}
+   *
+   * @param schemaName the schema name to find ancestors for
+   * @param separator the external separator
+   * @return ancestor names from outermost to innermost, or empty list if not 
nested
+   */
+  public static List<String> getAncestorNames(String schemaName, String 
separator) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(schemaName), 
"schemaName must not be blank");
+    Preconditions.checkArgument(StringUtils.isNotBlank(separator), "separator 
must not be blank");
+    String[] parts = schemaName.split(Pattern.quote(separator), -1);
+    List<String> ancestors = new ArrayList<>();
+    for (int i = 1; i < parts.length; i++) {
+      ancestors.add(String.join(separator, java.util.Arrays.copyOf(parts, i)));
+    }

Review Comment:
   Avoid using `java.util.Arrays` via a fully-qualified reference inside the 
method body (`java.util.Arrays.copyOf(...)`). Import `java.util.Arrays` (or 
refactor to avoid the copy) to keep consistent with the project guideline of 
using standard imports instead of FQNs in code.



##########
core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java:
##########
@@ -185,7 +198,8 @@ public void insertSchema(SchemaEntity schemaEntity, boolean 
overwrite) throws IO
       baseMetricName = "updateSchema")
   public <E extends Entity & HasIdentifier> SchemaEntity updateSchema(
       NameIdentifier identifier, Function<E, E> updater) throws IOException {
-    SchemaPO oldSchemaPO = getSchemaPOByIdentifier(identifier);
+    // Use physical identifier for DB lookup; expose logical entity to the 
updater.
+    SchemaPO oldSchemaPO = 
getSchemaPOByIdentifier(toPhysicalIdentifier(identifier));
 
     SchemaEntity oldSchemaEntity = POConverters.fromSchemaPO(oldSchemaPO, 
identifier.namespace());
     SchemaEntity newEntity = (SchemaEntity) updater.apply((E) oldSchemaEntity);

Review Comment:
   `updateSchema` claims it “expose[s] logical entity to the updater”, but 
`oldSchemaEntity` is built directly from the PO and therefore still carries the 
*physical* schema name (dot-separated). This will leak physical names to 
callers and can cause `newEntity` (returned to the caller) to be physical as 
well, which also contradicts the new tests expecting logical names. Convert 
`oldSchemaEntity` to logical (and ideally normalize `newEntity` to logical) 
before invoking/returning the updater result, while still persisting the 
physical form.



##########
core/src/main/java/org/apache/gravitino/Configs.java:
##########
@@ -500,6 +500,20 @@ private Configs() {}
           .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
           .createWithDefault(5 * 60 * 1000L); // Default is 5 minutes
 
+  public static final ConfigEntry<String> SCHEMA_SEPARATOR =
+      new ConfigBuilder("gravitino.schema.separator")
+          .doc(
+              "The separator used to represent nested namespace hierarchy in 
schema names at the "
+                  + "API boundary (e.g. ':' for 'A:B:C'). Schema names are 
stored internally in "
+                  + "EntityStore using '.' as the physical separator. The 
configured separator is "
+                  + "only used at external API and catalog capability 
validation layer. "
+                  + "The '.' character is reserved as the internal physical 
separator and must "
+                  + "not be used as the external separator.")
+          .version(ConfigConstants.VERSION_1_3_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .createWithDefault(":");

Review Comment:
   The SCHEMA_SEPARATOR doc says "." is reserved as the physical separator and 
must not be used as the external separator, but the config validation only 
checks non-blank. Please enforce this in `checkValue` (e.g., reject "." and 
possibly any value containing ".") so misconfiguration can’t silently corrupt 
name conversions.



##########
server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java:
##########
@@ -222,15 +224,31 @@ public boolean isOwner(
       String metalake,
       MetadataObject metadataObject,
       AuthorizationRequestContext requestContext) {
-    boolean result;
-    try {
+    boolean result = false;
+    if (metadataObject == null) {
+      return false;
+    }
+
+    if (metadataObject.type() == MetadataObject.Type.SCHEMA) {
+      for (MetadataObject scopeObject : 
buildSchemaInheritanceChain(metadataObject)) {
+        Long metadataId = MetadataIdConverter.getID(scopeObject, metalake);
+        if (metadataId == null) {
+          continue;
+        }
+        loadOwnerPolicy(metalake, scopeObject, metadataId);
+        if (checkOwnership(principal, metalake, metadataId)) {
+          result = true;
+          break;
+        }
+      }
+    } else {
       Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-      loadOwnerPolicy(metalake, metadataObject, metadataId);
-      result = checkOwnership(principal, metalake, metadataId);
-    } catch (Exception e) {
-      LOG.debug("Can not get entity id", e);
-      result = false;
+      if (metadataId != null) {
+        loadOwnerPolicy(metalake, metadataObject, metadataId);
+        result = checkOwnership(principal, metalake, metadataId);
+      }
     }

Review Comment:
   `isOwner` no longer guards calls to `MetadataIdConverter.getID(...)` / 
`loadOwnerPolicy(...)` with a try/catch. Since `MetadataIdConverter.getID` can 
throw a `RuntimeException` on I/O failures, authorization evaluation can now 
fail by throwing instead of failing closed (returning false). Please restore 
exception handling (at least around ID resolution) so errors deny ownership 
rather than bubbling up and breaking request processing.



##########
core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java:
##########
@@ -57,20 +59,51 @@ public NameIdentifier[] listSchemas(Namespace namespace) 
throws NoSuchCatalogExc
   @Override
   public Schema createSchema(NameIdentifier ident, String comment, Map<String, 
String> properties)
       throws NoSuchCatalogException, SchemaAlreadyExistsException {
+    // Collect missing parent identifiers BEFORE the underlying call; the 
catalog itself is
+    // responsible for auto-creating them. We only need to assign ownership 
afterwards.
+    List<NameIdentifier> missingParents = findMissingParents(ident);
     Schema schema = dispatcher.createSchema(ident, comment, properties);
 
-    // Set the creator as the owner of the schema.
+    String metalake = ident.namespace().level(0);
     OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
     if (ownerManager != null) {
+      String creator = PrincipalUtils.getCurrentUserName();
+      for (NameIdentifier parent : missingParents) {
+        if (dispatcher.schemaExists(parent)) {
+          ownerManager.setOwner(
+              metalake,
+              NameIdentifierUtil.toMetadataObject(parent, 
Entity.EntityType.SCHEMA),
+              creator,
+              Owner.Type.USER);
+        }
+      }

Review Comment:
   The new parent-owner assignment can overwrite ownership in a race: 
`missingParents` is computed before `createSchema`, and `OwnerManager#setOwner` 
overwrites existing owners. If another request creates a parent schema between 
the pre-check and this call, this code can incorrectly reset that parent’s 
owner to the current user. To avoid privilege/ownership corruption, check 
`ownerManager.getOwner(...)` and only call `setOwner` when the parent currently 
has no owner (or otherwise ensure you only set owner for parents created by 
this request).



##########
core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java:
##########
@@ -170,6 +177,186 @@ public void testMetaLifeCycleFromCreationToDeletion() 
throws IOException {
     assertFalse(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA));
   }
 
+  @TestTemplate
+  public void testNestedSchemaInsertStoresPhysicalNameAndReturnsLogical() 
throws IOException {
+    createAndInsertMakeLake(metalakeName);
+    createAndInsertCatalog(metalakeName, catalogName);
+
+    SchemaMetaService schemaMetaService = SchemaMetaService.getInstance();
+    String logicalName = "teamA:sales:reports";
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofSchema(metalakeName, catalogName),
+            logicalName,
+            AUDIT_INFO);
+    schemaMetaService.insertSchema(schema, false);
+
+    // Reading back via getSchemaByIdentifier must return the logical name.
+    SchemaEntity loaded =
+        schemaMetaService.getSchemaByIdentifier(
+            NameIdentifierUtil.ofSchema(metalakeName, catalogName, 
logicalName));
+    Assertions.assertEquals(logicalName, loaded.name());
+
+    // The raw row stored in the DB must use the physical separator ".".
+    String rawStoredName = readRawSchemaNameFromDb(schema.id());
+    Assertions.assertEquals("teamA.sales.reports", rawStoredName);
+  }
+
+  @TestTemplate
+  public void testFlatSchemaNameStoredAndReturnedUnchanged() throws 
IOException {
+    createAndInsertMakeLake(metalakeName);
+    createAndInsertCatalog(metalakeName, catalogName);
+
+    SchemaMetaService schemaMetaService = SchemaMetaService.getInstance();
+    String flatName = "flat_schema";
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofSchema(metalakeName, catalogName),
+            flatName,
+            AUDIT_INFO);
+    schemaMetaService.insertSchema(schema, false);
+
+    SchemaEntity loaded =
+        schemaMetaService.getSchemaByIdentifier(
+            NameIdentifierUtil.ofSchema(metalakeName, catalogName, flatName));
+    Assertions.assertEquals(flatName, loaded.name());
+
+    // Flat name: raw DB value must equal the name as-is.
+    Assertions.assertEquals(flatName, readRawSchemaNameFromDb(schema.id()));
+  }
+
+  @TestTemplate
+  public void testListSchemasReturnsLogicalNames() throws IOException {
+    createAndInsertMakeLake(metalakeName);
+    createAndInsertCatalog(metalakeName, catalogName);
+
+    SchemaMetaService schemaMetaService = SchemaMetaService.getInstance();
+
+    SchemaEntity flat =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofSchema(metalakeName, catalogName),
+            "flat",
+            AUDIT_INFO);
+    SchemaEntity nested2 =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofSchema(metalakeName, catalogName),
+            "A:B",
+            AUDIT_INFO);
+    SchemaEntity nested3 =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofSchema(metalakeName, catalogName),
+            "A:B:C",
+            AUDIT_INFO);
+    schemaMetaService.insertSchema(flat, false);
+    schemaMetaService.insertSchema(nested2, false);
+    schemaMetaService.insertSchema(nested3, false);
+
+    List<SchemaEntity> listed =
+        
schemaMetaService.listSchemasByNamespace(NamespaceUtil.ofSchema(metalakeName, 
catalogName));
+
+    List<String> names =
+        
listed.stream().map(SchemaEntity::name).collect(java.util.stream.Collectors.toList());
+    Assertions.assertTrue(names.contains("flat"), "flat name must be present");

Review Comment:
   Avoid using fully-qualified class names inside method bodies. Here 
`java.util.stream.Collectors.toList()` can be replaced by importing 
`Collectors` and using `Collectors.toList()` to match the project’s Java 
import/style guidelines.



##########
core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java:
##########
@@ -572,7 +573,11 @@ public static Map<Long, String> 
getSchemaObjectsFullName(List<Long> schemaIds) {
             return;
           }
 
-          String fullName = DOT_JOINER.join(catalogName, 
schemaPO.getSchemaName());
+          String fullName =
+              DOT_JOINER.join(
+                  catalogName,
+                  HierarchicalSchemaUtil.physicalToLogical(
+                      schemaPO.getSchemaName(), 
HierarchicalSchemaUtil.schemaSeparator()));

Review Comment:
   `getSchemaObjectsFullName` now converts schema names from physical 
(dot-separated) to logical, but there’s no test covering schema objects 
(especially nested schemas) in `TestMetadataObjectService`. Please add coverage 
that inserts a nested schema and verifies the returned full name uses the 
configured logical separator (e.g., `catalog.A:B:C`) and does not expose the 
physical `A.B.C`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to