无法在数据流中运行Pandas

2024-09-29 21:23:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图了解如何在数据流上运行程序。我已经读到像熊猫这样的库是内置在数据流中的。 我使用的数据集是常用的amazon数据集,带有评论和评级。 我有以下计划:

from google.cloud import storage
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
beam_options = PipelineOptions(runner='DataflowRunner',
    project='Project',
    job_name='job',
    temp_location='gs://temp_loc',
    region='us-central1')

table_spec = 'table_name'

class CleaningReviews(beam.DoFn):
    def process(self,element):
        contractions = {"i'll":"I will", "we'll": "We will"...}
        text = element['reviews']
        # Convert words to lower case
        text = str(text)
        text = text.lower()

        # Replace contractions with their longer forms 
        # if True:
        text = text.split()
        new_text = []
        for word in text:
            if word in contractions:
                new_text.append(contractions[word])
            else:
                new_text.append(word)
        text = " ".join(new_text)

        # Format words and remove unwanted characters
        text = re.sub(r'https?:\/\/.*[\r\n]*', '', text,  
                      flags=re.MULTILINE)
        text = re.sub(r'\<a href', ' ', text)
        text = re.sub(r'&amp;', '', text) 
        text = re.sub(r'[_"\-;%()|+&=*%.,!?:#$@\[\]/]', ' ', text)
        text = re.sub(r'<br />', ' ', text)
        text = re.sub(r'\'', ' ', text)
        text = re.sub(r'[^A-Z a-z]+','',text)

        element['reviews'] = text
        return element

class SaveToGCS(beam.DoFn):
    def process(self,elements):
        df = pd.DataFrame(elements, columns={'reviews': str,'ratings':int})
        client = storage.Client()
        bucket = client.get_bucket('bucker_name')
        bucket.blob(f"csv_exports.csv").upload_from_string(df.to_csv(index=False), 'text/csv')
    
with beam.Pipeline(options=beam_options) as pipeline:
    a = CleaningReviews()
    #b = SavetoGCS()
    rows = ( pipeline
            | 'QueryBQTable' >> beam.io.ReadFromBigQuery(
                query='SELECT reviews, ratings FROM `Table` LIMIT 100',
                use_standard_sql=True) 
            | 'TransformData' >> beam.ParDo(CleaningReviews())
            | 'WriteOutputToGCS' >> beam.ParDo(SaveToGCS())
           )

我试图从BigQuery中的表中提取一些示例数据(文本数据),对其运行一些转换,然后将其以CSV文件的形式存储在云存储中

在执行时,程序运行一段时间,然后出现以下错误:

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/tmp/ipykernel_3475/2505603581.py", line 122, in process
NameError: name 'pd' is not defined

我还尝试创建CleaningReviews和SaveToGCS类的对象,并在beam管道中调用它们的函数,但仍然没有成功

我能做什么


Tags: 数据textnameinfromimportrepipeline

热门问题