zeroshade commented on code in PR #414:
URL: https://github.com/apache/iceberg-go/pull/414#discussion_r2105433224
##########
catalog/sql/sql_test.go:
##########
@@ -299,8 +303,12 @@ func (s *SqliteCatalogTestSuite)
TestCreationAllTablesExist() {
"catalog_name" VARCHAR NOT NULL,
"table_namespace" VARCHAR NOT NULL,
"table_name" VARCHAR NOT NULL,
+ "iceberg_type" VARCHAR,
"metadata_location" VARCHAR,
"previous_metadata_location" VARCHAR,
+ "view_sql" VARCHAR,
+ "schema_json" VARCHAR,
+ "properties" JSON,
Review Comment:
same here, fix formatting and remove the extra columns
##########
catalog/sql/sql.go:
##########
@@ -809,3 +818,347 @@ func (c *Catalog) UpdateNamespaceProperties(ctx
context.Context, namespace table
func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace
table.Identifier) (bool, error) {
return c.namespaceExists(ctx, strings.Join(namespace, "."))
}
+
+// CreateView creates a new view in the catalog.
+func (c *Catalog) CreateView(ctx context.Context, identifier table.Identifier,
schema *iceberg.Schema, viewSQL string, props iceberg.Properties) error {
+ nsIdent := catalog.NamespaceFromIdent(identifier)
+ viewIdent := catalog.TableNameFromIdent(identifier)
+ ns := strings.Join(nsIdent, ".")
+
+ exists, err := c.namespaceExists(ctx, ns)
+ if err != nil {
+ return err
+ }
+ if !exists {
+ return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, ns)
+ }
+
+ exists, err = c.CheckViewExists(ctx, identifier)
+ if err != nil {
+ return err
+ }
+ if exists {
+ return fmt.Errorf("%w: %s", catalog.ErrViewAlreadyExists,
identifier)
+ }
+
+ loc, err := internal.ResolveTableLocation(ctx, "", ns, viewIdent,
c.props, c.LoadNamespaceProperties)
+ if err != nil {
+ return err
+ }
+
+ timestampMs := time.Now().UnixMilli()
+ versionId := int64(1)
+
+ viewVersion := struct {
+ VersionID int64 `json:"version-id"`
+ TimestampMs int64 `json:"timestamp-ms"`
+ SchemaID int `json:"schema-id"`
+ Summary map[string]string `json:"summary"`
+ Operation string `json:"operation"`
+ Representations []struct {
+ Type string `json:"type"`
+ SQL string `json:"sql"`
+ Dialect string `json:"dialect"`
+ } `json:"representations"`
+ DefaultCatalog string `json:"default-catalog"`
+ DefaultNamespace []string `json:"default-namespace"`
+ }{
+ VersionID: versionId,
+ TimestampMs: timestampMs,
+ SchemaID: schema.ID,
+ Summary: map[string]string{"sql": viewSQL},
+ Operation: "create",
+ Representations: []struct {
+ Type string `json:"type"`
+ SQL string `json:"sql"`
+ Dialect string `json:"dialect"`
+ }{
+ {Type: "sql", SQL: viewSQL, Dialect: "default"},
+ },
+ DefaultCatalog: c.name,
+ DefaultNamespace: nsIdent,
+ }
+
+ viewVersionBytes, err := json.Marshal(viewVersion)
+ if err != nil {
+ return fmt.Errorf("failed to marshal view version: %w", err)
+ }
+
+ if props == nil {
+ props = iceberg.Properties{}
+ }
+ props["view-version"] = string(viewVersionBytes)
+ props["view-format"] = "iceberg"
+ props["view-sql"] = viewSQL
+
+ metadataLocation := loc + "/metadata/view-" + uuid.New().String() +
".metadata.json"
+
+ viewUUID := uuid.New().String()
+ props["view-uuid"] = viewUUID
+
+ viewMetadata := map[string]interface{}{
+ "view-uuid": viewUUID,
+ "format-version": 1,
+ "location": loc,
+ "schema": schema,
+ "current-version-id": versionId,
+ "versions": map[string]interface{}{
+ "1": viewVersion,
+ },
+ "properties": props,
+ "version-log": []map[string]interface{}{
+ {
+ "timestamp-ms": timestampMs,
+ "version-id": versionId,
+ },
+ },
+ }
+
+ viewMetadataBytes, err := json.Marshal(viewMetadata)
+ if err != nil {
+ return fmt.Errorf("failed to marshal view metadata: %w", err)
+ }
+
+ fs, err := io.LoadFS(ctx, c.props, metadataLocation)
+ if err != nil {
+ return fmt.Errorf("failed to load filesystem for view metadata:
%w", err)
+ }
+
+ wfs, ok := fs.(io.WriteFileIO)
+ if !ok {
+ return errors.New("filesystem IO does not support writing")
+ }
+
+ out, err := wfs.Create(metadataLocation)
+ if err != nil {
+ return fmt.Errorf("failed to create view metadata file: %w",
err)
+ }
+ defer out.Close()
+
+ if _, err := out.Write(viewMetadataBytes); err != nil {
+ return fmt.Errorf("failed to write view metadata: %w", err)
+ }
+
+ err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error
{
+ _, err := tx.NewInsert().Model(&sqlIcebergTable{
+ CatalogName: c.name,
+ TableNamespace: ns,
+ TableName: viewIdent,
+ IcebergType: "VIEW",
+ MetadataLocation: sql.NullString{String:
metadataLocation, Valid: true},
+ }).Exec(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to create view: %w", err)
+ }
+
+ return nil
+ })
+
+ return err
+}
+
+// ListViews returns a list of view identifiers in the catalog.
+func (c *Catalog) ListViews(ctx context.Context, namespace table.Identifier)
iter.Seq2[table.Identifier, error] {
+ views, err := c.listViewsAll(ctx, namespace)
+ if err != nil {
+ return func(yield func(table.Identifier, error) bool) {
+ yield(table.Identifier{}, err)
+ }
+ }
+
+ return func(yield func(table.Identifier, error) bool) {
+ for _, v := range views {
+ if !yield(v, nil) {
+ return
+ }
+ }
+ }
+}
+
+func (c *Catalog) listViewsAll(ctx context.Context, namespace
table.Identifier) ([]table.Identifier, error) {
+ if len(namespace) > 0 {
+ exists, err := c.namespaceExists(ctx, strings.Join(namespace,
"."))
+ if err != nil {
+ return nil, err
+ }
+ if !exists {
+ return nil, fmt.Errorf("%w: %s",
catalog.ErrNoSuchNamespace, strings.Join(namespace, "."))
+ }
+ }
+
+ ns := strings.Join(namespace, ".")
+ views, err := withReadTx(ctx, c.db, func(ctx context.Context, tx
bun.Tx) ([]sqlIcebergTable, error) {
+ var views []sqlIcebergTable
+ err := tx.NewSelect().Model(&views).
+ Where("catalog_name = ?", c.name).
+ Where("table_namespace = ?", ns).
+ Where("iceberg_type = ?", "VIEW").
+ Scan(ctx)
+
+ return views, err
+ })
+ if err != nil {
+ return nil, fmt.Errorf("error listing views for namespace '%s':
%w", namespace, err)
+ }
+
+ ret := make([]table.Identifier, len(views))
+ for i, v := range views {
+ ret[i] = append(strings.Split(v.TableNamespace, "."),
v.TableName)
+ }
+
+ return ret, nil
+}
+
+// DropView deletes a view from the catalog.
+func (c *Catalog) DropView(ctx context.Context, identifier table.Identifier)
error {
+ ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".")
+ viewName := catalog.TableNameFromIdent(identifier)
+
+ metadataLocation := ""
+ view, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx)
(*sqlIcebergTable, error) {
+ v := new(sqlIcebergTable)
+ err := tx.NewSelect().Model(v).
+ Where("catalog_name = ?", c.name).
+ Where("table_namespace = ?", ns).
+ Where("table_name = ?", viewName).
+ Where("iceberg_type = ?", "VIEW").
+ Scan(ctx)
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchView,
identifier)
+ }
+ if err != nil {
+ return nil, fmt.Errorf("error encountered loading view
%s: %w", identifier, err)
+ }
+
+ return v, nil
+ })
+ if err != nil {
+ return err
+ }
+
+ if view.MetadataLocation.Valid {
+ metadataLocation = view.MetadataLocation.String
+ }
+
+ err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error
{
+ res, err := tx.NewDelete().Model(&sqlIcebergTable{
+ CatalogName: c.name,
+ TableNamespace: ns,
+ TableName: viewName,
+ }).WherePK().Where("iceberg_type = ?", "VIEW").Exec(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to delete view entry: %w",
err)
+ }
+
+ n, err := res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("error encountered when deleting view
entry: %w", err)
+ }
+
+ if n == 0 {
+ return fmt.Errorf("%w: %s", catalog.ErrNoSuchView,
identifier)
+ }
+
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ if metadataLocation != "" {
+ fs, err := io.LoadFS(ctx, c.props, metadataLocation)
+ if err != nil {
+ return nil
+ }
+
+ _ = fs.Remove(metadataLocation)
+ }
+
+ return nil
+}
+
+// CheckViewExists returns true if a view exists in the catalog.
+func (c *Catalog) CheckViewExists(ctx context.Context, identifier
table.Identifier) (bool, error) {
+ ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".")
+ viewName := catalog.TableNameFromIdent(identifier)
+
+ return withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx)
(bool, error) {
+ exists, err := tx.NewSelect().Model(&sqlIcebergTable{
+ CatalogName: c.name,
+ TableNamespace: ns,
+ TableName: viewName,
+ }).WherePK().Where("iceberg_type = ?", "VIEW").Exists(ctx)
+ if err != nil {
+ return false, fmt.Errorf("error checking view
existence: %w", err)
+ }
+
+ return exists, nil
+ })
+}
+
+// LoadView loads a view from the catalog.
+func (c *Catalog) LoadView(ctx context.Context, identifier table.Identifier)
(map[string]interface{}, error) {
+ ns := strings.Join(catalog.NamespaceFromIdent(identifier), ".")
+ viewName := catalog.TableNameFromIdent(identifier)
+
+ view, err := withReadTx(ctx, c.db, func(ctx context.Context, tx bun.Tx)
(*sqlIcebergTable, error) {
+ v := new(sqlIcebergTable)
+ err := tx.NewSelect().Model(v).
+ Where("catalog_name = ?", c.name).
+ Where("table_namespace = ?", ns).
+ Where("table_name = ?", viewName).
+ Where("iceberg_type = ?", "VIEW").
+ Scan(ctx)
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchView,
identifier)
+ }
+ if err != nil {
+ return nil, fmt.Errorf("error encountered loading view
%s: %w", identifier, err)
+ }
+
+ return v, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ if !view.MetadataLocation.Valid {
+ return nil, fmt.Errorf("%w: %s, metadata location is missing",
catalog.ErrNoSuchView, identifier)
+ }
+
+ viewMetadata := map[string]interface{}{
+ "name": viewName,
+ "namespace": ns,
+ "metadata-location": view.MetadataLocation.String,
+ }
+
+ fs, err := io.LoadFS(ctx, c.props, view.MetadataLocation.String)
Review Comment:
as the linting says, you need to check the `err` here :)
##########
catalog/sql/sql_test.go:
##########
@@ -280,9 +280,13 @@ func (s *SqliteCatalogTestSuite)
TestCreationOneTableExists() {
_, err := sqldb.Exec(`CREATE TABLE "iceberg_tables" (
"catalog_name" VARCHAR NOT NULL,
"table_namespace" VARCHAR NOT NULL,
- "table_name" VARCHAR NOT NULL,
+ "table_name" VARCHAR NOT NULL,
+ "iceberg_type" VARCHAR,
"metadata_location" VARCHAR,
"previous_metadata_location" VARCHAR,
+ "view_sql" VARCHAR,
+ "schema_json" VARCHAR,
+ "properties" JSON,
Review Comment:
remove the columns that we're no longer adding :smile:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]