在数据流运行程序中找不到模块请求

2024-10-08 19:22:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在数据流上构建一个管道,它从pubsub发送接收到的消息,通过api调用进行转换,然后将它们写入大查询

在没有api调用的情况下,从pubsub编写到bigquery工作得非常好。但是,当我尝试使用请求库时,它总是给我以下错误:

NameError: name 'requests' is not defined [while running 'CustomParse-ptransform-11727']

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service.cc:631
generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "PubSub2BQ.py", line 43, in process

以下是相关代码:

import argparse
import json
import requests
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions


class MtToJson(beam.DoFn):
    def __init__(self):
        pass

    def process(self, message):
        x = requests.post('serverAdress', data=message)
        yield x.text

def run():
    # Parsing arguments
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_subscription",
        help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."',
        default=INPUT_SUBSCRIPTION,
    )
    parser.add_argument(
        "--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE
    )
    parser.add_argument(
        "--output_schema",
        help="Output BigQuery Schema in text format",
        default=BIGQUERY_SCHEMA,
    )
    known_args, pipeline_args = parser.parse_known_args()

    # Creating pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(StandardOptions).streaming = True

    # Defining our pipeline and its steps
    with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
                subscription=known_args.input_subscription, timestamp_attribute=None )
            | "CustomParse" >> beam.ParDo(MtToJson())
            | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                known_args.output_table,
                schema=known_args.output_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
        )

省略了服务器地址

我验证了请求是在gcp上生成的,并且是针对Python2.7和3.7的

我正在使用requirements.txt运行dataflow runner,其中包括requirements模块,但似乎没有安装它

编辑:

我将apache sdk的版本更改为较低版本:

requirements.txt

requests>=2.20.1
apache-beam[gcp]==2.25.0

