尝试理解spark流媒体工作

2024-03-29 05:40:12 发布

您现在位置:Python中文网/ 问答频道 /正文

试图弄清楚如何编写流式处理作业依赖于一些第三方模块和查找表,如下所示:

# custom.py
# this is the 3rd party or user defined python module,
# there're some module-level variables
# and some functions which rely on the moduel-level variables to work
VAR_A = ...
VAR_B = ...


# load external data files to initialize VAR_A and VAR_B
def init_var(external_file):
  with open(external_file, 'r') as f:
    for l in f:
      VAR_A.append(l)
      VAR_B.check(l)
      ....

# relies on VAR_A and VAR_B to work
def process(x):
  if x in VAR_A:
    ...
  if VAR_B.check(x):
    ...

流驱动程序如下所示,基本上,对于每个rdd,我想通过handle应用customprocess函数,但是在process函数中依赖一些查找变量来工作,即VAR_AVAR_B,所以我必须在Spark contenxt中显式地广播这些查找变量吗?你知道吗

# driver.py
import custom

def handle(x):
  ...
  custom = shared.value
  return custom.process(x)


if __name__ == '__main__':
  sc = SparkContext(appName='porn_score_on_name')
  ssc = StreamingContext(sc, 2)

  custom.init('/path/to/external_file')

  # since each task node will use custom, so I try to make it a shared one
  # HOWEVER, this won't work, since module cannot be pickled
  shared = sc.broadcast(custom)

  # get stream data
  data = ...
  processed = data.map(handle)

  # further processing
  ...

  ssc.start()
  ssc.awaitTermination()

我想知道如何使它工作,如果我必须使用第三方模块?你知道吗

更新

假设实时流式输入有几行字,例如

word1 word2
word3
word5 word7 word1
...

我想找出在一个预先计算的词汇表(V)中有单词的行。你知道吗

所以我有个想法: 写一个流作业来处理传入的数据,这意味着我有多个并行运行的执行器来使用数据,对于每个执行器,预计算的vocav应该一直可用。 现在的问题是我该怎么做?你知道吗

以下是我的初步看法: 我制作了一个包含词汇表和我的自定义代码的zip包。包装.zip,然后我提交这个包装.zip通过spark-submit,所以包装.zip在每个执行器机器上都是可用的,那么我应该做一些事情让每个执行器从中加载词汇表包装.zip到内存中的查找表中,因此,现在每个执行器都可以访问词汇表,以便在驱动程序开始运行时正确处理实时流数据。你知道吗

然而,事实证明,上面的想法是可行的,但是每个执行者一次又一次地为每个批加载词汇表,这是不可接受的。 这是我的第二个观点: 我应该将词汇表加载到驱动程序中(这样就在本地机器上发生了,而不是在执行器上),然后我将词汇表查找表广播给所有执行器,然后执行任务。你知道吗


Tags: andto词汇表dataonvardefcustom
1条回答
网友
1楼 · 发布于 2024-03-29 05:40:12

你的例子看起来并不是一个流问题,只是如何加载一个全局变量。。。你知道吗

我不会试图广播整个模块,只是个别需要的变量。你知道吗

例如,您应该能够像这样使用广播变量。(代码未测试)

# One of the first things you do
vocab = sc.broadcast(open('vocab.txt').readlines())  # broadcast to all executors

def vocab_filter(line):
    words = line.split()
    return [w for w in words if w in vocab.value]

ssc = StreamingContext(sc, 1)  # Some streaming context
lines = ssc.socketTextStream("localhost", 9999)  # Some stream
# remove extraneous words from the lines and flatten all words in the stream
lines_filtered = lines.flatMap(vocab_filter)  

相关问题 更多 >