如何将CSV作为流表源加载到PyFlink中?

2024-06-07 03:17:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试设置一个简单的游乐场环境来使用Flink Python表API。我最终试图写的作业将来自卡夫卡或肯尼斯队列,但这使得玩弄想法(和测试)非常困难

我可以愉快地从CSV加载并以批处理模式处理它。但我无法让它在流模式下工作。我将如何在StreamingExecutionEnvironment中做类似的事情(主要是为了让我可以使用windows)

我知道我需要让系统使用EventTime(因为ProcTime会一次全部进入),但我找不到任何方法来设置它。原则上,我应该能够将CSV中的一列设置为事件时间,但文档中不清楚如何设置(或者如果可能)

为了让批处理执行测试运行,我使用了下面的代码,它从input.csv读取并输出到output.csv

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    TableConfig,
    DataTypes,
    BatchTableEnvironment,
    StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"

try:
    out_path.unlink()
except:
    pass

from pyflink.table.window import Tumble

(
    t_env.connect(FileSystem().path(str(root / "input.csv")))
    .with_format(Csv())
    .with_schema(
        Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
    )
    .create_temporary_table("mySource")
)

(
    t_env.connect(FileSystem().path(str(out_path)))
    .with_format(Csv())
    .with_schema(
        Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
    )
    .create_temporary_table("mySink")
)

(
    t_env.from_path("mySource")
    .group_by("word")
    .select("word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

t_env.execute("tutorial_job")

而input.csv是

2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve

因此,我的问题是如何设置它,使其从相同的CSV读取,但使用第一列作为事件时间,并允许我编写如下代码:

(
    t_env.from_path("mySource")
    .window(Tumble.over("10.minutes").on("time").alias("w"))
    .group_by("w, word")
    .select("w, word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

任何帮助都将不胜感激,我无法从文件中解决这个问题。我正在使用python 3.7flink 1.11.1


Tags: csvpathfromimportenvfieldinputschema
2条回答

您是否尝试过使用水印策略?正如前面提到的here,您需要使用水印策略来使用事件时间。对于pyflink,我个人认为用ddl格式(如this)声明它更容易

如果使用描述符API,则可以通过架构将字段指定为事件时间字段:

.with_schema(  # declare the schema of the table
             Schema()
             .field("rowtime", DataTypes.TIMESTAMP())
             .rowtime(
                Rowtime()
                .timestamps_from_field("time")
                .watermarks_periodic_bounded(60000))
             .field("a", DataTypes.STRING())
             .field("b", DataTypes.STRING())
             .field("c", DataTypes.STRING())
         )

但是我仍然建议您使用DDL,一方面它更易于使用,另一方面现有的描述符API中存在一些bug,社区正在讨论重构描述符API

相关问题 更多 >