目前,我每1秒收到4个或更多的车辆物联网感官数据记录,为了简单起见,我想先添加4个速度读数值。我发现的大多数代码示例都提供了我已经可以做的计数,但是我如何简单地添加4行或更多单独的速度值呢。现在,输出显示了1秒的时间戳和4x提取的速度值
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import pyspark.sql.functions as sf
from pyspark.sql.functions import udf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
conf = SparkConf().setAppName("rjws-sparkstreams")
#Pauses for Context Load
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc, '172.16.10.1:2181', 'spark-streaming', {'vehicle_events':1})
#Presents JSON formatted data
KafkaStream_json = kafkaStream.map(lambda x: json.loads(x[1]))
#Parses the Velocity column of data
velocity_dstream = KafkaStream_json.map(lambda vehicle_events_fast_testdata: vehicle_events_fast_testdata["velocity"])
velocity_readings = velocity_dstream.countByValue()
top_reads = velocity_readings.transform(lambda rdd:sc.parallelize(rdd.take(50)))
ssc.start()
ssc.awaitTermination()
我也尝试添加以下代码:
total = 0
def velParse(vehicle_events_fast_testdata):
total = sum(vehicle_events_fast_testdata["velocity"]) + (total)
return vehicle_events_fast_testdata["velocity"]
velocity_dstream = KafkaStream_json.map(lambda vehicle_events_fast_testdata: velParse(vehicle_events_fast_testdata))
然而,这并不能正确计算速度读数的总和,它表明该项是不可测的。谢谢
目前没有回答
相关问题 更多 >
编程相关推荐