如何在apache beam GroupBy中使用datetime.date值?

2024-05-16 23:49:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图使用ApacheBeam的^{}构造,但遇到了一个问题,即它不知道如何对标准python datetime.date进行编码

这里有一个简化的代码块来演示这个问题。守则:

import logging
import random
import apache_beam as beam
from apache_beam.io import WriteToText
from datetime import date


def random_record():
    return {
        'account_id'   : random.randint(1004, 1009),
        'activity_date': date(2021, 10, random.randint(1, 4)),
        'region'       : random.randint(30, 40),
        'largest_sale' : random.randint(10000, 40000),
        'total_sales'  : random.randint(100000, 900000)
    }

def main(argv=None):
    random.seed(2349090823434)

    records = [random_record() for i in range(100)]

    with beam.Pipeline() as p:
        output = (
            p 
            | "Source" >> beam.Create(records)
            | "Typed"  >> beam.Map(lambda d: beam.Row(**d))
            | "Rollup" >> (beam.GroupBy('account_id', 'activity_date')
                                .aggregate_field('total_sales', sum, 'total_sales')
                                .aggregate_field('largest_sale', max, 'largest_sale')
                        )
            | "Output" >> WriteToText('learn-output', file_name_suffix='.csv')
        )

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    main()

当我运行此命令时:

% python learn.py

导致错误的原因:

WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '<class 'apache_beam.transforms.core.Key'>' in 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'.
Traceback (most recent call last):
  ...
TypeError: Unable to deterministically encode '2021-10-01' of type '<class 'datetime.date'>', please provide a type hint for the input of 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'

...

TypeError: Unable to deterministically encode 'Key(account_id=1007, activity_date=datetime.date(2021, 10, 1))' of type '<class 'apache_beam.transforms.core.Key'>', 
please provide a type hint for the input of 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'

我想知道如何“为'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'的输入提供类型提示”

似乎GroupByKey正在GroupBy内调用。所以我需要告诉GroupBy如何处理我的2元组密钥,或者注册一些编码器来支持datetime.date


Tags: ofimportfordatetimedateapacheloggingtype
2条回答

一般来说,你应该能够做一些像

class DateCoder(beam.coders.Coder):
    def encode(self, d):
        return d.isoformat().encode('ascii')
    def decode(self, bs):
        return datetime.date.fromisoformat(bs.decode('ascii'))

beam.coders.registry.register_coder(date, DateTimeCoder)

为了处理这个问题,这里有一些与模式的糟糕交互,这使得在这种情况下使用beam.GroupBy更加困难。一个问题是beam.Row(**d)实际上并没有让Beam在构造时计算出这些Row对象的列名/类型(模式)(为此,需要显式传递关键字)。(今后的工作将使这项工作更好。)

显然,转换为字符串是一个简单的选择,正如注释所建议的那样

出于分组目的,您可以将日期转换为字符串:

beam.GroupBy('account_id', date_str=lambda x: str(x.activity_date))

相关问题 更多 >