应用this或this示例来构建我的程序,每次尝试插入大查询时,都会出现以下错误:
溢出错误:日期值超出范围[运行'Format'时]
我的束流管道是:
Bigquery = (transformation
| 'Format' >> beam.ParDo(FormatBigQueryoFn())
| 'Write to BigQuery' >> beam.io.Write(beam.io.BigQuerySink(
'XXXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_append
)))
在类FormatBigQueryoFn中,它应该是窗口数据时间的逻辑
例1的代码:
^{pr2}$例2的代码:
class FormatDoFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
ts_format = '%Y-%m-%d %H:%M:%S.%f UTC'
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
return [{'word': element[0],
'count': element[1],
'window_start':window_start,
'window_end':window_end}]
我的管道可能出了什么问题?在
编辑:
例如,如果我打印窗口.开始我得到:
Timestamp(-9223372036860)
问题是我在用googlepub/Sub测试之前从一个文件中读取数据
当我从文件中读取数据时,元素没有时间戳。在
元素中必须有时间戳。在
发布/订阅自动附加此时间戳。在
来自documentation:
最简单的窗口形式是使用固定时间窗口:给定一个时间戳的PCollection,它可能会持续更新,每个窗口可能会捕捉(例如)时间戳在5分钟间隔内的所有元素。
相关问题 更多 >
编程相关推荐