如何以控制台格式打印结构化流

2024-04-26 11:09:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在学习使用DataRicks的结构化流媒体,我正在努力使用DataStreamWriter控制台模式

我的节目:

  • 模拟文件流式到达文件夹“monitoring_dir”(每10秒从“source_dir”传输一个新文件)
  • 使用DataStreamReader用每个新文件的内容填充无界数据框“inputUDF”
  • 使用DataStreamWriter将新行“inputUDF”输出到有效接收器

当选择使用文件接收器时,程序可以工作(批被附加到“result_dir”中的文本格式文件中),但当选择Console sink时,我看不到任何显示

此外,当我在本地机器上运行该程序的等效版本(安装了Spark)时,它对文件接收器和控制台接收器都可以正常工作

我的问题是:

  • 如何使此程序输出到控制台接收器,并在使用Databricks时显示结果?

提前非常感谢

致以最良好的祝愿, 纳乔


我的程序:myTest.py

import pyspark
import pyspark.sql.functions

import time

#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------ 
def get_source_dir_file_names(source_dir):

    # 1. We create the output variable
    res = []

    # 2. We get the FileInfo representation of the files of source_dir
    fileInfo_objects = dbutils.fs.ls(source_dir)

    # 3. We traverse the fileInfo objects, to get the name of each file
    for item in fileInfo_objects:      
        # 3.1. We get a string representation of the fileInfo
        file_name = str(item)

        # 3.2. We look for the pattern name= to remove all useless info from the start
        lb_index = file_name.index("name='")
        file_name = file_name[(lb_index + 6):]

        # 3.3. We look for the pattern ') to remove all useless info from the end
        ub_index = file_name.index("',")
        file_name = file_name[:ub_index]

        # 3.4. We append the name to the list
        res.append(file_name)

    # 4. We sort the list in alphabetic order
    res.sort()

    # 5. We return res
    return res

#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------ 
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
    # 1. We get the names of the files on source_dir
    files = get_source_dir_file_names(source_dir)

    # 2. We get the starting time of the process
    time.sleep(time_step_interval * 0.1)

    start = time.time()

    # 3. We set a counter in the amount of files being transferred
    count = 0

    # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
    # (i.e, the files are moved one by one for each time period, simulating their generation).
    for file in files:
        # 4.1. We copy the file from source_dir to dataset_dir#
        dbutils.fs.cp(source_dir + file, monitoring_dir + file)

        # 4.2. We increase the counter, as we have transferred a new file
        count = count + 1

        # 4.3. We wait the desired transfer_interval until next time slot.
        time.sleep((start + (count * time_step_interval)) - time.time())

    # 5. We wait a last time_step_interval
    time.sleep(time_step_interval)

#------------------------------------
# FUNCTION my_main
#------------------------------------ 
def my_main():
    # 0. We set the mode
    console_sink = True

    # 1. We set the paths to the folders
    source_dir = "/FileStore/tables/my_dataset/"
    monitoring_dir = "/FileStore/tables/my_monitoring/"
    checkpoint_dir = "/FileStore/tables/my_checkpoint/"
    result_dir = "/FileStore/tables/my_result/"

    dbutils.fs.rm(monitoring_dir, True)
    dbutils.fs.rm(result_dir, True)
    dbutils.fs.rm(checkpoint_dir, True)  

    dbutils.fs.mkdirs(monitoring_dir)
    dbutils.fs.mkdirs(result_dir)
    dbutils.fs.mkdirs(checkpoint_dir)    

    # 2. We configure the Spark Session
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel('WARN')    

    # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
    inputUDF = spark.readStream.format("text")\
                               .load(monitoring_dir)

    myDSW = None
    # 4. Operation A1: We create the DataStreamWritter...

    # 4.1. To either save to result_dir in append mode  
    if console_sink == False:
        myDSW = inputUDF.writeStream.format("text")\
                                    .option("path", result_dir) \
                                    .option("checkpointLocation", checkpoint_dir)\
                                    .trigger(processingTime="10 seconds")\
                                    .outputMode("append")   
    # 4.2. Or to display by console in append mode    
    else:
        myDSW = inputUDF.writeStream.format("console")\
                                    .trigger(processingTime="10 seconds")\
                                    .outputMode("append")   

    # 5. We get the StreamingQuery object derived from starting the DataStreamWriter
    mySQ = myDSW.start()

    # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
    streaming_simulation(source_dir, monitoring_dir, 10)

    # 7. We stop the StreamingQuery to finish the application
    mySQ.stop()    

#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
    my_main()

我的数据集:f1.txt

第一句

第二句


我的数据集:f2.txt

第三句

第四句


我的数据集:f3.txt

第五句

第六句


Tags: ofthetonamesourcegettimedir
1条回答
网友
1楼 · 发布于 2024-04-26 11:09:03

"How can I make this program to output to Console sink and display the results when using Databricks?"

最简单的方法是使用Databricks提供的display。您可以使用它,如下所示:

# Cell 1
rateDf = (spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1)
  .option("numPartitions", 1)
  .load())

# Cell 2
display(rateDf, streamName="rate_stream")

控制台接收器在Databricks中不起作用,因为您希望它在IDE中或将其提交到集群时起作用。相反,您可以使用memory格式并使用%sql查询来查询数据:

inputUDF.writeStream \
  .format("memory") \
  .trigger(processingTime = "10 seconds") \
  .queryName("inputUDF_console") \
  .outputMode("append") \
  .start()  

在另一个DataRicks单元格中,您可以通过查询表来查看数据,如queryName中所示:

%sql select * from inputUDF_console

相关问题 更多 >