我收到了新的巨大错误信息:

 File "/usr/lib/python3.8/subprocess.py", line 512, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.

 Pip install failed for package: -r
 Output from execution of subprocess: b'Collecting requests>=2.20.1\n  File was already downloaded /tmp/dataflow-requirements-cache/requests-2.25.1.tar.gz\nCollecting httplib2>=0.17.2\n  File was already downloaded /tmp/dataflow-requirements-cache/httplib2-0.19.1.tar.gz\n  Installing build dependencies: started\n  Installing build dependencies: finished with status \'done\'\n  Getting requirements to build wheel: started\n  Getting requirements to build wheel: finished with status \'done\'\n    Preparing wheel metadata: started\n    Preparing wheel metadata: finished with status \'done\'\nCollecting apache-beam[gcp]==2.25.0\n  File was already downloaded /tmp/dataflow-requirements-cache/apache-beam-2.25.0.zip\nCollecting certifi>=2017.4.17\n  File was already downloaded /tmp/dataflow-requirements-cache/certifi-2021.5.30.tar.gz\nCollecting chardet<5,>=3.0.2\n  File was already downloaded /tmp/dataflow-requirements-cache/chardet-4.0.0.tar.gz\nCollecting idna<3,>=2.5\n  File was already downloaded /tmp/dataflow-requirements-cache/idna-2.10.tar.gz\nCollecting urllib3<1.27,>=1.21.1\n  File was already downloaded /tmp/dataflow-requirements-cache/urllib3-1.26.5.tar.gz\nCollecting pyparsing<3,>=2.4.2\n  File was already downloaded /tmp/dataflow-requirements-cache/pyparsing-2.4.7.tar.gz\nCollecting crcmod<2.0,>=1.7\n  File was already downloaded /tmp/dataflow-requirements-cache/crcmod-1.7.tar.gz\nCollecting dill<0.3.2,>=0.3.1.1\n  File was already downloaded /tmp/dataflow-requirements-cache/dill-0.3.1.1.tar.gz\nCollecting fastavro<2,>=0.21.4\n  File was already downloaded /tmp/dataflow-requirements-cache/fastavro-1.4.1.tar.gz\nCollecting future<1.0.0,>=0.18.2\n  File was already downloaded /tmp/dataflow-requirements-cache/future-0.18.2.tar.gz\nCollecting grpcio<2,>=1.29.0\n  File was already downloaded /tmp/dataflow-requirements-cache/grpcio-1.38.0.tar.gz\nCollecting hdfs<3.0.0,>=2.1.0\n  File was already downloaded /tmp/dataflow-requirements-cache/hdfs-2.6.0.tar.gz\nCollecting mock<3.0.0,>=1.0.1\n  File was already downloaded /tmp/dataflow-requirements-cache/mock-2.0.0.tar.gz\nCollecting numpy<2,>=1.14.3\n  File was already downloaded /tmp/dataflow-requirements-cache/numpy-1.20.3.zip\n  Installing build dependencies: started\n  Installing build dependencies: still running...\n  Installing build dependencies: finished with status \'done\'\n  Getting requirements to build wheel: started\n  Getting requirements to build wheel: finished with status \'done\'\n    Preparing wheel metadata: started\n    Preparing wheel metadata: finished with status \'done\'\nCollecting oauth2client<5,>=2.0.1\n  File was already downloaded /tmp/dataflow-requirements-cache/oauth2client-4.1.3.tar.gz\nCollecting protobuf<4,>=3.12.2\n  File was already downloaded /tmp/dataflow-requirements-cache/protobuf-3.17.3.tar.gz\nCollecting pydot<2,>=1.2.0\n  File was already downloaded /tmp/dataflow-requirements-cache/pydot-1.4.2.tar.gz\nCollecting pymongo<4.0.0,>=3.8.0\n  File was already downloaded /tmp/dataflow-requirements-cache/pymongo-3.11.4.tar.gz\nCollecting python-dateutil<3,>=2.8.0\n  File was already downloaded /tmp/dataflow-requirements-cache/python-dateutil-2.8.1.tar.gz\n  Installing build dependencies: started\n  Installing build dependencies: finished with status \'done\'\n  Getting requirements to build wheel: started\n  Getting requirements to build wheel: finished with status \'done\'\n    Preparing wheel metadata: started\n    Preparing wheel metadata: finished with status \'done\'\nCollecting pytz>=2018.3\n  File was already downloaded /tmp/dataflow-requirements-cache/pytz-2021.1.tar.gz\nCollecting typing-extensions<3.8.0,>=3.7.0\n  File was already downloaded /tmp/dataflow-requirements-cache/typing_extensions-3.7.4.3.tar.gz\nCollecting avro-python3!=1.9.2,<1.10.0,>=1.8.1\n  File was already downloaded /tmp/dataflow-requirements-cache/avro-python3-1.9.2.1.tar.gz\nCollecting pyarrow<0.18.0,>=0.15.1\n  File was already downloaded /tmp/dataflow-requirements-cache/pyarrow-0.17.1.tar.gz\n  Installing build dependencies: started\n  Installing build dependencies: still running...\n  Installing build dependencies: still running...\n  Installing build dependencies: finished with status \'done\'\n  Getting requirements to build wheel: started\n  Getting requirements to build wheel: finished with status \'done\'\n    Preparing wheel metadata: started\n    Preparing wheel metadata: finished with status \'done\'\nCollecting cachetools<5,>=3.1.0\n  File was already downloaded /tmp/dataflow-requirements-cache/cachetools-4.2.2.tar.gz\n  Installing build dependencies: started\n  Installing build dependencies: finished with status \'done\'\n  Getting requirements to build wheel: started\n  Getting requirements to build wheel: finished with status \'done\'\n    Preparing wheel metadata: started\n    Preparing wheel metadata: finished with status \'done\'\nCollecting google-apitools<0.5.32,>=0.5.31\n  File was already downloaded /tmp/dataflow-requirements-cache/google-apitools-0.5.31.tar.gz\nCollecting google-auth<2,>=1.18.0\n  File was already downloaded /tmp/dataflow-requirements-cache/google-auth-1.30.2.tar.gz\nCollecting google-cloud-bigquery<2,>=1.6.0\n  File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-bigquery-1.28.0.tar.gz\nCollecting google-cloud-bigtable<2,>=0.31.1\n  File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-bigtable-

它似乎想下载一堆我早就从需求文件中删除的包。有办法清除缓存吗


Tags: buildcachewithtartmprequirementsfilebeam

热门问题