我试图使用ApacheBeam的^{datetime.date
进行编码
这里有一个简化的代码块来演示这个问题。守则:
import logging
import random
import apache_beam as beam
from apache_beam.io import WriteToText
from datetime import date
def random_record():
return {
'account_id' : random.randint(1004, 1009),
'activity_date': date(2021, 10, random.randint(1, 4)),
'region' : random.randint(30, 40),
'largest_sale' : random.randint(10000, 40000),
'total_sales' : random.randint(100000, 900000)
}
def main(argv=None):
random.seed(2349090823434)
records = [random_record() for i in range(100)]
with beam.Pipeline() as p:
output = (
p
| "Source" >> beam.Create(records)
| "Typed" >> beam.Map(lambda d: beam.Row(**d))
| "Rollup" >> (beam.GroupBy('account_id', 'activity_date')
.aggregate_field('total_sales', sum, 'total_sales')
.aggregate_field('largest_sale', max, 'largest_sale')
)
| "Output" >> WriteToText('learn-output', file_name_suffix='.csv')
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main()
当我运行此命令时:
% python learn.py
导致错误的原因:
WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '<class 'apache_beam.transforms.core.Key'>' in 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'.
Traceback (most recent call last):
...
TypeError: Unable to deterministically encode '2021-10-01' of type '<class 'datetime.date'>', please provide a type hint for the input of 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'
...
TypeError: Unable to deterministically encode 'Key(account_id=1007, activity_date=datetime.date(2021, 10, 1))' of type '<class 'apache_beam.transforms.core.Key'>',
please provide a type hint for the input of 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'
我想知道如何“为'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'的输入提供类型提示”
似乎GroupByKey
正在GroupBy
内调用。所以我需要告诉GroupBy如何处理我的2元组密钥,或者注册一些编码器来支持datetime.date
一般来说,你应该能够做一些像
为了处理这个问题,这里有一些与模式的糟糕交互,这使得在这种情况下使用
beam.GroupBy
更加困难。一个问题是beam.Row(**d)
实际上并没有让Beam在构造时计算出这些Row
对象的列名/类型(模式)(为此,需要显式传递关键字)。(今后的工作将使这项工作更好。)显然,转换为字符串是一个简单的选择,正如注释所建议的那样
出于分组目的,您可以将日期转换为字符串:
相关问题 更多 >
编程相关推荐