我一直在使用Python开发一些Spark流,特别是textFileStream,我注意到一个有点奇怪的行为。我想知道是否有人能帮我解释一下。在
我目前的代码设置如下:
def fileName(data):
debug = data.toDebugString()
pattern = re.compile("file:/.*\.txt")
files = pattern.findall(debug)
return files
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingFileNamePrinter")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("file:///test/input/")
files = lines.foreachRDD(fileName)
print(files)
ssc.start()
ssc.awaitTermination()
fileName函数simple从调试流(Spark Streaming: How to get the filename of a processed file in Python)获取正在处理的文件的名称。但是,此代码只运行一次,只打印一次文件。当我修改函数如下:
^{pr2}$它像预期的那样每秒钟检查一次目录。似乎只有foreachRDD中有“循环”的代码。在
我的假设是正确的吗?所有的处理(包括循环、条件语句等)都必须在map函数中进行等等?在
谢谢, 米
数据流由随时间构建的许多RDD组成。 线是数据流。在
当您在行上执行foreachRDD时,流中的每个rdd都被转换为一个字符串。因此,当您打印它时,您将得到一个表示流中所有RDD的字符串列表。意思是,这发生在“流的尽头”。在
当您在fileName函数中打印字符串时,您正在对流中的每个rdd执行此操作。所以你可以在流运行时得到它。在
另外,正如我在你前面的问题中所提到的,foreachRDD在这里是不必要的。它不是“火花流的方式”这个特定的需要,也许这就是为什么它让你困惑。在
这里更直接的方法是在数据流本身上使用映射(这将影响其中的所有rdd),然后使用pprint。在
请记住,与常规rdd不同,您不能在流中收集(或任何类似的)rdd并在流运行时返回结果。您需要对该数据执行某些操作,将其保存到某个外部源(如果需要)或将其作为整个流状态的一部分进行处理。在
相关问题 更多 >
编程相关推荐