带有干净api的大客户机包装器

biggerquer的Python项目详细描述


biggerquery-bigquery的python库

biggerquery是一个python库,它简化了bigquery数据集的使用。它包装了bigquery客户端,提供了优雅的 最常见用例的api。它还提供了简化数据流管道创建的api。

安装

pip install biggerquery

兼容性

biggerquery与python 2.7兼容。

教程

任务定义

为了指导您了解biggerquery提供的所有功能,我们准备了一个简单的任务。有一个表事务,如下所示:

<表><广告>用户ID交易值 分区时间戳 < /广告><正文>约翰1232019-01-01 00:00:00史密斯99100002019-01-01 00:00:00史密斯99300002019-01-01 00:00:00

该表包含用户在特定日期进行的所有事务。您的任务是为每个用户计算两个度量: 每日用户交易值和每日用户交易计数。

最终结果应为表用户交易度量

<表><广告>用户ID公制值 公制类型 分区时间戳 < /广告><正文>约翰123用户交易值2019-01-01 00:00:00史密斯9940000用户交易值2019-01-01 00:00:00约翰123用户交易计数2019-01-01 00:00:00史密斯99用户交易计数2019-01-01 00:00:00

设置测试环境

在开始使用biggerquery之前,需要安装google cloud sdk

安装gcloud后,设置默认gcloud凭据:

gcloud auth应用程序默认登录名

接下来,使用biggerquery设置virtualenv:

mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery

然后,准备数据集。首先创建一个新的python模块:

touch user_transaction_metrics.py

编辑使用您喜爱的编辑器创建模块,并添加以下行:

frombiggerqueryimportcreate_dataset_managerPROJECT_ID='your-project-id'USER_TRANSACTION_METRICS_DATASET_NAME='user_transaction_metrics'TRANSACTION_DATASET_NAME='transactions'defsetup_test_transactions_table(project_id,dataset_name):dataset_id,dataset_manager=create_dataset_manager(project_id,'2019-01-01',dataset_name,internal_tables=['transactions'])dataset_manager.create_table("""        CREATE TABLE IF NOT EXISTS transactions (            user_id STRING,            transaction_value FLOAT64,            partition_timestamp TIMESTAMP)        PARTITION BY DATE(partition_timestamp)""")dataset_manager.write_truncate('transactions',"""        SELECT 'john123' as user_id, 800.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp        """)dataset_manager.write_append('transactions',"""        SELECT 'smith99' as user_id, 10000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp        """)dataset_manager.write_append('transactions',"""        SELECT 'smith99' as user_id, 30000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp        """)return'{dataset_id}.transactions'.format(dataset_id=dataset_id)TRANSACTIONS_TABLE_ID=setup_test_transactions_table(PROJECT_ID,TRANSACTION_DATASET_NAME)user_transaction_dataset_id,user_transaction_metrics_dataset_manager=create_dataset_manager(project_id=PROJECT_ID,runtime='2019-01-01',dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,internal_tables=['user_transaction_metrics'],external_tables={'transactions':TRANSACTIONS_TABLE_ID})

此代码创建两个数据集:

  • 事务包含要处理的源数据表的数据集,
  • 用户交易指标包含我们处理结果表的数据集。

创建数据集管理器

数据集管理器是一个对象,它允许您使用以下基本操作操作给定数据集中的表:write\u truncate写入附加创建表收集写入tmp。让我们通过几个例子来说明每个操作。

从创建数据集管理器对象开始。参数project_iddataset_name定义要使用的数据集。 参数internal_tables指定由project_iddataset_name指定的数据集中的表。 参数external_tables指定由project_iddataset_name指定的外部数据集的表。 外部表必须用完整的表id来描述,例如:

external_tables={'transactions':'dataset.id.transactions','some_external_table':'dataset.id2.external_table'}

参数runtime用于确定正在处理的分区。

user_transaction_dataset_id,user_transaction_metrics_dataset_manager=create_dataset_manager(project_id=PROJECT_ID,runtime='2019-01-01',dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,internal_tables=['user_transaction_metrics'],external_tables={'transactions':TRANSACTIONS_TABLE_ID})

创建表格

现在,创建一个可以用来存储度量的表。您可以使用普通SQL创建此表。将以下行添加到user_transaction_metrics.py

user_transaction_metrics_dataset_manager.create_table("""CREATE TABLE IF NOT EXISTS user_transaction_metrics (    user_id STRING,    metric_value FLOAT64,    metric_type STRING,    partition_timestamp TIMESTAMP)PARTITION BY DATE(partition_timestamp)""")

写入截断

