<p>您可以使用如下所示的自连接</p>
<pre><code>from pyspark.sql.types import IntegerType, StructField, StructType
values_arr = [
(2,None, None,None,102, 202, 302),
(4,None, None,None,104, 204, 304),
(1,None, None,None,101, 201, 301),
(3,None, None,None,103, 203, 303),
(1,11, 21,31,None,None,None),
(2,12, 22,32,None,None,None),
(4,14, 24,34,None,None,None),
(3,13, 23,33,None,None,None)
]
sc = spark.sparkContext
rdd = sc.parallelize(values_arr)
schema = StructType([
StructField("id", IntegerType(), True),
StructField("col_1", IntegerType(), True),
StructField("col_2", IntegerType(), True),
StructField("col_3", IntegerType(), True),
StructField("sf_1", IntegerType(), True),
StructField("sf_2", IntegerType(), True),
StructField("sf_3", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)
df.show()
//Sample Inpput
+ -+ -+ -+ -+ + + +
| id|col_1|col_2|col_3|sf_1|sf_2|sf_3|
+ -+ -+ -+ -+ + + +
| 2| null| null| null| 102| 202| 302|
| 4| null| null| null| 104| 204| 304|
| 1| null| null| null| 101| 201| 301|
| 3| null| null| null| 103| 203| 303|
| 1| 11| 21| 31|null|null|null|
| 2| 12| 22| 32|null|null|null|
| 4| 14| 24| 34|null|null|null|
| 3| 13| 23| 33|null|null|null|
+ -+ -+ -+ -+ + + +
//Solution
df.createTempView("my_table")
query="select l.id as id,r.col_1 as col_1, r.col_2 as col_2, r.col_3 as col_3, l.sf_1 as sf_1, l.sf_2 as sf_2,l.sf_3 as sf_3 from my_table l, my_table r where l.id=r.id and r.col_1 is not null and l.sf_1 is not null"
spark.sql(query).show()
//Sample output:
+ -+ -+ -+ -+ + + +
| id|col_1|col_2|col_3|sf_1|sf_2|sf_3|
+ -+ -+ -+ -+ + + +
| 1| 11| 21| 31| 101| 201| 301|
| 3| 13| 23| 33| 103| 203| 303|
| 4| 14| 24| 34| 104| 204| 304|
| 2| 12| 22| 32| 102| 202| 302|
+ -+ -+ -+ -+ + + +
</code></pre>