zeroshade commented on code in PR #414:
URL: https://github.com/apache/iceberg-go/pull/414#discussion_r2130041841


##########
catalog/sql/sql.go:
##########
@@ -809,3 +823,350 @@ func (c *Catalog) UpdateNamespaceProperties(ctx 
context.Context, namespace table
 func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace 
table.Identifier) (bool, error) {
        return c.namespaceExists(ctx, strings.Join(namespace, "."))
 }
+
+// CreateView creates a new view in the catalog.
+func (c *Catalog) CreateView(ctx context.Context, identifier table.Identifier, 
schema *iceberg.Schema, viewSQL string, props iceberg.Properties) error {
+       nsIdent := catalog.NamespaceFromIdent(identifier)
+       viewIdent := catalog.TableNameFromIdent(identifier)
+       ns := strings.Join(nsIdent, ".")
+
+       exists, err := c.namespaceExists(ctx, ns)
+       if err != nil {
+               return err
+       }
+       if !exists {
+               return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, ns)
+       }
+
+       exists, err = c.CheckViewExists(ctx, identifier)
+       if err != nil {
+               return err
+       }
+       if exists {
+               return fmt.Errorf("%w: %s", catalog.ErrViewAlreadyExists, 
identifier)
+       }
+
+       loc, err := internal.ResolveTableLocation(ctx, "", ns, viewIdent, 
c.props, c.LoadNamespaceProperties)
+       if err != nil {
+               return err
+       }
+
+       timestampMs := time.Now().UnixMilli()
+       versionId := int64(1)
+
+       viewVersion := struct {
+               VersionID       int64             `json:"version-id"`
+               TimestampMs     int64             `json:"timestamp-ms"`
+               SchemaID        int               `json:"schema-id"`
+               Summary         map[string]string `json:"summary"`
+               Operation       string            `json:"operation"`
+               Representations []struct {
+                       Type    string `json:"type"`
+                       SQL     string `json:"sql"`
+                       Dialect string `json:"dialect"`
+               } `json:"representations"`
+               DefaultCatalog   string   `json:"default-catalog"`
+               DefaultNamespace []string `json:"default-namespace"`
+       }{
+               VersionID:   versionId,
+               TimestampMs: timestampMs,
+               SchemaID:    schema.ID,
+               Summary:     map[string]string{"sql": viewSQL},
+               Operation:   "create",
+               Representations: []struct {
+                       Type    string `json:"type"`
+                       SQL     string `json:"sql"`
+                       Dialect string `json:"dialect"`
+               }{
+                       {Type: "sql", SQL: viewSQL, Dialect: "default"},
+               },
+               DefaultCatalog:   c.name,
+               DefaultNamespace: nsIdent,
+       }
+
+       viewVersionBytes, err := json.Marshal(viewVersion)
+       if err != nil {
+               return fmt.Errorf("failed to marshal view version: %w", err)
+       }
+
+       if props == nil {
+               props = iceberg.Properties{}
+       }
+       props["view-version"] = string(viewVersionBytes)
+       props["view-format"] = "iceberg"
+       props["view-sql"] = viewSQL
+
+       metadataLocation := loc + "/metadata/view-" + uuid.New().String() + 
".metadata.json"
+
+       viewUUID := uuid.New().String()
+       props["view-uuid"] = viewUUID
+
+       viewMetadata := map[string]interface{}{
+               "view-uuid":          viewUUID,
+               "format-version":     1,
+               "location":           loc,
+               "schema":             schema,
+               "current-version-id": versionId,
+               "versions": map[string]interface{}{
+                       "1": viewVersion,
+               },
+               "properties": props,
+               "version-log": []map[string]interface{}{
+                       {
+                               "timestamp-ms": timestampMs,
+                               "version-id":   versionId,
+                       },
+               },
+       }
+
+       viewMetadataBytes, err := json.Marshal(viewMetadata)
+       if err != nil {
+               return fmt.Errorf("failed to marshal view metadata: %w", err)
+       }
+
+       fs, err := io.LoadFS(ctx, c.props, metadataLocation)
+       if err != nil {
+               return fmt.Errorf("failed to load filesystem for view metadata: 
%w", err)
+       }
+
+       wfs, ok := fs.(io.WriteFileIO)
+       if !ok {
+               return errors.New("filesystem IO does not support writing")
+       }
+
+       out, err := wfs.Create(metadataLocation)
+       if err != nil {
+               return fmt.Errorf("failed to create view metadata file: %w", 
err)
+       }
+       defer out.Close()
+
+       if _, err := out.Write(viewMetadataBytes); err != nil {
+               return fmt.Errorf("failed to write view metadata: %w", err)
+       }
+
+       err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error 
{
+               _, err := tx.NewInsert().Model(&sqlIcebergTable{
+                       CatalogName:      c.name,
+                       TableNamespace:   ns,
+                       TableName:        viewIdent,
+                       IcebergType:      ViewType,
+                       MetadataLocation: sql.NullString{String: 
metadataLocation, Valid: true},
+               }).Exec(ctx)
+               if err != nil {
+                       return fmt.Errorf("failed to create view: %w", err)
+               }
+
+               return nil
+       })
+
+       return err
+}
+
+// ListViews returns a list of view identifiers in the catalog.
+func (c *Catalog) ListViews(ctx context.Context, namespace table.Identifier) 
iter.Seq2[table.Identifier, error] {
+       views, err := c.listViewsAll(ctx, namespace)
+       if err != nil {
+               return func(yield func(table.Identifier, error) bool) {
+                       yield(table.Identifier{}, err)
+               }
+       }
+
+       return func(yield func(table.Identifier, error) bool) {
+               for _, v := range views {
+                       if !yield(v, nil) {
+                               return
+                       }
+               }
+       }
+}
+
+func (c *Catalog) listViewsAll(ctx context.Context, namespace 
table.Identifier) ([]table.Identifier, error) {
+       if len(namespace) > 0 {
+               exists, err := c.namespaceExists(ctx, strings.Join(namespace, 
"."))
+               if err != nil {
+                       return nil, err
+               }
+               if !exists {
+                       return nil, fmt.Errorf("%w: %s", 
catalog.ErrNoSuchNamespace, strings.Join(namespace, "."))
+               }
+       }
+
+       ns := strings.Join(namespace, ".")
+       views, err := withReadTx(ctx, c.db, func(ctx context.Context, tx 
bun.Tx) ([]sqlIcebergTable, error) {
+               var views []sqlIcebergTable
+               err := tx.NewSelect().Model(&views).
+                       Where("catalog_name = ?", c.name).
+                       Where("table_namespace = ?", ns).
+                       Where("iceberg_type = ?", ViewType).
+                       Scan(ctx)
+
+               return views, err
+       })
+       if err != nil {
+               return nil, fmt.Errorf("error listing views for namespace '%s': 
%w", namespace, err)
+       }
+
+       ret := make([]table.Identifier, len(views))
+       for i, v := range views {
+               ret[i] = append(strings.Split(v.TableNamespace, "."), 
v.TableName)
+       }
+
+       return ret, nil
+}
+
+// DropView deletes a view from the catalog.
+func (c *Catalog) DropView(ctx context.Context, identifier table.Identifier) 
error {
+       ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".")
+       viewName := catalog.TableNameFromIdent(identifier)
+
+       metadataLocation := ""
+       view, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
(*sqlIcebergTable, error) {
+               v := new(sqlIcebergTable)
+               err := tx.NewSelect().Model(v).
+                       Where("catalog_name = ?", c.name).
+                       Where("table_namespace = ?", ns).
+                       Where("table_name = ?", viewName).
+                       Where("iceberg_type = ?", ViewType).
+                       Scan(ctx)
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchView, 
identifier)
+               }
+               if err != nil {
+                       return nil, fmt.Errorf("error encountered loading view 
%s: %w", identifier, err)
+               }
+
+               return v, nil
+       })
+       if err != nil {
+               return err
+       }
+
+       if view.MetadataLocation.Valid {
+               metadataLocation = view.MetadataLocation.String
+       }
+
+       err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error 
{
+               res, err := tx.NewDelete().Model(&sqlIcebergTable{
+                       CatalogName:    c.name,
+                       TableNamespace: ns,
+                       TableName:      viewName,
+               }).WherePK().Where("iceberg_type = ?", ViewType).Exec(ctx)
+               if err != nil {
+                       return fmt.Errorf("failed to delete view entry: %w", 
err)
+               }
+
+               n, err := res.RowsAffected()
+               if err != nil {
+                       return fmt.Errorf("error encountered when deleting view 
entry: %w", err)
+               }
+
+               if n == 0 {
+                       return fmt.Errorf("%w: %s", catalog.ErrNoSuchView, 
identifier)
+               }
+
+               return nil
+       })
+       if err != nil {
+               return err
+       }
+
+       if metadataLocation != "" {
+               fs, err := io.LoadFS(ctx, c.props, metadataLocation)
+               if err != nil {
+                       return nil
+               }
+
+               _ = fs.Remove(metadataLocation)
+       }
+
+       return nil
+}
+
+// CheckViewExists returns true if a view exists in the catalog.
+func (c *Catalog) CheckViewExists(ctx context.Context, identifier 
table.Identifier) (bool, error) {
+       ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".")
+       viewName := catalog.TableNameFromIdent(identifier)
+
+       return withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
(bool, error) {
+               exists, err := tx.NewSelect().Model(&sqlIcebergTable{
+                       CatalogName:    c.name,
+                       TableNamespace: ns,
+                       TableName:      viewName,
+               }).WherePK().Where("iceberg_type = ?", ViewType).Exists(ctx)
+               if err != nil {
+                       return false, fmt.Errorf("error checking view 
existence: %w", err)
+               }
+
+               return exists, nil
+       })
+}
+
+// LoadView loads a view from the catalog.
+func (c *Catalog) LoadView(ctx context.Context, identifier table.Identifier) 
(map[string]interface{}, error) {
+       ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".")
+       viewName := catalog.TableNameFromIdent(identifier)
+
+       view, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) 
(*sqlIcebergTable, error) {
+               v := new(sqlIcebergTable)
+               err := tx.NewSelect().Model(v).
+                       Where("catalog_name = ?", c.name).
+                       Where("table_namespace = ?", ns).
+                       Where("table_name = ?", viewName).
+                       Where("iceberg_type = ?", ViewType).
+                       Scan(ctx)
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchView, 
identifier)
+               }
+               if err != nil {
+                       return nil, fmt.Errorf("error encountered loading view 
%s: %w", identifier, err)
+               }
+
+               return v, nil
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       if !view.MetadataLocation.Valid {
+               return nil, fmt.Errorf("%w: %s, metadata location is missing", 
catalog.ErrNoSuchView, identifier)
+       }
+
+       viewMetadata := map[string]interface{}{
+               "name":              viewName,
+               "namespace":         ns,
+               "metadata-location": view.MetadataLocation.String,
+       }
+
+       fs, err := io.LoadFS(ctx, c.props, view.MetadataLocation.String)
+       if err != nil {
+               return nil, fmt.Errorf("error loading view metadata: %w", err)
+       }
+       inputFile, err := fs.Open(view.MetadataLocation.String)
+       if err != nil {
+               return viewMetadata, fmt.Errorf("error encountered loading view 
metadata %s: %w", identifier, err)
+       }
+       defer inputFile.Close()
+
+       var fullViewMetadata map[string]interface{}
+       if err := json.NewDecoder(inputFile).Decode(&fullViewMetadata); err != 
nil {
+               return viewMetadata, fmt.Errorf("error encountered decoding 
view metadata %s: %w", identifier, err)
+       }
+
+       fullViewMetadata["name"] = viewName
+       fullViewMetadata["namespace"] = ns
+       fullViewMetadata["metadata-location"] = view.MetadataLocation.String
+
+       if props, ok := 
fullViewMetadata["properties"].(map[string]interface{}); ok {
+               strProps := make(map[string]string)
+               for k, v := range props {
+                       if str, ok := v.(string); ok {
+                               strProps[k] = str
+                       } else if vJson, err := json.Marshal(v); err == nil {
+                               strProps[k] = string(vJson)
+                       }
+               }
+               fullViewMetadata["properties"] = strProps
+       }

Review Comment:
   same with this section, let's pull it out into a helper function that the 
other catalogs would be able to leverage too. We already have an internal 
package for shared utilities with catalogs that this could get added to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to