apachenifi:使用ExecuteScript进程处理多个csv

2024-10-01 15:45:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个70列的csv。第60列包含一个值,该值决定记录是valid还是invalid。如果第60列有0、1、6或7,则为valid。如果它包含任何其他值,那么它的invalid。你知道吗

我意识到这个功能不可能完全依赖于apachenifi中处理器属性的改变。因此,我决定使用executeScript processor并添加这个python代码作为文本体。你知道吗

import csv

valid =0
invalid =0
total =0
file2 = open("invalid.csv","w")
file1 = open("valid.csv","w")

with  open('/Users/himsaragallage/Desktop/redder/Regexo_2019101812750.dat.csv') as f:
    r = csv.reader(f)
    for row in f:
        # print row[1]
        total +=1

        if row[59] == "0" or row[59] == "1" or row[59] == "6" or row[59] == "7":
            valid +=1
            file1.write(row)
        else:
            invalid += 1
            file2.write(row)
file1.close()
file2.close()
print("Total : " + str(total))
print("Valid : " + str(valid))
print("Invalid : " + str(invalid))

我不知道如何在executeScript处理器中使用会话和代码,如this question所示。所以我只编写了一个简单的python代码,并将有效和无效的数据定向到不同的文件中。我使用的这种方法有很多限制。你知道吗

  1. 我想能够动态处理csv的不同文件名。你知道吗
  2. 将无效数据发送到的csv也必须与输入csv具有相同的文件名。你知道吗
  3. 在我的redder文件夹中大约有20个csv。所有这些都必须一次性处理。你知道吗

希望你能给我建议一个方法来做下面的事情。请随意为我提供一个解决方案,编辑我使用过的python代码,甚至完全使用不同的处理器集,并且完全不使用ExecuteScript Processer


Tags: orcsv代码open处理器file1file2row
2条回答

下面是关于how to use ^{} processor的完整分步说明

基本上,您需要设置突出显示的属性

enter image description here

您希望基于一列中的值路由记录。在NiFi中有多种方法可以实现这一点。我能想到以下几点:

我将向您展示如何使用PartitionRecord处理器解决您的问题。因为您没有提供任何示例数据,所以我创建了一个示例用例。我想把欧洲的城市和其他地方的城市区别开来。给出了以下数据:

id,city,country
1,Berlin,Germany
2,Paris,France
3,New York,USA
4,Frankfurt,Germany

流量:

enter image description here

生成流文件:

enter image description here

分区记录:

enter image description here

CSVReader应该设置为推断模式,CSVRecordSetWriter应该设置为继承模式。PartitionRecord将按国家对记录进行分组,并将它们与具有国家值的属性country一起传递。您将看到以下记录组:

id,city,country
1,Berlin,Germany
4,Frankfurt,Germany

id,city,country
2,Paris,France

id,city,country
3,New York,USA

每个组都是一个流文件,并具有country属性,您将使用该属性来路由组。你知道吗

路由属性:

enter image description here

所有来自欧洲的国家都将被安排加入is\ U欧洲关系。现在您可以将相同的策略应用到您的用例中。你知道吗

相关问题 更多 >

    热门问题