我正在尝试编写一个小DoFn来将数据流管道中的数据写入cloudfirestore。在本地,一切都按预期工作,但当试图在数据流上运行时,一切都会崩溃!在
我的职能是:
class FirestoreWriteDoFn(beam.DoFn):
def __init__(self):
super(FirestoreWriteDoFn, self).__init__()
def start_bundle(self):
import google.cloud
self.db = google.cloud.firestore.Client(project='ag-audience')
def process(self, element):
fb_data = {
'topics': element.get('page_keywords').split(','),
'title': element.get('page_title')
}
logging.info('Inserting into Firebase: %s', fb_data)
fb_doc = self.db.document('totallyNotBigtable', element.get('key'))
result = fb_doc.create(fb_data)
yield result
在这里部署了命令:
^{pr2}$这是我的要求.txt公司名称:
google-cloud-firestore>=1.3.0
我尝试过很多事情:
-在文件顶部全局导入firestore模块。
-以不同的方式导入它:import x from y
,import y
。
-在代码的各个部分导入它。在
错误总是有些东西是未定义的:
NameError: global name 'google' is not defined [while running 'generatedPtransform-480']
编辑:(添加管道代码)
with beam.Pipeline(argv=pipeline_args) as p:
crawled_features = (p
| 'ReadPubsubCrawls' >> ReadFromPubSub(topic=PUBSUB_TOPIC_CRAWLED_FEATURES).with_output_types(bytes)
| 'DebugLogInput' >> beam.ParDo(LogResults())
| 'JSONParse2' >> beam.Map(lambda x: json.loads(x))
)
firebase_stream = (crawled_features
| 'WriteFirebase' >> beam.ParDo(FirestoreWriteDoFn())
| 'LogFirebaseWriteResult' >> beam.ParDo(LogResults())
)
bigquery_stream = (crawled_features
| 'ShapeRow' >> beam.Map(ShapeBQRow)
| 'LogShapedBQRow' >> beam.ParDo(LogResults())
| 'WriteBigQuery' >> beam.io.WriteToBigQuery(
table=BIGQUERY_TABLE,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
问题在于Beam版本。在2.13.0中可能有一些bug,但是在2.12.0中,它可以基于Python package errors while running GCP Dataflow正常工作。我也亲自核实过。在
相关问题 更多 >
编程相关推荐