knowxyz commented on issue #12802:
URL: https://github.com/apache/iceberg/issues/12802#issuecomment-2806998151

   Code i am trying
   # Databricks notebook source
   #Mianly used variables
   from pyspark.sql import SparkSession
   # Azure Storage Account Details
   container_name = "csv"
   mount_point = "/mnt/icebergdata8" # Choose a mount point
   mount_point= "dbfs:/FileStore/tables"
   storage_account_name = "abc"
   storage_account_key = "zZHI+AStlGv+3w=="
   
   file_name = "data.csv"
   catalog = "catalog5"
   store = "default"
   table = "sampleoutput24"
   warehouseName ="warehousenew9"
   icebergtable_name = catalog + "." + store + "." + table
   tempview ="tempviewice12"
   # Initialize Spark Session (if not already initialized)
   spark = SparkSession.builder \
       .appName("IcebergSave") \
       .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
       .config("spark.sql.catalog.catalog5", 
"org.apache.iceberg.spark.SparkCatalog") \
       .config("spark.sql.catalog.catalog5.type", "hadoop") \
       .config("spark.jars.packages", 
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1, 
org.apache.iceberg:iceberg-azure-bundle:1.7.1")    \
       .config("spark.sql.catalog.catalog5.warehouse", 
f"{mount_point}/{warehouseName}").getOrCreate()
   
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
 storage_account_key)
   spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
   spark.conf.set("spark.sql.iceberg.vectorization.enabled", "false")
   spark.conf.set("spark.databricks.io.cache.enabled","false")
   spark.conf.set("spark.sql.files.useFsCache", "false")
   spark.conf.set("spark.sql.execution.photon.enabled", "false")
   spark.conf.set("spark.sql.catalog.catalog5.io-impl", 
"org.apache.iceberg.azure.AzureBlobStorageIO")
   spark.conf.set("spark.sql.catalog.catalog5.io-impl", 
"org.apache.iceberg.azure.adlsv2.ADLSFileIO")
   
   
   file_path = 
f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{file_name}"
   try:
       dbutils.fs.unmount(mount_point)
       print(f"{mount_point} unmounted successfully.")
   except:
     print("{mount_point} was not mounted or unmounting failed.")
   try:
       #dbutils.fs.mount(
       #    
source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
       #    mount_point=mount_point,
       #    
extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net":
 storage_account_key}
       #)
       print(f"Mounted {container_name} to {mount_point} using Access Key.")
   except Exception as e:
       print(f"Error mounting using Access Key: ")
   print(f"fs.azure.account.key  ready to excute")
   
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
 storage_account_key)
   print(f"fs.azure.account.key  Excuted")
   
   # COMMAND ----------
   
   sc = spark.sparkContext
   print(spark.conf.get("spark.jars.packages"))
   spark.conf.get("spark.sql.catalog.catalog5.warehouse")
   
   
   # COMMAND ----------
   
   #spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
   # Create the Spark configuration
   # Read the CSV file into a Spark DataFrame
   df = spark.read.format("csv").option("header", "true").load(file_path)
   # Show the DataFrame
   #df.show()
   
   # COMMAND ----------
   
   # Write to Iceberg table with partitioning
   #df2 = spark.createDataFrame(df.collect(), df.schema)
   df.createOrReplaceTempView(tempview)
   
   # Query the temporary view using SQL
   sql = f"SELECT * FROM {tempview}"
   resultdf = spark.sql(sql)
   #print(icebergtable_name)
   # only iceberg here
   
#resultdf.write.format("iceberg").mode("overwrite").saveAsTable(icebergtable_name)
   #resultdf.unpersist()
   
   # COMMAND ----------
   
   schema = resultdf.schema
   columns_sql= ""
   columns_sql = ", ".join(f"{field.name} 
{field.dataType.simpleString().upper()}" for field in schema)
   
   query =f"""
   CREATE TABLE {icebergtable_name}({columns_sql}) TBLPROPERTIES(
   'delta.columnMapping.mode' = 'name',
   'delta.enableIcebergCompatV2' = 'true',
   'delta.universalFormat.enabledFormats' = 'iceberg'
   );"""
   #print (query)
   spark.sql(query)
   
   query1 = f"INSERT INTO {icebergtable_name} SELECT * FROM {tempview}"
   #r =spark.sql(query1)
   #print(query1)
   spark.sql(query1)
   dfread = 
spark.read.format("org.apache.iceberg.spark.source.IcebergSource").load(icebergtable_name)
   count = dfread.count()
   #print(count)
   dfread.show()
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to