kcullimore opened a new issue, #44683:
URL: https://github.com/apache/arrow/issues/44683
### Describe the enhancement requested
When uploading a Parquet file created with PyArrow to Google BigQuery,
columns containing lists (e.g., List[str], List[int], List[float]) are
interpreted by BigQuery as RECORD types with REPEATED mode instead of the
expected primitive types (STRING, INTEGER, FLOAT) with REPEATED mode.
I was expecting BigQuery to recognize the `int_column`, `str_column`, and
`float_column` as arrays of integers, strings, and floats respectively (with
REPEATED mode). BigQuery seems to interpret these columns as RECORD types with
REPEATED mode instead of being treated as simple repeated fields which further
complicates the data handling.
I’ve tried explicitly defining the schema in BigQuery and ensuring that the
Parquet file’s schema matches what I expect but the behavior persists. I’m not
sure if this is expected or if there’s something I’m missing?
I have an alternative workaround in mind (via JSON) but would prefer to
continue using PyArrow and parquet.
## Example
To reproduce create a Parquet file using PyArrow that includes some columns
with lists of integers, strings, and floats. Upload this Parquet file to
BigQuery via a bucket and inspect the table schema and field values.
```python
import os
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery, storage
# file
sample_filename = 'sample_data.parquet'
sample_filepath = f'{sample_filename}'
# Create mock data
data = {
'id': [0, 1, 2, 3],
'int_column': [[1], [2, 2], [], [999]],
'str_column': [['a', 'aa'], ['b'], [], ['alpha']],
'float_column': [[1.1], [2.2, 3.30], [], [9.029]]
}
schema = pa.schema([
pa.field('id', pa.int64()),
pa.field('int_column', pa.list_(pa.int64())),
pa.field('str_column', pa.list_(pa.string())),
pa.field('float_column', pa.list_(pa.float64())),
])
table = pa.Table.from_pydict(data, schema=schema)
print(table.schema)
"""
id: int64
int_column: list<item: int64>
child 0, item: int64
str_column: list<item: string>
child 0, item: string
float_column: list<item: double>
child 0, item: double
"""
# Write and read from parquet file
pq.write_table(table, sample_filepath)
imported_table = pq.read_table(sample_filepath)
print(imported_table.schema)
# Upload to bucket
bucket_name = 'bucket_name'
blob_uri = f'gs://{bucket_name}/{sample_filename}'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(sample_filename)
blob.upload_from_filename(sample_filepath)
# Upload to BigQuery table
dataset_id = 'dataset_id'
table_id = 'table_id'
bq_client = bigquery.Client()
table_ref = bq_client.dataset(dataset_id).table(table_id)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
load_job = bq_client.load_table_from_uri(
blob_uri,
table_ref,
job_config=job_config,
)
load_job.result()
# Review BQ table schema
loaded_table = bq_client.get_table(table_ref)
print(loaded_table.schema)
"""
[SchemaField('id', 'INTEGER', 'NULLABLE', None, None, (), None),
SchemaField('int_column', 'RECORD', 'NULLABLE', None, None,
(SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element',
'INTEGER', 'NULLABLE', None, None, (), None),), None),), None),
SchemaField('str_column', 'RECORD', 'NULLABLE', None, None,
(SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element',
'STRING', 'NULLABLE', None, None, (), None),), None),), None),
SchemaField('float_column', 'RECORD', 'NULLABLE', None, None,
(SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element',
'FLOAT', 'NULLABLE', None, None, (), None),), None),), None)]
"""
# Review BQ table data
query = f'SELECT * FROM `{dataset_id}.{table_id}`'
query_job = bq_client.query(query)
bq_table = query_job.result().to_arrow()
print(bq_table.shema)
"""
id: int64
int_column: struct<list: list<item: struct<element: int64>> not null>
child 0, list: list<item: struct<element: int64>> not null
child 0, item: struct<element: int64>
child 0, element: int64
str_column: struct<list: list<item: struct<element: string>> not null>
child 0, list: list<item: struct<element: string>> not null
child 0, item: struct<element: string>
child 0, element: string
float_column: struct<list: list<item: struct<element: double>> not null>
child 0, list: list<item: struct<element: double>> not null
child 0, item: struct<element: double>
child 0, element: double
"""
# Optional job_config to verify enforcing schema does not help
bq_schema = [
bigquery.SchemaField('id', 'INTEGER', mode='NULLABLE'),
bigquery.SchemaField('int_column', 'STRING', mode='REPEATED'),
bigquery.SchemaField('str_column', 'STRING', mode='REPEATED'),
bigquery.SchemaField('float_column', 'FLOAT', mode='REPEATED'),
]
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema=bq_schema,
autodetect=False,
)
load_job = bq_client.load_table_from_uri(
blob_uri,
table_ref,
job_config=job_config,
)
load_job.result()
loaded_table = bq_client.get_table(table_ref)
print(loaded_table.schema)
# Clear data
os.remove(sample_filepath)
blob.delete()
bq_client.delete_table(table_ref)
```
Environment:
• Python 3.11.10
• Ubuntu 22.04.5
• pyarrow==18.0.0
• google-cloud-bigquery==3.26.0
• google-cloud-storage==2.18.2
### Component(s)
Python
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]