ipython笔记本中的spark结构化流式多线程

nbthread_spark的Python项目详细描述


ipython笔记本中的spark多线程。

现在,在Jupyter笔记本电脑中执行Spark结构化流很简单

安装

pip install nbthread_spark --process-dependency-links

用法

显示停止按钮和用户界面:

fromnbthread_spark.sparkimportSparkRunnerspark=SparkRunner.builder.getOrCreate()# same as original SparkSession## you will see buttons ;)

给定一个套接字流:

TCP_IP="localhost"TCP_PORT=9005frompyspark.sql.functionsimportfrom_jsonfrompyspark.sqlimportSparkSessionfrompyspark.sql.typesimportStructField,StructType,IntegerTypeschema=StructType([StructField("bip",IntegerType(),True),StructField("is_on",IntegerType(),True)])spark=SparkSession \
    .builder \
    .appName("IOTStreamApp") \
    .getOrCreate()iot_stream=spark \
    .readStream \
    .format("socket") \
    .option("host",TCP_IP) \
    .option("port",TCP_PORT) \
    .load()iot_expanded=iot_stream.withColumn('value_json',from_json('value',schema)).drop('value').select('value_json.*')query=iot_expanded \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("iot_table") \
    .start()

您可以使用以下命令运行查询:

fromnbthread_spark.streamimportStreamRunnerrunner=StreamRunner(query)runner.controls()## you will see buttons ;)runner.start()# start without controlsrunner.status()# show stream statusrunner.stop()# stop streaming and thread

对于流管理器,您可以轻松地控制许多流:

fromnbthread_spark.managerimportStreamManagersm=StreamManager()sm.append(runner)sm.append(runner1)sm.append(runner2)sm.all_controls()## you will see all buttons from streams ;)sm.start_all()# start all streamssm.stop_all()# stop all streams

特别感谢

Here参与此模块的学生列表。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java理解泛型   java Guava:如何自定义减少多重映射?   java无法构建实体管理器工厂JPA/Hibernate   不区分大小写的LDAP搜索   在java中同时调用所有类对象中的方法   java做高级数字计算?2.1k等于2100等。。?   java Camel netty组件:未能创建选择器   exceljava。lang.ClassCastException:ExcelStreamAction无法强制转换为com。开放交响乐团。xwork2。行动   java避免对嵌套a4j:区域进行验证   java如何使一帧在1秒内显示50次,每次显示时消失   java一个HashMap的遍历,我得到NullPointerException   windows HP Stream 8平板电脑。。。Java swing JScrollPane滚动在触摸屏上不工作   java如何在运行时根据用户/程序员的需要自动增加数组的大小?