ipython笔记本中的spark结构化流式多线程
nbthread_spark的Python项目详细描述
ipython笔记本中的spark多线程。
现在,在Jupyter笔记本电脑中执行Spark结构化流很简单
安装
pip install nbthread_spark --process-dependency-links
用法
显示停止按钮和用户界面:
fromnbthread_spark.sparkimportSparkRunnerspark=SparkRunner.builder.getOrCreate()# same as original SparkSession## you will see buttons ;)
给定一个套接字流:
TCP_IP="localhost"TCP_PORT=9005frompyspark.sql.functionsimportfrom_jsonfrompyspark.sqlimportSparkSessionfrompyspark.sql.typesimportStructField,StructType,IntegerTypeschema=StructType([StructField("bip",IntegerType(),True),StructField("is_on",IntegerType(),True)])spark=SparkSession \ .builder \ .appName("IOTStreamApp") \ .getOrCreate()iot_stream=spark \ .readStream \ .format("socket") \ .option("host",TCP_IP) \ .option("port",TCP_PORT) \ .load()iot_expanded=iot_stream.withColumn('value_json',from_json('value',schema)).drop('value').select('value_json.*')query=iot_expanded \ .writeStream \ .outputMode("update") \ .format("memory") \ .queryName("iot_table") \ .start()
您可以使用以下命令运行查询:
fromnbthread_spark.streamimportStreamRunnerrunner=StreamRunner(query)runner.controls()## you will see buttons ;)runner.start()# start without controlsrunner.status()# show stream statusrunner.stop()# stop streaming and thread
对于流管理器,您可以轻松地控制许多流:
fromnbthread_spark.managerimportStreamManagersm=StreamManager()sm.append(runner)sm.append(runner1)sm.append(runner2)sm.all_controls()## you will see all buttons from streams ;)sm.start_all()# start all streamssm.stop_all()# stop all streams
特别感谢
Here参与此模块的学生列表。