我试图了解如何在数据流上运行程序。我已经读到像熊猫这样的库是内置在数据流中的。 我使用的数据集是常用的amazon数据集,带有评论和评级。 我有以下计划:
from google.cloud import storage
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
beam_options = PipelineOptions(runner='DataflowRunner',
project='Project',
job_name='job',
temp_location='gs://temp_loc',
region='us-central1')
table_spec = 'table_name'
class CleaningReviews(beam.DoFn):
def process(self,element):
contractions = {"i'll":"I will", "we'll": "We will"...}
text = element['reviews']
# Convert words to lower case
text = str(text)
text = text.lower()
# Replace contractions with their longer forms
# if True:
text = text.split()
new_text = []
for word in text:
if word in contractions:
new_text.append(contractions[word])
else:
new_text.append(word)
text = " ".join(new_text)
# Format words and remove unwanted characters
text = re.sub(r'https?:\/\/.*[\r\n]*', '', text,
flags=re.MULTILINE)
text = re.sub(r'\<a href', ' ', text)
text = re.sub(r'&', '', text)
text = re.sub(r'[_"\-;%()|+&=*%.,!?:#$@\[\]/]', ' ', text)
text = re.sub(r'<br />', ' ', text)
text = re.sub(r'\'', ' ', text)
text = re.sub(r'[^A-Z a-z]+','',text)
element['reviews'] = text
return element
class SaveToGCS(beam.DoFn):
def process(self,elements):
df = pd.DataFrame(elements, columns={'reviews': str,'ratings':int})
client = storage.Client()
bucket = client.get_bucket('bucker_name')
bucket.blob(f"csv_exports.csv").upload_from_string(df.to_csv(index=False), 'text/csv')
with beam.Pipeline(options=beam_options) as pipeline:
a = CleaningReviews()
#b = SavetoGCS()
rows = ( pipeline
| 'QueryBQTable' >> beam.io.ReadFromBigQuery(
query='SELECT reviews, ratings FROM `Table` LIMIT 100',
use_standard_sql=True)
| 'TransformData' >> beam.ParDo(CleaningReviews())
| 'WriteOutputToGCS' >> beam.ParDo(SaveToGCS())
)
我试图从BigQuery中的表中提取一些示例数据(文本数据),对其运行一些转换,然后将其以CSV文件的形式存储在云存储中
在执行时,程序运行一段时间,然后出现以下错误:
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/tmp/ipykernel_3475/2505603581.py", line 122, in process
NameError: name 'pd' is not defined
我还尝试创建CleaningReviews和SaveToGCS类的对象,并在beam管道中调用它们的函数,但仍然没有成功
我能做什么
我相信您正在Apache Beam中寻找数据帧功能。请按照官方的Beam Dataframe Documentation here进行操作,您可以相应地更改代码。pandas dataframe API不能直接在Beam中导入
相关问题 更多 >
编程相关推荐