正在将pyspark偏移滞后动态值检索到其他datafram

2024-09-30 01:26:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的是pyspark 2.1。下面是我的输入数据帧。我被困在从不同的数据帧获取动态偏移值请帮助

df1=

价值类别

13个

2 2个

4 5个

df2型

类别年-月-周数-滞后-属性运行

0 0 2 0

2019年1月11日

2019年1月2日20

2019年1月3日30

2019年1月4日1

2019年1 5 5 2

2019年1月6日3

2019年1月7 7 4

2019年1月18日5

2019年1月9 9 6

2 0 0 0 9 0

2018年2月1 1 1 2 0

2 2018年1 2 3 2

2 2018年1 3 4 3

2 2018年1 3 5 4

如上例所示,df1是我的查找表,它有偏移值,对于1,偏移值是3,对于类别2,偏移值是2。在

在df2中,runs是我的输出列,因此对于df1中的每个类别值,如果lag值是3,那么从dataframe2[df2]应该考虑lag_attbute和lag down 3个值,因此可以看到每3个lag_属性值重复运行

我试过下面的代码没用。请帮忙

df1=df1.registerTempTable("df1")
df2=df2.registerTempTable("df2")
sqlCtx.sql("select st.category,st.Year,st.Month,st.weekyear,st.lag_attribute,LAG(st.lag_attribute,df1.value, 0) OVER (PARTITION BY st.cagtegory ORDER BY st.Year,st.Month,st.weekyear) as return_test from df1 st,df2 lkp where df1.category=df2.category")

请帮我跨过这个障碍


Tags: 数据by属性attribute类别yearpysparklag
1条回答
网友
1楼 · 发布于 2024-09-30 01:26:59

lag接受一个column对象和一个整数(python integer),如函数签名所示: 在

Signature: psf.lag(col, count=1, default=None)

count的值不能是pysparkIntegerType(列对象)。不过,还是有一些变通办法,让我们从示例数据开始:

^{pr2}$
  1. 如果df1不是太大(意味着少量的categories,并且每个category中可能有很多值),则可以将df1转换为一个列表,并创建一个if elif elif。。。基于其值的条件:

    list1 = df1.collect()
    sc.broadcast(list1)
    
    import pyspark.sql.functions as psf
    from pyspark.sql import Window
    w = Window.partitionBy("category").orderBy("year", "month", "weeknumber")
    cond = eval('psf' + ''.join(['.when(df2.category == ' + str(c) + ', psf.lag("lag_attribute", ' + str(l) + ', 0).over(w))' for c, l in list1]))
    

    注意:如果cl是整数,如果它们是字符串,则:

    cond = eval('psf' + ''.join(['.when(df2.category == "' + str(c) + '", psf.lag("lag_attribute", "' + str(l) + '", 0).over(w))' for c, l in list1]))
    

    现在我们可以应用条件:

    df2.select("*", cond.alias("return_test")).show()
    
        +    +  +  -+     +      -+  +     -+
        |category|year|month|weeknumber|lag_attribute|runs|return_test|
        +    +  +  -+     +      -+  +     -+
        |       1|   0|    0|         0|            0|   2|          0|
        |       1|2019|    1|         1|            1|   0|          0|
        |       1|2019|    1|         2|            2|   0|          0|
        |       1|2019|    1|         3|            3|   0|          0|
        |       1|2019|    1|         4|            4|   1|          1|
        |       1|2019|    1|         5|            5|   2|          2|
        |       1|2019|    1|         6|            6|   3|          3|
        |       1|2019|    1|         7|            7|   4|          4|
        |       1|2019|    1|         8|            8|   5|          5|
        |       1|2019|    1|         9|            9|   6|          6|
        |       2|   0|    0|         0|            9|   0|          0|
        |       2|2018|    1|         1|            2|   0|          0|
        |       2|2018|    1|         2|            3|   2|          9|
        |       2|2018|    1|         3|            4|   3|          2|
        |       2|2018|    1|         3|            5|   4|          3|
        +    +  +  -+     +      -+  +     -+
    
  2. 如果df1大的,那么您可以在一个内置的lag列上自连接df2

    首先,我们将使用一个连接将valuesdf1带到{}:

    df = df2.join(df1, "category")
    

    如果df1不是太大,则应该broadcast它:

    import pyspark.sql.functions as psf
    df = df2.join(psf.broadcast(df1), "category")
    

    现在我们将枚举每个partition中的行,并构建一个lag列:

    from pyspark.sql import Window
    w = Window.partitionBy("category").orderBy("year", "month", "weeknumber")
    left = df.withColumn('rn', psf.row_number().over(w))
    right = left.select((left.rn + left.value).alias("rn"), left.lag_attribute.alias("return_test"))
    
    left.join(right, ["category", "rn"], "left")\
        .na.fill(0)\
        .sort("category", "rn").show()
    
        +    + -+  +  -+     +      -+  +  -+     -+
        |category| rn|year|month|weeknumber|lag_attribute|runs|value|return_test|
        +    + -+  +  -+     +      -+  +  -+     -+
        |       1|  1|   0|    0|         0|            0|   2|    3|          0|
        |       1|  2|2019|    1|         1|            1|   0|    3|          0|
        |       1|  3|2019|    1|         2|            2|   0|    3|          0|
        |       1|  4|2019|    1|         3|            3|   0|    3|          0|
        |       1|  5|2019|    1|         4|            4|   1|    3|          1|
        |       1|  6|2019|    1|         5|            5|   2|    3|          2|
        |       1|  7|2019|    1|         6|            6|   3|    3|          3|
        |       1|  8|2019|    1|         7|            7|   4|    3|          4|
        |       1|  9|2019|    1|         8|            8|   5|    3|          5|
        |       1| 10|2019|    1|         9|            9|   6|    3|          6|
        |       2|  1|   0|    0|         0|            9|   0|    2|          0|
        |       2|  2|2018|    1|         1|            2|   0|    2|          0|
        |       2|  3|2018|    1|         2|            3|   2|    2|          9|
        |       2|  4|2018|    1|         3|            4|   3|    2|          2|
        |       2|  5|2018|    1|         3|            5|   4|    2|          3|
        +    + -+  +  -+     +      -+  +  -+     -+
    

注意:您的runs滞后值有问题,例如catagory=2它只是滞后1,而不是{}。还有一些行在数据帧中具有相同的顺序(例如,示例数据帧中的最后两行df2具有相同的category, year, month and weeknumber),因为涉及到洗牌,所以每次运行代码时可能会得到不同的结果。在

相关问题 更多 >

    热门问题