ms1111 opened a new issue, #10245:
URL: https://github.com/apache/iceberg/issues/10245

   ### Apache Iceberg version
   
   1.5.1 (latest release)
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   ADLSFileIO has an AzureProperties object. When ADLS_SHARED_KEY_ACCOUNT_NAME 
or ADLS_SHARED_KEY_ACCOUNT_KEY are set, AzureProperties creates a 
StorageSharedKeyCredential in its constructor. StorageSharedKeyCredential is 
not Serializable, so serialization fails during job startup.
   
   If the storage account key is not supplied, DefaultAzureCredential will try 
to get credentials from the Azure CLI or another source like workload identity. 
That appears to work, but some environments may require shared key 
authentication.
   
   The serialization error is below:
   ```
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not 
serialize object for key serializedUDF.
        at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:322)
        ... 13 more
   Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
Could not serialize object for key serializedUDF.
        at 
org.apache.flink.streaming.api.graph.StreamConfig.lambda$serializeAllConfigs$1(StreamConfig.java:203)
        at java.base/java.util.HashMap.forEach(HashMap.java:1421)
        at 
org.apache.flink.streaming.api.graph.StreamConfig.serializeAllConfigs(StreamConfig.java:197)
        at 
org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:174)
        at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
        at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
   Caused by: java.io.NotSerializableException: 
com.azure.storage.common.StorageSharedKeyCredential
   ```
   
   Sample app to trigger it below.
   
   ```java
   import java.util.HashMap;
   import java.util.Map;
   import java.util.UUID;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.types.Row;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.iceberg.CatalogProperties;
   import org.apache.iceberg.CatalogUtil;
   import org.apache.iceberg.PartitionSpec;
   import org.apache.iceberg.Schema;
   import org.apache.iceberg.Table;
   import org.apache.iceberg.azure.AzureProperties;
   import org.apache.iceberg.azure.adlsv2.ADLSFileIO;
   import org.apache.iceberg.catalog.Catalog;
   import org.apache.iceberg.catalog.Namespace;
   import org.apache.iceberg.catalog.SupportsNamespaces;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.flink.CatalogLoader;
   import org.apache.iceberg.flink.FlinkSchemaUtil;
   import org.apache.iceberg.flink.TableLoader;
   import org.apache.iceberg.flink.sink.FlinkSink;
   import org.apache.iceberg.types.Types;
   
   /**
    * Run with environment variables:
    * <ul>
    *     <li>STORAGE_ACCOUNT=storage account name</li>
    *     <li>STORAGE_ACCOUNT_KEY=key</li>
    *     <li>CONTAINER=name of storage container</li>
    * </ul>
    */
   public class ADLSSharedKeyAuthIssue {
       public static void main(String[] args) throws Exception {
           final String storageAccount = System.getenv("STORAGE_ACCOUNT");
           final String storageAccountKey = 
System.getenv("STORAGE_ACCOUNT_KEY");
           final String container = System.getenv("CONTAINER");
   
           Map<String, String> options = new HashMap<>();
           options.put("warehouse", "abfss://" + container + "@" + 
storageAccount + ".dfs.core.windows.net");
           options.put("uri", "http://localhost:19120/api/v1";);
           options.put(CatalogProperties.FILE_IO_IMPL, 
ADLSFileIO.class.getCanonicalName());
           options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_NAME, 
storageAccount);
           options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_KEY, 
storageAccountKey);
   
           CatalogLoader catalogLoader = CatalogLoader.custom(
                   "flink",
                   options,
                   new Configuration(),
                   CatalogUtil.ICEBERG_CATALOG_NESSIE);
   
           Catalog catalog = catalogLoader.loadCatalog();
   
           Schema schema = new Schema(
                   Types.NestedField.required(1, "id", Types.LongType.get()));
   
           PartitionSpec spec = PartitionSpec.builderFor(schema).build();
           Namespace namespace = Namespace.of("nsname_" + 
UUID.randomUUID().toString().substring(0, 4));
           ((SupportsNamespaces) catalog).createNamespace(namespace);
           TableIdentifier tableIdentifier = TableIdentifier.of(namespace, 
"t1");
           Table table = catalog.createTable(tableIdentifier, schema, spec);
   
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           DataStream<Row> source = env.fromElements(1)
                   .map(data -> {
                       Row row = new Row(1);
                       row.setField(0, data);
                       return row;
                   });
   
           FlinkSink.forRow(source, FlinkSchemaUtil.toSchema(schema))
                   .tableLoader(TableLoader.fromCatalog(catalogLoader, 
tableIdentifier))
                   .overwrite(true)
                   .append();
   
           env.execute();
       }
   }
   ```
   
   POM:
   ```xml
   <?xml version="1.0" encoding="UTF-8"?>
   <project xmlns="http://maven.apache.org/POM/4.0.0";
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <modelVersion>4.0.0</modelVersion>
   
       <groupId>org.example</groupId>
       <artifactId>adls-shared-key-auth-issue</artifactId>
       <version>1.0-SNAPSHOT</version>
   
       <properties>
           <maven.compiler.source>17</maven.compiler.source>
           <maven.compiler.target>17</maven.compiler.target>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
           <iceberg.version>1.5.1</iceberg.version>
           <flink.version>1.18.1</flink.version>
       </properties>
   
       <dependencies>
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-core</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-api</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-parquet</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
   
           <!-- Needed for to load the nessie catalog  -->
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-nessie</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-azure</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-data</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
   
           <dependency>
               <groupId>com.azure</groupId>
               <artifactId>azure-identity</artifactId>
           </dependency>
   
           <dependency>
               <groupId>com.azure</groupId>
               <artifactId>azure-storage-file-datalake</artifactId>
           </dependency>
   
           <!--  to be able to create parquet file       -->
           <dependency>
               <groupId>org.apache.avro</groupId>
               <artifactId>avro</artifactId>
               <version>1.11.3</version>
           </dependency>
   
           <dependency>
               <groupId>commons-io</groupId>
               <artifactId>commons-io</artifactId>
               <version>2.14.0</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.parquet</groupId>
               <artifactId>parquet-column</artifactId>
               <version>1.13.1</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.hadoop</groupId>
               <artifactId>hadoop-common</artifactId>
               <version>3.4.0</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-flink</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               <artifactId>iceberg-flink-1.18</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-java</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-streaming-java</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-metrics-dropwizard</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-api-java</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-runtime</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-clients</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
       </dependencies>
   
       <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.azure</groupId>
                   <artifactId>azure-sdk-bom</artifactId>
                   <version>1.2.22</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   </project>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to