Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]

2024-01-14 Thread via GitHub


HonahX commented on code in PR #265:
URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451680767


##
pyiceberg/catalog/sql.py:
##
@@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) 
-> None:
 identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
 database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
 with Session(self.engine) as session:
-res = session.execute(
-delete(IcebergTables).where(
-IcebergTables.catalog_name == self.name,
-IcebergTables.table_namespace == database_name,
-IcebergTables.table_name == table_name,
+if self.engine.dialect.supports_sane_rowcount:
+res = session.execute(
+delete(IcebergTables).where(
+IcebergTables.catalog_name == self.name,
+IcebergTables.table_namespace == database_name,
+IcebergTables.table_name == table_name,
+)
 )
-)
+if res.rowcount < 1:
+raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}")
+else:
+try:
+tbl = (
+session.query(IcebergTables)
+.with_for_update(of=IcebergTables, nowait=True)

Review Comment:
   Thanks for the explanation! Reflecting further, I think that using neither 
NOWAIT nor SKIP_LOCKED might be best for maintaining consistency. Currently, 
when the engine supports accurate rowcounts, we employ UPDATE TABLE or DELETE 
TABLE, which inherently wait for locks. If the engine lacks rowcount support, 
sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE 
operations, as it waits for locks. IThis consistency eliminates the need to 
handle additional exceptions, as any concurrent transaction will naturally lead 
to a `NoResultFound` scenario once the table is dropped, renamed, or updated.
   
   If we later decide that avoiding lock waits is preferable in fallback 
situations, we could consider the following modifications:
   1. For `drop_table` and `rename_table`, I think you're right that we can 
catch `sqlalchemy.exc.OperationalError` and re-raise it as a new exception 
indicating that another process is set to delete the table. Using 
`CommitFailedException` doesn't seem appropriate here, as it's primarily meant 
for failed commits on iceberg tables.
   2. For `_commit_table`, `skip_locked` might still be useful. At the point 
when the SQL command is executed, we're assured of the table's existence. 
Therefore, encountering `NoResultFound` would directly imply a concurrent 
commit attempt.
   
   Do you think the concern about maintaining consistency between cases that do 
and do not support rowcount is valid? If not, what are your thoughts on 
adopting NOWAIT for `drop_table` and `rename_table`, and SKIP_LOCKED for 
`_commit_table`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]

2024-01-14 Thread via GitHub


HonahX commented on code in PR #265:
URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451680767


##
pyiceberg/catalog/sql.py:
##
@@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) 
-> None:
 identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
 database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
 with Session(self.engine) as session:
-res = session.execute(
-delete(IcebergTables).where(
-IcebergTables.catalog_name == self.name,
-IcebergTables.table_namespace == database_name,
-IcebergTables.table_name == table_name,
+if self.engine.dialect.supports_sane_rowcount:
+res = session.execute(
+delete(IcebergTables).where(
+IcebergTables.catalog_name == self.name,
+IcebergTables.table_namespace == database_name,
+IcebergTables.table_name == table_name,
+)
 )
-)
+if res.rowcount < 1:
+raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}")
+else:
+try:
+tbl = (
+session.query(IcebergTables)
+.with_for_update(of=IcebergTables, nowait=True)

Review Comment:
   Thanks for the explanation! Reflecting further, I think that using neither 
NOWAIT nor SKIP_LOCKED might be better for maintaining consistency. Currently, 
when the engine supports accurate rowcounts, we employ UPDATE TABLE or DELETE 
TABLE, which inherently wait for locks. If the engine lacks rowcount support, 
sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE 
operations, as it waits for locks. IThis consistency eliminates the need to 
handle additional exceptions, as any concurrent transaction will naturally lead 
to a `NoResultFound` scenario once the table is dropped, renamed, or updated.
   
   If we later decide that avoiding lock waits is preferable in fallback 
situations, we could consider the following modifications:
   1. For `drop_table` and `rename_table`, I think you're right that we can 
catch `sqlalchemy.exc.OperationalError` and re-raise it as a new exception 
indicating that another process is set to delete the table. Using 
`CommitFailedException` doesn't seem appropriate here, as it's primarily meant 
for failed commits on iceberg tables.
   2. For `_commit_table`, `skip_locked` might still be useful. At the point 
when the SQL command is executed, we're assured of the table's existence. 
Therefore, encountering `NoResultFound` would directly imply a concurrent 
commit attempt.
   
   Do you think the concern about maintaining consistency between cases that do 
and do not support rowcount is valid? If not, what are your thoughts on 
adopting NOWAIT for `drop_table` and `rename_table`, and SKIP_LOCKED for 
`_commit_table`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]

2024-01-14 Thread via GitHub


HonahX commented on code in PR #265:
URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451680767


##
pyiceberg/catalog/sql.py:
##
@@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) 
-> None:
 identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
 database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
 with Session(self.engine) as session:
-res = session.execute(
-delete(IcebergTables).where(
-IcebergTables.catalog_name == self.name,
-IcebergTables.table_namespace == database_name,
-IcebergTables.table_name == table_name,
+if self.engine.dialect.supports_sane_rowcount:
+res = session.execute(
+delete(IcebergTables).where(
+IcebergTables.catalog_name == self.name,
+IcebergTables.table_namespace == database_name,
+IcebergTables.table_name == table_name,
+)
 )
-)
+if res.rowcount < 1:
+raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}")
+else:
+try:
+tbl = (
+session.query(IcebergTables)
+.with_for_update(of=IcebergTables, nowait=True)

Review Comment:
   Thanks for the explanation! Reflecting further, I think that using neither 
NOWAIT nor SKIP_LOCKED might be better for maintaining consistency. Currently, 
when the engine supports accurate rowcounts, we employ UPDATE TABLE or DELETE 
TABLE, which inherently wait for locks. If the engine lacks rowcount support, 
sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE 
operations, as it waits for locks. This consistency eliminates the need to 
handle additional exceptions, as any concurrent transaction will naturally lead 
to a `NoResultFound` scenario once the table is dropped, renamed, or updated.
   
   If we later decide that avoiding lock waits is preferable in fallback 
situations, we could consider the following modifications:
   1. For `drop_table` and `rename_table`, I think you're right that we can 
catch `sqlalchemy.exc.OperationalError` and re-raise it as a new exception 
indicating that another process is set to delete the table. Using 
`CommitFailedException` doesn't seem appropriate here, as it's primarily meant 
for failed commits on iceberg tables.
   2. For `_commit_table`, `skip_locked` might still be useful. At the point 
when the SQL command is executed, we're assured of the table's existence. 
Therefore, encountering `NoResultFound` would directly imply a concurrent 
commit attempt.
   
   Do you think the concern about maintaining consistency between cases that do 
and do not support rowcount is valid? If not, what are your thoughts on 
adopting NOWAIT for `drop_table` and `rename_table`, and SKIP_LOCKED for 
`_commit_table`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]

