我正在数据流上构建一个管道,它从pubsub发送接收到的消息,通过api调用进行转换,然后将它们写入大查询
在没有api调用的情况下,从pubsub编写到bigquery工作得非常好。但是,当我尝试使用请求库时,它总是给我以下错误:
NameError: name 'requests' is not defined [while running 'CustomParse-ptransform-11727']
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service.cc:631
generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs
File "PubSub2BQ.py", line 43, in process
以下是相关代码:
import argparse
import json
import requests
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
class MtToJson(beam.DoFn):
def __init__(self):
pass
def process(self, message):
x = requests.post('serverAdress', data=message)
yield x.text
def run():
# Parsing arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."',
default=INPUT_SUBSCRIPTION,
)
parser.add_argument(
"--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE
)
parser.add_argument(
"--output_schema",
help="Output BigQuery Schema in text format",
default=BIGQUERY_SCHEMA,
)
known_args, pipeline_args = parser.parse_known_args()
# Creating pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
# Defining our pipeline and its steps
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None )
| "CustomParse" >> beam.ParDo(MtToJson())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.output_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)
省略了服务器地址
我验证了请求是在gcp上生成的,并且是针对Python2.7和3.7的
我正在使用requirements.txt运行dataflow runner,其中包括requirements模块,但似乎没有安装它
编辑:
我将apache sdk的版本更改为较低版本:
requirements.txt
requests>=2.20.1
apache-beam[gcp]==2.25.0
我收到了新的巨大错误信息:
File "/usr/lib/python3.8/subprocess.py", line 512, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
Pip install failed for package: -r
Output from execution of subprocess: b'Collecting requests>=2.20.1\n File was already downloaded /tmp/dataflow-requirements-cache/requests-2.25.1.tar.gz\nCollecting httplib2>=0.17.2\n File was already downloaded /tmp/dataflow-requirements-cache/httplib2-0.19.1.tar.gz\n Installing build dependencies: started\n Installing build dependencies: finished with status \'done\'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status \'done\'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status \'done\'\nCollecting apache-beam[gcp]==2.25.0\n File was already downloaded /tmp/dataflow-requirements-cache/apache-beam-2.25.0.zip\nCollecting certifi>=2017.4.17\n File was already downloaded /tmp/dataflow-requirements-cache/certifi-2021.5.30.tar.gz\nCollecting chardet<5,>=3.0.2\n File was already downloaded /tmp/dataflow-requirements-cache/chardet-4.0.0.tar.gz\nCollecting idna<3,>=2.5\n File was already downloaded /tmp/dataflow-requirements-cache/idna-2.10.tar.gz\nCollecting urllib3<1.27,>=1.21.1\n File was already downloaded /tmp/dataflow-requirements-cache/urllib3-1.26.5.tar.gz\nCollecting pyparsing<3,>=2.4.2\n File was already downloaded /tmp/dataflow-requirements-cache/pyparsing-2.4.7.tar.gz\nCollecting crcmod<2.0,>=1.7\n File was already downloaded /tmp/dataflow-requirements-cache/crcmod-1.7.tar.gz\nCollecting dill<0.3.2,>=0.3.1.1\n File was already downloaded /tmp/dataflow-requirements-cache/dill-0.3.1.1.tar.gz\nCollecting fastavro<2,>=0.21.4\n File was already downloaded /tmp/dataflow-requirements-cache/fastavro-1.4.1.tar.gz\nCollecting future<1.0.0,>=0.18.2\n File was already downloaded /tmp/dataflow-requirements-cache/future-0.18.2.tar.gz\nCollecting grpcio<2,>=1.29.0\n File was already downloaded /tmp/dataflow-requirements-cache/grpcio-1.38.0.tar.gz\nCollecting hdfs<3.0.0,>=2.1.0\n File was already downloaded /tmp/dataflow-requirements-cache/hdfs-2.6.0.tar.gz\nCollecting mock<3.0.0,>=1.0.1\n File was already downloaded /tmp/dataflow-requirements-cache/mock-2.0.0.tar.gz\nCollecting numpy<2,>=1.14.3\n File was already downloaded /tmp/dataflow-requirements-cache/numpy-1.20.3.zip\n Installing build dependencies: started\n Installing build dependencies: still running...\n Installing build dependencies: finished with status \'done\'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status \'done\'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status \'done\'\nCollecting oauth2client<5,>=2.0.1\n File was already downloaded /tmp/dataflow-requirements-cache/oauth2client-4.1.3.tar.gz\nCollecting protobuf<4,>=3.12.2\n File was already downloaded /tmp/dataflow-requirements-cache/protobuf-3.17.3.tar.gz\nCollecting pydot<2,>=1.2.0\n File was already downloaded /tmp/dataflow-requirements-cache/pydot-1.4.2.tar.gz\nCollecting pymongo<4.0.0,>=3.8.0\n File was already downloaded /tmp/dataflow-requirements-cache/pymongo-3.11.4.tar.gz\nCollecting python-dateutil<3,>=2.8.0\n File was already downloaded /tmp/dataflow-requirements-cache/python-dateutil-2.8.1.tar.gz\n Installing build dependencies: started\n Installing build dependencies: finished with status \'done\'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status \'done\'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status \'done\'\nCollecting pytz>=2018.3\n File was already downloaded /tmp/dataflow-requirements-cache/pytz-2021.1.tar.gz\nCollecting typing-extensions<3.8.0,>=3.7.0\n File was already downloaded /tmp/dataflow-requirements-cache/typing_extensions-3.7.4.3.tar.gz\nCollecting avro-python3!=1.9.2,<1.10.0,>=1.8.1\n File was already downloaded /tmp/dataflow-requirements-cache/avro-python3-1.9.2.1.tar.gz\nCollecting pyarrow<0.18.0,>=0.15.1\n File was already downloaded /tmp/dataflow-requirements-cache/pyarrow-0.17.1.tar.gz\n Installing build dependencies: started\n Installing build dependencies: still running...\n Installing build dependencies: still running...\n Installing build dependencies: finished with status \'done\'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status \'done\'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status \'done\'\nCollecting cachetools<5,>=3.1.0\n File was already downloaded /tmp/dataflow-requirements-cache/cachetools-4.2.2.tar.gz\n Installing build dependencies: started\n Installing build dependencies: finished with status \'done\'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status \'done\'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status \'done\'\nCollecting google-apitools<0.5.32,>=0.5.31\n File was already downloaded /tmp/dataflow-requirements-cache/google-apitools-0.5.31.tar.gz\nCollecting google-auth<2,>=1.18.0\n File was already downloaded /tmp/dataflow-requirements-cache/google-auth-1.30.2.tar.gz\nCollecting google-cloud-bigquery<2,>=1.6.0\n File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-bigquery-1.28.0.tar.gz\nCollecting google-cloud-bigtable<2,>=0.31.1\n File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-bigtable-
它似乎想下载一堆我早就从需求文件中删除的包。有办法清除缓存吗
目前没有回答
相关问题 更多 >
编程相关推荐