<h2>选项1:一次对一列使用UDF</h2>
<p>最简单的方法是重写函数,将字符串作为参数(使其成为<code>string</code>->;<code>string</code>)并使用UDF。有一个很好的例子<a href="https://docs.databricks.com/spark/latest/spark-sql/udf-python.html" rel="noreferrer">here</a>。这种方法一次只能处理一列。因此,如果您的<code>DataFrame</code>具有合理数量的列,则可以一次将UDF应用于每个列:</p>
<pre><code>from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)
</code></pre>
<h2>示例</h2>
^{pr2}$
<h2>选项2:一次映射整个数据帧</h2>
<p><code>map</code>可用于Scala<code>DataFrame</code>s,但目前在PySpark中不可用。
低级的<a href="http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD" rel="noreferrer">RDD</a>API在PySpark中确实有一个<code>map</code>函数。因此,如果有太多的列无法一次转换一个,可以对<code>DataFrame</code>中的每个单元格进行如下操作:</p>
<pre><code>def map_fn(row):
return [api_function(x) for (column, x) in row.asDict().items()
column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)
</code></pre>
<h2>示例</h2>
<pre><code>df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
return [value + 1 for (_, value) in row.asDict().items()]
columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+ + +
|col1|col2|
+ + +
| 2| 5|
| 3| 6|
| 4| 7|
+ + +
</code></pre>
<h2>上下文</h2>
<p><code>foreach</code>的<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.foreach" rel="noreferrer">documentation</a>只给出了打印的示例,但是我们可以通过查看<a href="http://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/dataframe.html#DataFrame.foreach" rel="noreferrer">code</a>来验证它确实没有返回任何内容。在</p>
<p>你可以在<a href="https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html" rel="noreferrer">this post</a>中读到<code>pandas_udf</code>,但它似乎最适合于向量化函数,正如你所指出的,由于<code>api_function</code>,你不能使用它。在</p>