petartushev opened a new issue, #11997:
URL: https://github.com/apache/iceberg/issues/11997

   ### Query engine
   
   I'm using spark as a processing engine.
   
   ### Question
   
   I have the following docker compose file:
   ```
   version: "3"
   
   services:
       spark-iceberg:
           image: tabulario/spark-iceberg
           container_name: spark-iceberg
           build: spark/
           networks:
               iceberg_net:
           depends_on:
               - rest
               - minio
           volumes:
               - ./warehouse:/home/iceberg/warehouse
               - ./notebooks:/home/iceberg/notebooks/notebooks
           environment:
              - AWS_ACCESS_KEY_ID=admin
              - AWS_SECRET_ACCESS_KEY=password
              - AWS_REGION=us-east-1
           ports:
              - 8888:8888
              - 8080:8080
              - 10000:10000
              - 10001:10001
           
       zookeeper:
           image: wurstmeister/zookeeper:latest
           expose:
               - "2181"
   
       kafka:
           image: wurstmeister/kafka:2.13-2.8.1
           depends_on:
               - zookeeper
           ports:
               - "9092:9092"
           expose:
               - "9093"
           environment:
               KAFKA_ADVERTISED_LISTENERS: 
INSIDE://kafka:9093,OUTSIDE://localhost:9092
               KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
               KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
               KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
               KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
   
   
       postgres:
           image: postgres:16
           container_name: pg-catalog-store-spark
           networks:
               iceberg_net:
           ports:
               - "5432:5432"
           environment:
               - POSTGRES_USER=iceberg_postgres_user
               - POSTGRES_PASSWORD=iceberg_postgres_password
               - POSTGRES_DB=iceberg
           volumes:
               - pg_catalog_volume_spark:/var/lib/postgresql/data
   
       rest:
           image: tabulario/iceberg-rest
           container_name: iceberg-rest-spark
           networks:
               iceberg_net:
           ports:
               - 8181:8181
           environment:
               - AWS_ACCESS_KEY_ID=admin
               - AWS_SECRET_ACCESS_KEY=password
               - AWS_REGION=eu-west-1
               - CATALOG_WAREHOUSE=s3://warehouse/
               - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
               - CATALOG_CATALOG_IMPL=org.apache.iceberg.jdbc.JdbcCatalog
               - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg
               - CATALOG_JDBC_USER=iceberg_postgres_user
               - CATALOG_JDBC_PASSWORD=iceberg_postgres_password
               - CATALOG_S3_ENDPOINT=http://minio:9000
               - CATALOG_S3_PATH__STYLE_ACCESS=true
           depends_on:
               - postgres
               - minio
       
       minio:
           image: minio/minio
           container_name: minio-store-spark
           environment:
               - MINIO_ROOT_USER=admin
               - MINIO_ROOT_PASSWORD=password
               - MINIO_DOMAIN=minio
           networks:
               iceberg_net:
                   aliases:
                       - warehouse.minio
           ports:
               - 9001:9001
               - 9000:9000
           command: ["server", "/data", "--console-address", ":9001"]
   
       mc:
           depends_on:
               - minio
           image: minio/mc
           container_name: mc-spark
           networks:
               iceberg_net:
           environment:
               - AWS_ACCESS_KEY_ID=admin
               - AWS_SECRET_ACCESS_KEY=password
               - AWS_REGION=eu-west-1
           entrypoint: >
               /bin/sh -c "
               until (/usr/bin/mc config host add minio http://minio:9000 admin 
password) do echo '...waiting...' && sleep 1; done;
               /usr/bin/mc mb --ignore-existing minio/warehouse;
               /usr/bin/mc mb --ignore-existing minio/metadata;
               /usr/bin/mc policy set public minio/warehouse;
               /usr/bin/mc policy set public minio/metadata;
               tail -f /dev/null
               "
   
   volumes:
       pg_catalog_volume_spark:
   
   networks:
       iceberg_net:
   ```
   What I'm trying to do is stream some data from a python file using kafka as 
