带有干净api的大客户机包装器
biggerquer的Python项目详细描述
biggerquery-bigquery的python库
biggerquery是一个python库,它简化了bigquery数据集的使用。它包装了bigquery客户端,提供了优雅的 最常见用例的api。它还提供了简化数据流管道创建的api。
安装
pip install biggerquery
兼容性
biggerquery与python 2.7兼容。
教程
任务定义
为了指导您了解biggerquery提供的所有功能,我们准备了一个简单的任务。有一个表事务,如下所示:
<表><广告>该表包含用户在特定日期进行的所有事务。您的任务是为每个用户计算两个度量: 每日用户交易值和每日用户交易计数。
最终结果应为表用户交易度量:
<表><广告>设置测试环境
在开始使用biggerquery之前,需要安装google cloud sdk
安装gcloud后,设置默认gcloud凭据:
gcloud auth应用程序默认登录名
接下来,使用biggerquery设置virtualenv:
mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
然后,准备数据集。首先创建一个新的python模块:
touch user_transaction_metrics.py
编辑使用您喜爱的编辑器创建模块,并添加以下行:
frombiggerqueryimportcreate_dataset_managerPROJECT_ID='your-project-id'USER_TRANSACTION_METRICS_DATASET_NAME='user_transaction_metrics'TRANSACTION_DATASET_NAME='transactions'defsetup_test_transactions_table(project_id,dataset_name):dataset_id,dataset_manager=create_dataset_manager(project_id,'2019-01-01',dataset_name,internal_tables=['transactions'])dataset_manager.create_table(""" CREATE TABLE IF NOT EXISTS transactions ( user_id STRING, transaction_value FLOAT64, partition_timestamp TIMESTAMP) PARTITION BY DATE(partition_timestamp)""")dataset_manager.write_truncate('transactions',""" SELECT 'john123' as user_id, 800.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp """)dataset_manager.write_append('transactions',""" SELECT 'smith99' as user_id, 10000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp """)dataset_manager.write_append('transactions',""" SELECT 'smith99' as user_id, 30000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp """)return'{dataset_id}.transactions'.format(dataset_id=dataset_id)TRANSACTIONS_TABLE_ID=setup_test_transactions_table(PROJECT_ID,TRANSACTION_DATASET_NAME)user_transaction_dataset_id,user_transaction_metrics_dataset_manager=create_dataset_manager(project_id=PROJECT_ID,runtime='2019-01-01',dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,internal_tables=['user_transaction_metrics'],external_tables={'transactions':TRANSACTIONS_TABLE_ID})
此代码创建两个数据集:
- 事务包含要处理的源数据表的数据集,
- 用户交易指标包含我们处理结果表的数据集。
创建数据集管理器
数据集管理器是一个对象,它允许您使用以下基本操作操作给定数据集中的表:write\u truncate
,
写入附加
,创建表
,收集
,写入tmp
。让我们通过几个例子来说明每个操作。
从创建数据集管理器对象开始。参数project_id
和dataset_name
定义要使用的数据集。
参数internal_tables
指定由project_id
和dataset_name
指定的数据集中的表。
参数external_tables
指定由project_id
和dataset_name
指定的外部数据集的表。
外部表必须用完整的表id来描述,例如:
external_tables={'transactions':'dataset.id.transactions','some_external_table':'dataset.id2.external_table'}
参数runtime
用于确定正在处理的分区。
user_transaction_dataset_id,user_transaction_metrics_dataset_manager=create_dataset_manager(project_id=PROJECT_ID,runtime='2019-01-01',dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,internal_tables=['user_transaction_metrics'],external_tables={'transactions':TRANSACTIONS_TABLE_ID})
创建表格
现在,创建一个可以用来存储度量的表。您可以使用普通SQL创建此表。将以下行添加到user_transaction_metrics.py
:
user_transaction_metrics_dataset_manager.create_table("""CREATE TABLE IF NOT EXISTS user_transaction_metrics ( user_id STRING, metric_value FLOAT64, metric_type STRING, partition_timestamp TIMESTAMP)PARTITION BY DATE(partition_timestamp)""")
写入截断
接下来,计算第一个度量-用户事务值
。添加E以下行:
user_transaction_metrics_dataset_manager.write_truncate('user_transaction_metrics',"""SELECT user_id, sum(transaction_value) as metric_value, 'USER_TRANSACTION_VALUE' as metric_type, TIMESTAMP('{dt}') as partition_timestampFROM `{transactions}`WHERE DATE(partition_timestamp) = '{dt}'GROUP BY user_id""")
结果:
<表><广告>write_truncate
函数将所提供查询的结果写入指定的表,在本例中为user_transaction_metrics
。
此函数用于在写入新数据之前删除给定表中的所有数据。
在查询中,不必编写完整的表id。您可以使用参数内部_表
和外部_表
中提供的名称。
参数runtime在查询中也可用作{dt}
写入附加
那么将数据添加到表中呢?计算另一个度量-用户交易计数
。添加以下行:
user_transaction_metrics_dataset_manager.write_append('user_transaction_metrics',"""SELECT user_id, count(transaction_value) * 1.0 as metric_value, 'USER_TRANSACTION_COUNT' as metric_type, TIMESTAMP('{dt}') as partition_timestampFROM `{transactions}`WHERE DATE(partition_timestamp) = '{dt}'GROUP BY user_id""")
结果:
<表><广告>write-append与write-append的区别在于write-append不会在写入新数据之前从给定的表中删除数据。
临时写入
有时创建一个额外的表来存储一些中间结果是很有用的。
write-tmp
函数允许从查询结果创建表(write-truncate
和write-append
只能写入已经存在的表)。
您可以使用write\u tmp
函数重构现有代码:
frombiggerqueryimportcreate_dataset_managerPROJECT_ID='your-project-id'USER_TRANSACTION_METRICS_DATASET_NAME='user_transaction_metrics'TRANSACTION_DATASET_NAME='transactions'defsetup_test_transactions_table(project_id,dataset_name):dataset_id,dataset_manager=create_dataset_manager(project_id,'2019-01-01',dataset_name,internal_tables=['transactions'])dataset_manager.create_table(""" CREATE TABLE IF NOT EXISTS transactions ( user_id STRING, transaction_value FLOAT64, partition_timestamp TIMESTAMP) PARTITION BY DATE(partition_timestamp)""")dataset_manager.write_truncate('transactions',""" SELECT 'john123' as user_id, 800.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp """)dataset_manager.write_append('transactions',""" SELECT 'smith99' as user_id, 10000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp """)dataset_manager.write_append('transactions',""" SELECT 'smith99' as user_id, 30000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp """)return'{dataset_id}.transactions'.format(dataset_id=dataset_id)# creating source dataset and table- transactionsTRANSACTIONS_TABLE_ID=setup_test_transactions_table(PROJECT_ID,TRANSACTION_DATASET_NAME)# creating processing dataset- user_transaction_metricsuser_transaction_dataset_id,user_transaction_metrics_dataset_manager=create_dataset_manager(project_id=PROJECT_ID,runtime='2019-01-01',dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,internal_tables=['user_transaction_metrics'],external_tables={'transactions':TRANSACTIONS_TABLE_ID})defcalculate_user_transaction_metrics(dataset_manager):dataset_manager.create_table(""" CREATE TABLE IF NOT EXISTS user_transaction_metrics ( user_id STRING, metric_value FLOAT64, metric_type STRING, partition_timestamp TIMESTAMP) PARTITION BY DATE(partition_timestamp) """)dataset_manager.write_tmp('daily_user_transaction_value',""" SELECT user_id, sum(transaction_value) as metric_value, 'USER_TRANSACTION_VALUE' as metric_type, TIMESTAMP('{dt}') as partition_timestamp FROM `{transactions}` WHERE DATE(partition_timestamp) = '{dt}' GROUP BY user_id """)dataset_manager.write_tmp('daily_user_transaction_count',""" SELECT user_id, count(transaction_value) as metric_value, 'USER_TRANSACTION_COUNT' as metric_type, TIMESTAMP('{dt}') as partition_timestamp FROM `{transactions}` WHERE DATE(partition_timestamp) = '{dt}' GROUP BY user_id """)dataset_manager.write_truncate('user_transaction_metrics',""" SELECT * FROM `{daily_user_transaction_value}` UNION ALL SELECT * FROM `{daily_user_transaction_count}` """)calculate_user_transaction_metrics(user_transaction_metrics_dataset_manager)
将一系列相关查询放入一个函数中是一个好的实践,您可以使用指定的数据集管理器对其进行调度、测试或运行。
在本例中,它是用户事务度量
函数。通过检查结果步骤,临时表对于调试代码非常有用
循序渐进。将一个大查询分成更小的块也更容易阅读。
收集
您可以使用collect
从bigquery中将数据提取到内存中。例如,通过http:
calculate_user_transaction_metrics(user_transaction_metrics_dataset_manager)rows=user_transaction_metrics_dataset_manager.collect("""SELECT * FROM `{user_transaction_metrics}`WHERE DATE(partition_timestamp) = '{dt}'""")importrequestsforrowinrows:requests.post('http://example.com/user-metrics',json={'userMetric':row})
凭证
如果要指定对数据集进行操作的凭据,可以在创建数据集管理器时执行此操作,例如:
mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
0
测试
不幸的是,无法在本地运行bigquery进行测试。但是你仍然可以为你的 查询如下所示。运行测试之前,请记住设置测试项目ID。
mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
1
创建梁管理器
beam manager是一个允许您创建数据流管道的对象。create_dataflow_pipeline
方法允许您创建
梁管理器对象。beam管理器提供实用方法,包装原始beam api:write_truncate_to_big_query
,
写入avro
,
read_from_big_查询
read_from_avro
。
让我们通过几个例子来说明每个操作。
从创建梁管理器对象开始。参数project_id
和dataset_name
定义要使用的数据集。
参数internal_tables
指定由project_id
和dataset_name
指定的数据集中的表。
参数外部_表
指定外部由项目id
和数据集名称指定的数据集
。
外部表必须用完整的表id来描述,例如:
external_tables={'transactions':'dataset.id.transactions','some_external_table':'dataset.id2.external_table'}
参数runtime
用于确定正在处理的分区。
参数dataflow_bucket
是用于临时和临时位置的gcs bucket。
参数requirements_file_path
提供有关数据流依赖项的信息。
parameterregion
是用于处理管道的数据中心的位置。默认设置为Europe-West1。
parametermachine_type
是一种使用过的机器。默认情况下为n1-standard-1。有关GCP中的机器类型的详细信息:
https://cloud.google.com/compute/docs/machine-types
mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
3
创建管道
对于本例,您必须执行以下步骤:设置测试环境:https://github.com/allegro/biggerquery;rel="nofollow">https://github.com/allegro/biggerquery;设置测试环境 https://github.com/allegro/biggerquery创建表 现在,在我们创建数据流管理器的同一个文件中,我们需要创建一些代码来创建作为模块的管道。
mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
4
创建数据流管理器后,我们可以创建管道。为此,我们需要创建一个新文件 管道.py.在这个文件中,我们需要将代码放在下面。
mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
5
此代码将使用write-truncate-to-big-u查询方法将行放入transactions表中。现在运行数据流管理器。在我们的交易中几分钟后
表应可见三条记录。现在更新您的pipeline.py
文件如下:
mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
6
几分钟后,在user_transaction_metrics表中可以看到执行查询的结果。
示例代码beam\u manager
您可以在/examples/\u example\u beam\u manager.py中找到