petartushev opened a new issue, #11997:
URL: https://github.com/apache/iceberg/issues/11997
### Query engine
I'm using spark as a processing engine.
### Question
I have the following docker compose file:
```
version: "3"
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
zookeeper:
image: wurstmeister/zookeeper:latest
expose:
- "2181"
kafka:
image: wurstmeister/kafka:2.13-2.8.1
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS:
INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
postgres:
image: postgres:16
container_name: pg-catalog-store-spark
networks:
iceberg_net:
ports:
- "5432:5432"
environment:
- POSTGRES_USER=iceberg_postgres_user
- POSTGRES_PASSWORD=iceberg_postgres_password
- POSTGRES_DB=iceberg
volumes:
- pg_catalog_volume_spark:/var/lib/postgresql/data
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest-spark
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=eu-west-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_CATALOG_IMPL=org.apache.iceberg.jdbc.JdbcCatalog
- CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg
- CATALOG_JDBC_USER=iceberg_postgres_user
- CATALOG_JDBC_PASSWORD=iceberg_postgres_password
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_S3_PATH__STYLE_ACCESS=true
depends_on:
- postgres
- minio
minio:
image: minio/minio
container_name: minio-store-spark
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc-spark
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=eu-west-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin
password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb --ignore-existing minio/warehouse;
/usr/bin/mc mb --ignore-existing minio/metadata;
/usr/bin/mc policy set public minio/warehouse;
/usr/bin/mc policy set public minio/metadata;
tail -f /dev/null
"
volumes:
pg_catalog_volume_spark:
networks:
iceberg_net:
```
What I'm trying to do is stream some data from a python file using kafka as
a broker, and then use spark to do something to that data, and then write it to
a iceberg table. I am however having trouble appending some data to a iceberg
table. Here is my code:
```
package src;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
public class Sandbox {
public static void main(String[] args) {
// Initialize Spark Session with Iceberg REST catalog configuration
SparkSession spark = SparkSession.builder()
.appName("IcebergCatalogExample")
.master("local[*]") // Use local mode for testing
.config("spark.sql.catalog.rest",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.rest.type", "rest")
.config("spark.sql.catalog.rest.uri",
"http://localhost:8181") // REST catalog endpoint
.config("spark.sql.catalog.rest.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.rest.client-type", "rest")
.config("spark.hadoop.fs.s3a.endpoint",
"http://localhost:9000") // MinIO endpoint
.config("spark.hadoop.fs.s3a.access.key", "admin")
.config("spark.hadoop.fs.s3a.secret.key", "password")
.config("spark.hadoop.fs.s3a.path.style.access", "true") //
Required for MinIO
.config("spark.hadoop.fs.s3a.endpoint.region", "eu-west-1")
// Use the region of your MinIO deployment
.config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") // S3A implementation
.getOrCreate();
// Define the namespace and table name
String namespace = "example_namespace";
String tableName = "example_table";
// Define the schema
Schema schema = new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.required(2, "name",
Types.StringType.get()),
Types.NestedField.optional(3, "email",
Types.StringType.get()),
Types.NestedField.optional(4, "age", Types.IntegerType.get())
);
// Create the table if it doesn't exist
String fullTableName = String.format("rest.%s.%s", namespace,
tableName);
try {
// Create the table
spark.sql(
String.format(
"CREATE TABLE IF NOT EXISTS %s " +
"(id BIGINT, name STRING, email STRING, age INT)
" +
"USING iceberg " +
"PARTITIONED BY (age)",
fullTableName
)
);
System.out.println("Table created: " + fullTableName);
// Insert example data
Dataset<Row> data = spark.sql("SELECT 1 AS id, 'John Doe' AS
name, '[email protected]' AS email, 30 AS age");
data.writeTo(fullTableName).append();
// Read and display the data
Dataset<Row> tableData =
spark.read().format("iceberg").load(fullTableName);
tableData.show();
} catch (Exception e) {
System.err.println("Error while creating or accessing the table:
" + e.getMessage());
e.printStackTrace();
}
// Stop the Spark session
spark.stop();
}
}
```
The endpoint `minio` is modified because this is being run as a standalone
cluster, outside of the cluster that is defined in the docker compose. When I
run this, I get the error:
```
software.amazon.awssdk.services.s3.model.S3Exception: The bucket you are
attempting to access must be addressed using the specified endpoint. Please
send all future requests to this endpoint. (Service: S3, Status Code: 301,
Request ID: 332NMTVVN450E0A2, Extended Request ID:
MvTvqnln5tD/O3N1B4iyBc+8bzR0BiNro2Frh5/NH09KiZAHUj/UbOMtYZUdBWvFjlkAX0RMSzOF+Ik/SRnltA5H5P1dciD8zzUq0O68dKU=)
at
software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
at
software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
at
software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
at
software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
at
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93)
at
software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:74)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:43)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:79)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:41)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.executeRequest(RetryableStage2.java:93)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:56)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:36)
at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:53)
at
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:35)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:82)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:62)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:43)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
at
software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:210)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
at
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
at
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
at
software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:10628)
at
org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:444)
at
org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:270)
at
org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:256)
at
org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1204)
at
org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:257)
at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
at
org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:122)
at
org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:147)
at
org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:32)
at org.apache.iceberg.io.FanoutWriter.closeWriters(FanoutWriter.java:81)
at org.apache.iceberg.io.FanoutWriter.close(FanoutWriter.java:73)
at
org.apache.iceberg.io.FanoutDataWriter.close(FanoutDataWriter.java:31)
at
org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.close(SparkWrite.java:804)
at
org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.commit(SparkWrite.java:786)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:475)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
```
I've found similar/identical problems here:
https://stackoverflow.com/questions/77099097/error-when-reading-s3-data-with-spark-using-iceberg-the-bucket-you-are-attempt
https://stackoverflow.com/questions/78846530/error-when-append-data-to-iceberg-using-spark
But I cannot figure out what I am doing wrong. Any help is appreciated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]