Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]
HonahX commented on code in PR #265: URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451680767 ## pyiceberg/catalog/sql.py: ## @@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) with Session(self.engine) as session: -res = session.execute( -delete(IcebergTables).where( -IcebergTables.catalog_name == self.name, -IcebergTables.table_namespace == database_name, -IcebergTables.table_name == table_name, +if self.engine.dialect.supports_sane_rowcount: +res = session.execute( +delete(IcebergTables).where( +IcebergTables.catalog_name == self.name, +IcebergTables.table_namespace == database_name, +IcebergTables.table_name == table_name, +) ) -) +if res.rowcount < 1: +raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") +else: +try: +tbl = ( +session.query(IcebergTables) +.with_for_update(of=IcebergTables, nowait=True) Review Comment: Thanks for the explanation! Reflecting further, I think that using neither NOWAIT nor SKIP_LOCKED might be best for maintaining consistency. Currently, when the engine supports accurate rowcounts, we employ UPDATE TABLE or DELETE TABLE, which inherently wait for locks. If the engine lacks rowcount support, sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE operations, as it waits for locks. IThis consistency eliminates the need to handle additional exceptions, as any concurrent transaction will naturally lead to a `NoResultFound` scenario once the table is dropped, renamed, or updated. If we later decide that avoiding lock waits is preferable in fallback situations, we could consider the following modifications: 1. For `drop_table` and `rename_table`, I think you're right that we can catch `sqlalchemy.exc.OperationalError` and re-raise it as a new exception indicating that another process is set to delete the table. Using `CommitFailedException` doesn't seem appropriate here, as it's primarily meant for failed commits on iceberg tables. 2. For `_commit_table`, `skip_locked` might still be useful. At the point when the SQL command is executed, we're assured of the table's existence. Therefore, encountering `NoResultFound` would directly imply a concurrent commit attempt. Do you think the concern about maintaining consistency between cases that do and do not support rowcount is valid? If not, what are your thoughts on adopting NOWAIT for `drop_table` and `rename_table`, and SKIP_LOCKED for `_commit_table`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]
HonahX commented on code in PR #265: URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451680767 ## pyiceberg/catalog/sql.py: ## @@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) with Session(self.engine) as session: -res = session.execute( -delete(IcebergTables).where( -IcebergTables.catalog_name == self.name, -IcebergTables.table_namespace == database_name, -IcebergTables.table_name == table_name, +if self.engine.dialect.supports_sane_rowcount: +res = session.execute( +delete(IcebergTables).where( +IcebergTables.catalog_name == self.name, +IcebergTables.table_namespace == database_name, +IcebergTables.table_name == table_name, +) ) -) +if res.rowcount < 1: +raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") +else: +try: +tbl = ( +session.query(IcebergTables) +.with_for_update(of=IcebergTables, nowait=True) Review Comment: Thanks for the explanation! Reflecting further, I think that using neither NOWAIT nor SKIP_LOCKED might be better for maintaining consistency. Currently, when the engine supports accurate rowcounts, we employ UPDATE TABLE or DELETE TABLE, which inherently wait for locks. If the engine lacks rowcount support, sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE operations, as it waits for locks. IThis consistency eliminates the need to handle additional exceptions, as any concurrent transaction will naturally lead to a `NoResultFound` scenario once the table is dropped, renamed, or updated. If we later decide that avoiding lock waits is preferable in fallback situations, we could consider the following modifications: 1. For `drop_table` and `rename_table`, I think you're right that we can catch `sqlalchemy.exc.OperationalError` and re-raise it as a new exception indicating that another process is set to delete the table. Using `CommitFailedException` doesn't seem appropriate here, as it's primarily meant for failed commits on iceberg tables. 2. For `_commit_table`, `skip_locked` might still be useful. At the point when the SQL command is executed, we're assured of the table's existence. Therefore, encountering `NoResultFound` would directly imply a concurrent commit attempt. Do you think the concern about maintaining consistency between cases that do and do not support rowcount is valid? If not, what are your thoughts on adopting NOWAIT for `drop_table` and `rename_table`, and SKIP_LOCKED for `_commit_table`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]
HonahX commented on code in PR #265: URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451680767 ## pyiceberg/catalog/sql.py: ## @@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) with Session(self.engine) as session: -res = session.execute( -delete(IcebergTables).where( -IcebergTables.catalog_name == self.name, -IcebergTables.table_namespace == database_name, -IcebergTables.table_name == table_name, +if self.engine.dialect.supports_sane_rowcount: +res = session.execute( +delete(IcebergTables).where( +IcebergTables.catalog_name == self.name, +IcebergTables.table_namespace == database_name, +IcebergTables.table_name == table_name, +) ) -) +if res.rowcount < 1: +raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") +else: +try: +tbl = ( +session.query(IcebergTables) +.with_for_update(of=IcebergTables, nowait=True) Review Comment: Thanks for the explanation! Reflecting further, I think that using neither NOWAIT nor SKIP_LOCKED might be better for maintaining consistency. Currently, when the engine supports accurate rowcounts, we employ UPDATE TABLE or DELETE TABLE, which inherently wait for locks. If the engine lacks rowcount support, sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE operations, as it waits for locks. This consistency eliminates the need to handle additional exceptions, as any concurrent transaction will naturally lead to a `NoResultFound` scenario once the table is dropped, renamed, or updated. If we later decide that avoiding lock waits is preferable in fallback situations, we could consider the following modifications: 1. For `drop_table` and `rename_table`, I think you're right that we can catch `sqlalchemy.exc.OperationalError` and re-raise it as a new exception indicating that another process is set to delete the table. Using `CommitFailedException` doesn't seem appropriate here, as it's primarily meant for failed commits on iceberg tables. 2. For `_commit_table`, `skip_locked` might still be useful. At the point when the SQL command is executed, we're assured of the table's existence. Therefore, encountering `NoResultFound` would directly imply a concurrent commit attempt. Do you think the concern about maintaining consistency between cases that do and do not support rowcount is valid? If not, what are your thoughts on adopting NOWAIT for `drop_table` and `rename_table`, and SKIP_LOCKED for `_commit_table`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]
HonahX commented on code in PR #265: URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451680767 ## pyiceberg/catalog/sql.py: ## @@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) with Session(self.engine) as session: -res = session.execute( -delete(IcebergTables).where( -IcebergTables.catalog_name == self.name, -IcebergTables.table_namespace == database_name, -IcebergTables.table_name == table_name, +if self.engine.dialect.supports_sane_rowcount: +res = session.execute( +delete(IcebergTables).where( +IcebergTables.catalog_name == self.name, +IcebergTables.table_namespace == database_name, +IcebergTables.table_name == table_name, +) ) -) +if res.rowcount < 1: +raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") +else: +try: +tbl = ( +session.query(IcebergTables) +.with_for_update(of=IcebergTables, nowait=True) Review Comment: Thanks for the explanation! Reflecting further, I think that using neither NOWAIT nor SKIP_LOCKED might be better for maintaining consistency. Currently, when the engine supports accurate rowcounts, we employ UPDATE TABLE or DELETE TABLE, which inherently wait for locks. If the engine lacks rowcount support, sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE operations, as it waits for locks. This consistency eliminates the need to handle additional exceptions, as any concurrent transaction will naturally lead to a `NoResultFound` scenario once the table is dropped, renamed, or updated. Shifting focus to potential alternatives, If we later decide that avoiding lock waits is preferable in fallback situations, we could consider the following modifications: 1. For `drop_table` and `rename_table`, I think you're right that we can catch `sqlalchemy.exc.OperationalError` and re-raise it as a new exception indicating that another process is set to delete the table. Using `CommitFailedException` doesn't seem appropriate here, as it's primarily meant for failed commits on iceberg tables. 2. For `_commit_table`, `skip_locked` might still be useful. At the point when the SQL command is executed, we're assured of the table's existence. Therefore, encountering `NoResultFound` would directly imply a concurrent commit attempt. Do you think the concern about maintaining consistency between cases that do and do not support rowcount is valid? If not, what are your thoughts on adopting NOWAIT for `drop_table` and `rename_table`, and SKIP_LOCKED for `_commit_table`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add SqlCatalog _commit_table support [iceberg-python]
syun64 commented on code in PR #265: URL: https://github.com/apache/iceberg-python/pull/265#discussion_r1451754672 ## pyiceberg/catalog/sql.py: ## @@ -268,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) with Session(self.engine) as session: -res = session.execute( -delete(IcebergTables).where( -IcebergTables.catalog_name == self.name, -IcebergTables.table_namespace == database_name, -IcebergTables.table_name == table_name, +if self.engine.dialect.supports_sane_rowcount: +res = session.execute( +delete(IcebergTables).where( +IcebergTables.catalog_name == self.name, +IcebergTables.table_namespace == database_name, +IcebergTables.table_name == table_name, +) ) -) +if res.rowcount < 1: +raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") +else: +try: +tbl = ( +session.query(IcebergTables) +.with_for_update(of=IcebergTables, nowait=True) Review Comment: Hi @HonahX . Thank you for explaining so patiently. > If the engine lacks rowcount support, sticking with SELECT FOR UPDATE ensures consistent behavior with UPDATE TABLE operations, as it waits for locks. This consistency eliminates the need to handle additional exceptions, as any concurrent transaction will naturally lead to a NoResultFound scenario once the table is dropped, renamed, or updated. I agree with your analysis here, and I think that dropping **nowait** and **skip_locked** will be best to mimic the other behavior with UPDATE TABLE operation as closely as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] support python 3.12 [iceberg-python]
MehulBatra commented on code in PR #254: URL: https://github.com/apache/iceberg-python/pull/254#discussion_r1451758129 ## pyproject.toml: ## @@ -29,7 +29,8 @@ classifiers = [ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11" + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12" Review Comment: sure, it wouldn't harm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] support python 3.12 [iceberg-python]
MehulBatra commented on code in PR #254: URL: https://github.com/apache/iceberg-python/pull/254#discussion_r1451758589 ## pyproject.toml: ## @@ -70,6 +71,9 @@ adlfs = { version = ">=2023.1.0,<2024.1.0", optional = true } gcsfs = { version = ">=2023.1.0,<2024.1.0", optional = true } psycopg2-binary = { version = ">=2.9.6", optional = true } sqlalchemy = { version = "^2.0.18", optional = true } +numpy = { version = "^1.26.3", python = "3.12" } +greenlet = {version = "^3.0.3", python = "3.12" } Review Comment: As we are on the latest version of both the libraries, we should be good with this as upcoming python version support should be there too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451796402 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); Review Comment: What about using `DynConstructors` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451796523 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); Review Comment: You should be able to chain the `impl` calls to do the same thing without logic here. It will use the first one that is found. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451797043 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { +HADOOP_CONF_FILES.forEach( +confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { +try { + addResourceMethod.invoke(path.toUri().toURL()); +} catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); +} + } +}); + } + + // set any Hadoop properties specified in the s
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451797289 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { +HADOOP_CONF_FILES.forEach( +confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { +try { + addResourceMethod.invoke(path.toUri().toURL()); +} catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); +} + } +}); + } + + // set any Hadoop properties specified in the s
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451797484 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { +HADOOP_CONF_FILES.forEach( +confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { +try { + addResourceMethod.invoke(path.toUri().toURL()); +} catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); +} + } +}); + } + + // set any Hadoop properties specified in the s
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451798132 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { +HADOOP_CONF_FILES.forEach( +confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { +try { + addResourceMethod.invoke(path.toUri().toURL()); +} catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); +} + } +}); + } + + // set any Hadoop properties specified in the s
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451798927 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { +HADOOP_CONF_FILES.forEach( +confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { +try { + addResourceMethod.invoke(path.toUri().toURL()); +} catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); +} + } +}); + } + + // set any Hadoop properties specified in the s
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451799004 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { +HADOOP_CONF_FILES.forEach( +confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { +try { + addResourceMethod.invoke(path.toUri().toURL()); +} catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); +} + } +}); + } + + // set any Hadoop properties specified in the s
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451799323 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { +HADOOP_CONF_FILES.forEach( +confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { +try { + addResourceMethod.invoke(path.toUri().toURL()); +} catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); +} + } +}); + } + + // set any Hadoop properties specified in the s
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451799486 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { +return CatalogUtil.buildIcebergCatalog( +config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { +Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); +if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); +} + +if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; +} + +try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { +HADOOP_CONF_FILES.forEach( +confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { +try { + addResourceMethod.invoke(path.toUri().toURL()); +} catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); +} + } +}); + } + + // set any Hadoop properties specified in the s
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451800122 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java: ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; + +public class PartitionedAppendWriter extends PartitionedFanoutWriter { Review Comment: Should this live in `data` since it is using Iceberg's generic `Record` class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451800555 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java: ## @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toList; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.temporal.Temporal; +import java.util.Base64; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.kafka.connect.data.Struct; + +public class RecordConverter { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final DateTimeFormatter OFFSET_TS_FMT = + new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .appendOffset("+HHmm", "Z") + .toFormatter(); + + private final Schema tableSchema; + private final NameMapping nameMapping; + private final IcebergSinkConfig config; + private final Map> structNameMap = Maps.newHashMap(); + + public RecordConverter(Table table, IcebergSinkConfig config) { Review Comment: Looks like the purpose of this is to create a new Iceberg generic `Record` from Kafka's object model (`Struct`). Is that needed? In Flink and Spark, we use writers that are adapted to the in-memory object model for those engines to avoid copying a record and then writing. I'm not familiar enough the the KC object model to know whether this is needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451800701 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.kafka.connect.sink.SinkRecord; + +public interface RecordWriter extends Cloneable { + + default void write(SinkRecord record) {} + + default List complete() { Review Comment: This default implementation seems dangerous to me. Why not force the implementation to provide `complete` and `close` to ensure that they are implemented and not overlooked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451800872 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.Map; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types.StructType; + +public class RecordWrapper implements Record { Review Comment: What is the purpose of this class? Why would you not just use the underlying `Record` directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451801429 ## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java: ## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static java.util.stream.Collectors.toList; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; +import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkConfig extends AbstractConfig { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkConfig.class.getName()); + + public static final String INTERNAL_TRANSACTIONAL_SUFFIX_PROP = + "iceberg.coordinator.transactional.suffix"; + private static final String ROUTE_REGEX = "route-regex"; + private static final String ID_COLUMNS = "id-columns"; + private static final String PARTITION_BY = "partition-by"; + private static final String COMMIT_BRANCH = "commit-branch"; + + private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; + private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; + private static final String KAFKA_PROP_PREFIX = "iceberg.kafka."; + private static final String TABLE_PROP_PREFIX = "iceberg.table."; + private static final String AUTO_CREATE_PROP_PREFIX = "iceberg.tables.auto-create-props."; + private static final String WRITE_PROP_PREFIX = "iceberg.table.write-props."; + + private static final String CATALOG_NAME_PROP = "iceberg.catalog"; + private static final String TABLES_PROP = "iceberg.tables"; + private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; + private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field"; + private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch"; + private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns"; + private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by"; + private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdc-field"; + private static final String TABLES_UPSERT_MODE_ENABLED_PROP = + "iceberg.tables.upsert-mode-enabled"; + private static final String TABLES_AUTO_CREATE_ENABLED_PROP = + "iceberg.tables.auto-create-enabled"; + private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP = + "iceberg.tables.evolve-schema-enabled"; + private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP = + "iceberg.tables.schema-force-optional"; + private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = + "iceberg.tables.schema-case-insensitive"; + private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; + private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id"; + private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; + private static final int COMMIT_INTERVAL_MS_DEFAULT = 30
Re: [PR] Kafka Connect: Sink connector with data writers and converters [iceberg]
rdblue commented on code in PR #9466: URL: https://github.com/apache/iceberg/pull/9466#discussion_r1451801651 ## kafka-connect/build.gradle: ## @@ -30,3 +30,30 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect-events") { useJUnitPlatform() } } + +project(":iceberg-kafka-connect:iceberg-kafka-connect") { Review Comment: Why the duplicate name? Could this be `:iceberg-kafka-connect:iceberg-sink`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] AES GCM Stream changes [iceberg]
rdblue merged PR #9453: URL: https://github.com/apache/iceberg/pull/9453 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Avro data encryption [iceberg]
rdblue commented on PR #9436: URL: https://github.com/apache/iceberg/pull/9436#issuecomment-1891079101 Looks like this is ready to commit when it is rebased on top of the AES GCM stream changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Parquet: Add system config for unsafe Parquet ID fallback. [iceberg]
rdblue commented on PR #9324: URL: https://github.com/apache/iceberg/pull/9324#issuecomment-1891081451 @jackye1995, @danielcweeks, @RussellSpitzer, could you look at this? I'd like to ideally get it into the next release since we have been allowing unsafe reads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451809069 ## pyiceberg/io/pyarrow.py: ## @@ -1565,13 +1564,54 @@ def fill_parquet_file_metadata( del upper_bounds[field_id] del null_value_counts[field_id] -df.file_format = FileFormat.PARQUET df.record_count = parquet_metadata.num_rows -df.file_size_in_bytes = file_size df.column_sizes = column_sizes df.value_counts = value_counts df.null_value_counts = null_value_counts df.nan_value_counts = nan_value_counts df.lower_bounds = lower_bounds df.upper_bounds = upper_bounds df.split_offsets = split_offsets + + +def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +task = next(tasks) + +try: +_ = next(tasks) +# If there are more tasks, raise an exception +raise ValueError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208";) +except StopIteration: +pass + +df = task.df + +file_path = f'{table.location()}/data/{_generate_datafile_filename("parquet")}' +file_schema = schema_to_pyarrow(table.schema()) + +collected_metrics: List[pq.FileMetaData] = [] +fo = table.io.new_output(file_path) +with fo.create() as fos: +with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: +writer.write_table(df) + +df = DataFile( +content=DataFileContent.DATA, +file_path=file_path, +file_format=FileFormat.PARQUET, +partition=Record(), +record_count=len(df), +file_size_in_bytes=len(fo), +# Just copy these from the table for now +sort_order_id=table.sort_order().order_id, +spec_id=table.spec().spec_id, +equality_ids=table.schema().identifier_field_ids, +key_metadata=None, +) +fill_parquet_file_metadata( +df=df, +parquet_metadata=collected_metrics[0], Review Comment: So if Arrow decides to write multiple files, there will be one entry per file in this list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451829404 ## pyiceberg/table/__init__.py: ## @@ -797,6 +850,9 @@ def location(self) -> str: def last_sequence_number(self) -> int: return self.metadata.last_sequence_number +def next_sequence_number(self) -> int: +return self.last_sequence_number + 1 if self.metadata.format_version > 1 else INITIAL_SEQUENCE_NUMBER Review Comment: Why have both `next_sequence_number` and `_next_sequence_number` just below this? They also have slightly different logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451831352 ## pyiceberg/table/__init__.py: ## @@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]: def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) +def append(self, df: pa.Table) -> None: +if len(self.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +snapshot_id = self.new_snapshot_id() + +data_files = _dataframe_to_data_files(self, df=df) +merge = _MergeAppend(operation=Operation.APPEND, table=self, snapshot_id=snapshot_id) +for data_file in data_files: +merge.append_datafile(data_file) Review Comment: Minor: looks like we have some inconsistency in naming. The method uses `datafile` as a single word, while the variable (and other places) use two words, `data_file`. Not a big deal, but I generally prefer using the same form in all cases so it's predictable whenever possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] How to detect if the partition's data is ready to consume [iceberg]
github-actions[bot] closed issue #6725: How to detect if the partition's data is ready to consume URL: https://github.com/apache/iceberg/issues/6725 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451832048 ## pyiceberg/table/__init__.py: ## @@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]: def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) +def append(self, df: pa.Table) -> None: +if len(self.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +snapshot_id = self.new_snapshot_id() + +data_files = _dataframe_to_data_files(self, df=df) +merge = _MergeAppend(operation=Operation.APPEND, table=self, snapshot_id=snapshot_id) +for data_file in data_files: +merge.append_datafile(data_file) + +if current_snapshot := self.current_snapshot(): +for manifest in current_snapshot.manifests(io=self.io): +for entry in manifest.fetch_manifest_entry(io=self.io): +merge.append_datafile(entry.data_file, added=False) Review Comment: I think that the `_MergeAppend` should be responsible for handling the existing data. It doesn't make sense to me that an append operation would require the caller to re-add the data files that were in the table already. That puts too much on the caller, which should just add files and not worry about existing data or state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Add checkstyle rule to ensure AssertJ assertions always check for underlying exception message [iceberg]
github-actions[bot] commented on issue #7040: URL: https://github.com/apache/iceberg/issues/7040#issuecomment-1891124126 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] How to detect if the partition's data is ready to consume [iceberg]
github-actions[bot] commented on issue #6725: URL: https://github.com/apache/iceberg/issues/6725#issuecomment-1891124143 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] [DOC] Reorder pages under Spark in the nav bar [iceberg]
github-actions[bot] commented on issue #6724: URL: https://github.com/apache/iceberg/issues/6724#issuecomment-1891124149 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451832516 ## pyiceberg/table/__init__.py: ## @@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]: def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) +def append(self, df: pa.Table) -> None: +if len(self.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +snapshot_id = self.new_snapshot_id() + +data_files = _dataframe_to_data_files(self, df=df) +merge = _MergeAppend(operation=Operation.APPEND, table=self, snapshot_id=snapshot_id) +for data_file in data_files: +merge.append_datafile(data_file) + +if current_snapshot := self.current_snapshot(): +for manifest in current_snapshot.manifests(io=self.io): +for entry in manifest.fetch_manifest_entry(io=self.io): Review Comment: I can see why you'd want to merge all of the manifests for a PyIceberg table because we're assuming that the table is small and written only by PyIceberg. But if this code ever attempts to append to a larger table, it's going to take a really long time because it will rewrite all table metadata. I think this should take the `FastAppend` approach at first and create a new manifest for the new data files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451833666 ## pyiceberg/io/pyarrow.py: ## @@ -1565,13 +1565,56 @@ def fill_parquet_file_metadata( del upper_bounds[field_id] del null_value_counts[field_id] -df.file_format = FileFormat.PARQUET -df.record_count = parquet_metadata.num_rows -df.file_size_in_bytes = file_size -df.column_sizes = column_sizes -df.value_counts = value_counts -df.null_value_counts = null_value_counts -df.nan_value_counts = nan_value_counts -df.lower_bounds = lower_bounds -df.upper_bounds = upper_bounds -df.split_offsets = split_offsets +data_file.record_count = parquet_metadata.num_rows +data_file.column_sizes = column_sizes +data_file.value_counts = value_counts +data_file.null_value_counts = null_value_counts +data_file.nan_value_counts = nan_value_counts +data_file.lower_bounds = lower_bounds +data_file.upper_bounds = upper_bounds +data_file.split_offsets = split_offsets + + +def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +task = next(tasks) + +try: +_ = next(tasks) +# If there are more tasks, raise an exception +raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208";) +except StopIteration: +pass + +file_path = f'{table.location()}/data/{task.generate_datafile_filename("parquet")}' +file_schema = schema_to_pyarrow(table.schema()) + +collected_metrics: List[pq.FileMetaData] = [] +fo = table.io.new_output(file_path) +with fo.create() as fos: Review Comment: I think this should call `overwrite=True` to avoid the `exists` check in `create`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451834512 ## pyiceberg/io/pyarrow.py: ## @@ -1565,13 +1564,54 @@ def fill_parquet_file_metadata( del upper_bounds[field_id] del null_value_counts[field_id] -df.file_format = FileFormat.PARQUET df.record_count = parquet_metadata.num_rows -df.file_size_in_bytes = file_size df.column_sizes = column_sizes df.value_counts = value_counts df.null_value_counts = null_value_counts df.nan_value_counts = nan_value_counts df.lower_bounds = lower_bounds df.upper_bounds = upper_bounds df.split_offsets = split_offsets + + +def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +task = next(tasks) + +try: +_ = next(tasks) +# If there are more tasks, raise an exception +raise ValueError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208";) +except StopIteration: +pass + +df = task.df + +file_path = f'{table.location()}/data/{_generate_datafile_filename("parquet")}' +file_schema = schema_to_pyarrow(table.schema()) + +collected_metrics: List[pq.FileMetaData] = [] +fo = table.io.new_output(file_path) +with fo.create() as fos: +with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: +writer.write_table(df) + +df = DataFile( +content=DataFileContent.DATA, +file_path=file_path, +file_format=FileFormat.PARQUET, +partition=Record(), +record_count=len(df), +file_size_in_bytes=len(fo), +# Just copy these from the table for now +sort_order_id=table.sort_order().order_id, +spec_id=table.spec().spec_id, Review Comment: Since `write_file` is a public method, we can't guarantee that the caller did this check. I agree that it is safe when called from `append` or `overwrite`, but a caller could use this method directly to create a data file for a partitioned table right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451834512 ## pyiceberg/io/pyarrow.py: ## @@ -1565,13 +1564,54 @@ def fill_parquet_file_metadata( del upper_bounds[field_id] del null_value_counts[field_id] -df.file_format = FileFormat.PARQUET df.record_count = parquet_metadata.num_rows -df.file_size_in_bytes = file_size df.column_sizes = column_sizes df.value_counts = value_counts df.null_value_counts = null_value_counts df.nan_value_counts = nan_value_counts df.lower_bounds = lower_bounds df.upper_bounds = upper_bounds df.split_offsets = split_offsets + + +def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +task = next(tasks) + +try: +_ = next(tasks) +# If there are more tasks, raise an exception +raise ValueError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208";) +except StopIteration: +pass + +df = task.df + +file_path = f'{table.location()}/data/{_generate_datafile_filename("parquet")}' +file_schema = schema_to_pyarrow(table.schema()) + +collected_metrics: List[pq.FileMetaData] = [] +fo = table.io.new_output(file_path) +with fo.create() as fos: +with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: +writer.write_table(df) + +df = DataFile( +content=DataFileContent.DATA, +file_path=file_path, +file_format=FileFormat.PARQUET, +partition=Record(), +record_count=len(df), +file_size_in_bytes=len(fo), +# Just copy these from the table for now +sort_order_id=table.sort_order().order_id, +spec_id=table.spec().spec_id, Review Comment: Since `write_file` is a public method, we can't guarantee that the caller did this check. I agree that it is safe when called from `append` or `overwrite`, but a caller could use this method directly to create a data file for a partitioned table right? Wouldn't it be easy to just pass the spec ID and partition tuple (an empty `Record`) through `WriteTask` for now? I think it would make sense if a `WriteTask` were for a single partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451835863 ## pyiceberg/table/__init__.py: ## @@ -831,6 +887,46 @@ def history(self) -> List[SnapshotLogEntry]: def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) +def append(self, df: pa.Table) -> None: +if len(self.spec().fields) > 0: +raise ValueError("Cannot write to partitioned tables") + +snapshot_id = self.new_snapshot_id() + +data_files = _dataframe_to_data_files(self, df=df) +merge = _MergeAppend(operation=Operation.APPEND, table=self, snapshot_id=snapshot_id) +for data_file in data_files: +merge.append_datafile(data_file) + +if current_snapshot := self.current_snapshot(): +for manifest in current_snapshot.manifests(io=self.io): +for entry in manifest.fetch_manifest_entry(io=self.io): +merge.append_datafile(entry.data_file, added=False) + +merge.commit() + +def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None: Review Comment: Should these methods have docstrings? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451836112 ## pyiceberg/table/__init__.py: ## @@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_datafile_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: Review Comment: I think the intent is for this to eventually support partitioning, so I think it would make sense to check `table.spec()` is unpartitioned. Good to be more defensive and open things up later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451836277 ## pyiceberg/table/__init__.py: ## @@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_datafile_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: Review Comment: Nit: this produces a file name, but the method above produces a full path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451836798 ## pyiceberg/table/__init__.py: ## @@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_datafile_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_datafiles: List[DataFile] +_existing_datafiles: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_datafiles = [] +self._existing_datafiles = [] +self._commit_uuid = uuid.uuid4() + +def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: Review Comment: As I noted above, I don't think the caller should be responsible for adding the existing data files. I also think the cleanest way to implement simple appends is a fast append -- create a new manifest and preserve the existing ones. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451837508 ## pyiceberg/table/__init__.py: ## @@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_datafile_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_datafiles: List[DataFile] +_existing_datafiles: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_datafiles = [] +self._existing_datafiles = [] +self._commit_uuid = uuid.uuid4() + +def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: +if added: +self._added_datafiles.append(data_file) +else: +self._existing_datafiles.append(data_file) +return self + +def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: +ssc = SnapshotSummaryCollector() +manifests = [] + +if self._added_datafiles: +output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) +with write_manifest( +format_version=self._table.format_version, +spec=self._table.spec(), +schema=self._table.schema(), +output_file=self._table.io.new_output(output_file_location), +snapshot_id=self._snapshot_id, +) as writer: +for data_file in self._added_datafiles + self._existing_datafiles: +writer.add_entry( +ManifestEntry( +status=ManifestEntryStatus.ADDED, Review Comment: This status isn't correct for the existing data files. Those should use `EXISTING` and should preserve the original snapshot ID in which they were added to the table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451837579 ## pyiceberg/table/__init__.py: ## @@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_datafile_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_datafiles: List[DataFile] +_existing_datafiles: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_datafiles = [] +self._existing_datafiles = [] +self._commit_uuid = uuid.uuid4() + +def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: +if added: +self._added_datafiles.append(data_file) +else: +self._existing_datafiles.append(data_file) +return self + +def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: +ssc = SnapshotSummaryCollector() +manifests = [] + +if self._added_datafiles: +output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) +with write_manifest( +format_version=self._table.format_version, +spec=self._table.spec(), +schema=self._table.schema(), +output_file=self._table.io.new_output(output_file_location), +snapshot_id=self._snapshot_id, +) as writer: +for data_file in self._added_datafiles + self._existing_datafiles: +writer.add_entry( +ManifestEntry( +status=ManifestEntryStatus.ADDED, +snapshot_id=self._snapshot_id, +data_sequence_number=None, +file_sequence_number=None, Review Comment: Both sequence numbers should be preserved for existing data files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451837941 ## pyiceberg/table/__init__.py: ## @@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_datafile_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_datafiles: List[DataFile] +_existing_datafiles: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_datafiles = [] +self._existing_datafiles = [] +self._commit_uuid = uuid.uuid4() + +def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: +if added: +self._added_datafiles.append(data_file) +else: +self._existing_datafiles.append(data_file) +return self + +def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: +ssc = SnapshotSummaryCollector() +manifests = [] + +if self._added_datafiles: +output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) +with write_manifest( +format_version=self._table.format_version, +spec=self._table.spec(), +schema=self._table.schema(), +output_file=self._table.io.new_output(output_file_location), +snapshot_id=self._snapshot_id, +) as writer: +for data_file in self._added_datafiles + self._existing_datafiles: +writer.add_entry( +ManifestEntry( +status=ManifestEntryStatus.ADDED, +snapshot_id=self._snapshot_id, +data_sequence_number=None, +file_sequence_number=None, +data_file=data_file, +) +) + +for data_file in self._added_datafiles: +ssc.add_file(data_file=data_file) + +manifests.append(writer.to_manifest_file()) + +return ssc.build(), manifests Review Comment: I don't think that this method needs to be responsible for both updating the snapshot summary and for writing manifest files. It's easy enough to update the summary in a separate method, especially since it is mostly handled independently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451838116 ## pyiceberg/table/__init__.py: ## @@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_datafile_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_datafiles: List[DataFile] +_existing_datafiles: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_datafiles = [] +self._existing_datafiles = [] +self._commit_uuid = uuid.uuid4() + +def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: +if added: +self._added_datafiles.append(data_file) +else: +self._existing_datafiles.append(data_file) +return self + +def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: +ssc = SnapshotSummaryCollector() +manifests = [] + +if self._added_datafiles: +output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) +with write_manifest( +format_version=self._table.format_version, +spec=self._table.spec(), +schema=self._table.schema(), +output_file=self._table.io.new_output(output_file_location), +snapshot_id=self._snapshot_id, +) as writer: +for data_file in self._added_datafiles + self._existing_datafiles: +writer.add_entry( +ManifestEntry( +status=ManifestEntryStatus.ADDED, +snapshot_id=self._snapshot_id, +data_sequence_number=None, +file_sequence_number=None, +data_file=data_file, +) +) + +for data_file in self._added_datafiles: +ssc.add_file(data_file=data_file) + +manifests.append(writer.to_manifest_file()) + +return ssc.build(), manifests + +def commit(self) -> Snapshot: +new_summary, manifests = self._manifests() + +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None +summary = update_snapshot_summaries( +summary=Summary(operation=self._operation, **new_summary), +previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, +truncate_full_table=self._operation == Operation.OVERWRITE, +) + +manifest_list_filename = _generate_manifest_list_filename( +snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid Review Comment: There are currently no retries? -- This is an automated message from the Apache Git Service. To respond to the message, plea
Re: [PR] Write support [iceberg-python]
rdblue commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1451838385 ## pyiceberg/table/__init__.py: ## @@ -1910,3 +2006,137 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: +write_uuid: uuid.UUID +task_id: int +df: pa.Table +sort_order_id: Optional[int] = None + +# Later to be extended with partition information + +def generate_datafile_filename(self, extension: str) -> str: +# Mimics the behavior in the Java API: +# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 +return f"0-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: +return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +# Mimics the behavior in Java: +# https://github.com/apache/iceberg/blob/c862b9177af8e2d8310764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 +return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: +from pyiceberg.io.pyarrow import write_file + +write_uuid = uuid.uuid4() +counter = itertools.count(0) + +# This is an iter, so we don't have to materialize everything every time +# This will be more relevant when we start doing partitioned writes +yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: +_operation: Operation +_table: Table +_snapshot_id: int +_parent_snapshot_id: Optional[int] +_added_datafiles: List[DataFile] +_existing_datafiles: List[DataFile] +_commit_uuid: uuid.UUID + +def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: +self._operation = operation +self._table = table +self._snapshot_id = snapshot_id +# Since we only support the main branch for now +self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None +self._added_datafiles = [] +self._existing_datafiles = [] +self._commit_uuid = uuid.uuid4() + +def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: +if added: +self._added_datafiles.append(data_file) +else: +self._existing_datafiles.append(data_file) +return self + +def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: +ssc = SnapshotSummaryCollector() +manifests = [] + +if self._added_datafiles: +output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) +with write_manifest( +format_version=self._table.format_version, +spec=self._table.spec(), +schema=self._table.schema(), +output_file=self._table.io.new_output(output_file_location), +snapshot_id=self._snapshot_id, +) as writer: +for data_file in self._added_datafiles + self._existing_datafiles: +writer.add_entry( +ManifestEntry( +status=ManifestEntryStatus.ADDED, +snapshot_id=self._snapshot_id, +data_sequence_number=None, +file_sequence_number=None, +data_file=data_file, +) +) + +for data_file in self._added_datafiles: +ssc.add_file(data_file=data_file) + +manifests.append(writer.to_manifest_file()) + +return ssc.build(), manifests + +def commit(self) -> Snapshot: +new_summary, manifests = self._manifests() + +previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None +summary = update_snapshot_summaries( +summary=Summary(operation=self._operation, **new_summary), +previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, +truncate_full_table=self._operation == Operation.OVERWRITE, +) + +manifest_list_filename = _generate_manifest_list_filename( +snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid +) +manifest_list_file_path = f'{self._table.location()}/metadata/{manifest_list_filename}' +with write_manifest_list( +
Re: [I] On iceberg11 getting s3 connection reset error [iceberg]
javrasya commented on issue #4457: URL: https://github.com/apache/iceberg/issues/4457#issuecomment-1891155838 I am using Flink to consume from a table which gets upserts and I am getting this error too. Tried tweaking http client socket timeouts and stuff but nothing has worked so for for me. How did reducing the number of partitions with Spark helped here? Was it maybe S3 in your case which was closing the socket 🤔 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Spark: Fix SparkTable to use name and effective snapshotID for comparing [iceberg]
wooyeong commented on PR #9455: URL: https://github.com/apache/iceberg/pull/9455#issuecomment-1891165135 @ajantha-bhat gentle ping, could you review this change, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Added error handling and default logic for Flink version detection [iceberg]
stevenzwu commented on code in PR #9452: URL: https://github.com/apache/iceberg/pull/9452#discussion_r1451920250 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java: ## @@ -19,15 +19,31 @@ package org.apache.iceberg.flink.util; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class FlinkPackage { - /** Choose {@link DataStream} class because it is one of the core Flink API. */ - private static final String VERSION = DataStream.class.getPackage().getImplementationVersion(); + + public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN"; private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { -return VERSION; +try { + String version = getVersionFromJar(); + /* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), then the best we can do is say it's + unknown + */ + return version != null ? version : FLINK_UNKNOWN_VERSION; Review Comment: +1. this method can be used to initialize the static `VERSION` variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] kerberos beeline insert iceberg fail error: Job commit failed: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore [iceberg]
xiaolan-bit opened a new issue, #9475: URL: https://github.com/apache/iceberg/issues/9475 ### Apache Iceberg version 1.3.1 ### Query engine Hive ### Please describe the bug 🐞 *version: hive-3.1.3 iceberg-1.3.1 kerberos-1.15.1 hadoop-3.3.6 user: hadoop *hive.server2.transport.mode: binary *hive.server2.thrift.port: 1 shell: kinit -kt hadoop.keytab hadoop/h...@hadoop.com beeline !connect jdbc:hive2://host:1/default;httpPath=cliservice;principal=hadoop/h...@hadoop.com beeline sql: create database iceberg_hive_example; use iceberg_hive_example; create table test_table_hive1( id int, name string, age int) partitioned by (dt string) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'; set hive.execution.engine=mr; add jar /hive/lib/iceberg-hive-runtime-1.3.1.jar; add jar /hive/lib/libfb303-0.9.3.jar; insert into test_table_hive1 values (1,"hivezs",18,"20231204"); error appear:  the error as follow: Job commit failed: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:84) at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34) at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125) at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56) at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122) at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:158) at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97) at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80) at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47) at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:124) at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:111) at org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.commitTable(HiveIcebergOutputCommitter.java:320) at org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.lambda$commitJob$2(HiveIcebergOutputCommitter.java:214) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.commitJob(HiveIcebergOutputCommitter.java:207) at org.apache.hadoop.mapred.OutputCommitter.commitJob(OutputCommitter.java:291) at org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.handleJobCommit(CommitterEventHandler.java:286) at org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.run(CommitterEventHandler.java:238) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:86) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:95) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:119) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:112) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60) at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72) at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:185) at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63) ... 24 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCon
[I] flink has implemented the delete and update syntax support in batch mode in a later version. Will the iceberg community implement this feature [iceberg]
BlackPigHe opened a new issue, #9476: URL: https://github.com/apache/iceberg/issues/9476 ### Feature Request / Improvement flink has implemented the delete and update syntax support in batch mode in a later version. Will the iceberg community implement this feature.If iceberg is interested in implementing this feature, I can provide commit here ### Query engine Flink -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Fix ParallelIterable memory leak because queue continues to be added even if iterator exited [iceberg]
Heltman commented on PR #9402: URL: https://github.com/apache/iceberg/pull/9402#issuecomment-1891512061 @findepi @electrum cc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org