apachebeam作业在大型数据上执行会话窗口时失败

2024-04-25 14:14:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在处理一个Python Apache Beam作业,在有界数据集上打开会话窗口。它适用于较小的数据集,但当我增加输入数据的大小时,任务就会终止。在

作业ID是2019-06-10_07_28_32-2942508228086251217。在

elements = (p | 'IngestData' >> beam.io.Read(big_query_source))

        elements | 'AddEventTimestamp' >> beam.ParDo(AddTimestampDoFn()) \
                        | 'SessionWindow' >> beam.WindowInto(window.Sessions(10 * 60)) \
                        | 'CreateTuple' >> beam.Map(lambda row: (row['id'], {'attribute1': row['attribute1'], 'date': row['date']})) \
                        | 'GroupById1' >> beam.GroupByKey() \
                        | 'AggregateSessions' >> beam.ParDo(AggregateTransactions()) \
                        | 'MergeWindows' >> beam.WindowInto(window.GlobalWindows()) \
                        | 'GroupById2' >> beam.GroupByKey() \
                        | 'MapSessionsToLists' >> beam.Map(lambda x: (x[0], [y for y in x[1]])) \
                        | 'BiggestSession' >> beam.ParDo(MaximumSession()) \
                        | "PrepForWrite" >> beam.Map(lambda x: x[1].update({"id": x[0]}) or x[1]) \
                        | 'WriteResult' >> WriteToText(known_args.output)

DoFn类

^{2}$

作业失败并给出以下错误:The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers:

stackdriver上的特定worker日志没有任何暗示。我只是得到了这些条目的组合:

processing lull for over 431.44 seconds in state process-msecs in step s5

Refusing to split <dataflow_worker.shuffle.GroupedShuffleRangeTracker object at 0x7f82e970cbd0> at '\n\xaaG\t\x00\x01': proposed split position is out of range

Retry with exponential backoff: waiting for 4.69305060273 seconds before retrying lease_work because we caught exception: SSLError: ('The read operation timed out',)

其余条目是信息性的。在

该特定工作线程的最新内存使用量为43413 MB。因为我使用的是n1-highmem-32机器,所以我不认为内存是个问题。在

在客户端,云Shell,在我触发这个工作的地方,我得到了很多

INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)

在工作失败之前。在

有什么想法吗?在

谢谢


Tags: to数据ininfomapfor作业row
1条回答
网友
1楼 · 发布于 2024-04-25 14:14:36

默认情况下,如果在批处理模式下出现任何错误,Dataflow将重试管道4次,而在流模式下运行时则不确定重试次数。在

请在堆栈驱动程序中为用于管道的计算引擎计算机创建仪表板,以分析正在发生的内存、CPU消耗和IO操作量。在对上述因素进行仔细分析后,应提高管道的配置。在

请确保根据您提供的数据,所有转换都能正常工作,并应用异常处理。在

相关问题 更多 >