弗林克跑步字数.py导致NullPointerException

2024-05-23 18:52:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在windows上用flink的pythonapi处理数据。但是,当我使用命令将作业提交到本地集群时,它抛出NullPointerException

bin/flink run -py D:\workspace\python-test\flink-test.py

弗林克-测试.py地址:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('D:\\workspace\\python-test\\data.txt')) \
    .with_format(OldCsv()
                 .line_delimiter(' ')
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .register_table_source('mySource')

t_env.connect(FileSystem().path('D:\\workspace\\python-test\\result.txt')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .register_table_sink('mySink')

t_env.scan('mySource') \
    .group_by('word') \
    .select('word, count(1)') \
    .insert_into('mySink')

t_env.execute("tutorial_job")

enter image description here 有人知道为什么吗?你知道吗


Tags: frompytestimportenvfieldstringschema
1条回答
网友
1楼 · 发布于 2024-05-23 18:52:30

我已经解决了这个问题。我通过错误信息阅读源代码。你知道吗

enter image description here

NullPointerException是由flinkOptPath为空!引起的!。我使用弗林克蝙蝠提交作业,以及弗林克蝙蝠不要设置flinkOptPath。所以我在弗林克蝙蝠这样地。这个弗林克蝙蝠目前还不完整。我们应该在linux上运行flink。 enter image description here

相关问题 更多 >