Spark SQL forloop错误“bool”属性没有属性“alias”

2024-10-02 02:30:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在sparksqltataframe中创建新列,比较dataframe中的两个列,如果相等则返回True,否则返回False。我必须对一个包含数千列的数据集执行此操作。作为一个示例问题,我在这里包含了我的所有代码。然而,重要的问题出现在代码块末尾的第二个for循环中。在

from pyspark.sql import SQLContext
from pyspark.sql.types import *    
data = sc.parallelize([[1, None, 'BND'], [2, None, 'RMP'], [3, None, 'SWP'], [4, None, "IRS"], [5, None, "SWP"], [6, None, "IRS"]])
match = sc.parallelize([[1, 2,  100], [3, 5, 101], [4, 6, 102]])

trade_schema_string = 'trade_id,match_id,product'
trade_fields = [StructField(field_name, StringType(), True) for field_name in trade_schema_string.split(',')]
trade_fields[0].dataType = IntegerType()
trade_fields[1].dataType = IntegerType()
trade_schema = StructType(trade_fields)

match_schema_string = "pri_netting_id,sec_netting_id,match_id"
match_fields = [StructField(field_name, IntegerType(), True) for field_name in match_schema_string.split(',')]
match_schema = StructType(match_fields)

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(data, trade_schema)
odf = sqlContext.createDataFrame(match, match_schema)
df.registerTempTable("trade")
odf.registerTempTable("match")

# Get match_ids so you can match up front office and back office records
# Change column names for fo and bo dataframes so that they say "bo_product" and "fo_product", etc.
fo = sqlContext.sql("SELECT t.trade_id,t.product,m.match_id FROM trade t INNER JOIN match m WHERE t.trade_id = m.pri_netting_id")
bo = sqlContext.sql("SELECT t.trade_id,t.product,m.match_id FROM trade t INNER JOIN match m WHERE t.trade_id = m.sec_netting_id")
col_names = fo.columns
for x in range(0, len(col_names)):
    col_name = col_names[x]
    fo = fo.withColumnRenamed(col_name, "fo_" + col_name)
    bo = bo.withColumnRenamed(col_name, "bo_" + col_name)

fo.registerTempTable("front_office")
bo.registerTempTable("back_office")

fobo = sqlContext.sql("SELECT f.fo_trade_id,f.fo_product,b.bo_trade_id,b.bo_product FROM front_office f INNER JOIN back_office b WHERE f.fo_match_id = b.bo_match_id")
fobo = fobo.repartion(5)
# How to create diff columns
num_cols = len(fobo.columns)
fobo_names = fobo.columns
first = fobo.first()

for x in range(0, num_cols / 2):
    new_name = "\'diff_" + fobo_names[x][3:] + "\'"
    old_column_fo = "fobo." + fobo_names[x]
    old_column_bo = "fobo." + fobo_names[x + (num_cols / 2)]
    fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)

我得到的错误是:

Traceback (most recent call last): File "", line 8, in File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/spark/python/pyspark/sql/dataframe.py", line 695, in withColumn return self.select('*', col.alias(colName)) AttributeError: 'bool' object has no attribute 'alias'

奇怪的是,如果我用手执行以下操作:

fobo = fobo.withColumn("diff_product", fobo.fo_product == fobo.bo_product)

以及

fobo = fobo.withColumn("diff_trade_id", fobo.fo_trade_id == fobo.bo_trade_id)

整个过程都很完美。然而,这对于我的真实用例来说并不实用,因为它有很多列。在


Tags: namenoneidfieldsforsqlnamesschema
1条回答
网友
1楼 · 发布于 2024-10-02 02:30:46
old_column_fo = "fobo." + fobo_names[x]
old_column_bo = "fobo." + fobo_names[x + (num_cols / 2)]
fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)

old_column_foold_column_bo将是字符串,它们看起来与您试图访问的属性名相似,但它们不是实际的属性。请尝试改用getattr。在

^{pr2}$

相关问题 更多 >

    热门问题