2024-01-14 Thread via GitHub


HonahX commented on code in PR #265:
URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451680767


##
pyiceberg/catalog/sql.py:
##
@@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) 
-> None:
 identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
 database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
 with Session(self.engine) as session:
-res = session.execute(
-delete(IcebergTables).where(
-IcebergTables.catalog_name == self.name,
-IcebergTables.table_namespace == database_name,
-IcebergTables.table_name == table_name,
+if self.engine.dialect.supports_sane_rowcount:
+res = session.execute(
+delete(IcebergTables).where(
+IcebergTables.catalog_name == self.name,
+IcebergTables.table_namespace == database_name,
+IcebergTables.table_name == table_name,
+)
 )
-)
+if res.rowcount < 1:
+raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}")
+else:
+try:
+tbl = (
+session.query(IcebergTables)
+.with_for_update(of=IcebergTables, nowait=True)

Review Comment:
   Thanks for the explanation! Reflecting further, I think that using neither 
NOWAIT nor SKIP_LOCKED might be better for maintaining consistency. Currently, 
when the engine supports accurate rowcounts, we employ UPDATE TABLE or DELETE 
TABLE, which inherently wait for locks. If the engine lacks rowcount support, 
sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE 
operations, as it waits for locks. This consistency eliminates the need to 
handle additional exceptions, as any concurrent transaction will naturally lead 
to a `NoResultFound` scenario once the table is dropped, renamed, or updated.
   
   Shifting focus to potential alternatives, If we later decide that avoiding 
lock waits is preferable in fallback situations, we could consider the 
following modifications:
   1. For `drop_table` and `rename_table`, I think you're right that we can 
catch `sqlalchemy.exc.OperationalError` and re-raise it as a new exception 
indicating that another process is set to delete the table. Using 
`CommitFailedException` doesn't seem appropriate here, as it's primarily meant 
for failed commits on iceberg tables.
   2. For `_commit_table`, `skip_locked` might still be useful. At the point 
when the SQL command is executed, we're assured of the table's existence. 
Therefore, encountering `NoResultFound` would directly imply a concurrent 
commit attempt.
   
   Do you think the concern about maintaining consistency between cases that do 
and do not support rowcount is valid? If not, what are your thoughts on 
adopting NOWAIT for `drop_table` and `rename_table`, and SKIP_LOCKED for 
`_commit_table`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]

2024-01-14 Thread via GitHub


syun64 commented on code in PR #265:
URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451754672


##
pyiceberg/catalog/sql.py:
##
@@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) 
-> None:
 identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
 database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
 with Session(self.engine) as session:
-res = session.execute(
-delete(IcebergTables).where(
-IcebergTables.catalog_name == self.name,
-IcebergTables.table_namespace == database_name,
-IcebergTables.table_name == table_name,
+if self.engine.dialect.supports_sane_rowcount:
+res = session.execute(
+delete(IcebergTables).where(
+IcebergTables.catalog_name == self.name,
+IcebergTables.table_namespace == database_name,
+IcebergTables.table_name == table_name,
+)
 )
-)
+if res.rowcount < 1:
+raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}")
+else:
+try:
+tbl = (
+session.query(IcebergTables)
+.with_for_update(of=IcebergTables, nowait=True)

Review Comment:
   Hi @HonahX . Thank you for explaining so patiently.
   
   > If the engine lacks rowcount support, sticking with SELECT FOR UPDATE 
ensures consistent behavior with UPDATE TABLE operations, as it waits for 
locks. This consistency eliminates the need to handle additional exceptions, as 
any concurrent transaction will naturally lead to a NoResultFound scenario once 
the table is dropped, renamed, or updated.
   
   I agree with your analysis here, and I think that dropping **nowait** and 
**skip_locked** will be best to mimic the other behavior with UPDATE TABLE 
operation as closely as possible. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] support python 3.12 [iceberg-python]

2024-01-14 Thread via GitHub


MehulBatra commented on code in PR #254:
URL: https://github.com/apache/iceberg-python/pull/254#discussion_r1451758129


