{1如果用户在一个特定的时间段内启动了一个新的会话,则根据一个用户的建议,如果一个新的会话在某个特定的时间段内启动,则返回该窗口。在
基本上我添加了这个IntervalWindow类:
class StoppingIntervalWindow(IntervalWindow):
def __init__(self, start, end, event):
super(StoppingIntervalWindow, self).__init__(start, end)
self.event = event
然后我通过扩展WindowFn类创建了一个自定义窗口。 我将窗口分配给StoppingIntervalWindow,合并的方式与会话窗口中的类似:
^{2}$我当前的问题是:本地用户数据被正确聚合,并且每次事件发生或时间间隔过去时都会启动不同的会话。然而,一旦我在google云数据流中远程运行我的代码,我会得到以下错误:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
def start(self):
File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.shuffle_source.reader() as reader:
File "dataflow_worker/shuffle_operations.py", line 69, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/shuffle_operations.py", line 232, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/trigger.py", line 993, in process_elements
self.window_fn.merge(TriggerMergeContext(all_windows))
File "/vagrant/adjust/session.py", line 50, in merge
AttributeError: 'IntervalWindow' object has no attribute 'event'
目前没有回答
相关问题 更多 >
编程相关推荐