petartushev opened a new issue, #11997: URL: https://github.com/apache/iceberg/issues/11997
### Query engine I'm using spark as a processing engine. ### Question I have the following docker compose file: ``` version: "3" services: spark-iceberg: image: tabulario/spark-iceberg container_name: spark-iceberg build: spark/ networks: iceberg_net: depends_on: - rest - minio volumes: - ./warehouse:/home/iceberg/warehouse - ./notebooks:/home/iceberg/notebooks/notebooks environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 ports: - 8888:8888 - 8080:8080 - 10000:10000 - 10001:10001 zookeeper: image: wurstmeister/zookeeper:latest expose: - "2181" kafka: image: wurstmeister/kafka:2.13-2.8.1 depends_on: - zookeeper ports: - "9092:9092" expose: - "9093" environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE postgres: image: postgres:16 container_name: pg-catalog-store-spark networks: iceberg_net: ports: - "5432:5432" environment: - POSTGRES_USER=iceberg_postgres_user - POSTGRES_PASSWORD=iceberg_postgres_password - POSTGRES_DB=iceberg volumes: - pg_catalog_volume_spark:/var/lib/postgresql/data rest: image: tabulario/iceberg-rest container_name: iceberg-rest-spark networks: iceberg_net: ports: - 8181:8181 environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=eu-west-1 - CATALOG_WAREHOUSE=s3://warehouse/ - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO - CATALOG_CATALOG_IMPL=org.apache.iceberg.jdbc.JdbcCatalog - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg - CATALOG_JDBC_USER=iceberg_postgres_user - CATALOG_JDBC_PASSWORD=iceberg_postgres_password - CATALOG_S3_ENDPOINT=http://minio:9000 - CATALOG_S3_PATH__STYLE_ACCESS=true depends_on: - postgres - minio minio: image: minio/minio container_name: minio-store-spark environment: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password - MINIO_DOMAIN=minio networks: iceberg_net: aliases: - warehouse.minio ports: - 9001:9001 - 9000:9000 command: ["server", "/data", "--console-address", ":9001"] mc: depends_on: - minio image: minio/mc container_name: mc-spark networks: iceberg_net: environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=eu-west-1 entrypoint: > /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb --ignore-existing minio/warehouse; /usr/bin/mc mb --ignore-existing minio/metadata; /usr/bin/mc policy set public minio/warehouse; /usr/bin/mc policy set public minio/metadata; tail -f /dev/null " volumes: pg_catalog_volume_spark: networks: iceberg_net: ``` What I'm trying to do is stream some data from a python file using kafka as a broker, and then use spark to do something to that data, and then write it to a iceberg table. I am however having trouble appending some data to a iceberg table. Here is my code: ``` package src; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; public class Sandbox { public static void main(String[] args) { // Initialize Spark Session with Iceberg REST catalog configuration SparkSession spark = SparkSession.builder() .appName("IcebergCatalogExample") .master("local[*]") // Use local mode for testing .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.rest.type", "rest") .config("spark.sql.catalog.rest.uri", "http://localhost:8181") // REST catalog endpoint .config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config("spark.sql.catalog.rest.client-type", "rest") .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") // MinIO endpoint .config("spark.hadoop.fs.s3a.access.key", "admin") .config("spark.hadoop.fs.s3a.secret.key", "password") .config("spark.hadoop.fs.s3a.path.style.access", "true") // Required for MinIO .config("spark.hadoop.fs.s3a.endpoint.region", "eu-west-1") // Use the region of your MinIO deployment .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") // S3A implementation .getOrCreate(); // Define the namespace and table name String namespace = "example_namespace"; String tableName = "example_table"; // Define the schema Schema schema = new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), Types.NestedField.required(2, "name", Types.StringType.get()), Types.NestedField.optional(3, "email", Types.StringType.get()), Types.NestedField.optional(4, "age", Types.IntegerType.get()) ); // Create the table if it doesn't exist String fullTableName = String.format("rest.%s.%s", namespace, tableName); try { // Create the table spark.sql( String.format( "CREATE TABLE IF NOT EXISTS %s " + "(id BIGINT, name STRING, email STRING, age INT) " + "USING iceberg " + "PARTITIONED BY (age)", fullTableName ) ); System.out.println("Table created: " + fullTableName); // Insert example data Dataset<Row> data = spark.sql("SELECT 1 AS id, 'John Doe' AS name, 'john....@example.com' AS email, 30 AS age"); data.writeTo(fullTableName).append(); // Read and display the data Dataset<Row> tableData = spark.read().format("iceberg").load(fullTableName); tableData.show(); } catch (Exception e) { System.err.println("Error while creating or accessing the table: " + e.getMessage()); e.printStackTrace(); } // Stop the Spark session spark.stop(); } } ``` The endpoint `minio` is modified because this is being run as a standalone cluster, outside of the cluster that is defined in the docker compose. When I run this, I get the error: ``` software.amazon.awssdk.services.s3.model.S3Exception: The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint. (Service: S3, Status Code: 301, Request ID: 332NMTVVN450E0A2, Extended Request ID: MvTvqnln5tD/O3N1B4iyBc+8bzR0BiNro2Frh5/NH09KiZAHUj/UbOMtYZUdBWvFjlkAX0RMSzOF+Ik/SRnltA5H5P1dciD8zzUq0O68dKU=) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93) at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:74) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:43) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:79) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:41) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.executeRequest(RetryableStage2.java:93) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:56) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:36) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:53) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:35) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:82) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:62) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:43) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:210) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53) at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:10628) at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:444) at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:270) at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:256) at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38) at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1204) at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:257) at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82) at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:122) at org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:147) at org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:32) at org.apache.iceberg.io.FanoutWriter.closeWriters(FanoutWriter.java:81) at org.apache.iceberg.io.FanoutWriter.close(FanoutWriter.java:73) at org.apache.iceberg.io.FanoutDataWriter.close(FanoutDataWriter.java:31) at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.close(SparkWrite.java:804) at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.commit(SparkWrite.java:786) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:475) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` I've found similar/identical problems here: https://stackoverflow.com/questions/77099097/error-when-reading-s3-data-with-spark-using-iceberg-the-bucket-you-are-attempt https://stackoverflow.com/questions/78846530/error-when-append-data-to-iceberg-using-spark But I cannot figure out what I am doing wrong. Any help is appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org