运行python数据流作业时出错:

2024-10-06 07:16:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试使用GCP数据流Python处理输入文本文件的第一个字符。如果条目的第一个字符是'A',我希望将文件存储在A.txt中,依此类推。类似地,我有一个与每个字符相关联的数字。我为此存储了两个hashmap。以下是我的代码:

splitHashMap={'A':1,'F':4, 'J':4, 'Z':4, 'G':10, 'I':11};
fileHashMap= {'A':'A.txt','B':'B.txt','F':'F.txt','J':'J.txt','Z':'Z.txt','G':'G.txt','I':'I.txt'};
def to_table_row(x):
  firstChar=x[0][0];
  global splitHashMap
  global fileHashMap
  print splitHashMap[firstChar];
  x | WriteToText(fileHashMap[firstChar]);
  return {firstChar}

WriteToText函数的错误如下:

^{pr2}$

有人能帮我解决这个问题吗?在

编辑:包含管道的其余代码如下:

arser = argparse.ArgumentParser()
parser.add_argument('--input',
                  dest='input',
                  default='gs://dataflow-samples/shakespeare/kinglear.txt',
                  help='Input file to process.')
parser.add_argument('--output',
                  dest='output',
                  help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)


lines = p | 'read' >> ReadFromText(known_args.input)


lines | 'ToTableRows' >> beam.Map(to_table_row);

result = p.run()

我请求你现在就帮我解决这个问题。我用来tun python文件的命令是:

python File_parse.py ---input temp.txt

在温度文本具体如下:

Aasadasd asdasd adsad af
Jdsad asdasd asd as
A asdd ad agfsfg sfg 
Z afsdfrew320pjpoji
Idadfsd w8480ujfds

所需的输出是所有以“A”开头的文件转到“A.txt”,“B”转到“B.txt”,依此类推。如果你在你的回复中写下代码那就太好了。在


Tags: 文件to代码txtparserinputpipelinetable
1条回答
网友
1楼 · 发布于 2024-10-06 07:16:27

您使用WriteToText不合适。不能将字符串传递给PTransform。相反,您需要将PCollections传递到PTransforms中。在下面的代码中,可以为第一个字符的每个大小写创建单独的PCollections,并将其传递给

在这种情况下,您可以这样做:

file_hash_map= {'A':'A.txt','B':'B.txt','F':'F.txt',
                'J':'J.txt','Z':'Z.txt','G':'G.txt','I':'I.txt'}
existing_chars = file_hash_map.keys()

class ToTableRowDoFn(beam.DoFn):
  def process(self, element):
    first_char = element[0][0]
    if first_char in file_hash_map:
      yield pvalue.TaggedOutput(first_char, element)
    else:
      # When the first char of the word is not from the allowed
      # characters, we just send it to the main output.
      yield element 

lines = p | 'read' >> ReadFromText(known_args.input)

multiple_outputs = (
    lines | 
    'ToTableRows' >> beam.ParDo(ToTableRowDoFn())
                           .with_outputs(*existing_chars, main='main'));

for pcollection_name in existing_chars:
  char_pcollection = getattr(multiple_outputs, pcollection_name)
  char_pcollection | WriteToFile(file_hash_map[pcollection_name])

这段代码的关键在于for循环,在这个循环中我们迭代每个输出PCollections,并将它们的内容分别写入不同的文件。在

相关问题 更多 >