<p>因为这是一个非常定制的用例,最好的方法是扩展Hive操作符(或者创建自己的Hive2CSVOperator)。实现将取决于您是否可以通过CLI或HiveServer2访问配置单元。在</p>
<p><strong>配置单元CLI</strong></p>
<p>我将首先尝试配置配置单元CLI连接并添加<code>hive_cli_params</code>,如<a href="https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L9" rel="nofollow noreferrer">Hive CLI hook code</a>所示,如果这不起作用,请扩展钩子(这将使您能够访问所有内容)。在</p>
<p><strong>HiveServer2</strong></p>
<p>这个例子有一个单独的钩子(<a href="https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L753" rel="nofollow noreferrer">link</a>)。它更方便一些,因为它有一个<code>get_results</code>方法(<a href="https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L834" rel="nofollow noreferrer">source</a>)或{<cd3>}方法(<a href="https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L852" rel="nofollow noreferrer">source</a>)。在</p>
<p>运算符代码中的<code>execute</code>可能看起来与此类似:</p>
<pre><code>def execute():
...
self.hook = HiveServer2Hook(...)
self.conn = self.hook.get_conn()
self.conn.to_csv(hql=self.hql, csv_filepath=self.output_filepath, ...)
</code></pre>