java优先考虑某些Flink操作
我正在使用Flink v1.4.0
我正在利用批处理API进行一些ETL,我有一个DataSet<Employee >
,其中Employee
的形式如下:
public class Employee implements Serializable {
private String name;
private double baseSalary;
private double bonus;
private double totalComp;
...
}
假设所有变量都有一个构造函数、setter和getter
现在,我有一些操作被应用于,我认为是,一种顺序的方式,根据这种方式,我试图丰富DataSet<Employee> employees
,如下所示:
...
DataSet<String> employees = env.fromCollection(employeesList);
DataSet<Employee> initEmployees = employees.map(new InitMapFunction());
DataSet<Employee> employeesEnrichedWithSalaryData = initEmployees.map(new SalaryMapFunction(salaryEnrichmentData));
DataSet<Employee> employeesEnrichedWithBonusData = employeesEnrichedWithSalaryData.map(new BonusMapFunction(bonusEnrichmentData));
DataSet<Employee> finalEmployeesData = employeesEnrichedWithSalaryData.map(new TotalCompMapFunction());
...
假设我的包中有以下MapFunction
实现:
final class InitMapFunction implements MapFunction<String, Employee>, Serializable {
@Override
public Employee map(String name) {
Employee employee = Employee.newBuilder().build();
employee.setName(name)
return employee;
}
}
final class SalaryMapFunction implements MapFunction<Employee, Employee>, Serializable {
private Map<String, double> mapOfEmployeeVsSalary;
SalaryMapFunction(Map<String, double> mapOfEmployeeVsSalary) {
this.mapOfEmployeeVsSalary = mapOfEmployeeVsSalary;
}
@Override
public Employee map(Employee employee) {
if(mapOfEmployeeVsSalary.containsKey(employee.getName())) {
employee.setSalary(mapOfEmployeeVsSalary.get(employee.getName()))
}
return employee;
}
}
final class BonusMapFunction implements MapFunction<Employee, Employee>, Serializable {
private Map<String, double> mapOfEmployeeVsBonus;
SalaryMapFunction(Map<String, double> mapOfEmployeeVsBonus) {
this.mapOfEmployeeVsBonus = mapOfEmployeeVsBonus;
}
@Override
public Employee map(Employee employee) {
if(mapOfEmployeeVsBonus.containsKey(employee.getName())) {
employee.setBonus(mapOfEmployeeVsBonus.get(employee.getName()))
}
return employee;
}
}
final class TotalCompMapFunction implements MapFunction<Employee, Employee>, Serializable {
@Override
public Employee map(Employee employee) {
employee.setTotalComp(employee.getSalary + employee.getBonus);
return employee;
}
}
问题是:最终的DataSet
(finalEmployeesData)是否包含正确的值?我知道我可以一次完成这一切,但这不是问题的重点。我所实现的代码的功能要求在不同的步骤中进行充实。我已经确定了在处理上述数据集时,特定字段未使用正确值进行充实的情况。我理解/怀疑这是由于懒惰的评估,并依赖于Flink
进行的优化来计算最佳执行顺序(识别独立操作等)。对吗
最后,我如何保证某个操作优先于另一个操作?如果将这些操作按如下方式链接在一起,输出会发生变化吗
DataSet<Employee> finalEmployessData = env.fromCollection(employeesList)
.map(new InitMapFunction())
.map(new SalaryMapFunction(salaryEnrichmentData))
.map(new BonusMapFunction(bonusEnrichmentData))
.map(new TotalCompMapFunction());
# 1 楼答案
弗林克不会改变行动顺序。如果您将程序定义为
然后
Map2()
将始终应用于Map1()
的结果此外,无论是在不同的对象上逐个应用函数,还是在上一个代码段中以流畅的方式应用函数,都没有区别
你说,你观察到一些数值设置不正确的情况。假设您运行的代码与此处显示的不完全相同,其中一个原因可能是Flink如何连接运算符,以及它如何在运算符之间传送记录。在某些情况下(例如映射函数序列),Flink通过方法调用传递记录,以避免序列化成本。我们称之为函数链。链接函数被融合到一个操作符中(例如,你可以在web UI中看到)。显然,函数必须注意它们如何与接收和发射的对象交互。否则,两个函数可能会同时修改同一条记录。我建议仔细看看Flink文档中关于object reusage的部分