ms1111 opened a new issue, #10245: URL: https://github.com/apache/iceberg/issues/10245
### Apache Iceberg version 1.5.1 (latest release) ### Query engine Flink ### Please describe the bug 🐞 ADLSFileIO has an AzureProperties object. When ADLS_SHARED_KEY_ACCOUNT_NAME or ADLS_SHARED_KEY_ACCOUNT_KEY are set, AzureProperties creates a StorageSharedKeyCredential in its constructor. StorageSharedKeyCredential is not Serializable, so serialization fails during job startup. If the storage account key is not supplied, DefaultAzureCredential will try to get credentials from the Azure CLI or another source like workload identity. That appears to work, but some environments may require shared key authentication. The serialization error is below: ``` Caused by: java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:322) ... 13 more Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF. at org.apache.flink.streaming.api.graph.StreamConfig.lambda$serializeAllConfigs$1(StreamConfig.java:203) at java.base/java.util.HashMap.forEach(HashMap.java:1421) at org.apache.flink.streaming.api.graph.StreamConfig.serializeAllConfigs(StreamConfig.java:197) at org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:174) at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.io.NotSerializableException: com.azure.storage.common.StorageSharedKeyCredential ``` Sample app to trigger it below. ```java import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.azure.AzureProperties; import org.apache.iceberg.azure.adlsv2.ADLSFileIO; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.types.Types; /** * Run with environment variables: * <ul> * <li>STORAGE_ACCOUNT=storage account name</li> * <li>STORAGE_ACCOUNT_KEY=key</li> * <li>CONTAINER=name of storage container</li> * </ul> */ public class ADLSSharedKeyAuthIssue { public static void main(String[] args) throws Exception { final String storageAccount = System.getenv("STORAGE_ACCOUNT"); final String storageAccountKey = System.getenv("STORAGE_ACCOUNT_KEY"); final String container = System.getenv("CONTAINER"); Map<String, String> options = new HashMap<>(); options.put("warehouse", "abfss://" + container + "@" + storageAccount + ".dfs.core.windows.net"); options.put("uri", "http://localhost:19120/api/v1"); options.put(CatalogProperties.FILE_IO_IMPL, ADLSFileIO.class.getCanonicalName()); options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_NAME, storageAccount); options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_KEY, storageAccountKey); CatalogLoader catalogLoader = CatalogLoader.custom( "flink", options, new Configuration(), CatalogUtil.ICEBERG_CATALOG_NESSIE); Catalog catalog = catalogLoader.loadCatalog(); Schema schema = new Schema( Types.NestedField.required(1, "id", Types.LongType.get())); PartitionSpec spec = PartitionSpec.builderFor(schema).build(); Namespace namespace = Namespace.of("nsname_" + UUID.randomUUID().toString().substring(0, 4)); ((SupportsNamespaces) catalog).createNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "t1"); Table table = catalog.createTable(tableIdentifier, schema, spec); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Row> source = env.fromElements(1) .map(data -> { Row row = new Row(1); row.setField(0, data); return row; }); FlinkSink.forRow(source, FlinkSchemaUtil.toSchema(schema)) .tableLoader(TableLoader.fromCatalog(catalogLoader, tableIdentifier)) .overwrite(true) .append(); env.execute(); } } ``` POM: ```xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>adls-shared-key-auth-issue</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <iceberg.version>1.5.1</iceberg.version> <flink.version>1.18.1</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-core</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-api</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-parquet</artifactId> <version>${iceberg.version}</version> </dependency> <!-- Needed for to load the nessie catalog --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-nessie</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-azure</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-data</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-identity</artifactId> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-storage-file-datalake</artifactId> </dependency> <!-- to be able to create parquet file --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.3</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.14.0</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-column</artifactId> <version>1.13.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-1.18</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-dropwizard</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>com.azure</groupId> <artifactId>azure-sdk-bom</artifactId> <version>1.2.22</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> </project> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org