##
pyproject.toml:
##
@@ -29,7 +29,8 @@ classifiers = [
   "Programming Language :: Python :: 3.8",
   "Programming Language :: Python :: 3.9",
   "Programming Language :: Python :: 3.10",
-  "Programming Language :: Python :: 3.11"
+  "Programming Language :: Python :: 3.11",
+  "Programming Language :: Python :: 3.12"

Review Comment:
   sure, it wouldn't harm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] support python 3.12 [iceberg-python]

2024-01-14 Thread via GitHub


MehulBatra commented on code in PR #254:
URL: https://github.com/apache/iceberg-python/pull/254#discussion_r1451758589


##
pyproject.toml:
##
@@ -70,6 +71,9 @@ adlfs = { version = ">=2023.1.0,<2024.1.0", optional = true }
 gcsfs = { version = ">=2023.1.0,<2024.1.0", optional = true }
 psycopg2-binary = { version = ">=2.9.6", optional = true }
 sqlalchemy = { version = "^2.0.18", optional = true }
+numpy = { version = "^1.26.3", python = "3.12" }
+greenlet = {version = "^3.0.3", python = "3.12" }

Review Comment:
   As we are on the latest version of both the libraries, we should be good 
with this as upcoming python version support should be there too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451796402


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();

Review Comment:
   What about using `DynConstructors` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451796523


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();

Review Comment:
   You should be able to chain the `impl` calls to do the same thing without 
logic here. It will use the first one that is found.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451797043


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();
+  BoundMethod addResourceMethod =
+  DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+  BoundMethod setMethod =
+  DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+  //  load any config files in the specified config directory
+  String hadoopConfDir = config.hadoopConfDir();
+  if (hadoopConfDir != null) {
+HADOOP_CONF_FILES.forEach(
+confFile -> {
+  Path path = Paths.get(hadoopConfDir, confFile);
+  if (Files.exists(path)) {
+try {
+  addResourceMethod.invoke(path.toUri().toURL());
+} catch (IOException e) {
+  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+}
+  }
+});
+  }
+
+  // set any Hadoop properties specified in the s

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451797289


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();
+  BoundMethod addResourceMethod =
+  DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+  BoundMethod setMethod =
+  DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+  //  load any config files in the specified config directory
+  String hadoopConfDir = config.hadoopConfDir();
+  if (hadoopConfDir != null) {
+HADOOP_CONF_FILES.forEach(
+confFile -> {
+  Path path = Paths.get(hadoopConfDir, confFile);
+  if (Files.exists(path)) {
+try {
+  addResourceMethod.invoke(path.toUri().toURL());
+} catch (IOException e) {
+  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+}
+  }
+});
+  }
+
+  // set any Hadoop properties specified in the s

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451797484


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();
+  BoundMethod addResourceMethod =
+  DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+  BoundMethod setMethod =
+  DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+  //  load any config files in the specified config directory
+  String hadoopConfDir = config.hadoopConfDir();
+  if (hadoopConfDir != null) {
+HADOOP_CONF_FILES.forEach(
+confFile -> {
+  Path path = Paths.get(hadoopConfDir, confFile);
+  if (Files.exists(path)) {
+try {
+  addResourceMethod.invoke(path.toUri().toURL());
+} catch (IOException e) {
+  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+}
+  }
+});
+  }
+
+  // set any Hadoop properties specified in the s

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451798132


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();
+  BoundMethod addResourceMethod =
+  DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+  BoundMethod setMethod =
+  DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+  //  load any config files in the specified config directory
+  String hadoopConfDir = config.hadoopConfDir();
+  if (hadoopConfDir != null) {
+HADOOP_CONF_FILES.forEach(
+confFile -> {
+  Path path = Paths.get(hadoopConfDir, confFile);
+  if (Files.exists(path)) {
+try {
+  addResourceMethod.invoke(path.toUri().toURL());
+} catch (IOException e) {
+  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+}
+  }
+});
+  }
+
+  // set any Hadoop properties specified in the s

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451798927


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();
+  BoundMethod addResourceMethod =
+  DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+  BoundMethod setMethod =
+  DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+  //  load any config files in the specified config directory
+  String hadoopConfDir = config.hadoopConfDir();
+  if (hadoopConfDir != null) {
+HADOOP_CONF_FILES.forEach(
+confFile -> {
+  Path path = Paths.get(hadoopConfDir, confFile);
+  if (Files.exists(path)) {
+try {
+  addResourceMethod.invoke(path.toUri().toURL());
+} catch (IOException e) {
+  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+}
+  }
+});
+  }
+
+  // set any Hadoop properties specified in the s

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451799004


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();
+  BoundMethod addResourceMethod =
+  DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+  BoundMethod setMethod =
+  DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+  //  load any config files in the specified config directory
+  String hadoopConfDir = config.hadoopConfDir();
+  if (hadoopConfDir != null) {
+HADOOP_CONF_FILES.forEach(
+confFile -> {
+  Path path = Paths.get(hadoopConfDir, confFile);
+  if (Files.exists(path)) {
+try {
+  addResourceMethod.invoke(path.toUri().toURL());
+} catch (IOException e) {
+  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+}
+  }
+});
+  }
+
+  // set any Hadoop properties specified in the s

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451799323


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();
+  BoundMethod addResourceMethod =
+  DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+  BoundMethod setMethod =
+  DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+  //  load any config files in the specified config directory
+  String hadoopConfDir = config.hadoopConfDir();
+  if (hadoopConfDir != null) {
+HADOOP_CONF_FILES.forEach(
+confFile -> {
+  Path path = Paths.get(hadoopConfDir, confFile);
+  if (Files.exists(path)) {
+try {
+  addResourceMethod.invoke(path.toUri().toURL());
+} catch (IOException e) {
+  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+}
+  }
+});
+  }
+
+  // set any Hadoop properties specified in the s

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451799486


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utilities {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
+  private static final List HADOOP_CONF_FILES =
+  ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  public static Catalog loadCatalog(IcebergSinkConfig config) {
+return CatalogUtil.buildIcebergCatalog(
+config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+Class configClass =
+
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
+if (configClass == null) {
+  configClass =
+  
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
+}
+
+if (configClass == null) {
+  LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+  return null;
+}
+
+try {
+  Object result = configClass.getDeclaredConstructor().newInstance();
+  BoundMethod addResourceMethod =
+  DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+  BoundMethod setMethod =
+  DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+  //  load any config files in the specified config directory
+  String hadoopConfDir = config.hadoopConfDir();
+  if (hadoopConfDir != null) {
+HADOOP_CONF_FILES.forEach(
+confFile -> {
+  Path path = Paths.get(hadoopConfDir, confFile);
+  if (Files.exists(path)) {
+try {
+  addResourceMethod.invoke(path.toUri().toURL());
+} catch (IOException e) {
+  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+}
+  }
+});
+  }
+
+  // set any Hadoop properties specified in the s

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451800122


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+
+public class PartitionedAppendWriter extends PartitionedFanoutWriter {

Review Comment:
   Should this live in `data` since it is using Iceberg's generic `Record` 
class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451800555


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java:
##
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static java.util.stream.Collectors.toList;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.Temporal;
+import java.util.Base64;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.kafka.connect.data.Struct;
+
+public class RecordConverter {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  private static final DateTimeFormatter OFFSET_TS_FMT =
+  new DateTimeFormatterBuilder()
+  .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
+  .appendOffset("+HHmm", "Z")
+  .toFormatter();
+
+  private final Schema tableSchema;
+  private final NameMapping nameMapping;
+  private final IcebergSinkConfig config;
+  private final Map> structNameMap = 
Maps.newHashMap();
+
+  public RecordConverter(Table table, IcebergSinkConfig config) {

Review Comment:
   Looks like the purpose of this is to create a new Iceberg generic `Record` 
from Kafka's object model (`Struct`). Is that needed? In Flink and Spark, we 
use writers that are adapted to the in-memory object model for those engines to 
avoid copying a record and then writing. I'm not familiar enough the the KC 
object model to know whether this is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451800701


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public interface RecordWriter extends Cloneable {
+
+  default void write(SinkRecord record) {}
+
+  default List complete() {

Review Comment:
   This default implementation seems dangerous to me. Why not force the 
implementation to provide `complete` and `close` to ensure that they are 
implemented and not overlooked?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451800872


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.util.Map;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types.StructType;
+
+public class RecordWrapper implements Record {

Review Comment:
   What is the purpose of this class? Why would you not just use the underlying 
`Record` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451801429


##
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java:
##
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import org.apache.iceberg.IcebergBuild;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergSinkConfig extends AbstractConfig {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSinkConfig.class.getName());
+
+  public static final String INTERNAL_TRANSACTIONAL_SUFFIX_PROP =
+  "iceberg.coordinator.transactional.suffix";
+  private static final String ROUTE_REGEX = "route-regex";
+  private static final String ID_COLUMNS = "id-columns";
+  private static final String PARTITION_BY = "partition-by";
+  private static final String COMMIT_BRANCH = "commit-branch";
+
+  private static final String CATALOG_PROP_PREFIX = "iceberg.catalog.";
+  private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop.";
+  private static final String KAFKA_PROP_PREFIX = "iceberg.kafka.";
+  private static final String TABLE_PROP_PREFIX = "iceberg.table.";
+  private static final String AUTO_CREATE_PROP_PREFIX = 
"iceberg.tables.auto-create-props.";
+  private static final String WRITE_PROP_PREFIX = "iceberg.table.write-props.";
+
+  private static final String CATALOG_NAME_PROP = "iceberg.catalog";
+  private static final String TABLES_PROP = "iceberg.tables";
+  private static final String TABLES_DYNAMIC_PROP = 
"iceberg.tables.dynamic-enabled";
+  private static final String TABLES_ROUTE_FIELD_PROP = 
"iceberg.tables.route-field";
+  private static final String TABLES_DEFAULT_COMMIT_BRANCH = 
"iceberg.tables.default-commit-branch";
+  private static final String TABLES_DEFAULT_ID_COLUMNS = 
"iceberg.tables.default-id-columns";
+  private static final String TABLES_DEFAULT_PARTITION_BY = 
"iceberg.tables.default-partition-by";
+  private static final String TABLES_CDC_FIELD_PROP = 
"iceberg.tables.cdc-field";
+  private static final String TABLES_UPSERT_MODE_ENABLED_PROP =
+  "iceberg.tables.upsert-mode-enabled";
+  private static final String TABLES_AUTO_CREATE_ENABLED_PROP =
+  "iceberg.tables.auto-create-enabled";
+  private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
+  "iceberg.tables.evolve-schema-enabled";
+  private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP =
+  "iceberg.tables.schema-force-optional";
+  private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
+  "iceberg.tables.schema-case-insensitive";
+  private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
+  private static final String CONTROL_GROUP_ID_PROP = 
"iceberg.control.group-id";
+  private static final String COMMIT_INTERVAL_MS_PROP = 
"iceberg.control.commit.interval-ms";
+  private static final int COMMIT_INTERVAL_MS_DEFAULT = 30

Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451801651


##
kafka-connect/build.gradle:
##
@@ -30,3 +30,30 @@ 
project(":iceberg-kafka-connect:iceberg-kafka-connect-events") {
 useJUnitPlatform()
   }  
 }
+
+project(":iceberg-kafka-connect:iceberg-kafka-connect") {

Review Comment:
   Why the duplicate name? Could this be `:iceberg-kafka-connect:iceberg-sink`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] AES GCM Stream changes [iceberg]

2024-01-14 Thread via GitHub


rdblue merged PR #9453:
URL: https://github.com/apache/iceberg/pull/9453


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Avro data encryption [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on PR #9436:
URL: https://github.com/apache/iceberg/pull/9436#issuecomment-1891079101

   Looks like this is ready to commit when it is rebased on top of the AES GCM 
stream changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Parquet: Add system config for unsafe Parquet ID fallback. [iceberg]

2024-01-14 Thread via GitHub


rdblue commented on PR #9324:
URL: https://github.com/apache/iceberg/pull/9324#issuecomment-1891081451

   @jackye1995, @danielcweeks, @RussellSpitzer, could you look at this? I'd 
like to ideally get it into the next release since we have been allowing unsafe 
reads.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451809069


##
pyiceberg/io/pyarrow.py:
##
@@ -1565,13 +1564,54 @@ def fill_parquet_file_metadata(
 del upper_bounds[field_id]
 del null_value_counts[field_id]
 
-df.file_format = FileFormat.PARQUET
 df.record_count = parquet_metadata.num_rows
-df.file_size_in_bytes = file_size
 df.column_sizes = column_sizes
 df.value_counts = value_counts
 df.null_value_counts = null_value_counts
 df.nan_value_counts = nan_value_counts
 df.lower_bounds = lower_bounds
 df.upper_bounds = upper_bounds
 df.split_offsets = split_offsets
+
+
+def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
+task = next(tasks)
+
+try:
+_ = next(tasks)
+# If there are more tasks, raise an exception
+raise ValueError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+except StopIteration:
+pass
+
+df = task.df
+
+file_path = 
f'{table.location()}/data/{_generate_datafile_filename("parquet")}'
+file_schema = schema_to_pyarrow(table.schema())
+
+collected_metrics: List[pq.FileMetaData] = []
+fo = table.io.new_output(file_path)
+with fo.create() as fos:
+with pq.ParquetWriter(fos, schema=file_schema, version="1.0", 
metadata_collector=collected_metrics) as writer:
+writer.write_table(df)
+
+df = DataFile(
+content=DataFileContent.DATA,
+file_path=file_path,
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=len(df),
+file_size_in_bytes=len(fo),
+# Just copy these from the table for now
+sort_order_id=table.sort_order().order_id,
+spec_id=table.spec().spec_id,
+equality_ids=table.schema().identifier_field_ids,
+key_metadata=None,
+)
+fill_parquet_file_metadata(
+df=df,
+parquet_metadata=collected_metrics[0],

Review Comment:
   So if Arrow decides to write multiple files, there will be one entry per 
file in this list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451829404


##
pyiceberg/table/__init__.py:
##
@@ -797,6 +850,9 @@ def location(self) -> str:
 def last_sequence_number(self) -> int:
 return self.metadata.last_sequence_number
 
+def next_sequence_number(self) -> int:
+return self.last_sequence_number + 1 if self.metadata.format_version > 
1 else INITIAL_SEQUENCE_NUMBER

Review Comment:
   Why have both `next_sequence_number` and `_next_sequence_number` just below 
this? They also have slightly different logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451831352


##
pyiceberg/table/__init__.py:
##
@@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]:
 def update_schema(self, allow_incompatible_changes: bool = False, 
case_sensitive: bool = True) -> UpdateSchema:
 return UpdateSchema(self, 
allow_incompatible_changes=allow_incompatible_changes, 
case_sensitive=case_sensitive)
 
+def append(self, df: pa.Table) -> None:
+if len(self.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+snapshot_id = self.new_snapshot_id()
+
+data_files = _dataframe_to_data_files(self, df=df)
+merge = _MergeAppend(operation=Operation.APPEND, table=self, 
snapshot_id=snapshot_id)
+for data_file in data_files:
+merge.append_datafile(data_file)

Review Comment:
   Minor: looks like we have some inconsistency in naming. The method uses 
`datafile` as a single word, while the variable (and other places) use two 
words, `data_file`. Not a big deal, but I generally prefer using the same form 
in all cases so it's predictable whenever possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] How to detect if the partition's data is ready to consume [iceberg]

2024-01-14 Thread via GitHub


github-actions[bot] closed issue #6725: How to detect if the partition's data 
is ready to consume
URL: https://github.com/apache/iceberg/issues/6725


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451832048


##
pyiceberg/table/__init__.py:
##
@@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]:
 def update_schema(self, allow_incompatible_changes: bool = False, 
case_sensitive: bool = True) -> UpdateSchema:
 return UpdateSchema(self, 
allow_incompatible_changes=allow_incompatible_changes, 
case_sensitive=case_sensitive)
 
+def append(self, df: pa.Table) -> None:
+if len(self.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+snapshot_id = self.new_snapshot_id()
+
+data_files = _dataframe_to_data_files(self, df=df)
+merge = _MergeAppend(operation=Operation.APPEND, table=self, 
snapshot_id=snapshot_id)
+for data_file in data_files:
+merge.append_datafile(data_file)
+
+if current_snapshot := self.current_snapshot():
+for manifest in current_snapshot.manifests(io=self.io):
+for entry in manifest.fetch_manifest_entry(io=self.io):
+merge.append_datafile(entry.data_file, added=False)

Review Comment:
   I think that the `_MergeAppend` should be responsible for handling the 
existing data. It doesn't make sense to me that an append operation would 
require the caller to re-add the data files that were in the table already. 
That puts too much on the caller, which should just add files and not worry 
about existing data or state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Add checkstyle rule to ensure AssertJ assertions always check for underlying exception message [iceberg]

2024-01-14 Thread via GitHub


github-actions[bot] commented on issue #7040:
URL: https://github.com/apache/iceberg/issues/7040#issuecomment-1891124126

   This issue has been automatically marked as stale because it has been open 
for 180 days with no activity. It will be closed in next 14 days if no further 
activity occurs. To permanently prevent this issue from being considered stale, 
add the label 'not-stale', but commenting on the issue is preferred when 
possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] How to detect if the partition's data is ready to consume [iceberg]

2024-01-14 Thread via GitHub


github-actions[bot] commented on issue #6725:
URL: https://github.com/apache/iceberg/issues/6725#issuecomment-1891124143

   This issue has been closed because it has not received any activity in the 
last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] [DOC] Reorder pages under Spark in the nav bar [iceberg]

2024-01-14 Thread via GitHub


github-actions[bot] commented on issue #6724:
URL: https://github.com/apache/iceberg/issues/6724#issuecomment-1891124149

   This issue has been automatically marked as stale because it has been open 
for 180 days with no activity. It will be closed in next 14 days if no further 
activity occurs. To permanently prevent this issue from being considered stale, 
add the label 'not-stale', but commenting on the issue is preferred when 
possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451832516


##
pyiceberg/table/__init__.py:
##
@@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]:
 def update_schema(self, allow_incompatible_changes: bool = False, 
case_sensitive: bool = True) -> UpdateSchema:
 return UpdateSchema(self, 
allow_incompatible_changes=allow_incompatible_changes, 
case_sensitive=case_sensitive)
 
+def append(self, df: pa.Table) -> None:
+if len(self.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+snapshot_id = self.new_snapshot_id()
+
+data_files = _dataframe_to_data_files(self, df=df)
+merge = _MergeAppend(operation=Operation.APPEND, table=self, 
snapshot_id=snapshot_id)
+for data_file in data_files:
+merge.append_datafile(data_file)
+
+if current_snapshot := self.current_snapshot():
+for manifest in current_snapshot.manifests(io=self.io):
+for entry in manifest.fetch_manifest_entry(io=self.io):

Review Comment:
   I can see why you'd want to merge all of the manifests for a PyIceberg table 
because we're assuming that the table is small and written only by PyIceberg. 
But if this code ever attempts to append to a larger table, it's going to take 
a really long time because it will rewrite all table metadata. I think this 
should take the `FastAppend` approach at first and create a new manifest for 
the new data files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451833666


##
pyiceberg/io/pyarrow.py:
##
@@ -1565,13 +1565,56 @@ def fill_parquet_file_metadata(
 del upper_bounds[field_id]
 del null_value_counts[field_id]
 
-df.file_format = FileFormat.PARQUET
-df.record_count = parquet_metadata.num_rows
-df.file_size_in_bytes = file_size
-df.column_sizes = column_sizes
-df.value_counts = value_counts
-df.null_value_counts = null_value_counts
-df.nan_value_counts = nan_value_counts
-df.lower_bounds = lower_bounds
-df.upper_bounds = upper_bounds
-df.split_offsets = split_offsets
+data_file.record_count = parquet_metadata.num_rows
+data_file.column_sizes = column_sizes
+data_file.value_counts = value_counts
+data_file.null_value_counts = null_value_counts
+data_file.nan_value_counts = nan_value_counts
+data_file.lower_bounds = lower_bounds
+data_file.upper_bounds = upper_bounds
+data_file.split_offsets = split_offsets
+
+
+def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
+task = next(tasks)
+
+try:
+_ = next(tasks)
+# If there are more tasks, raise an exception
+raise NotImplementedError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+except StopIteration:
+pass
+
+file_path = 
f'{table.location()}/data/{task.generate_datafile_filename("parquet")}'
+file_schema = schema_to_pyarrow(table.schema())
+
+collected_metrics: List[pq.FileMetaData] = []
+fo = table.io.new_output(file_path)
+with fo.create() as fos:

Review Comment:
   I think this should call `overwrite=True` to avoid the `exists` check in 
`create`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451834512


##
pyiceberg/io/pyarrow.py:
##
@@ -1565,13 +1564,54 @@ def fill_parquet_file_metadata(
 del upper_bounds[field_id]
 del null_value_counts[field_id]
 
-df.file_format = FileFormat.PARQUET
 df.record_count = parquet_metadata.num_rows
-df.file_size_in_bytes = file_size
 df.column_sizes = column_sizes
 df.value_counts = value_counts
 df.null_value_counts = null_value_counts
 df.nan_value_counts = nan_value_counts
 df.lower_bounds = lower_bounds
 df.upper_bounds = upper_bounds
 df.split_offsets = split_offsets
+
+
+def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
+task = next(tasks)
+
+try:
+_ = next(tasks)
+# If there are more tasks, raise an exception
+raise ValueError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+except StopIteration:
+pass
+
+df = task.df
+
+file_path = 
f'{table.location()}/data/{_generate_datafile_filename("parquet")}'
+file_schema = schema_to_pyarrow(table.schema())
+
+collected_metrics: List[pq.FileMetaData] = []
+fo = table.io.new_output(file_path)
+with fo.create() as fos:
+with pq.ParquetWriter(fos, schema=file_schema, version="1.0", 
metadata_collector=collected_metrics) as writer:
+writer.write_table(df)
+
+df = DataFile(
+content=DataFileContent.DATA,
+file_path=file_path,
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=len(df),
+file_size_in_bytes=len(fo),
+# Just copy these from the table for now
+sort_order_id=table.sort_order().order_id,
+spec_id=table.spec().spec_id,

Review Comment:
   Since `write_file` is a public method, we can't guarantee that the caller 
did this check. I agree that it is safe when called from `append` or 
`overwrite`, but a caller could use this method directly to create a data file 
for a partitioned table right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451834512


##
pyiceberg/io/pyarrow.py:
##
@@ -1565,13 +1564,54 @@ def fill_parquet_file_metadata(
 del upper_bounds[field_id]
 del null_value_counts[field_id]
 
-df.file_format = FileFormat.PARQUET
 df.record_count = parquet_metadata.num_rows
-df.file_size_in_bytes = file_size
 df.column_sizes = column_sizes
 df.value_counts = value_counts
 df.null_value_counts = null_value_counts
 df.nan_value_counts = nan_value_counts
 df.lower_bounds = lower_bounds
 df.upper_bounds = upper_bounds
 df.split_offsets = split_offsets
+
+
+def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
+task = next(tasks)
+
+try:
+_ = next(tasks)
+# If there are more tasks, raise an exception
+raise ValueError("Only unpartitioned writes are supported: 
https://github.com/apache/iceberg-python/issues/208";)
+except StopIteration:
+pass
+
+df = task.df
+
+file_path = 
f'{table.location()}/data/{_generate_datafile_filename("parquet")}'
+file_schema = schema_to_pyarrow(table.schema())
+
+collected_metrics: List[pq.FileMetaData] = []
+fo = table.io.new_output(file_path)
+with fo.create() as fos:
+with pq.ParquetWriter(fos, schema=file_schema, version="1.0", 
metadata_collector=collected_metrics) as writer:
+writer.write_table(df)
+
+df = DataFile(
+content=DataFileContent.DATA,
+file_path=file_path,
+file_format=FileFormat.PARQUET,
+partition=Record(),
+record_count=len(df),
+file_size_in_bytes=len(fo),
+# Just copy these from the table for now
+sort_order_id=table.sort_order().order_id,
+spec_id=table.spec().spec_id,

Review Comment:
   Since `write_file` is a public method, we can't guarantee that the caller 
did this check. I agree that it is safe when called from `append` or 
`overwrite`, but a caller could use this method directly to create a data file 
for a partitioned table right?
   
   Wouldn't it be easy to just pass the spec ID and partition tuple (an empty 
`Record`) through `WriteTask` for now? I think it would make sense if a 
`WriteTask` were for a single partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451835863


##
pyiceberg/table/__init__.py:
##
@@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]:
 def update_schema(self, allow_incompatible_changes: bool = False, 
case_sensitive: bool = True) -> UpdateSchema:
 return UpdateSchema(self, 
allow_incompatible_changes=allow_incompatible_changes, 
case_sensitive=case_sensitive)
 
+def append(self, df: pa.Table) -> None:
+if len(self.spec().fields) > 0:
+raise ValueError("Cannot write to partitioned tables")
+
+snapshot_id = self.new_snapshot_id()
+
+data_files = _dataframe_to_data_files(self, df=df)
+merge = _MergeAppend(operation=Operation.APPEND, table=self, 
snapshot_id=snapshot_id)
+for data_file in data_files:
+merge.append_datafile(data_file)
+
+if current_snapshot := self.current_snapshot():
+for manifest in current_snapshot.manifests(io=self.io):
+for entry in manifest.fetch_manifest_entry(io=self.io):
+merge.append_datafile(entry.data_file, added=False)
+
+merge.commit()
+
+def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = 
ALWAYS_TRUE) -> None:

Review Comment:
   Should these methods have docstrings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451836112


##
pyiceberg/table/__init__.py:
##
@@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_datafile_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:

Review Comment:
   I think the intent is for this to eventually support partitioning, so I 
think it would make sense to check `table.spec()` is unpartitioned. Good to be 
more defensive and open things up later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451836277


##
pyiceberg/table/__init__.py:
##
@@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_datafile_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:

Review Comment:
   Nit: this produces a file name, but the method above produces a full path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451836798


##
pyiceberg/table/__init__.py:
##
@@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_datafile_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_datafiles: List[DataFile]
+_existing_datafiles: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_datafiles = []
+self._existing_datafiles = []
+self._commit_uuid = uuid.uuid4()
+
+def append_datafile(self, data_file: DataFile, added: bool = True) -> 
_MergeAppend:

Review Comment:
   As I noted above, I don't think the caller should be responsible for adding 
the existing data files. I also think the cleanest way to implement simple 
appends is a fast append -- create a new manifest and preserve the existing 
ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451837508


##
pyiceberg/table/__init__.py:
##
@@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_datafile_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_datafiles: List[DataFile]
+_existing_datafiles: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_datafiles = []
+self._existing_datafiles = []
+self._commit_uuid = uuid.uuid4()
+
+def append_datafile(self, data_file: DataFile, added: bool = True) -> 
_MergeAppend:
+if added:
+self._added_datafiles.append(data_file)
+else:
+self._existing_datafiles.append(data_file)
+return self
+
+def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]:
+ssc = SnapshotSummaryCollector()
+manifests = []
+
+if self._added_datafiles:
+output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+with write_manifest(
+format_version=self._table.format_version,
+spec=self._table.spec(),
+schema=self._table.schema(),
+output_file=self._table.io.new_output(output_file_location),
+snapshot_id=self._snapshot_id,
+) as writer:
+for data_file in self._added_datafiles + 
self._existing_datafiles:
+writer.add_entry(
+ManifestEntry(
+status=ManifestEntryStatus.ADDED,

Review Comment:
   This status isn't correct for the existing data files. Those should use 
`EXISTING` and should preserve the original snapshot ID in which they were 
added to the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451837579


##
pyiceberg/table/__init__.py:
##
@@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_datafile_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_datafiles: List[DataFile]
+_existing_datafiles: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_datafiles = []
+self._existing_datafiles = []
+self._commit_uuid = uuid.uuid4()
+
+def append_datafile(self, data_file: DataFile, added: bool = True) -> 
_MergeAppend:
+if added:
+self._added_datafiles.append(data_file)
+else:
+self._existing_datafiles.append(data_file)
+return self
+
+def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]:
+ssc = SnapshotSummaryCollector()
+manifests = []
+
+if self._added_datafiles:
+output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+with write_manifest(
+format_version=self._table.format_version,
+spec=self._table.spec(),
+schema=self._table.schema(),
+output_file=self._table.io.new_output(output_file_location),
+snapshot_id=self._snapshot_id,
+) as writer:
+for data_file in self._added_datafiles + 
self._existing_datafiles:
+writer.add_entry(
+ManifestEntry(
+status=ManifestEntryStatus.ADDED,
+snapshot_id=self._snapshot_id,
+data_sequence_number=None,
+file_sequence_number=None,

Review Comment:
   Both sequence numbers should be preserved for existing data files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451837941


##
pyiceberg/table/__init__.py:
##
@@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_datafile_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_datafiles: List[DataFile]
+_existing_datafiles: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_datafiles = []
+self._existing_datafiles = []
+self._commit_uuid = uuid.uuid4()
+
+def append_datafile(self, data_file: DataFile, added: bool = True) -> 
_MergeAppend:
+if added:
+self._added_datafiles.append(data_file)
+else:
+self._existing_datafiles.append(data_file)
+return self
+
+def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]:
+ssc = SnapshotSummaryCollector()
+manifests = []
+
+if self._added_datafiles:
+output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+with write_manifest(
+format_version=self._table.format_version,
+spec=self._table.spec(),
+schema=self._table.schema(),
+output_file=self._table.io.new_output(output_file_location),
+snapshot_id=self._snapshot_id,
+) as writer:
+for data_file in self._added_datafiles + 
self._existing_datafiles:
+writer.add_entry(
+ManifestEntry(
+status=ManifestEntryStatus.ADDED,
+snapshot_id=self._snapshot_id,
+data_sequence_number=None,
+file_sequence_number=None,
+data_file=data_file,
+)
+)
+
+for data_file in self._added_datafiles:
+ssc.add_file(data_file=data_file)
+
+manifests.append(writer.to_manifest_file())
+
+return ssc.build(), manifests

Review Comment:
   I don't think that this method needs to be responsible for both updating the 
snapshot summary and for writing manifest files. It's easy enough to update the 
summary in a separate method, especially since it is mostly handled 
independently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451838116


##
pyiceberg/table/__init__.py:
##
@@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_datafile_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_datafiles: List[DataFile]
+_existing_datafiles: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_datafiles = []
+self._existing_datafiles = []
+self._commit_uuid = uuid.uuid4()
+
+def append_datafile(self, data_file: DataFile, added: bool = True) -> 
_MergeAppend:
+if added:
+self._added_datafiles.append(data_file)
+else:
+self._existing_datafiles.append(data_file)
+return self
+
+def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]:
+ssc = SnapshotSummaryCollector()
+manifests = []
+
+if self._added_datafiles:
+output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+with write_manifest(
+format_version=self._table.format_version,
+spec=self._table.spec(),
+schema=self._table.schema(),
+output_file=self._table.io.new_output(output_file_location),
+snapshot_id=self._snapshot_id,
+) as writer:
+for data_file in self._added_datafiles + 
self._existing_datafiles:
+writer.add_entry(
+ManifestEntry(
+status=ManifestEntryStatus.ADDED,
+snapshot_id=self._snapshot_id,
+data_sequence_number=None,
+file_sequence_number=None,
+data_file=data_file,
+)
+)
+
+for data_file in self._added_datafiles:
+ssc.add_file(data_file=data_file)
+
+manifests.append(writer.to_manifest_file())
+
+return ssc.build(), manifests
+
+def commit(self) -> Snapshot:
+new_summary, manifests = self._manifests()
+
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id) if 
self._parent_snapshot_id is not None else None
+summary = update_snapshot_summaries(
+summary=Summary(operation=self._operation, **new_summary),
+previous_summary=previous_snapshot.summary if previous_snapshot is 
not None else None,
+truncate_full_table=self._operation == Operation.OVERWRITE,
+)
+
+manifest_list_filename = _generate_manifest_list_filename(
+snapshot_id=self._snapshot_id, attempt=0, 
commit_uuid=self._commit_uuid

Review Comment:
   There are currently no retries?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, plea

Re: [PR] Write support [iceberg-python]

2024-01-14 Thread via GitHub


rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451838385


##
pyiceberg/table/__init__.py:
##
@@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int:
 snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
 return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+write_uuid: uuid.UUID
+task_id: int
+df: pa.Table
+sort_order_id: Optional[int] = None
+
+# Later to be extended with partition information
+
+def generate_datafile_filename(self, extension: str) -> str:
+# Mimics the behavior in the Java API:
+# 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+return f"0-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int, 
commit_uuid: uuid.UUID) -> str:
+# Mimics the behavior in Java:
+# 
https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+from pyiceberg.io.pyarrow import write_file
+
+write_uuid = uuid.uuid4()
+counter = itertools.count(0)
+
+# This is an iter, so we don't have to materialize everything every time
+# This will be more relevant when we start doing partitioned writes
+yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+_operation: Operation
+_table: Table
+_snapshot_id: int
+_parent_snapshot_id: Optional[int]
+_added_datafiles: List[DataFile]
+_existing_datafiles: List[DataFile]
+_commit_uuid: uuid.UUID
+
+def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+self._operation = operation
+self._table = table
+self._snapshot_id = snapshot_id
+# Since we only support the main branch for now
+self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+self._added_datafiles = []
+self._existing_datafiles = []
+self._commit_uuid = uuid.uuid4()
+
+def append_datafile(self, data_file: DataFile, added: bool = True) -> 
_MergeAppend:
+if added:
+self._added_datafiles.append(data_file)
+else:
+self._existing_datafiles.append(data_file)
+return self
+
+def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]:
+ssc = SnapshotSummaryCollector()
+manifests = []
+
+if self._added_datafiles:
+output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+with write_manifest(
+format_version=self._table.format_version,
+spec=self._table.spec(),
+schema=self._table.schema(),
+output_file=self._table.io.new_output(output_file_location),
+snapshot_id=self._snapshot_id,
+) as writer:
+for data_file in self._added_datafiles + 
self._existing_datafiles:
+writer.add_entry(
+ManifestEntry(
+status=ManifestEntryStatus.ADDED,
+snapshot_id=self._snapshot_id,
+data_sequence_number=None,
+file_sequence_number=None,
+data_file=data_file,
+)
+)
+
+for data_file in self._added_datafiles:
+ssc.add_file(data_file=data_file)
+
+manifests.append(writer.to_manifest_file())
+
+return ssc.build(), manifests
+
+def commit(self) -> Snapshot:
+new_summary, manifests = self._manifests()
+
+previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id) if 
self._parent_snapshot_id is not None else None
+summary = update_snapshot_summaries(
+summary=Summary(operation=self._operation, **new_summary),
+previous_summary=previous_snapshot.summary if previous_snapshot is 
not None else None,
+truncate_full_table=self._operation == Operation.OVERWRITE,
+)
+
+manifest_list_filename = _generate_manifest_list_filename(
+snapshot_id=self._snapshot_id, attempt=0, 
commit_uuid=self._commit_uuid
+)
+manifest_list_file_path = 
f'{self._table.location()}/metadata/{manifest_list_filename}'
+with write_manifest_list(
+   

Re: [I] On iceberg11 getting s3 connection reset error [iceberg]

2024-01-14 Thread via GitHub


javrasya commented on issue #4457:
URL: https://github.com/apache/iceberg/issues/4457#issuecomment-1891155838

   I am using Flink to consume from a table which gets upserts and I am getting 
this error too. Tried tweaking http client socket timeouts and stuff but 
nothing has worked so for for me. How did reducing the number of partitions 
with Spark helped here? Was it maybe S3 in your case which was closing the 
socket 🤔 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Spark: Fix SparkTable to use name and effective snapshotID for comparing [iceberg]

2024-01-14 Thread via GitHub


wooyeong commented on PR #9455:
URL: https://github.com/apache/iceberg/pull/9455#issuecomment-1891165135

   @ajantha-bhat gentle ping, could you review this change, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Added error handling and default logic for Flink version detection [iceberg]

2024-01-14 Thread via GitHub


stevenzwu commented on code in PR #9452:
URL: https://github.com/apache/iceberg/pull/9452#discussion_r1451920250


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java:
##
@@ -19,15 +19,31 @@
 package org.apache.iceberg.flink.util;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 
 public class FlinkPackage {
-  /** Choose {@link DataStream} class because it is one of the core Flink API. 
*/
-  private static final String VERSION = 
DataStream.class.getPackage().getImplementationVersion();
+
+  public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN";
 
   private FlinkPackage() {}
 
   /** Returns Flink version string like x.y.z */
   public static String version() {
-return VERSION;
+try {
+  String version = getVersionFromJar();
+  /* If we can't detect the exact implementation version from the jar 
(this can happen if the DataStream class
+   appears multiple times in the same classpath such as with shading), 
then the best we can do is say it's
+   unknown
+  */
+  return version != null ? version : FLINK_UNKNOWN_VERSION;

Review Comment:
   +1. this method can be used to initialize the static `VERSION` variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[I] kerberos beeline insert iceberg fail error: Job commit failed: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore [iceberg]

2024-01-14 Thread via GitHub


xiaolan-bit opened a new issue, #9475:
URL: https://github.com/apache/iceberg/issues/9475

   ### Apache Iceberg version
   
   1.3.1
   
   ### Query engine
   
   Hive
   
   ### Please describe the bug 🐞
   
   *version: hive-3.1.3 iceberg-1.3.1 kerberos-1.15.1 hadoop-3.3.6
   user: hadoop
   *hive.server2.transport.mode: binary
   *hive.server2.thrift.port: 1
   
   shell:
   kinit -kt hadoop.keytab hadoop/h...@hadoop.com
   beeline
   !connect 
jdbc:hive2://host:1/default;httpPath=cliservice;principal=hadoop/h...@hadoop.com
   
   beeline sql:
   create database iceberg_hive_example;
   
   use iceberg_hive_example;
   
   create table test_table_hive1(
id int,
name string,
age int)
   partitioned by (dt string)
   stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
   set hive.execution.engine=mr;
   add jar /hive/lib/iceberg-hive-runtime-1.3.1.jar;
   add jar /hive/lib/libfb303-0.9.3.jar;
   insert into test_table_hive1 values (1,"hivezs",18,"20231204");
   
   error appear:
   
![image](https://github.com/apache/iceberg/assets/62273659/28e26e5b-6b5e-4164-a730-56fc40bd3f25)
   
   the error as follow:
   Job commit failed: org.apache.iceberg.hive.RuntimeMetaException: Failed to 
connect to Hive Metastore
   at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:84)
   at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
   at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
   at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
   at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
   at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
   at 
org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:158)
   at 
org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
   at 
org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
   at 
org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47)
   at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:124)
   at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:111)
   at 
org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.commitTable(HiveIcebergOutputCommitter.java:320)
   at 
org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.lambda$commitJob$2(HiveIcebergOutputCommitter.java:214)
   at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
   at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
   at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
   at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
   at 
org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.commitJob(HiveIcebergOutputCommitter.java:207)
   at 
org.apache.hadoop.mapred.OutputCommitter.commitJob(OutputCommitter.java:291)
   at 
org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.handleJobCommit(CommitterEventHandler.java:286)
   at 
org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.run(CommitterEventHandler.java:238)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
   at 
org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:86)
   at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:95)
   at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
   at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:119)
   at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:112)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at 
org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60)
   at 
org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72)
   at 
org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:185)
   at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
   ... 24 more
   Caused by: java.lang.reflect.InvocationTargetException
   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
   at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCon

[I] flink has implemented the delete and update syntax support in batch mode in a later version. Will the iceberg community implement this feature [iceberg]

2024-01-14 Thread via GitHub


BlackPigHe opened a new issue, #9476:
URL: https://github.com/apache/iceberg/issues/9476

   ### Feature Request / Improvement
   
   flink has implemented the delete and update syntax support in batch mode in 
a later version. Will the iceberg community implement this feature.If iceberg 
is interested in implementing this feature, I can provide commit here
   
   ### Query engine
   
   Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Fix ParallelIterable memory leak because queue continues to be added even if iterator exited [iceberg]

2024-01-14 Thread via GitHub


Heltman commented on PR #9402:
URL: https://github.com/apache/iceberg/pull/9402#issuecomment-1891512061

   @findepi @electrum cc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org