试图弄清楚如何编写流式处理作业依赖于一些第三方模块和查找表,如下所示:
# custom.py
# this is the 3rd party or user defined python module,
# there're some module-level variables
# and some functions which rely on the moduel-level variables to work
VAR_A = ...
VAR_B = ...
# load external data files to initialize VAR_A and VAR_B
def init_var(external_file):
with open(external_file, 'r') as f:
for l in f:
VAR_A.append(l)
VAR_B.check(l)
....
# relies on VAR_A and VAR_B to work
def process(x):
if x in VAR_A:
...
if VAR_B.check(x):
...
流驱动程序如下所示,基本上,对于每个rdd,我想通过handle
应用custom
的process
函数,但是在process
函数中依赖一些查找变量来工作,即VAR_A
和VAR_B
,所以我必须在Spark contenxt中显式地广播这些查找变量吗?你知道吗
# driver.py
import custom
def handle(x):
...
custom = shared.value
return custom.process(x)
if __name__ == '__main__':
sc = SparkContext(appName='porn_score_on_name')
ssc = StreamingContext(sc, 2)
custom.init('/path/to/external_file')
# since each task node will use custom, so I try to make it a shared one
# HOWEVER, this won't work, since module cannot be pickled
shared = sc.broadcast(custom)
# get stream data
data = ...
processed = data.map(handle)
# further processing
...
ssc.start()
ssc.awaitTermination()
我想知道如何使它工作,如果我必须使用第三方模块?你知道吗
更新
假设实时流式输入有几行字,例如
word1 word2
word3
word5 word7 word1
...
我想找出在一个预先计算的词汇表(V)中有单词的行。你知道吗
所以我有个想法: 写一个流作业来处理传入的数据,这意味着我有多个并行运行的执行器来使用数据,对于每个执行器,预计算的vocav应该一直可用。 现在的问题是我该怎么做?你知道吗
以下是我的初步看法:
我制作了一个包含词汇表和我的自定义代码的zip包。包装.zip,然后我提交这个包装.zip通过spark-submit
,所以包装.zip在每个执行器机器上都是可用的,那么我应该做一些事情让每个执行器从中加载词汇表包装.zip到内存中的查找表中,因此,现在每个执行器都可以访问词汇表,以便在驱动程序开始运行时正确处理实时流数据。你知道吗
然而,事实证明,上面的想法是可行的,但是每个执行者一次又一次地为每个批加载词汇表,这是不可接受的。 这是我的第二个观点: 我应该将词汇表加载到驱动程序中(这样就在本地机器上发生了,而不是在执行器上),然后我将词汇表查找表广播给所有执行器,然后执行任务。你知道吗
你的例子看起来并不是一个流问题,只是如何加载一个全局变量。。。你知道吗
我不会试图广播整个模块,只是个别需要的变量。你知道吗
例如,您应该能够像这样使用广播变量。(代码未测试)
相关问题 更多 >
编程相关推荐