<p>你可以使用_数据库写入</strong>和<strong>关系_数据库读取</strong>从<a href="https://github.com/mohaseeb/beam-nuggets" rel="nofollow noreferrer">beam-nuggets</a>转换如下:</p>
<p>首先安装横梁熔核:</p>
<pre><code>pip install beam-nuggets
</code></pre>
<p>阅读:</p>
^{pr2}$
<p>写作:</p>
<pre><code>import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
with beam.Pipeline(options=PipelineOptions()) as p:
months = p | "Reading month records" >> beam.Create([
{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2},
])
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
create_if_missing=True,
)
table_config = relational_db.TableConfiguration(
name='months',
create_if_missing=True
)
months | 'Writing to DB' >> relational_db.Write(
source_config=source_config,
table_config=table_config
)
</code></pre>