擅长:python、mysql、java
<p>您可以使用字典和<a href="https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.udf.html" rel="nofollow noreferrer">user-defined function</a>将每行的状态链接到表示状态顺序的整数,然后在<code>unique-id</code>上执行<a href="https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html" rel="nofollow noreferrer">^{<cd1>}</a>,聚合<a href="https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.min.html" rel="nofollow noreferrer">^{<cd3>}</a>状态顺序,最后从该状态顺序检索状态,如下所示:</p>
<pre class="lang-py prettyprint-override"><code>from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
status_dict = {'OAOS-STP': 1, 'OAOS-nonSTP': 2, 'manual': 3}
@F.udf(returnType=IntegerType())
def get_status_order(status):
return status_dict.get(status)
data = input_df.withColumn('status_order', F.struct(get_status_order('status'), F.col('status'))) \
.groupBy('unique-id') \
.agg(F.min('status_order').alias('status')) \
.withColumn('status', F.col('status.status')) \
.orderBy('unique-id') # Optional
</code></pre>