zeroshade commented on code in PR #1034:
URL: https://github.com/apache/iceberg-go/pull/1034#discussion_r3203044406


##########
catalog/hadoop/hadoop.go:
##########
@@ -296,12 +298,208 @@ func (c *Catalog) findVersion(ident table.Identifier) 
(int, error) {
        return c.scanForward(ident, maxVer), nil
 }
 
-func (c *Catalog) CreateTable(_ context.Context, _ table.Identifier, _ 
*iceberg.Schema, _ ...catalog.CreateTableOpt) (*table.Table, error) {
-       return nil, errors.New("hadoop catalog: CreateTable not yet 
implemented")
+func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc 
*iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
+       var cfg catalog.CreateTableCfg
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+
+       if len(ident) < 2 {
+               return nil, errors.New("hadoop catalog: table identifier must 
have at least a namespace and table name")
+       }
+
+       ns := catalog.NamespaceFromIdent(ident)
+       nsPath := c.namespaceToPath(ns)
+
+       info, err := os.Stat(nsPath)
+       if os.IsNotExist(err) || (err == nil && !info.IsDir()) {
+               return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, 
strings.Join(ns, "."))
+       }
+
+       if err != nil {
+               return nil, fmt.Errorf("hadoop catalog: failed to stat 
namespace directory: %w", err)
+       }
+
+       loc := c.defaultTableLocation(ident)
+       if cfg.Location != "" && cfg.Location != loc {
+               return nil, errors.New("hadoop catalog: custom table locations 
are not supported")
+       }
+
+       if isTableDir(loc) {
+               return nil, fmt.Errorf("%w: %s", catalog.ErrTableAlreadyExists, 
strings.Join(ident, "."))
+       }
+
+       metadata, err := table.NewMetadata(sc, cfg.PartitionSpec, 
cfg.SortOrder, loc, cfg.Properties)
+       if err != nil {
+               return nil, fmt.Errorf("hadoop catalog: failed to create table 
metadata: %w", err)
+       }
+
+       metaDir := c.metadataDir(ident)
+       if err := os.MkdirAll(metaDir, 0o755); err != nil {
+               return nil, fmt.Errorf("hadoop catalog: failed to create 
metadata directory: %w", err)
+       }
+
+       version := 1
+       metaPath := c.metadataFilePath(ident, version)
+       tempPath := filepath.Join(metaDir, uuid.New().String()+".metadata.json")
+
+       compression := table.MetadataCompressionDefault
+       if cfg.Properties != nil {
+               if v, ok := cfg.Properties[table.MetadataCompressionKey]; ok {
+                       compression = v
+               }
+       }
+
+       if err := internal.WriteTableMetadata(metadata, icebergio.LocalFS{}, 
tempPath, compression); err != nil {
+               os.Remove(tempPath)
+
+               return nil, fmt.Errorf("hadoop catalog: failed to write table 
metadata: %w", err)
+       }
+
+       if err := os.Rename(tempPath, metaPath); err != nil {
+               os.Remove(tempPath)
+
+               return nil, fmt.Errorf("hadoop catalog: failed to commit 
metadata file: %w", err)
+       }
+
+       c.writeVersionHint(ident, version)
+
+       tbl := table.New(
+               ident,
+               metadata,
+               metaPath,
+               icebergio.LoadFSFunc(c.props, metaPath),
+               c,
+       )
+
+       return tbl, nil
+}
+
+func (c *Catalog) LoadTable(ctx context.Context, ident table.Identifier) 
(*table.Table, error) {
+       if len(ident) < 2 {
+               return nil, errors.New("hadoop catalog: table identifier must 
have at least a namespace and table name")
+       }
+
+       ver, err := c.findVersion(ident)
+       if err != nil {
+               return nil, err
+       }
+
+       metaPath := c.metadataFilePath(ident, ver)
+
+       return table.NewFromLocation(ctx, ident, metaPath, 
icebergio.LoadFSFunc(c.props, metaPath), c)
 }
 
-func (c *Catalog) CommitTable(_ context.Context, _ table.Identifier, _ 
[]table.Requirement, _ []table.Update) (table.Metadata, string, error) {
-       return nil, "", errors.New("hadoop catalog: CommitTable not yet 
implemented")
+func (c *Catalog) CheckTableExists(_ context.Context, ident table.Identifier) 
(bool, error) {
+       if len(ident) < 2 {
+               return false, nil
+       }
+
+       return isTableDir(c.tableToPath(ident)), nil
+}
+
+// writeMetadataLocationKey is the Java Iceberg property that allows relocating
+// metadata to a custom path. The Hadoop catalog forbids this because it 
derives
+// metadata paths from the table identifier.
+const writeMetadataLocationKey = "write.metadata.location"

Review Comment:
   should this be a constant at the catalog package level or in table 
properties instead of being in the hadoop package?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to