aimenglin commented on issue #2685: URL: https://github.com/apache/iceberg/issues/2685#issuecomment-1847892200
Hi Rym, I understand you're inquiring about writing Iceberg tables using Hive. Based on my experience and experiments, it appears that directly writing to Iceberg tables through Hive is not supported. However, a viable workaround is to create and populate your Iceberg tables using Apache Spark. Once these tables are created and data is inserted via Spark, you can seamlessly read them using Hive. Best, Menglin On Fri, Dec 8, 2023 at 1:24 PM Rym ***@***.***> wrote: > @aimenglin <https://github.com/aimenglin> > @bitsondatadev <https://github.com/bitsondatadev> > @marton-bod <https://github.com/marton-bod> > @dacort <https://github.com/dacort> > @electrum <https://github.com/electrum> > Hello. > > I am using Hive Catalog to create Iceberg tables with Spark as the > execution engine: > > import pyspark > from pyspark.sql import SparkSession > import os > #DEFINE SENSITIVE VARIABLES > HIVE_URI = os.environ.get("HIVE_URI","thrift://hive-metastore:9083") > WAREHOUSE = os.environ.get("WAREHOUSE", "s3a://warehouse/") > AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY", "xxxxxxxxxxxxxxxx") > AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY", > "xxxxxxxxxxxxxxxxxxxxxxxxxxxx") > AWS_S3_ENDPOINT = os.environ.get("AWS_S3_ENDPOINT", " > http://minioserver:9000/") > > print(AWS_S3_ENDPOINT) > print(HIVE_URI) > print(WAREHOUSE) > conf = ( > pyspark.SparkConf() > .setAppName('app_name') > #packages > .set('spark.jars.packages', > 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178') > #SQL Extensions > .set('spark.sql.extensions', > 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') > #Configuring Catalog > .set('spark.sql.catalog.catalog_hive', > 'org.apache.iceberg.spark.SparkCatalog') > .set('spark.sql.catalog.catalog_hive.type', 'hive') > .set('spark.sql.catalog.catalog_hive.uri', HIVE_URI) > .set('spark.sql.catalog.catalog_hive.warehouse.dir', WAREHOUSE) > .set('spark.sql.catalog.catalog_hive.endpoint', AWS_S3_ENDPOINT) > .set('spark.sql.catalog.catalog_hive.io-impl', > 'org.apache.iceberg.aws.s3.S3FileIO') > .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY) > .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY) > .set("spark.hadoop.fs.s3a.path.style.access", "true") > ) > > #Start Spark Session > spark = SparkSession.builder.config(conf=conf).getOrCreate() > print("Spark Running") > > #Create a Table > spark.sql("CREATE TABLE catalog_hive.default.tmy_table (name STRING) USING > iceberg;").show() > > #Insert Some Data > spark.sql("INSERT INTO catalog_hive.default.my_table VALUES ('ns'), > ('nd'), ('Ja')").show() > > #Query the Data > spark.sql("SELECT * FROM catalog_hive.default.my_table;").show() > > When I try to run createTable command it gives me an exception: > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > > Py4JJavaError Traceback (most recent call last) > Cell In[4], line 38 > 35 print("Spark Running") > 37 #Create a Table > ---> 38 spark.sql("CREATE TABLE catalog_hive.default.tmy_table (name > STRING) USING iceberg;").show() > 40 #Insert Some Data > 41 spark.sql("INSERT INTO catalog_hive.default.my_table VALUES ('Alex > Merced'), ('Dipankar Mazumdar'), ('Jason Hughes')").show() > > File ~/.local/lib/python3.10/site-packages/pyspark/sql/session.py:1034, in > SparkSession.sql(self, sqlQuery, **kwargs) > 1032 sqlQuery = formatter.format(sqlQuery, **kwargs) > 1033 try: > -> 1034 return DataFrame(self._jsparkSession.sql(sqlQuery), self) > 1035 finally: > 1036 if len(kwargs) > 0: > > File ~/.local/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in > JavaMember.call(self, *args) > 1315 command = proto.CALL_COMMAND_NAME + > 1316 self.command_header + > 1317 args_command + > 1318 proto.END_COMMAND_PART > 1320 answer = self.gateway_client.send_command(command) > -> 1321 return_value = get_return_value( > 1322 answer, self.gateway_client, self.target_id, self.name) > 1324 for temp_arg in temp_args: > 1325 temp_arg._detach() > > File ~/.local/lib/python3.10/site-packages/pyspark/sql/utils.py:190, in > capture_sql_exception..deco(*a, **kw) > 188 def deco(*a: Any, **kw: Any) -> Any: > 189 try: > --> 190 return f(*a, **kw) > 191 except Py4JJavaError as e: > 192 converted = convert_exception(e.java_exception) > > File ~/.local/lib/python3.10/site-packages/py4j/protocol.py:326, in > get_return_value(answer, gateway_client, target_id, name) > 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 325 if answer[1] == REFERENCE_TYPE: > --> 326 raise Py4JJavaError( > 327 "An error occurred while calling {0}{1}{2}.\n". > 328 format(target_id, ".", name), value) > 329 else: > 330 raise Py4JError( > 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". > 332 format(target_id, ".", name, value)) > > Py4JJavaError: An error occurred while calling o49.sql. > : software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, > Status Code: 400, Request ID: 2MBCRA6QRAF6SMBQ, Extended Request ID: > s41ibIYx6fFDoMXiRK+8TRNkUT/GsiwqEzR5X2Drq9cY213HQkX19/PxSacQwo+SPX8eAqTNy7k=) > at > software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156) > at > software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108) > at > software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85) > at > software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43) > at > software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:95) > at > software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:245) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30) > at > software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) > at > software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) > at > software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31) > at > software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) > at > software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193) > at > software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) > at > software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:167) > at > software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) > at > software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175) > at > software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) > at > software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) > at > software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) > at > software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:9325) > at > org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:422) > at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:267) > at java.base/sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:341) > at java.base/sun.nio.cs.StreamEncoder.close(StreamEncoder.java:161) > at java.base/java.io.OutputStreamWriter.close(OutputStreamWriter.java:255) > at > org.apache.iceberg.TableMetadataParser.internalWrite(TableMetadataParser.java:127) > at > org.apache.iceberg.TableMetadataParser.overwrite(TableMetadataParser.java:110) > at > org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadata(BaseMetastoreTableOperations.java:162) > at > org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:234) > at > org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:133) > at > org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:174) > at > org.apache.iceberg.CachingCatalog$CachingTableBuilder.lambda$create$0(CachingCatalog.java:261) > at > org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406) > at > java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908) > at > org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) > at > org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) > at > org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) > at > org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62) > at > org.apache.iceberg.CachingCatalog$CachingTableBuilder.create(CachingCatalog.java:257) > at org.apache.iceberg.spark.SparkCatalog.createTable(SparkCatalog.java:192) > at org.apache.iceberg.spark.SparkCatalog.createTable(SparkCatalog.java:99) > at > org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:45) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) > at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org > $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) > at > org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) > at > org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) > at > org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) > at org.apache.spark.sql.Dataset.(Dataset.scala:220) > at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) > at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at > py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) > at py4j.ClientServerConnection.run(ClientServerConnection.java:106) > at java.base/java.lang.Thread.run(Thread.java:829) > > Even if I use a hadoop type iceberg catalog, I always get the same error: " > Py4JJavaError: An error occurred while calling o49.sql. > : software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, > Status Code: 400, Request ID: 5KRB8NP7R6TJ2JC4, Extended Request ID: > aHgN8vSpD8xf5/Mu9u34rRJF7fcKWlupodDS67WAuQJ5+pTiyWqltK51IJADZKYXEcTSqGbgl2Y=)" > > For this script: > import os > import pyspark > from pyspark.sql import SparkSession > > WAREHOUSE = os.environ.get("WAREHOUSE", "s3a://warehouse/") > AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID", > "xxxxxxxxxxxxxxxxxxxxxxx") > AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", > "xxxxxxxxxxxxxxxxxxxxx") > AWS_S3_ENDPOINT= os.environ.get("AWS_S3_ENDPOINT"," > http://minioserver:9000/") > > conf = ( > pyspark.SparkConf() > .setAppName("app_name") > .set("spark.jars.packages", > "org.apache.hadoop:hadoop-aws:3.3.1,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.3.1,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178") > .set("spark.sql.extensions", > "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") > .set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") > .set("spark.sql.catalog.iceberg.type", "hadoop") > .set("spark.sql.catalog.iceberg.io-impl", > "org.apache.iceberg.aws.s3.S3FileIO") > .set("spark.sql.catalog.iceberg.warehouse", WAREHOUSE) > .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") > .set("spark.hadoop.fs.s3a.endpoint", AWS_S3_ENDPOINT) > .set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) > .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) > .set("spark.hadoop.fs.s3a.path.style.access", "true") > ##.set("spark.hadoop.fs.s3a.requester.pays.enabled", "true") > ) > > spark = SparkSession.builder.config(conf=conf).getOrCreate() > print("Spark Running") > > spark.sql("CREATE TABLE iceberg.tab (name string) USING iceberg;") > > NB: (AWS_ACCESS_KEY and AWS_SECRET_KEy, AWS _S3_ENDPOINT and > S3A://warehouse / are correct and I have already tested them for read data > from minio. > > Any thoughts on what I might be missing. Thank you! > > — > Reply to this email directly, view it on GitHub > <https://github.com/apache/iceberg/issues/2685#issuecomment-1847860494>, > or unsubscribe > <https://github.com/notifications/unsubscribe-auth/AUICDMNA6IYJM6VIAWRNIVDYIOASNAVCNFSM46KJS63KU5DIOJSWCZC7NNSXTN2JONZXKZKDN5WW2ZLOOQ5TCOBUG44DMMBUHE2A> > . > You are receiving this because you were mentioned.Message ID: > ***@***.***> > -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org