我在试图通过jdbc连接更新现有Kudu表时遇到问题。声明的表被用作源和目标,我正在努力更新几个记录
我的过程如下:
我正在使用jdbc连接从名为db1.test_cls1
的输入Kudu表读取数据:
表具有以下属性:
consumer_id object
merchant_id object
log_date datetime64[ns]
beginning_period datetime64[ns]
end_period datetime64[ns]
first_trx_date datetime64[ns]
last_trx_date datetime64[ns]
tse float64
rec int32
freq int32
ag int32
mv float64
p_mo float64
p_al float64
p_freq float64
从表中读取数据的Jdbc连接:
sourceDF = spark.read.jdbc(url=url, table=inputTable, properties=properties)
!!注意-名为p_mo, p_al and p_freq
的属性最初是空的,应该在Python代码中进行计算后更新
p_mo, p_al and p_freq
李>sourceDF.createTempView("sourceDFView")
sql = """
SELECT consumer_id
, merchant_id
, log_date
, beginning_period
, end_period
, first_trx_date
, last_trx_date
, tse
, rec
, freq
, ag
, mv
FROM sourceDFView """
inputDF = sqlContext.sql(sql)
inputDF.dtypes
inputDF = inputDF.toPandas().reset_index()
def get_p_mo():
...code for calculation...
return p_mo
def get_p_al():
...code for calculation...
return p_al
def get_p_freq():
...code for calculation...
return p_freq
inputDF['p_mo'] = get_p_mo()
inputDF['p_al'] = get_p_al()
inputDF['p_freq'] = get_p_freq()
db1.test_cls1
的更新
具体做法如下:tarDF: DataFrame = inputDF[[
'consumer_id',
'merchant_id',
'log_date',
'beginning_period',
'end_period',
'first_trx_date',
'last_trx_date',
'tse',
'rec',
'freq',
'ag',
'mv',
'p_mo',
'p_al',
'p_freq']]
inputDF_Prob = spark.createDataFrame(tarDF, schema=schema)
inputDF_Prob.createTempView("targetDF")
sqlTarget = """
UPDATE
sourceDFView
set
sourceDFView.p_al= targetDF.p_al
, sourceDFView.p_freq = targetDF.p_freq
, sourceDFView.p_mo = targetDF.p_mo
from
targetDF
inner join
sourceDFView
on
sourceDFView.consumer_id = targetDF.consumer_id
and
sourceDFView.merchant_id = targetDF.merchant_id
and
sourceDFView.log_date = targetDF.log_date
and
sourceDFView.beginning_period = targetDF.beginning_period
and
sourceDFView.end_period = targetDF.end_period
"""
targetDF = sqlContext.sql(sqlTarget)
targetDF.write \
.mode("append") \
.jdbc(url=url, table=inputTable,
properties=properties)
这将导致以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o32.sql.
: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'UPDATE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 2, pos 8)
!注: 我还尝试直接在色调编辑器中进行更新,效果非常好
除此之外,我还尝试使用代码的下一部分将数据存储在不同的目标表中,并且效果良好:
sqlTarget = """
SELECT
cast(consumer_id as string) consumer_id
, cast(merchant_id as string) merchant_id
, cast(log_date as timestamp) log_date
, cast(beginning_period as timestamp) beginning_period
, cast(end_period as timestamp) end_period
, cast(p_mo as double) p_mo
, cast(p_al as double) p_al
, cast(p_freq as double) p_freq
FROM targetDF
"""
targetDF.write \
.mode("append") \
.jdbc(url=url, table=targetTable,
properties=properties)
有人可以帮助解决这个问题吗?例如,如何通过jdbc连接更新Kudu表
目前没有回答
相关问题 更多 >
编程相关推荐