PySpark如何使用jdbc连接更新Kudu表?

2024-10-03 21:26:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我在试图通过jdbc连接更新现有Kudu表时遇到问题。声明的表被用作源和目标,我正在努力更新几个记录

我的过程如下:

  1. 我正在使用jdbc连接从名为db1.test_cls1的输入Kudu表读取数据: 表具有以下属性:

    consumer_id                   object
    merchant_id                   object
    log_date              datetime64[ns]
    beginning_period      datetime64[ns]
    end_period            datetime64[ns]
    first_trx_date        datetime64[ns]
    last_trx_date         datetime64[ns]
    tse                          float64
    rec                            int32
    freq                           int32
    ag                             int32
    mv                           float64
    p_mo                         float64
    p_al                         float64
    p_freq                       float64
    

从表中读取数据的Jdbc连接: sourceDF = spark.read.jdbc(url=url, table=inputTable, properties=properties)

!!注意-名为p_mo, p_al and p_freq的属性最初是空的,应该在Python代码中进行计算后更新

  1. 按照这个要求,我将Spark数据帧转换为Pandas数据帧,并计算属性p_mo, p_al and p_freq

sourceDF.createTempView("sourceDFView")

sql = """
    SELECT consumer_id
          , merchant_id
          , log_date
          , beginning_period
          , end_period
          , first_trx_date
          , last_trx_date
          , tse
          , rec
          , freq
          , ag
          , mv
        FROM sourceDFView """

inputDF = sqlContext.sql(sql)
inputDF.dtypes

inputDF = inputDF.toPandas().reset_index()


def get_p_mo():
    ...code for calculation...
    return p_mo

def get_p_al():
    ...code for calculation...
    return p_al

def get_p_freq():
    ...code for calculation...
    return p_freq

inputDF['p_mo'] = get_p_mo()
inputDF['p_al'] = get_p_al()
inputDF['p_freq'] = get_p_freq()
  1. 在完成这部分之后,我再次将Pandas df转换为Spark df,以便能够通过jdbc连接执行表db1.test_cls1的更新 具体做法如下:

tarDF: DataFrame = inputDF[[ 'consumer_id', 'merchant_id', 'log_date', 'beginning_period', 'end_period', 'first_trx_date', 'last_trx_date', 'tse', 'rec', 'freq', 'ag', 'mv', 'p_mo', 'p_al', 'p_freq']]

inputDF_Prob = spark.createDataFrame(tarDF, schema=schema)

inputDF_Prob.createTempView("targetDF")

sqlTarget = """
        UPDATE 
            sourceDFView
        set 
            sourceDFView.p_al= targetDF.p_al
            , sourceDFView.p_freq = targetDF.p_freq
            , sourceDFView.p_mo = targetDF.p_mo
        from
            targetDF
        inner join 
            sourceDFView
        on 
            sourceDFView.consumer_id = targetDF.consumer_id
        and 
            sourceDFView.merchant_id = targetDF.merchant_id
        and 
            sourceDFView.log_date = targetDF.log_date
        and 
            sourceDFView.beginning_period = targetDF.beginning_period
        and 
            sourceDFView.end_period = targetDF.end_period
    """

targetDF = sqlContext.sql(sqlTarget)

targetDF.write \
    .mode("append") \
    .jdbc(url=url, table=inputTable,
          properties=properties)

这将导致以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o32.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'UPDATE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 2, pos 8)

!注: 我还尝试直接在色调编辑器中进行更新,效果非常好

除此之外,我还尝试使用代码的下一部分将数据存储在不同的目标表中,并且效果良好:

sqlTarget = """
    SELECT  
          cast(consumer_id as string) consumer_id
        , cast(merchant_id as string) merchant_id
        , cast(log_date as timestamp) log_date
        , cast(beginning_period as timestamp) beginning_period
        , cast(end_period as timestamp) end_period
        , cast(p_mo as double) p_mo
        , cast(p_al as double) p_al
        , cast(p_freq as double) p_freq
    FROM targetDF  
    """
targetDF.write \
    .mode("append") \
    .jdbc(url=url, table=targetTable,
          properties=properties)

有人可以帮助解决这个问题吗?例如,如何通过jdbc连接更新Kudu表


Tags: logiddateconsumerasmerchantperiodmo