我想在windows上用flink的pythonapi处理数据。但是,当我使用命令将作业提交到本地集群时,它抛出NullPointerException
bin/flink run -py D:\workspace\python-test\flink-test.py
弗林克-测试.py地址:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
t_env.connect(FileSystem().path('D:\\workspace\\python-test\\data.txt')) \
.with_format(OldCsv()
.line_delimiter(' ')
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.register_table_source('mySource')
t_env.connect(FileSystem().path('D:\\workspace\\python-test\\result.txt')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.register_table_sink('mySink')
t_env.scan('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
t_env.execute("tutorial_job")
我已经解决了这个问题。我通过错误信息阅读源代码。你知道吗
NullPointerException是由flinkOptPath为空!引起的!。我使用弗林克蝙蝠提交作业,以及弗林克蝙蝠不要设置flinkOptPath。所以我在弗林克蝙蝠这样地。这个弗林克蝙蝠目前还不完整。我们应该在linux上运行flink。
相关问题 更多 >
编程相关推荐