PySpark&SQL:修复运行HigherOrderFunction时的类型错误

2024-09-30 10:36:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试对以下PySpark DF运行SQL查询:

+--------------------+
|              values|
+--------------------+
|[1.09125882, 0.97...|
|[1.0, 1.0, 1.0, 1...|
|[1.06119951, 1.04...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.12954037,...|
|[1.0, 1.08907695,...|
|[1.0, 1.0, 1.0, 1...|
|[1.017957352, 0.9...|
|[1.015306123, 1.0...|
|[1.0, 1.0, 1.0, 1...|
|[1.015306123, 1.0...|
|[1.07177033, 1.00...|
|[1.0, 1.09094099,...|
|[1.061907984, 1.0...|
|[1.072550215, 1.0...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.0, 1.0, 1...|
|[1.08173935, 1.04...|
|[1.039907582, 1.0...|
+--------------------+
only showing top 20 rows

我正在引用thisDataBricks教程中的代码(这是最后一个示例)。查询如下:

query = """SELECT values,
         AGGREGATE(values,
           (1.0 AS product, 0 AS N),
           (buffer, value) -> (value * buffer.product, buffer.N + 1),
           buffer -> Power(buffer.product, 1.0 / buffer.N)) geomean FROM df_table"""
data_fin = spark.sql(query)

以下是DF的模式供参考:

root
 |-- values: array (nullable = true)
 |    |-- element: double (containsNull = true)

我将表格注册为:

data5.registerTempTable("df_table")

但是,我得到一个类型错误:

An error was encountered:
"cannot resolve 'aggregate(df_table.`values`, named_struct('product', 1.0BD, 'N', 0), lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N` + 1)), namedlambdavariable(), namedlambdavariable()), lambdafunction(POWER(CAST(namedlambdavariable().`product` AS DOUBLE), CAST((CAST(1.0BD AS DECIMAL(11,1)) / CAST(CAST(namedlambdavariable().`N` AS DECIMAL(10,0)) AS DECIMAL(11,1))) AS DOUBLE)), namedlambdavariable()))' due to data type mismatch: argument 3 requires struct<product:decimal(2,1),N:int> type, however, 'lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N` + 1)), namedlambdavariable(), namedlambdavariable())' is of struct<col1:double,col2:int> type.; line 2 pos 9;\nProject [values#514, aggregate(values#514, named_struct(product, 1.0, N, 0), lambdafunction(named_struct(col1, (lambda value#565 * cast(lambda buffer#564.product as double)), col2, (lambda buffer#564.N + 1)), lambda buffer#564, lambda value#565, false), lambdafunction(POWER(cast(lambda buffer#566.product as double), cast(CheckOverflow((promote_precision(cast(1.0 as decimal(11,1))) / promote_precision(cast(cast(lambda buffer#566.N as decimal(10,0)) as decimal(11,1)))), DecimalType(13,12)) as double)), lambda buffer#566, false)) AS geomean#563]\n+- SubqueryAlias `df_table`\n   +- Project [zpid#26, zip_#455, values#514]\n      +- Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 6 more fields]\n         +- Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 5 more fields]\n            +- Filter (zip_#455 = 02138)\n               +- Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 4 more fields]\n                  +- Repartition 40, false\n                     +- Relation[_id#0,address#1,cmaToolCompCandidates#2,comps#3,data#4,description#5,hiResImageLink#6,homeType#7,hugePhotos#8,latitude#9,location#10,longitude#11,nearbyHomes#12,nearbySales#13,no#14,priceHist#15,priceHistory#16,propertyTaxRate#17,rentZestimate#18L,resoFacts#19,responsivePhotos#20,streetViewTileImageUrlMediumAddress#21,taxHistory#22,tourPhotos#23,... 3 more fields] parquet\n"
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 767, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "cannot resolve 'aggregate(df_table.`values`, named_struct('product', 1.0BD, 'N', 0), lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N` + 1)), namedlambdavariable(), namedlambdavariable()), lambdafunction(POWER(CAST(namedlambdavariable().`product` AS DOUBLE), CAST((CAST(1.0BD AS DECIMAL(11,1)) / CAST(CAST(namedlambdavariable().`N` AS DECIMAL(10,0)) AS DECIMAL(11,1))) AS DOUBLE)), namedlambdavariable()))' due to data type mismatch: argument 3 requires struct<product:decimal(2,1),N:int> type, however, 'lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N` + 1)), namedlambdavariable(), namedlambdavariable())' is of struct<col1:double,col2:int> type.; line 2 pos 9;\nProject [values#514, aggregate(values#514, named_struct(product, 1.0, N, 0), lambdafunction(named_struct(col1, (lambda value#565 * cast(lambda buffer#564.product as double)), col2, (lambda buffer#564.N + 1)), lambda buffer#564, lambda value#565, false), lambdafunction(POWER(cast(lambda buffer#566.product as double), cast(CheckOverflow((promote_precision(cast(1.0 as decimal(11,1))) / promote_precision(cast(cast(lambda buffer#566.N as decimal(10,0)) as decimal(11,1)))), DecimalType(13,12)) as double)), lambda buffer#566, false)) AS geomean#563]\n+- SubqueryAlias `df_table`\n   +- Project [zpid#26, zip_#455, values#514]\n      +- Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 6 more fields]\n         +- Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 5 more fields]\n            +- Filter (zip_#455 = 02138)\n               +- Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 4 more fields]\n                  +- Repartition 40, false\n                     +- Relation[_id#0,address#1,cmaToolCompCandidates#2,comps#3,data#4,description#5,hiResImageLink#6,homeType#7,hugePhotos#8,latitude#9,location#10,longitude#11,nearbyHomes#12,nearbySales#13,no#14,priceHist#15,priceHistory#16,propertyTaxRate#17,rentZestimate#18L,resoFacts#19,responsivePhotos#20,streetViewTileImageUrlMediumAddress#21,taxHistory#22,tourPhotos#23,... 3 more fields] parquet\n"

在准备SQL查询时,是否有方法保留数据类型?我假设原始DF中的模式对于SQL查询是正确的


Tags: lambdaiddataasbufferproductstructnamed
1条回答
网友
1楼 · 发布于 2024-09-30 10:36:19

当您需要以AGGREGATEREDUCE(两者都是别名)的形式运行函数时,第一个参数是数组值,第二个参数必须定义默认值和类型。您可以编写1.0 (Decimal, Double or Float)0 (Boolean, Byte, Short, Integer or Long),但这让Spark有责任定义这些选项之间的内容。在您的示例中,由于隐式选择的数据类型不匹配而引发错误

为了保证数据类型正确并且查询将始终运行,请将(1.0 AS product, 0 AS N)更改为(cast(1 as double) AS product, cast(0 as double) AS N)

相关问题 更多 >

    热门问题