接下来,计算第一个度量-用户事务值。添加E以下行:

user_transaction_metrics_dataset_manager.write_truncate('user_transaction_metrics',"""SELECT user_id,    sum(transaction_value) as metric_value,    'USER_TRANSACTION_VALUE' as metric_type,    TIMESTAMP('{dt}') as partition_timestampFROM `{transactions}`WHERE DATE(partition_timestamp) = '{dt}'GROUP BY user_id""")

结果:

<表><广告>用户ID公制值 公制类型 分区时间戳 < /广告><正文>约翰123用户交易值2019-01-01 00:00:00史密斯9940000用户交易值2019-01-01 00:00:00

write_truncate函数将所提供查询的结果写入指定的表,在本例中为user_transaction_metrics。 此函数用于在写入新数据之前删除给定表中的所有数据。

在查询中,不必编写完整的表id。您可以使用参数内部_表外部_表中提供的名称。 参数runtime在查询中也可用作{dt}

写入附加

那么将数据添加到表中呢?计算另一个度量-用户交易计数。添加以下行:

user_transaction_metrics_dataset_manager.write_append('user_transaction_metrics',"""SELECT user_id,   count(transaction_value) * 1.0 as metric_value,   'USER_TRANSACTION_COUNT' as metric_type,   TIMESTAMP('{dt}') as partition_timestampFROM `{transactions}`WHERE DATE(partition_timestamp) = '{dt}'GROUP BY user_id""")

结果:

<表><广告>用户ID公制值 公制类型 分区时间戳 < /广告><正文>约翰123用户交易值2019-01-01 00:00:00史密斯9940000用户交易值2019-01-01 00:00:00约翰123用户交易计数2019-01-01 00:00:00史密斯99用户交易计数2019-01-01 00:00:00

write-append与write-append的区别在于write-append不会在写入新数据之前从给定的表中删除数据。

临时写入

有时创建一个额外的表来存储一些中间结果是很有用的。 write-tmp函数允许从查询结果创建表(write-truncatewrite-append只能写入已经存在的表)。

您可以使用write\u tmp函数重构现有代码:

frombiggerqueryimportcreate_dataset_managerPROJECT_ID='your-project-id'USER_TRANSACTION_METRICS_DATASET_NAME='user_transaction_metrics'TRANSACTION_DATASET_NAME='transactions'defsetup_test_transactions_table(project_id,dataset_name):dataset_id,dataset_manager=create_dataset_manager(project_id,'2019-01-01',dataset_name,internal_tables=['transactions'])dataset_manager.create_table("""       CREATE TABLE IF NOT EXISTS transactions (           user_id STRING,           transaction_value FLOAT64,           partition_timestamp TIMESTAMP)       PARTITION BY DATE(partition_timestamp)""")dataset_manager.write_truncate('transactions',"""       SELECT 'john123' as user_id, 800.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp       """)dataset_manager.write_append('transactions',"""       SELECT 'smith99' as user_id, 10000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp       """)dataset_manager.write_append('transactions',"""       SELECT 'smith99' as user_id, 30000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp       """)return'{dataset_id}.transactions'.format(dataset_id=dataset_id)# creating source dataset and table- transactionsTRANSACTIONS_TABLE_ID=setup_test_transactions_table(PROJECT_ID,TRANSACTION_DATASET_NAME)# creating processing dataset- user_transaction_metricsuser_transaction_dataset_id,user_transaction_metrics_dataset_manager=create_dataset_manager(project_id=PROJECT_ID,runtime='2019-01-01',dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,internal_tables=['user_transaction_metrics'],external_tables={'transactions':TRANSACTIONS_TABLE_ID})defcalculate_user_transaction_metrics(dataset_manager):dataset_manager.create_table("""   CREATE TABLE IF NOT EXISTS user_transaction_metrics (       user_id STRING,       metric_value FLOAT64,       metric_type STRING,       partition_timestamp TIMESTAMP)   PARTITION BY DATE(partition_timestamp)   """)dataset_manager.write_tmp('daily_user_transaction_value',"""   SELECT user_id,       sum(transaction_value) as metric_value,       'USER_TRANSACTION_VALUE' as metric_type,       TIMESTAMP('{dt}') as partition_timestamp   FROM `{transactions}`   WHERE DATE(partition_timestamp) = '{dt}'   GROUP BY user_id   """)dataset_manager.write_tmp('daily_user_transaction_count',"""   SELECT user_id,       count(transaction_value) as metric_value,       'USER_TRANSACTION_COUNT' as metric_type,       TIMESTAMP('{dt}') as partition_timestamp   FROM `{transactions}`   WHERE DATE(partition_timestamp) = '{dt}'   GROUP BY user_id   """)dataset_manager.write_truncate('user_transaction_metrics',"""   SELECT * FROM `{daily_user_transaction_value}`   UNION ALL   SELECT * FROM `{daily_user_transaction_count}`   """)calculate_user_transaction_metrics(user_transaction_metrics_dataset_manager)

