Kinesis Spark流媒体集成无法输出数据流内容

2024-06-01 03:10:02 发布

您现在位置:Python中文网/ 问答频道 /正文

使用Python,我想创建一个简单的体系结构来打印以动画形式传输的数据,然后发送到Spark Streaming数据流对象。我正在EC2实例中运行所有内容

我的数据生成器是一个运动代理监控/var/documents/目录。 代理日志文件似乎正在解析记录并将它们发送到目标,但不知何故,当我打印DStream对象时,什么也没有显示

我的源代码

import boto3, random, time
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

conf = SparkConf().setAppName("KinesisSparkBigDataPipeline")

sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)

def createStream():
    """
    Function that creates a DStream Object coming from Kinesis Stream.

    Returns:
        sparkDStream => DStream object created from records in the Kinesis Stream.
    """
    kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
    sparkDStream = KinesisUtils.createStream(
            ssc,
            kinesisAppName,
            "EntryPoints",
            "https://kinesis.eu-central-1.amazonaws.com",
            "eu-central-1",
            InitialPositionInStream.LATEST,
            2
    )
    return sparkDStream

if __name__ == "__main__":
    try:
        kinesisStream = createStream()
        kinesisStream.pprint()

        ssc.start()
        time.sleep(60)
        ssc.stop()
        # ssc.awaitTermination()
    except Exception as e:
        print(e)

运行命令时的输出:spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 poc_bigdata_pipeline.py是:

-------------------------------------------
Time: 2020-11-03 11:09:52
-------------------------------------------

-------------------------------------------
Time: 2020-11-03 11:09:54
-------------------------------------------

...

我做错什么了吗?如果我忘记了我的问题的任何重要信息,请原谅我,我对这个问题很陌生

谢谢你阅读这篇文章


Tags: 数据对象fromimport代理confkinesisspark