使用python sp映射另一个文件

2024-10-01 13:38:47 发布

您现在位置:Python中文网/ 问答频道 /正文

作为spark和python的新手,尝试一些基本的东西来打印员工数据的count和max

from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as psf

spark = SparkSession \
    .builder \
    .appName("Hello") \
    .config("World") \
    .getOrCreate()


sc = spark.sparkContext
sqlContext = SQLContext(sc)
df = spark.createDataFrame(
    sc.textFile("employee.txt").map(lambda l: l.split('::')),
    ["employeeid","deptid","salary"]
)
df.registerTempTable("df")

mostEmpDept = sqlContext.sql("""select deptid, cntDept from (
                                            select deptid, count(*) as cntDept, max(count(*)) over () as maxcnt 
                                            from df 
                                            group by deptid) as tmp
                                            where tmp.cntDept = tmp.maxcnt""")

mostEmpDept.show()

上面的代码给出了雇员人数最多的deptid,如下所示

+-------+--------+                                                              
|deptid |cntDept |
+-------+--------+
|    10 |       7|
+-------+--------+

现在,我有另一个包含所有deptid及其名称的文件,如何将这个结果映射到另一个文件并打印deptid 10名称?另一个文件如下所示

10::Marketing
20::Finance
30::HumanResource
40::HouseKeeping

Tags: 文件fromimportdfsqlascounttmp
1条回答
网友
1楼 · 发布于 2024-10-01 13:38:47

请在下面使用:

sc = spark.sparkContext
sqlContext = SQLContext(sc)
df = spark.createDataFrame(
    sc.textFile("employee.txt").map(lambda l: l.split('::')),
    ["employeeid","deptid","salary"]
)
df.registerTempTable("df")

dept = spark.createDataFrame(
    sc.textFile("dept.txt").map(lambda l: l.split('::')),
    ["deptid","deptname"]
)
dept.registerTempTable("dept")

mostEmpDept = sqlContext.sql("""select deptid, cntDept from (
                                            select deptid, count(*) as cntDept, max(count(*)) over () as maxcnt 
                                            from df 
                                            group by deptid) as tmp
                                            where tmp.cntDept = tmp.maxcnt""")

mostEmpDept.registerTempTable('mostEmpDept')

final_df= sqlContext.sql("select a.deptid, b.deptname from mostEmpDept a inner join dept b on a.deptid=b.deptid")

final_df.show()

如果要保存,请使用

final_df.saveAsTextFile('Location')

相关问题 更多 >