如何使用PySpark与Kafka和Spark流计算值之和

2024-05-14 22:13:18 发布

您现在位置:Python中文网/ 问答频道 /正文

目前,我每1秒收到4个或更多的车辆物联网感官数据记录,为了简单起见,我想先添加4个速度读数值。我发现的大多数代码示例都提供了我已经可以做的计数,但是我如何简单地添加4行或更多单独的速度值呢。现在,输出显示了1秒的时间戳和4x提取的速度值


from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import pyspark.sql.functions as sf
from pyspark.sql.functions import udf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

conf = SparkConf().setAppName("rjws-sparkstreams")

#Pauses for Context Load
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc, '172.16.10.1:2181', 'spark-streaming', {'vehicle_events':1})



#Presents JSON formatted data
KafkaStream_json = kafkaStream.map(lambda x: json.loads(x[1]))

#Parses the Velocity column of data
velocity_dstream = KafkaStream_json.map(lambda vehicle_events_fast_testdata: vehicle_events_fast_testdata["velocity"])
velocity_readings = velocity_dstream.countByValue()
top_reads = velocity_readings.transform(lambda rdd:sc.parallelize(rdd.take(50)))

ssc.start()
ssc.awaitTermination()

我也尝试添加以下代码:

total = 0
def velParse(vehicle_events_fast_testdata):
    total = sum(vehicle_events_fast_testdata["velocity"]) + (total)
    return vehicle_events_fast_testdata["velocity"]

velocity_dstream = KafkaStream_json.map(lambda vehicle_events_fast_testdata: velParse(vehicle_events_fast_testdata))

然而,这并不能正确计算速度读数的总和,它表明该项是不可测的。谢谢


Tags: lambdafromimportjsonsqlevents速度pyspark

热门问题