在googledataflow上使用自定义窗口fn,该窗口在元素值之后关闭

2024-06-08 00:59:17 发布

您现在位置:Python中文网/ 问答频道 /正文

{1如果用户在一个特定的时间段内启动了一个新的会话,则根据一个用户的建议,如果一个新的会话在某个特定的时间段内启动,则返回该窗口。在

基本上我添加了这个IntervalWindow类:

class StoppingIntervalWindow(IntervalWindow):
    def __init__(self, start, end, event):
        super(StoppingIntervalWindow, self).__init__(start, end)
        self.event = event

然后我通过扩展WindowFn类创建了一个自定义窗口。 我将窗口分配给StoppingIntervalWindow,合并的方式与会话窗口中的类似:

^{2}$

我当前的问题是:本地用户数据被正确聚合,并且每次事件发生或时间间隔过去时都会启动不同的会话。然而,一旦我在google云数据流中远程运行我的代码,我会得到以下错误:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    def start(self):
  File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.shuffle_source.reader() as reader:
  File "dataflow_worker/shuffle_operations.py", line 69, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/shuffle_operations.py", line 232, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/trigger.py", line 993, in process_elements
    self.window_fn.merge(TriggerMergeContext(all_windows))
  File "/vagrant/adjust/session.py", line 50, in merge
AttributeError: 'IntervalWindow' object has no attribute 'event'
  • 我有没有办法在Google数据流上使用一个自定义的IntervalWindow?在
  • 如果没有,有没有其他方法来实现这个用例,使它也能在googledataflow上工作?在

Tags: inpyselfeventapachelinestartfile

热门问题