zeroshade commented on code in PR #414: URL: https://github.com/apache/iceberg-go/pull/414#discussion_r2130041841
########## catalog/sql/sql.go: ########## @@ -809,3 +823,350 @@ func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace table func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) { return c.namespaceExists(ctx, strings.Join(namespace, ".")) } + +// CreateView creates a new view in the catalog. +func (c *Catalog) CreateView(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, viewSQL string, props iceberg.Properties) error { + nsIdent := catalog.NamespaceFromIdent(identifier) + viewIdent := catalog.TableNameFromIdent(identifier) + ns := strings.Join(nsIdent, ".") + + exists, err := c.namespaceExists(ctx, ns) + if err != nil { + return err + } + if !exists { + return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, ns) + } + + exists, err = c.CheckViewExists(ctx, identifier) + if err != nil { + return err + } + if exists { + return fmt.Errorf("%w: %s", catalog.ErrViewAlreadyExists, identifier) + } + + loc, err := internal.ResolveTableLocation(ctx, "", ns, viewIdent, c.props, c.LoadNamespaceProperties) + if err != nil { + return err + } + + timestampMs := time.Now().UnixMilli() + versionId := int64(1) + + viewVersion := struct { + VersionID int64 `json:"version-id"` + TimestampMs int64 `json:"timestamp-ms"` + SchemaID int `json:"schema-id"` + Summary map[string]string `json:"summary"` + Operation string `json:"operation"` + Representations []struct { + Type string `json:"type"` + SQL string `json:"sql"` + Dialect string `json:"dialect"` + } `json:"representations"` + DefaultCatalog string `json:"default-catalog"` + DefaultNamespace []string `json:"default-namespace"` + }{ + VersionID: versionId, + TimestampMs: timestampMs, + SchemaID: schema.ID, + Summary: map[string]string{"sql": viewSQL}, + Operation: "create", + Representations: []struct { + Type string `json:"type"` + SQL string `json:"sql"` + Dialect string `json:"dialect"` + }{ + {Type: "sql", SQL: viewSQL, Dialect: "default"}, + }, + DefaultCatalog: c.name, + DefaultNamespace: nsIdent, + } + + viewVersionBytes, err := json.Marshal(viewVersion) + if err != nil { + return fmt.Errorf("failed to marshal view version: %w", err) + } + + if props == nil { + props = iceberg.Properties{} + } + props["view-version"] = string(viewVersionBytes) + props["view-format"] = "iceberg" + props["view-sql"] = viewSQL + + metadataLocation := loc + "/metadata/view-" + uuid.New().String() + ".metadata.json" + + viewUUID := uuid.New().String() + props["view-uuid"] = viewUUID + + viewMetadata := map[string]interface{}{ + "view-uuid": viewUUID, + "format-version": 1, + "location": loc, + "schema": schema, + "current-version-id": versionId, + "versions": map[string]interface{}{ + "1": viewVersion, + }, + "properties": props, + "version-log": []map[string]interface{}{ + { + "timestamp-ms": timestampMs, + "version-id": versionId, + }, + }, + } + + viewMetadataBytes, err := json.Marshal(viewMetadata) + if err != nil { + return fmt.Errorf("failed to marshal view metadata: %w", err) + } + + fs, err := io.LoadFS(ctx, c.props, metadataLocation) + if err != nil { + return fmt.Errorf("failed to load filesystem for view metadata: %w", err) + } + + wfs, ok := fs.(io.WriteFileIO) + if !ok { + return errors.New("filesystem IO does not support writing") + } + + out, err := wfs.Create(metadataLocation) + if err != nil { + return fmt.Errorf("failed to create view metadata file: %w", err) + } + defer out.Close() + + if _, err := out.Write(viewMetadataBytes); err != nil { + return fmt.Errorf("failed to write view metadata: %w", err) + } + + err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { + _, err := tx.NewInsert().Model(&sqlIcebergTable{ + CatalogName: c.name, + TableNamespace: ns, + TableName: viewIdent, + IcebergType: ViewType, + MetadataLocation: sql.NullString{String: metadataLocation, Valid: true}, + }).Exec(ctx) + if err != nil { + return fmt.Errorf("failed to create view: %w", err) + } + + return nil + }) + + return err +} + +// ListViews returns a list of view identifiers in the catalog. +func (c *Catalog) ListViews(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error] { + views, err := c.listViewsAll(ctx, namespace) + if err != nil { + return func(yield func(table.Identifier, error) bool) { + yield(table.Identifier{}, err) + } + } + + return func(yield func(table.Identifier, error) bool) { + for _, v := range views { + if !yield(v, nil) { + return + } + } + } +} + +func (c *Catalog) listViewsAll(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) { + if len(namespace) > 0 { + exists, err := c.namespaceExists(ctx, strings.Join(namespace, ".")) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, strings.Join(namespace, ".")) + } + } + + ns := strings.Join(namespace, ".") + views, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) ([]sqlIcebergTable, error) { + var views []sqlIcebergTable + err := tx.NewSelect().Model(&views). + Where("catalog_name = ?", c.name). + Where("table_namespace = ?", ns). + Where("iceberg_type = ?", ViewType). + Scan(ctx) + + return views, err + }) + if err != nil { + return nil, fmt.Errorf("error listing views for namespace '%s': %w", namespace, err) + } + + ret := make([]table.Identifier, len(views)) + for i, v := range views { + ret[i] = append(strings.Split(v.TableNamespace, "."), v.TableName) + } + + return ret, nil +} + +// DropView deletes a view from the catalog. +func (c *Catalog) DropView(ctx context.Context, identifier table.Identifier) error { + ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".") + viewName := catalog.TableNameFromIdent(identifier) + + metadataLocation := "" + view, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) (*sqlIcebergTable, error) { + v := new(sqlIcebergTable) + err := tx.NewSelect().Model(v). + Where("catalog_name = ?", c.name). + Where("table_namespace = ?", ns). + Where("table_name = ?", viewName). + Where("iceberg_type = ?", ViewType). + Scan(ctx) + if errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchView, identifier) + } + if err != nil { + return nil, fmt.Errorf("error encountered loading view %s: %w", identifier, err) + } + + return v, nil + }) + if err != nil { + return err + } + + if view.MetadataLocation.Valid { + metadataLocation = view.MetadataLocation.String + } + + err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error { + res, err := tx.NewDelete().Model(&sqlIcebergTable{ + CatalogName: c.name, + TableNamespace: ns, + TableName: viewName, + }).WherePK().Where("iceberg_type = ?", ViewType).Exec(ctx) + if err != nil { + return fmt.Errorf("failed to delete view entry: %w", err) + } + + n, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("error encountered when deleting view entry: %w", err) + } + + if n == 0 { + return fmt.Errorf("%w: %s", catalog.ErrNoSuchView, identifier) + } + + return nil + }) + if err != nil { + return err + } + + if metadataLocation != "" { + fs, err := io.LoadFS(ctx, c.props, metadataLocation) + if err != nil { + return nil + } + + _ = fs.Remove(metadataLocation) + } + + return nil +} + +// CheckViewExists returns true if a view exists in the catalog. +func (c *Catalog) CheckViewExists(ctx context.Context, identifier table.Identifier) (bool, error) { + ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".") + viewName := catalog.TableNameFromIdent(identifier) + + return withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) (bool, error) { + exists, err := tx.NewSelect().Model(&sqlIcebergTable{ + CatalogName: c.name, + TableNamespace: ns, + TableName: viewName, + }).WherePK().Where("iceberg_type = ?", ViewType).Exists(ctx) + if err != nil { + return false, fmt.Errorf("error checking view existence: %w", err) + } + + return exists, nil + }) +} + +// LoadView loads a view from the catalog. +func (c *Catalog) LoadView(ctx context.Context, identifier table.Identifier) (map[string]interface{}, error) { + ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".") + viewName := catalog.TableNameFromIdent(identifier) + + view, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) (*sqlIcebergTable, error) { + v := new(sqlIcebergTable) + err := tx.NewSelect().Model(v). + Where("catalog_name = ?", c.name). + Where("table_namespace = ?", ns). + Where("table_name = ?", viewName). + Where("iceberg_type = ?", ViewType). + Scan(ctx) + if errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchView, identifier) + } + if err != nil { + return nil, fmt.Errorf("error encountered loading view %s: %w", identifier, err) + } + + return v, nil + }) + if err != nil { + return nil, err + } + + if !view.MetadataLocation.Valid { + return nil, fmt.Errorf("%w: %s, metadata location is missing", catalog.ErrNoSuchView, identifier) + } + + viewMetadata := map[string]interface{}{ + "name": viewName, + "namespace": ns, + "metadata-location": view.MetadataLocation.String, + } + + fs, err := io.LoadFS(ctx, c.props, view.MetadataLocation.String) + if err != nil { + return nil, fmt.Errorf("error loading view metadata: %w", err) + } + inputFile, err := fs.Open(view.MetadataLocation.String) + if err != nil { + return viewMetadata, fmt.Errorf("error encountered loading view metadata %s: %w", identifier, err) + } + defer inputFile.Close() + + var fullViewMetadata map[string]interface{} + if err := json.NewDecoder(inputFile).Decode(&fullViewMetadata); err != nil { + return viewMetadata, fmt.Errorf("error encountered decoding view metadata %s: %w", identifier, err) + } + + fullViewMetadata["name"] = viewName + fullViewMetadata["namespace"] = ns + fullViewMetadata["metadata-location"] = view.MetadataLocation.String + + if props, ok := fullViewMetadata["properties"].(map[string]interface{}); ok { + strProps := make(map[string]string) + for k, v := range props { + if str, ok := v.(string); ok { + strProps[k] = str + } else if vJson, err := json.Marshal(v); err == nil { + strProps[k] = string(vJson) + } + } + fullViewMetadata["properties"] = strProps + } Review Comment: same with this section, let's pull it out into a helper function that the other catalogs would be able to leverage too. We already have an internal package for shared utilities with catalogs that this could get added to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org