在Google BigQuery中从查询结果创建表格

19 投票
3 回答
18464 浏览
提问于 2025-04-17 14:18

我们正在通过Python API使用Google BigQuery。我想知道如何从查询结果创建一个表(可以是新表,也可以覆盖旧表)?我查看了查询文档,但觉得没有什么帮助。

我们想要模拟:

“SELEC ... INTO ...” 这个ANSI SQL的语法。

3 个回答

0

在Google BigQuery中从查询结果创建一个表。假设你正在使用Python 3的Jupyter Notebook,下面我会解释以下步骤:

  1. 如何在BigQuery上创建一个新的数据集(用来保存结果)
  2. 如何运行一个查询,并将结果以表格的形式保存在新的数据集中

在BigQuery上创建一个新的数据集:my_dataset

bigquery_client  = bigquery.Client() #Create a BigQuery service object
dataset_id = 'my_dataset' 
dataset_ref = bigquery_client.dataset(dataset_id) # Create a DatasetReference using a chosen dataset ID.
dataset = bigquery.Dataset(dataset_ref)  # Construct a full Dataset object to send to the API.
dataset.location = 'US' # Specify the geographic location where the new dataset will reside. Remember this should be same location as that of source data set from where we are getting data to run a query

# Send the dataset to the API for creation. Raises google.api_core.exceptions.AlreadyExists if the Dataset already exists within the project.
dataset = bigquery_client.create_dataset(dataset)  # API request
print('Dataset {} created.'.format(dataset.dataset_id))

使用Python在BigQuery上运行查询:

这里有两种类型:

  1. 允许大结果
  2. 不提及大结果等的查询

我这里使用的是公共数据集:bigquery-public-data:hacker_news,表格ID是:comments,来运行一个查询。

允许大结果

DestinationTableName='table_id1'  #Enter new table name you want to give
!bq query --allow_large_results --destination_table=project_id:my_dataset.$DestinationTableName 'SELECT * FROM [bigquery-public-data:hacker_news.comments]'

这个查询会在需要时允许大查询结果。

不提及 --allow_large_results:

DestinationTableName='table_id2'  #Enter new table name you want to give
!bq query destination_table=project_id:my_dataset.$DestinationTableName 'SELECT * FROM [bigquery-public-data:hacker_news.comments] LIMIT 100'

这个查询适用于结果不会超过Google BigQuery文档中提到的限制的情况。

在这里输入图片描述

输出结果:

  1. 在BigQuery上创建一个名为my_dataset的新数据集
  2. 查询结果以表格的形式保存在my_dataset中

注意:

  1. 这些查询是可以在终端上运行的命令(开头没有!)。但因为我们使用Python来运行这些命令/查询,所以我们在前面加了!。这样可以让我们在Python程序中使用/运行命令。
  2. 另外,请给这个回答点赞哦 :)。谢谢。
15

这个被接受的回答是对的,但它没有提供用Python来完成这个任务的代码。下面是一个例子,是我刚写的一个小客户类里提取出来的。这个例子没有处理错误,而且里面的查询是写死的,应该根据需要修改成更有趣的内容,而不是简单的 SELECT * ...

import time

from google.cloud import bigquery
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.dataset import Dataset


class Client(object):

    def __init__(self, origin_project, origin_dataset, origin_table,
                 destination_dataset, destination_table):
        """
        A Client that performs a hardcoded SELECT and INSERTS the results in a
        user-specified location.

        All init args are strings. Note that the destination project is the
        default project from your Google Cloud configuration.
        """
        self.project = origin_project
        self.dataset = origin_dataset
        self.table = origin_table
        self.dest_dataset = destination_dataset
        self.dest_table_name = destination_table
        self.client = bigquery.Client()

    def run(self):
        query = ("SELECT * FROM `{project}.{dataset}.{table}`;".format(
            project=self.project, dataset=self.dataset, table=self.table))

        job_config = bigquery.QueryJobConfig()

        # Set configuration.query.destinationTable
        destination_dataset = self.client.dataset(self.dest_dataset)
        destination_table = destination_dataset.table(self.dest_table_name)
        job_config.destination = destination_table

        # Set configuration.query.createDisposition
        job_config.create_disposition = 'CREATE_IF_NEEDED'

        # Set configuration.query.writeDisposition
        job_config.write_disposition = 'WRITE_APPEND'

        # Start the query
        job = self.client.query(query, job_config=job_config)

        # Wait for the query to finish
        job.result()
16

你可以通过在查询中指定一个目标表来实现这个功能。你需要使用 Jobs.insert 这个接口,而不是 Jobs.query。同时,你还要设置 writeDisposition=WRITE_APPEND,并填写目标表的信息。

如果你使用的是原始接口,配置大概是这样的。如果你用的是Python,Python的客户端也会提供访问这些字段的方法:

"configuration": {
  "query": {
    "query": "select count(*) from foo.bar",
    "destinationTable": {
      "projectId": "my_project",
      "datasetId": "my_dataset",
      "tableId": "my_table"
    },
    "createDisposition": "CREATE_IF_NEEDED",
    "writeDisposition": "WRITE_APPEND",
  }
}

撰写回答