a broker, and then use spark to do something to that data, and then write it to 
a iceberg table. I am however having trouble appending some data to a iceberg 
table. Here is my code:
   ```
   package src;
   
   import org.apache.spark.sql.SparkSession;
   import org.apache.spark.sql.Dataset;
   import org.apache.spark.sql.Row;
   import org.apache.iceberg.Schema;
   import org.apache.iceberg.types.Types;
   
   public class Sandbox {
       public static void main(String[] args) {
   
           // Initialize Spark Session with Iceberg REST catalog configuration
           SparkSession spark = SparkSession.builder()
                   .appName("IcebergCatalogExample")
                   .master("local[*]") // Use local mode for testing
                   .config("spark.sql.catalog.rest", 
"org.apache.iceberg.spark.SparkCatalog")
                   .config("spark.sql.catalog.rest.type", "rest")
                   .config("spark.sql.catalog.rest.uri", 
"http://localhost:8181";) // REST catalog endpoint
                   .config("spark.sql.catalog.rest.io-impl", 
"org.apache.iceberg.aws.s3.S3FileIO")
                   .config("spark.sql.catalog.rest.client-type", "rest")
                   .config("spark.hadoop.fs.s3a.endpoint", 
"http://localhost:9000";) // MinIO endpoint
                   .config("spark.hadoop.fs.s3a.access.key", "admin")
                   .config("spark.hadoop.fs.s3a.secret.key", "password")
                   .config("spark.hadoop.fs.s3a.path.style.access", "true") // 
Required for MinIO
                   .config("spark.hadoop.fs.s3a.endpoint.region", "eu-west-1") 
// Use the region of your MinIO deployment
                   .config("spark.hadoop.fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem") // S3A implementation
                   .getOrCreate();
   
           // Define the namespace and table name
           String namespace = "example_namespace";
           String tableName = "example_table";
   
           // Define the schema
           Schema schema = new Schema(
                   Types.NestedField.required(1, "id", Types.LongType.get()),
                   Types.NestedField.required(2, "name", 
Types.StringType.get()),
                   Types.NestedField.optional(3, "email", 
Types.StringType.get()),
                   Types.NestedField.optional(4, "age", Types.IntegerType.get())
           );
   
   
           // Create the table if it doesn't exist
           String fullTableName = String.format("rest.%s.%s", namespace, 
tableName);
           try {
               // Create the table
               spark.sql(
                       String.format(
                               "CREATE TABLE IF NOT EXISTS %s " +
                               "(id BIGINT, name STRING, email STRING, age INT) 
" +
                               "USING iceberg " +
                               "PARTITIONED BY (age)",
                               fullTableName
                       )
               );
               System.out.println("Table created: " + fullTableName);
   
               // Insert example data
               Dataset<Row> data = spark.sql("SELECT 1 AS id, 'John Doe' AS 
name, 'john....@example.com' AS email, 30 AS age");
               data.writeTo(fullTableName).append();
   
               // Read and display the data
               Dataset<Row> tableData = 
spark.read().format("iceberg").load(fullTableName);
               tableData.show();
           } catch (Exception e) {
               System.err.println("Error while creating or accessing the table: 
" + e.getMessage());
               e.printStackTrace();
           }
   
           // Stop the Spark session
           spark.stop();
       }
   }
   
   ```
   The endpoint `minio` is modified because this is being run as a standalone 
cluster, outside of the cluster that is defined in the docker compose. When I 
run this, I get the error:
   ```
   software.amazon.awssdk.services.s3.model.S3Exception: The bucket you are 
attempting to access must be addressed using the specified endpoint. Please 
send all future requests to this endpoint. (Service: S3, Status Code: 301, 
Request ID: 332NMTVVN450E0A2, Extended Request ID: 
MvTvqnln5tD/O3N1B4iyBc+8bzR0BiNro2Frh5/NH09KiZAHUj/UbOMtYZUdBWvFjlkAX0RMSzOF+Ik/SRnltA5H5P1dciD8zzUq0O68dKU=)
        at 
software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
        at 
software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
        at 
software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
        at 
software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
        at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93)
        at 
software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:74)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:43)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:79)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:41)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.executeRequest(RetryableStage2.java:93)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:56)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:36)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:53)
        at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:35)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:82)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:62)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:43)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
        at 
software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:210)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
        at 
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
        at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
        at 
software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:10628)
        at 
org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:444)
        at 
org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:270)
        at 
org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:256)
        at 
org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1204)
        at 
org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:257)
        at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
        at 
org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:122)
        at 
org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:147)
        at 
org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:32)
        at org.apache.iceberg.io.FanoutWriter.closeWriters(FanoutWriter.java:81)
        at org.apache.iceberg.io.FanoutWriter.close(FanoutWriter.java:73)
        at 
org.apache.iceberg.io.FanoutDataWriter.close(FanoutDataWriter.java:31)
        at 
org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.close(SparkWrite.java:804)
        at 
org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.commit(SparkWrite.java:786)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:475)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   I've found similar/identical problems here:
   
https://stackoverflow.com/questions/77099097/error-when-reading-s3-data-with-spark-using-iceberg-the-bucket-you-are-attempt
   
https://stackoverflow.com/questions/78846530/error-when-append-data-to-iceberg-using-spark
   
   But I cannot figure out what I am doing wrong. Any help is appreciated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to