在python中将spark Dstream附加到单个文件

2024-05-16 02:16:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个程序读取卡夫卡和打印输出在火花。我需要将此输出附加到单个文件中。。 我的代码写入文件夹。火花写入多个文件,然后我有另一个实用程序,它将聚合文件的结果。在

有没有简单的方法可以将数据流的多个RDD数据追加到同一个文件中? 或者 我可以将所有的数据流RDD合并到一个数据流,并将其流/附加到文件中吗

    conf = SparkConf() \
         .setAppName("PySpark Cassandra Test") \
         .setMaster("spark://host:7077") \
         .set("spark.rpc.netty.dispatcher.numThreads","2")

    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 20)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    parsed = kvs.map(lambda (k, v): json.loads(v))
    mapped = parsed.map(lambda event: (event['test'], 1))
    reduced = mapped.reduceByKey(lambda x,y: x + y)
    result = reduced.map(lambda x: {"test": x[0], "test2": x[1]})
    result.pprint()
    result.saveAsTextFiles("file:///test/hack")
    ssc.start()
    ssc.awaitTermination()

Tags: 文件lambdatestmaptopicconfresultspark
1条回答
网友
1楼 · 发布于 2024-05-16 02:16:08

我可以用foreachRDD来做

def tpprint(val, num=10):
    """
    Print the first num elements of each RDD generated in this DStream.
    @param num: the number of elements from the first will be printed.
    """
    def takeAndPrint(time, rdd):
        taken = rdd.take(num + 1)
        print("########################")
        print("Time: %s" % time)
        print("########################")
        for record in taken[:num]:
            print(record)
            with open("/home/ubuntu/spark-1.4.1/test.txt", "a") as myfile:
                myfile.write(str(record))
        if len(taken) > num:
            print("...")
        print("")

    val.foreachRDD(takeAndPrint)

叫它像 结果=缩小.map(lambda x:{“feddback_id”:x[0],“pageviews”:x[1]})

^{pr2}$

相关问题 更多 >