我正在尝试设置一个简单的游乐场环境来使用Flink Python表API。我最终试图写的作业将来自卡夫卡或肯尼斯队列,但这使得玩弄想法(和测试)非常困难
我可以愉快地从CSV加载并以批处理模式处理它。但我无法让它在流模式下工作。我将如何在StreamingExecutionEnvironment中做类似的事情(主要是为了让我可以使用windows)
我知道我需要让系统使用EventTime(因为ProcTime会一次全部进入),但我找不到任何方法来设置它。原则上,我应该能够将CSV中的一列设置为事件时间,但文档中不清楚如何设置(或者如果可能)
为了让批处理执行测试运行,我使用了下面的代码,它从input.csv
读取并输出到output.csv
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
TableConfig,
DataTypes,
BatchTableEnvironment,
StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"
try:
out_path.unlink()
except:
pass
from pyflink.table.window import Tumble
(
t_env.connect(FileSystem().path(str(root / "input.csv")))
.with_format(Csv())
.with_schema(
Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
)
.create_temporary_table("mySource")
)
(
t_env.connect(FileSystem().path(str(out_path)))
.with_format(Csv())
.with_schema(
Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
)
.create_temporary_table("mySink")
)
(
t_env.from_path("mySource")
.group_by("word")
.select("word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
t_env.execute("tutorial_job")
而input.csv是
2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve
因此,我的问题是如何设置它,使其从相同的CSV读取,但使用第一列作为事件时间,并允许我编写如下代码:
(
t_env.from_path("mySource")
.window(Tumble.over("10.minutes").on("time").alias("w"))
.group_by("w, word")
.select("w, word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
任何帮助都将不胜感激,我无法从文件中解决这个问题。我正在使用python 3.7
和flink 1.11.1
您是否尝试过使用水印策略?正如前面提到的here,您需要使用水印策略来使用事件时间。对于pyflink,我个人认为用ddl格式(如this)声明它更容易
如果使用描述符API,则可以通过架构将字段指定为事件时间字段:
但是我仍然建议您使用DDL,一方面它更易于使用,另一方面现有的描述符API中存在一些bug,社区正在讨论重构描述符API
相关问题 更多 >
编程相关推荐