在两个Spark数据框中比较字符串值

2024-10-01 13:26:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个数据帧命名-品牌名称和景点名称。在

数据帧1(品牌名称):-

+-------------+
|brand_stop[0]|
+-------------+
|TOASTMASTERS |
|USBORNE      |
|ARBONNE      |
|USBORNE      |
|ARBONNE      |
|ACADEMY      |
|ARBONNE      |
|USBORNE      |
|USBORNE      |
|PILLAR       |
+-------------+

数据帧2:-(poi峎u name)

^{pr2}$

我想检查dataframe 1的brand_stop列中的字符串是否存在于dataframe 2的Name列中。应该按行进行匹配,然后如果匹配成功,则应将该特定记录存储在新列中。在

我试着用加入:-在

from pyspark.sql.functions import udf, col 
from pyspark.sql.types import BooleanType

contains = udf(lambda s, q: q in s, BooleanType())

like_with_python_udf = (poi_names.join(brand_names1)
    .where(contains(col("Name"), col("brand_stop[0]")))
    .select(col("Name")))
like_with_python_udf.show()

但这显示了一个错误

"AnalysisException: u'Detected cartesian product for INNER join between logical plans"

我是PySpark新手。请帮我拿这个。在

谢谢你


Tags: 数据namefrom名称dataframesqlcolpyspark
2条回答

The matching should be done row wise

在这种情况下,您必须添加某种形式的索引和连接

from pyspark.sql.types import *

def index(df):
    schema = StructType(df.schema.fields + [(StructField("_idx", LongType()))])
    rdd = df.rdd.zipWithIndex().map(lambda x: x[0] +(x[1], ))
    return rdd.toDF(schema)

brand_name = spark.createDataFrame(["TOASTMASTERS", "USBORNE"], "string").toDF("brand_stop")
poi_name = spark.createDataFrame(["TOASTMASTERS DISTRICT 48", "USBORNE BOOKS AND MORE"], "string").toDF("poi_name")

index(brand_name).join(index(poi_name), ["_idx"]).selectExpr("*", "poi_name rlike brand_stop").show()
# +  +      +          +            -+              
# |_idx|  brand_stop|            poi_name|poi_name RLIKE brand_stop|
# +  +      +          +            -+
# |   0|TOASTMASTERS|TOASTMASTERS DIST...|                     true|
# |   1|     USBORNE|USBORNE BOOKS AND...|                     true|
# +  +      +          +            -+

scala代码如下:

val d1 = Array(("TOASTMASTERS"),("USBORNE"),("ARBONNE"),("USBORNE"),("ARBONNE"),("ACADEMY"),("ARBONNE"),("USBORNE"),("USBORNE"),("PILLAR"))
val rdd1 = sc.parallelize(d1)
val df1 = rdd1.toDF("brand_stop")

val d2 = Array(("TOASTMASTERS DISTRICT 48"),("USBORNE BOOKS AND MORE"),("ARBONNE"),("USBORNE BOOKS AT HOME"),("ARBONNE"),("ACADEMY, LTD."),("ARBONNE"),("USBORNE BOOKS AT HOME"),("USBORNE BOOKS & MORE"),("PILLAR TO POST HOME INSPECTION SERVICES")) 
val rdd2 =sc.parallelize(d2)
val df2 = rdd2.toDF("names")


def matchFunc(s1:String,s2:String) : Boolean ={ 
if(s2.contains(s1)) true
else false
}
val contains = udf(matchFunc _)

val like_with_python_udf = (df1.join(df2).where(contains(col("brand_stop"), col("names"))).select(col("brand_stop"), col("names")))
like_with_python_udf.show()

Python代码:

^{pr2}$

我要出局了:

++ |品牌停车| + + |演讲会| |厄斯本| |阿尔本| ++

相关问题 更多 >