zeroshade commented on code in PR #1034:
URL: https://github.com/apache/iceberg-go/pull/1034#discussion_r3203044406
##########
catalog/hadoop/hadoop.go:
##########
@@ -296,12 +298,208 @@ func (c *Catalog) findVersion(ident table.Identifier)
(int, error) {
return c.scanForward(ident, maxVer), nil
}
-func (c *Catalog) CreateTable(_ context.Context, _ table.Identifier, _
*iceberg.Schema, _ ...catalog.CreateTableOpt) (*table.Table, error) {
- return nil, errors.New("hadoop catalog: CreateTable not yet
implemented")
+func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc
*iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
+ var cfg catalog.CreateTableCfg
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ if len(ident) < 2 {
+ return nil, errors.New("hadoop catalog: table identifier must
have at least a namespace and table name")
+ }
+
+ ns := catalog.NamespaceFromIdent(ident)
+ nsPath := c.namespaceToPath(ns)
+
+ info, err := os.Stat(nsPath)
+ if os.IsNotExist(err) || (err == nil && !info.IsDir()) {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace,
strings.Join(ns, "."))
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("hadoop catalog: failed to stat
namespace directory: %w", err)
+ }
+
+ loc := c.defaultTableLocation(ident)
+ if cfg.Location != "" && cfg.Location != loc {
+ return nil, errors.New("hadoop catalog: custom table locations
are not supported")
+ }
+
+ if isTableDir(loc) {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrTableAlreadyExists,
strings.Join(ident, "."))
+ }
+
+ metadata, err := table.NewMetadata(sc, cfg.PartitionSpec,
cfg.SortOrder, loc, cfg.Properties)
+ if err != nil {
+ return nil, fmt.Errorf("hadoop catalog: failed to create table
metadata: %w", err)
+ }
+
+ metaDir := c.metadataDir(ident)
+ if err := os.MkdirAll(metaDir, 0o755); err != nil {
+ return nil, fmt.Errorf("hadoop catalog: failed to create
metadata directory: %w", err)
+ }
+
+ version := 1
+ metaPath := c.metadataFilePath(ident, version)
+ tempPath := filepath.Join(metaDir, uuid.New().String()+".metadata.json")
+
+ compression := table.MetadataCompressionDefault
+ if cfg.Properties != nil {
+ if v, ok := cfg.Properties[table.MetadataCompressionKey]; ok {
+ compression = v
+ }
+ }
+
+ if err := internal.WriteTableMetadata(metadata, icebergio.LocalFS{},
tempPath, compression); err != nil {
+ os.Remove(tempPath)
+
+ return nil, fmt.Errorf("hadoop catalog: failed to write table
metadata: %w", err)
+ }
+
+ if err := os.Rename(tempPath, metaPath); err != nil {
+ os.Remove(tempPath)
+
+ return nil, fmt.Errorf("hadoop catalog: failed to commit
metadata file: %w", err)
+ }
+
+ c.writeVersionHint(ident, version)
+
+ tbl := table.New(
+ ident,
+ metadata,
+ metaPath,
+ icebergio.LoadFSFunc(c.props, metaPath),
+ c,
+ )
+
+ return tbl, nil
+}
+
+func (c *Catalog) LoadTable(ctx context.Context, ident table.Identifier)
(*table.Table, error) {
+ if len(ident) < 2 {
+ return nil, errors.New("hadoop catalog: table identifier must
have at least a namespace and table name")
+ }
+
+ ver, err := c.findVersion(ident)
+ if err != nil {
+ return nil, err
+ }
+
+ metaPath := c.metadataFilePath(ident, ver)
+
+ return table.NewFromLocation(ctx, ident, metaPath,
icebergio.LoadFSFunc(c.props, metaPath), c)
}
-func (c *Catalog) CommitTable(_ context.Context, _ table.Identifier, _
[]table.Requirement, _ []table.Update) (table.Metadata, string, error) {
- return nil, "", errors.New("hadoop catalog: CommitTable not yet
implemented")
+func (c *Catalog) CheckTableExists(_ context.Context, ident table.Identifier)
(bool, error) {
+ if len(ident) < 2 {
+ return false, nil
+ }
+
+ return isTableDir(c.tableToPath(ident)), nil
+}
+
+// writeMetadataLocationKey is the Java Iceberg property that allows relocating
+// metadata to a custom path. The Hadoop catalog forbids this because it
derives
+// metadata paths from the table identifier.
+const writeMetadataLocationKey = "write.metadata.location"
Review Comment:
should this be a constant at the catalog package level or in table
properties instead of being in the hadoop package?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]