knowxyz commented on issue #12802: URL: https://github.com/apache/iceberg/issues/12802#issuecomment-2806998151
Code i am trying # Databricks notebook source #Mianly used variables from pyspark.sql import SparkSession # Azure Storage Account Details container_name = "csv" mount_point = "/mnt/icebergdata8" # Choose a mount point mount_point= "dbfs:/FileStore/tables" storage_account_name = "abc" storage_account_key = "zZHI+AStlGv+3w==" file_name = "data.csv" catalog = "catalog5" store = "default" table = "sampleoutput24" warehouseName ="warehousenew9" icebergtable_name = catalog + "." + store + "." + table tempview ="tempviewice12" # Initialize Spark Session (if not already initialized) spark = SparkSession.builder \ .appName("IcebergSave") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.catalog5", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.catalog5.type", "hadoop") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1, org.apache.iceberg:iceberg-azure-bundle:1.7.1") \ .config("spark.sql.catalog.catalog5.warehouse", f"{mount_point}/{warehouseName}").getOrCreate() spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key) spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false") spark.conf.set("spark.sql.iceberg.vectorization.enabled", "false") spark.conf.set("spark.databricks.io.cache.enabled","false") spark.conf.set("spark.sql.files.useFsCache", "false") spark.conf.set("spark.sql.execution.photon.enabled", "false") spark.conf.set("spark.sql.catalog.catalog5.io-impl", "org.apache.iceberg.azure.AzureBlobStorageIO") spark.conf.set("spark.sql.catalog.catalog5.io-impl", "org.apache.iceberg.azure.adlsv2.ADLSFileIO") file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{file_name}" try: dbutils.fs.unmount(mount_point) print(f"{mount_point} unmounted successfully.") except: print("{mount_point} was not mounted or unmounting failed.") try: #dbutils.fs.mount( # source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/", # mount_point=mount_point, # extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key} #) print(f"Mounted {container_name} to {mount_point} using Access Key.") except Exception as e: print(f"Error mounting using Access Key: ") print(f"fs.azure.account.key ready to excute") spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key) print(f"fs.azure.account.key Excuted") # COMMAND ---------- sc = spark.sparkContext print(spark.conf.get("spark.jars.packages")) spark.conf.get("spark.sql.catalog.catalog5.warehouse") # COMMAND ---------- #spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false") # Create the Spark configuration # Read the CSV file into a Spark DataFrame df = spark.read.format("csv").option("header", "true").load(file_path) # Show the DataFrame #df.show() # COMMAND ---------- # Write to Iceberg table with partitioning #df2 = spark.createDataFrame(df.collect(), df.schema) df.createOrReplaceTempView(tempview) # Query the temporary view using SQL sql = f"SELECT * FROM {tempview}" resultdf = spark.sql(sql) #print(icebergtable_name) # only iceberg here #resultdf.write.format("iceberg").mode("overwrite").saveAsTable(icebergtable_name) #resultdf.unpersist() # COMMAND ---------- schema = resultdf.schema columns_sql= "" columns_sql = ", ".join(f"{field.name} {field.dataType.simpleString().upper()}" for field in schema) query =f""" CREATE TABLE {icebergtable_name}({columns_sql}) TBLPROPERTIES( 'delta.columnMapping.mode' = 'name', 'delta.enableIcebergCompatV2' = 'true', 'delta.universalFormat.enabledFormats' = 'iceberg' );""" #print (query) spark.sql(query) query1 = f"INSERT INTO {icebergtable_name} SELECT * FROM {tempview}" #r =spark.sql(query1) #print(query1) spark.sql(query1) dfread = spark.read.format("org.apache.iceberg.spark.source.IcebergSource").load(icebergtable_name) count = dfread.count() #print(count) dfread.show() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org