jerryshao commented on code in PR #10879:
URL: https://github.com/apache/gravitino/pull/10879#discussion_r3165441881


##########
core/src/main/java/org/apache/gravitino/storage/relational/po/ViewPO.java:
##########
@@ -18,94 +18,246 @@
  */
 package org.apache.gravitino.storage.relational.po;
 
+import static 
org.apache.gravitino.storage.relational.utils.POConverters.DEFAULT_DELETED_AT;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.type.CollectionType;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.dto.rel.ColumnDTO;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.NamespacedEntityId;
+import org.apache.gravitino.meta.ViewEntity;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Representation;
+import org.apache.gravitino.rel.SQLRepresentation;
+import org.apache.gravitino.storage.relational.service.EntityIdService;
 
 @Getter
-@EqualsAndHashCode
+@EqualsAndHashCode(exclude = "viewVersionInfoPO")
 @ToString
 public class ViewPO {
+
+  public static final Long INITIAL_VERSION = 1L;
+
   private Long viewId;
   private String viewName;
   private Long metalakeId;
   private Long catalogId;
   private Long schemaId;
+  private String auditInfo;
   private Long currentVersion;
   private Long lastVersion;
   private Long deletedAt;
+  private ViewVersionInfoPO viewVersionInfoPO;
 
-  public static class Builder {
-    private final ViewPO viewPO;
+  public ViewPO() {}
 
-    private Builder() {
-      viewPO = new ViewPO();
-    }
+  public static class ViewPOBuilder {
+    // Lombok will generate builder methods.
+  }
 
-    public Builder withViewId(Long viewId) {
-      viewPO.viewId = viewId;
-      return this;
-    }
+  @lombok.Builder(setterPrefix = "with")
+  private ViewPO(
+      Long viewId,
+      String viewName,
+      Long metalakeId,
+      Long catalogId,
+      Long schemaId,
+      String auditInfo,
+      Long currentVersion,
+      Long lastVersion,
+      Long deletedAt,
+      ViewVersionInfoPO viewVersionInfoPO) {
+    Preconditions.checkArgument(viewId != null, "View id is required");
+    Preconditions.checkArgument(viewName != null, "View name is required");
+    Preconditions.checkArgument(metalakeId != null, "Metalake id is required");
+    Preconditions.checkArgument(catalogId != null, "Catalog id is required");
+    Preconditions.checkArgument(schemaId != null, "Schema id is required");
+    Preconditions.checkArgument(auditInfo != null, "Audit info is required");
+    Preconditions.checkArgument(currentVersion != null, "Current version is 
required");
+    Preconditions.checkArgument(lastVersion != null, "Last version is 
required");
+    Preconditions.checkArgument(deletedAt != null, "Deleted at is required");
 
-    public Builder withViewName(String viewName) {
-      viewPO.viewName = viewName;
-      return this;
-    }
+    this.viewId = viewId;
+    this.viewName = viewName;
+    this.metalakeId = metalakeId;
+    this.catalogId = catalogId;
+    this.schemaId = schemaId;
+    this.auditInfo = auditInfo;
+    this.currentVersion = currentVersion;
+    this.lastVersion = lastVersion;
+    this.deletedAt = deletedAt;
+    this.viewVersionInfoPO = viewVersionInfoPO;
+  }
 
-    public Builder withMetalakeId(Long metalakeId) {
-      viewPO.metalakeId = metalakeId;
-      return this;
-    }
+  // ============================ PO Converters ============================
 
-    public Builder withCatalogId(Long catalogId) {
-      viewPO.catalogId = catalogId;
-      return this;
-    }
+  public static ViewEntity fromViewPO(ViewPO viewPO, Namespace namespace) {
+    try {
+      ViewVersionInfoPO versionPO = viewPO.getViewVersionInfoPO();
+      List<ColumnDTO> columnDTOs =
+          JsonUtils.anyFieldMapper()
+              .readValue(
+                  versionPO.columns(),
+                  JsonUtils.anyFieldMapper()
+                      .getTypeFactory()
+                      .constructCollectionType(List.class, ColumnDTO.class));
+      Column[] columns = columnDTOs.toArray(new Column[0]);
 
-    public Builder withSchemaId(Long schemaId) {
-      viewPO.schemaId = schemaId;
-      return this;
-    }
+      Representation[] representations = 
deserializeRepresentations(versionPO.representations());
+
+      Map<String, String> properties =
+          versionPO.properties() == null
+              ? Collections.emptyMap()
+              : JsonUtils.anyFieldMapper()
+                  .readValue(
+                      versionPO.properties(),
+                      JsonUtils.anyFieldMapper()
+                          .getTypeFactory()
+                          .constructMapType(Map.class, String.class, 
String.class));
 
-    public Builder withCurrentVersion(Long currentVersion) {
-      viewPO.currentVersion = currentVersion;
-      return this;
+      return ViewEntity.builder()
+          .withId(viewPO.getViewId())
+          .withName(viewPO.getViewName())
+          .withNamespace(namespace)
+          .withComment(versionPO.viewComment())
+          .withColumns(columns)
+          .withRepresentations(representations)
+          .withDefaultCatalog(versionPO.defaultCatalog())
+          .withDefaultSchema(versionPO.defaultSchema())
+          .withProperties(properties)
+          .withAuditInfo(
+              JsonUtils.anyFieldMapper().readValue(viewPO.getAuditInfo(), 
AuditInfo.class))
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to deserialize json object:", e);
     }
+  }
 
-    public Builder withLastVersion(Long lastVersion) {
-      viewPO.lastVersion = lastVersion;
-      return this;
+  public static ViewPO initializeViewPO(ViewEntity viewEntity, ViewPOBuilder 
builder) {
+    
builder.withCurrentVersion(INITIAL_VERSION).withLastVersion(INITIAL_VERSION);
+    return buildViewPO(viewEntity, builder, INITIAL_VERSION.intValue());
+  }
+
+  public static ViewVersionInfoPO initializeViewVersionInfoPO(
+      ViewEntity viewEntity, NamespacedEntityId namespacedEntityId, Integer 
version) {
+    try {
+      List<ColumnDTO> columnDTOs =
+          Arrays.stream(viewEntity.columns() == null ? new Column[0] : 
viewEntity.columns())
+              .map(ViewPO::toColumnDTO)
+              .collect(Collectors.toList());
+      String propertiesJson =
+          viewEntity.properties() == null || viewEntity.properties().isEmpty()
+              ? null
+              : 
JsonUtils.anyFieldMapper().writeValueAsString(viewEntity.properties());
+
+      return ViewVersionInfoPO.builder()
+          .withViewId(viewEntity.id())
+          .withMetalakeId(namespacedEntityId.namespaceIds()[0])
+          .withCatalogId(namespacedEntityId.namespaceIds()[1])
+          .withSchemaId(namespacedEntityId.entityId())
+          .withVersion(version)
+          .withViewComment(viewEntity.comment())
+          
.withColumns(JsonUtils.anyFieldMapper().writeValueAsString(columnDTOs))
+          .withProperties(propertiesJson)
+          .withDefaultCatalog(viewEntity.defaultCatalog())
+          .withDefaultSchema(viewEntity.defaultSchema())
+          
.withRepresentations(serializeRepresentations(viewEntity.representations()))
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(viewEntity.auditInfo()))
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
     }
+  }
 
-    public Builder withDeletedAt(Long deletedAt) {
-      viewPO.deletedAt = deletedAt;
-      return this;
+  public static ViewPO buildViewPO(ViewEntity viewEntity, ViewPOBuilder 
builder, Integer version) {
+    try {
+      NamespacedEntityId namespacedEntityId =
+          EntityIdService.getEntityIds(
+              NameIdentifier.of(viewEntity.namespace().levels()), 
Entity.EntityType.SCHEMA);
+      ViewVersionInfoPO versionPO =
+          initializeViewVersionInfoPO(viewEntity, namespacedEntityId, version);
+      return builder
+          .withViewId(viewEntity.id())
+          .withViewName(viewEntity.name())
+          .withMetalakeId(namespacedEntityId.namespaceIds()[0])
+          .withCatalogId(namespacedEntityId.namespaceIds()[1])
+          .withSchemaId(namespacedEntityId.entityId())
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(viewEntity.auditInfo()))
+          .withViewVersionInfoPO(versionPO)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
     }
+  }
+
+  // ============================ Representation Serde 
============================
 
-    private void validate() {
-      Preconditions.checkArgument(viewPO.viewId != null, "View id is 
required");
-      Preconditions.checkArgument(viewPO.viewName != null, "View name is 
required");
-      Preconditions.checkArgument(viewPO.metalakeId != null, "Metalake id is 
required");
-      Preconditions.checkArgument(viewPO.catalogId != null, "Catalog id is 
required");
-      Preconditions.checkArgument(viewPO.schemaId != null, "Schema id is 
required");
-      Preconditions.checkArgument(viewPO.currentVersion != null, "Current 
version is required");
-      Preconditions.checkArgument(viewPO.lastVersion != null, "Last version is 
required");
-      Preconditions.checkArgument(viewPO.deletedAt != null, "Deleted at is 
required");
+  static String serializeRepresentations(Representation[] representations)
+      throws JsonProcessingException {
+    List<Map<String, Object>> out =
+        Arrays.stream(representations == null ? new Representation[0] : 
representations)
+            .map(ViewPO::toMap)
+            .collect(Collectors.toList());
+    return JsonUtils.anyFieldMapper().writeValueAsString(out);
+  }
+
+  static Representation[] deserializeRepresentations(String json) throws 
JsonProcessingException {
+    CollectionType type =
+        
JsonUtils.anyFieldMapper().getTypeFactory().constructCollectionType(List.class, 
Map.class);
+    List<Map<String, Object>> list = 
JsonUtils.anyFieldMapper().readValue(json, type);
+    return list.stream().map(ViewPO::fromMap).toArray(Representation[]::new);
+  }
+
+  private static Map<String, Object> toMap(Representation rep) {
+    if (rep instanceof SQLRepresentation) {
+      SQLRepresentation sql = (SQLRepresentation) rep;
+      ImmutableMap.Builder<String, Object> m = ImmutableMap.builder();
+      m.put("type", Representation.TYPE_SQL);
+      m.put("dialect", sql.dialect());
+      m.put("sql", sql.sql());
+      return m.build();

Review Comment:
   Or, you can figure out a more explicit way to do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to