ApacheFlink 1.11无法通过Java Flink Streaming作业中的SQL函数DDL使用Python UDF

2024-09-23 22:23:40 发布

您现在位置:Python中文网/ 问答频道 /正文

Flip-106中有一个示例,说明如何通过SQL函数DDL调用批处理作业java应用程序中的用户定义python函数

BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();

我一直在尝试在流作业java应用程序中重现相同的示例,下面是我的代码:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
Table table = fsTableEnv.fromValues("1", "2", "3").as("str").select("func1(str)");
/* Missing line */

对于批处理作业中的此特定行:

tEnv.toDataSet(table, String.class).collect();

我还没有找到一个流媒体工作的等价物

1。您能帮我将这个flip-106示例从批次映射到流吗?

最后,我想用flink 1.11在流化作业java flink应用程序中调用python函数,如下所示:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");
System.out.println("Result --> " + table.select($("umid")) + " --> End of Result");

并使用该自定义项的结果进行进一步处理(不必在控制台中打印)

我编辑了test.py文件,以查看是否至少在不考虑未命名表的情况下,python正在执行某些操作

from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
from os import getcwd

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(line):
    print(line)
    print(getcwd())
    with open("test.txt", "a") as myfile:
        myfile.write(line)
    return line

并且不会打印任何内容,也不会创建test.txt文件,也不会将值返回到流作业。所以基本上这个python函数没有被调用

2。我错过了什么?

感谢David、Wei和Xingbo到目前为止的支持,因为每一个细节都对我有用

致以最良好的祝愿

乔纳森


Tags: 函数pytestascreate作业linetable
1条回答
网友
1楼 · 发布于 2024-09-23 22:23:40

您可以尝试以下方法:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

// You need to specify the python interpreter used to run the python udf on cluster.
// I assume this is a local program so it is the same as the "python.client.executable".
fsTableEnv.getConfig().getConfiguration().setString("python.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");

// 'table.select($("umid"))' will not trigger job execution. You need to call the "execute()" method explicitly.
table.execute().print();

相关问题 更多 >