JSON表架构到bigquery.TableSchema对于BigQuerySin

2024-05-23 23:16:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个非常重要的表模式(包括嵌套和重复的字段),以JSON格式定义(带有name、type、mode属性)并存储在一个文件中。它已成功地用于用bqload命令填充bigquery表。在

但是当我尝试使用Dataflow Python SDK和BigQuerySink执行相同的操作时,schema参数需要是一个以逗号分隔的'name':'type'元素列表,或者是一个bigquery.TableSchema对象。在

有没有什么方便的方法可以将我的JSON模式转换为bigquery.TableSchema,还是必须将其转换为name:value列表?在


Tags: 文件name命令json列表属性定义mode
3条回答

我也有同样的问题。在我的例子中,我已经在bigquery中加载了一些json,并自动生成了一个模式。在

因此,我可以使用以下命令获得自动生成的模式:

bq show --format prettyjson my-gcp-project:my-bq-table |jq .schema > my-bq-table.json

然后可以使用这个片段将模式转换为bigquery.TableSchema

^{pr2}$

它将与bigqueryjson模式规范一起工作,如果您像我一样懒惰,您可以避免指定type和{},前提是默认情况下可以为null的字符串。在

目前,您不能直接指定JSON模式。必须将模式指定为包含逗号分隔的字段列表的字符串或bigquery.TableSchema对象。在

如果架构很复杂并且包含嵌套和/或重复的字段,我们建议构建一个bigquery.TableSchema对象。在

下面是一个带有嵌套和重复字段的bigquery.TableSchema对象的示例。在

from apitools.clients import bigquery

table_schema = bigquery.TableSchema()

# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)

# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)

# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'

area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)

number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)

# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)

andreapierleoni发布的上述代码片段适用于google-cloud-bigquerypython客户机的旧版本,例如,google-cloud-bigquery0.25.0版本,它碰巧通过pip install apache-beam[gcp]安装。在

但是,bigquerypython客户机API在google-cloud-bigquery的较新版本中发生了巨大的变化,例如在我当前使用的1.8.0版本中,bigquery.TableFieldSchema()和{}都不起作用。在

如果您使用的是google-cloud-bigquery包的最新版本,下面介绍如何从JSON文件获取所需的SchemaField列表(例如,创建表所必需的)。这是AndreaPierleoni发布的代码的改编版(谢谢!)在

def _get_field_schema(field):
    name = field['name']
    field_type = field.get('type', 'STRING')
    mode = field.get('mode', 'NULLABLE')
    fields = field.get('fields', [])

    if fields:
        subschema = []
        for f in fields:
            fields_res = _get_field_schema(f)
            subschema.append(fields_res)
    else:
        subschema = []

    field_schema = bigquery.SchemaField(name=name, 
        field_type=field_type,
        mode=mode,
        fields=subschema
    )
    return field_schema


def parse_bq_json_schema(schema_filename):
    schema = []
    with open(schema_filename, 'r') as infile:
        jsonschema = json.load(infile)

    for field in jsonschema:
        schema.append(_get_field_schema(field))

    return schema

现在,假设您有一个表的schema already defined in JSON。假设您有this particular "schema.json" file,那么使用上面的helper方法,您可以获得Python客户机所需的SchemaField表示,如下所示:

^{pr2}$

现在,对于create a table having the above schema using the Python SDK,您将执行以下操作:

dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)

您可以选择按如下方式设置基于时间的分区(如果需要):

table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field='timestamp'  # name of column to use for partitioning
)

最后创建了表格:

table = bqclient.create_table(table)

print('Created table {}, partitioned on column {}'.format(
    table.table_id, table.time_partitioning.field))

相关问题 更多 >