ipython笔记本中的spark结构化流式多线程

nbthread_spark的Python项目详细描述


ipython笔记本中的spark多线程。

现在,在Jupyter笔记本电脑中执行Spark结构化流很简单

安装

pip install nbthread_spark --process-dependency-links

用法

显示停止按钮和用户界面:

fromnbthread_spark.sparkimportSparkRunnerspark=SparkRunner.builder.getOrCreate()# same as original SparkSession## you will see buttons ;)

给定一个套接字流:

TCP_IP="localhost"TCP_PORT=9005frompyspark.sql.functionsimportfrom_jsonfrompyspark.sqlimportSparkSessionfrompyspark.sql.typesimportStructField,StructType,IntegerTypeschema=StructType([StructField("bip",IntegerType(),True),StructField("is_on",IntegerType(),True)])spark=SparkSession \
    .builder \
    .appName("IOTStreamApp") \
    .getOrCreate()iot_stream=spark \
    .readStream \
    .format("socket") \
    .option("host",TCP_IP) \
    .option("port",TCP_PORT) \
    .load()iot_expanded=iot_stream.withColumn('value_json',from_json('value',schema)).drop('value').select('value_json.*')query=iot_expanded \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("iot_table") \
    .start()

您可以使用以下命令运行查询:

fromnbthread_spark.streamimportStreamRunnerrunner=StreamRunner(query)runner.controls()## you will see buttons ;)runner.start()# start without controlsrunner.status()# show stream statusrunner.stop()# stop streaming and thread

对于流管理器,您可以轻松地控制许多流:

fromnbthread_spark.managerimportStreamManagersm=StreamManager()sm.append(runner)sm.append(runner1)sm.append(runner2)sm.all_controls()## you will see all buttons from streams ;)sm.start_all()# start all streamssm.stop_all()# stop all streams

特别感谢

Here参与此模块的学生列表。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Spring框架服务单元测试   在Java中遍历hashmaps的hashmap以检索字符串值   如何使用CodeQL检查Java注释是否具有特定属性?   java为什么在Spring Boot中访问此资源而不是登录弹出窗口需要始终获得完全身份验证   处理将多集计数转换为列表的过程   java另一个线性布局,没有出现按钮   eclipse Java映像加载未显示在jar中   java Junit类无法加载基本测试类ApplicationContext   java如何在main中使用my getvalues()方法打印列表   java Sonar,S128:切换案例应该以无条件的“中断”语句结束,而不是继续   java从socket读取字符串错误连接重置错误   java使用新数据刷新任意图表饼图   java通过异步运行lambda访问方法参数   java错误的结果一旦我处理try and catch