将一系列相关查询放入一个函数中是一个好的实践,您可以使用指定的数据集管理器对其进行调度、测试或运行。 在本例中,它是用户事务度量函数。通过检查结果步骤,临时表对于调试代码非常有用 循序渐进。将一个大查询分成更小的块也更容易阅读。

收集

您可以使用collect从bigquery中将数据提取到内存中。例如,通过http:

calculate_user_transaction_metrics(user_transaction_metrics_dataset_manager)rows=user_transaction_metrics_dataset_manager.collect("""SELECT * FROM `{user_transaction_metrics}`WHERE DATE(partition_timestamp) = '{dt}'""")importrequestsforrowinrows:requests.post('http://example.com/user-metrics',json={'userMetric':row})

凭证

如果要指定对数据集进行操作的凭据,可以在创建数据集管理器时执行此操作,例如:

mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
0

测试

不幸的是,无法在本地运行bigquery进行测试。但是你仍然可以为你的 查询如下所示。运行测试之前,请记住设置测试项目ID。

mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
1

创建梁管理器

beam manager是一个允许您创建数据流管道的对象。create_dataflow_pipeline方法允许您创建 梁管理器对象。beam管理器提供实用方法,包装原始beam api:write_truncate_to_big_query写入avroread_from_big_查询read_from_avro。 让我们通过几个例子来说明每个操作。

从创建梁管理器对象开始。参数project_iddataset_name定义要使用的数据集。 参数internal_tables指定由project_iddataset_name指定的数据集中的表。 参数外部_表指定外部项目id数据集名称指定的数据集。 外部表必须用完整的表id来描述,例如:

external_tables={'transactions':'dataset.id.transactions','some_external_table':'dataset.id2.external_table'}

参数runtime用于确定正在处理的分区。 参数dataflow_bucket是用于临时和临时位置的gcs bucket。 参数requirements_file_path提供有关数据流依赖项的信息。 parameterregion是用于处理管道的数据中心的位置。默认设置为Europe-West1。 parametermachine_type是一种使用过的机器。默认情况下为n1-standard-1。有关GCP中的机器类型的详细信息: https://cloud.google.com/compute/docs/machine-types

mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
3

创建管道

对于本例,您必须执行以下步骤:设置测试环境:https://github.com/allegro/biggerquery;rel="nofollow">https://github.com/allegro/biggerquery;设置测试环境 https://github.com/allegro/biggerquery创建表 现在,在我们创建数据流管理器的同一个文件中,我们需要创建一些代码来创建作为模块的管道。

mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
4

创建数据流管理器后,我们可以创建管道。为此,我们需要创建一个新文件 管道.py.在这个文件中,我们需要将代码放在下面。

mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
5

此代码将使用write-truncate-to-big-u查询方法将行放入transactions表中。现在运行数据流管理器。在我们的交易中几分钟后 表应可见三条记录。现在更新您的pipeline.py文件如下:

mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery
6

几分钟后,在user_transaction_metrics表中可以看到执行查询的结果。

示例代码beam\u manager您可以在/examples/\u example\u beam\u manager.py中找到

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何使用相同的模型、颜色、字体和侦听器创建JTable的副本?   JavaSpring+Thymeleaf:用户时区中的时间   java HTTP请求返回非法状态异常安卓   java xml验证JDK 1.5 JDK 1.6差异   junit如何使用Parasoft从java文件生成测试文件   java使用getSpans方法获取可扩展文件中的所有跨距   javascript无法使用bindingResult设置表单元素   java RCP应用程序活动   获取Java类中泛型字段的类型   java更新查询,从一个依赖于另一个表的表中更改字段   java错误:GWT类型中的方法setBridge(GWTBridge)不适用于参数(GWTBridge)   为什么java/安卓需要在XML元素的开头有一个空格来解析它?   java通用代码,用于将字符串转换为任何所需的类   java如何创建Liferay钩子以扩展购物车portlet的功能   java Selected()方法存在漏洞   java新行附加在我的加密字符串上   使用NaN值的指数平滑的java   使用飞碟和iText发行的java XHTML到PDF   java如何在不使用HTMLDocument的情况下在JTextPane中显示两列文本?