在apachebeam中导入googlefirestorepython客户端

2024-10-03 23:23:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试编写一个小DoFn来将数据流管道中的数据写入cloudfirestore。在本地,一切都按预期工作,但当试图在数据流上运行时,一切都会崩溃!在

我的职能是:

class FirestoreWriteDoFn(beam.DoFn):
  def __init__(self):
    super(FirestoreWriteDoFn, self).__init__()

  def start_bundle(self):
    import google.cloud 
    self.db = google.cloud.firestore.Client(project='ag-audience')

  def process(self, element):
    fb_data = {
      'topics': element.get('page_keywords').split(','),
      'title': element.get('page_title')
    }
    logging.info('Inserting into Firebase: %s', fb_data)
    fb_doc = self.db.document('totallyNotBigtable', element.get('key'))
    result = fb_doc.create(fb_data)
    yield result

在这里部署了命令:

^{pr2}$

这是我的要求.txt公司名称:

google-cloud-firestore>=1.3.0

我尝试过很多事情: -在文件顶部全局导入firestore模块。 -以不同的方式导入它:import x from yimport y。 -在代码的各个部分导入它。在

错误总是有些东西是未定义的: NameError: global name 'google' is not defined [while running 'generatedPtransform-480']

编辑:(添加管道代码)

with beam.Pipeline(argv=pipeline_args) as p:

    crawled_features = (p 
      | 'ReadPubsubCrawls' >> ReadFromPubSub(topic=PUBSUB_TOPIC_CRAWLED_FEATURES).with_output_types(bytes)
      | 'DebugLogInput' >> beam.ParDo(LogResults())
      | 'JSONParse2' >> beam.Map(lambda x: json.loads(x))
    )

    firebase_stream = (crawled_features
      | 'WriteFirebase' >> beam.ParDo(FirestoreWriteDoFn())
      | 'LogFirebaseWriteResult' >> beam.ParDo(LogResults())
    )

    bigquery_stream = (crawled_features 
      | 'ShapeRow' >> beam.Map(ShapeBQRow)
      | 'LogShapedBQRow' >> beam.ParDo(LogResults())
      | 'WriteBigQuery' >> beam.io.WriteToBigQuery(
        table=BIGQUERY_TABLE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )

Tags: importselfclouddatagetfbdefgoogle