有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java优先考虑某些Flink操作

我正在使用Flink v1.4.0

我正在利用批处理API进行一些ETL,我有一个DataSet<Employee >,其中Employee的形式如下:

public class Employee implements Serializable {

private String name;
private double baseSalary;
private double bonus;
private double totalComp;

...
}

假设所有变量都有一个构造函数、setter和getter

现在,我有一些操作被应用于,我认为是,一种顺序的方式,根据这种方式,我试图丰富DataSet<Employee> employees,如下所示:

...
DataSet<String> employees = env.fromCollection(employeesList);

DataSet<Employee> initEmployees = employees.map(new InitMapFunction());

DataSet<Employee> employeesEnrichedWithSalaryData = initEmployees.map(new SalaryMapFunction(salaryEnrichmentData));

DataSet<Employee> employeesEnrichedWithBonusData = employeesEnrichedWithSalaryData.map(new BonusMapFunction(bonusEnrichmentData));

DataSet<Employee> finalEmployeesData = employeesEnrichedWithSalaryData.map(new TotalCompMapFunction());
...

假设我的包中有以下MapFunction实现:

final class InitMapFunction implements MapFunction<String, Employee>, Serializable {

    @Override
    public Employee map(String name) {

        Employee employee = Employee.newBuilder().build();
        employee.setName(name)
        return employee;
    }
}

final class SalaryMapFunction implements MapFunction<Employee, Employee>, Serializable {

    private Map<String, double> mapOfEmployeeVsSalary;

    SalaryMapFunction(Map<String, double> mapOfEmployeeVsSalary) {
        this.mapOfEmployeeVsSalary = mapOfEmployeeVsSalary;
    }

    @Override
    public Employee map(Employee employee) {

        if(mapOfEmployeeVsSalary.containsKey(employee.getName())) {
           employee.setSalary(mapOfEmployeeVsSalary.get(employee.getName()))
        }

        return employee;
    }
}

final class BonusMapFunction implements MapFunction<Employee, Employee>, Serializable {

    private Map<String, double> mapOfEmployeeVsBonus;

    SalaryMapFunction(Map<String, double> mapOfEmployeeVsBonus) {
        this.mapOfEmployeeVsBonus = mapOfEmployeeVsBonus;
    }

    @Override
    public Employee map(Employee employee) {

        if(mapOfEmployeeVsBonus.containsKey(employee.getName())) {
           employee.setBonus(mapOfEmployeeVsBonus.get(employee.getName()))
        }

        return employee;
    }
}

final class TotalCompMapFunction implements MapFunction<Employee, Employee>, Serializable {

    @Override
    public Employee map(Employee employee) {
        employee.setTotalComp(employee.getSalary + employee.getBonus);
        return employee;
    }
}

问题是:最终的DataSet(finalEmployeesData)是否包含正确的值?我知道我可以一次完成这一切,但这不是问题的重点。我所实现的代码的功能要求在不同的步骤中进行充实。我已经确定了在处理上述数据集时,特定字段未使用正确值进行充实的情况。我理解/怀疑这是由于懒惰的评估,并依赖于Flink进行的优化来计算最佳执行顺序(识别独立操作等)。对吗

最后,我如何保证某个操作优先于另一个操作?如果将这些操作按如下方式链接在一起,输出会发生变化吗

DataSet<Employee> finalEmployessData = env.fromCollection(employeesList)
                  .map(new InitMapFunction())
                  .map(new SalaryMapFunction(salaryEnrichmentData))
                  .map(new BonusMapFunction(bonusEnrichmentData))
                  .map(new TotalCompMapFunction());

共 (1) 个答案

  1. # 1 楼答案

    弗林克不会改变行动顺序。如果您将程序定义为

    DataSet<Y> result = input
      .map(new Map1())
      .map(new Map2())
    

    然后Map2()将始终应用于Map1()的结果

    此外,无论是在不同的对象上逐个应用函数,还是在上一个代码段中以流畅的方式应用函数,都没有区别

    你说,你观察到一些数值设置不正确的情况。假设您运行的代码与此处显示的不完全相同,其中一个原因可能是Flink如何连接运算符,以及它如何在运算符之间传送记录。在某些情况下(例如映射函数序列),Flink通过方法调用传递记录,以避免序列化成本。我们称之为函数链。链接函数被融合到一个操作符中(例如,你可以在web UI中看到)。显然,函数必须注意它们如何与接收和发射的对象交互。否则,两个函数可能会同时修改同一条记录。我建议仔细看看Flink文档中关于object reusage的部分