如何通过连接Spark中的表来管理物理内存使用?

2024-10-06 10:29:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我目前正在使用Spark将所有这9个表合并到一个大数据集市中,但我的电脑只有3.5 GB的RAM,由于内存不足,很容易断开连接/返回Py4JError

authorized_df = authorized_df.drop(*authorized_drop)
authorized_df = authorized_df.withColumnRenamed("id","transaction_id")
authorized_df = authorized_df.withColumnRenamed("created_date","transaction_created_date")
authorized_df = authorized_df.withColumnRenamed("card_type","detailed_card_type")
authorized_df = authorized_df.withColumn("bin_identifier",func_bin(authorized_df["pan"]))
authorized_info = [authorized_df.count(), len(authorized_df.columns), authorized_df.columns]

mobileuser_df = sqlContext.read.format('jdbc').options(
        driver = 'com.mysql.jdbc.Driver',
        url=jdbc_url, 
        dbtable="cz_mobile_user", 
        user = username,
        password = password).load()

mobileuser_drop = ["pin","modified_date","terminate_date","wrong_password_count",
                 "modified_by","created_by","activation_code","is_active","change_pwd",
                 "aggregator_fk","login_type","mobileusertoken_fk",
                 "merchant_source_sales","merchant_join_date","merchant_pic_salutation",
                 "merchant_pic_firstname","merchant_pic_lastname"]

mobileuser_df = mobileuser_df.drop(*mobileuser_drop)
mobileuser_df = mobileuser_df.withColumnRenamed("id","mobile_user_id")
mobileuser_df = mobileuser_df.withColumnRenamed("created_date","mobileuser_created_date")
mobileuser_df = mobileuser_df.withColumnRenamed("contact","mobileuser_contact")
mobileuser_df = mobileuser_df.withColumnRenamed("email","mobileuser_email")
mobileuser_df = mobileuser_df.withColumnRenamed("first_name","mobileuser_firstname")
mobileuser_df = mobileuser_df.withColumnRenamed("middle_name","mobileuser_middlename")
mobileuser_df = mobileuser_df.withColumnRenamed("last_name","mobileuser_last_name")
mobileuser_df = mobileuser_df.withColumnRenamed("salutation","mobileuser_salutation")
mobileuser_info = [mobileuser_df.count(), len(mobileuser_df.columns), mobileuser_df.columns]

users_df = sqlContext.read.format('jdbc').options(
        driver = 'com.mysql.jdbc.Driver',
        url=jdbc_url, 
        dbtable="users", 
        user = username,
        password = password).load()

users_df = users_df.select([c for c in users_df.columns if c in ["id","contact","email","merchant_fk","control_fk","bank_fk","role_fk",
                       "first_name","middle_name","last_name","salutation"]])
users_df = users_df.withColumnRenamed("id","PIC_id")
users_df = users_df.withColumnRenamed("contact","PIC_contact")
users_df = users_df.withColumnRenamed("email","PIC_email")
users_df = users_df.withColumnRenamed("first_name","PIC_first_name")
users_df = users_df.withColumnRenamed("middle_name","PIC_middle_name")
users_df = users_df.withColumnRenamed("last_name","PIC_last_name")
users_df = users_df.withColumnRenamed("salutation","PIC_salutation")
users_df = users_df.withColumnRenamed("merchant_fk","users_merchant_fk")
users_info = [users_df.count(), len(users_df.columns), users_df.columns]

reader_df = sqlContext.read.format('jdbc').options(
        driver = 'com.mysql.jdbc.Driver',
        url=jdbc_url, 
        dbtable="reader", 
        user = username,
        password = password).load()

reader_df = reader_df.select([c for c in reader_df.columns if c in ["id","devicetype_fk","serial_number"]])
reader_df = reader_df.withColumnRenamed("id","reader_id")
reader_info = [reader_df.count(), len(reader_df.columns), reader_df.columns]

mid_df = sqlContext.read.format('jdbc').options(
        driver = 'com.mysql.jdbc.Driver',
        url=jdbc_url, 
        dbtable="cz_mid", 
        user = username,
        password = password).load()

mid_df = mid_df.select([c for c in mid_df.columns if c in ["id","created_date","application_identifier","merchant_fk","bank_fk",
                   "master_mcc_fk","status","bank_product"]])
mid_df = mid_df.withColumnRenamed("id","mid_id")
mid_df = mid_df.withColumnRenamed("created_date","mid_created_date")
mid_df = mid_df.withColumnRenamed("status","mid_status")
mid_info = [mid_df.count(), len(mid_df.columns), mid_df.columns]

mastermcc_df = sqlContext.read.format('jdbc').options(
        driver = 'com.mysql.jdbc.Driver',
        url=jdbc_url, 
        dbtable="master_mcc", 
        user = username,
        password = password).load()

mastermcc_df = mastermcc_df.select([c for c in mastermcc_df.columns if c in ["mcc","mcc_name_en","ovo_mcc_name"]])

mastermcc_df = mastermcc_df.withColumnRenamed("mcc","master_mcc_id")
mastermcc_df = mastermcc_df.withColumnRenamed("mcc_name_en","master_mcc_description")
mastermcc_info = [mastermcc_df.count(), len(mastermcc_df.columns), mastermcc_df.columns]

catmastermcc_df = sqlContext.read.format('jdbc').options(
        driver = 'com.mysql.jdbc.Driver',
        url=jdbc_url, 
        dbtable="category_master_mcc", 
        user = username,
        password = password).load()

catmastermcc_df = catmastermcc_df.withColumnRenamed("id","category_mmcc_id")
catmastermcc_info = [catmastermcc_df.count(), len(catmastermcc_df.columns), catmastermcc_df.columns]

cardbin_df = sqlContext.read.format('jdbc').options(
        driver = 'com.mysql.jdbc.Driver',
        url=jdbc_url, 
        dbtable="cz_card_bin", 
        user = username,
        password = password).load()

cardbin_info = [cardbin_df.count(), len(cardbin_df.columns), cardbin_df.columns]

bank_df = sqlContext.read.format('jdbc').options(
        driver = 'com.mysql.jdbc.Driver',
        url=jdbc_url, 
        dbtable="bank", 
        user = username,
        password = password).load()

bank_df = bank_df.select([c for c in bank_df.columns if c in ["id","created_date","bank_name","status","business_address1",
                     "business_address2","business_contact","city","postcode","state","bank_reference"]])
bank_df = bank_df.withColumnRenamed("id","bank_id")
bank_df = bank_df.withColumnRenamed("created_date","bank_created_date")
bank_df = bank_df.withColumnRenamed("status","bank_status")
bank_df = bank_df.withColumnRenamed("city","bank_city")
bank_df = bank_df.withColumnRenamed("postcode","bank_postcode")
bank_df = bank_df.withColumnRenamed("state","bank_state")
bank_info = [bank_df.count(), len(bank_df.columns), bank_df.columns]

现在开始合并过程

print("Merge 1...")
sp_mu_comb_users = mobileuser_df.join(users_df, mobileuser_df.merchant_fk == users_df.users_merchant_fk, how="left")
sp_mu_comb_users = sp_mu_comb_users.drop("users_merchant_fk")
sp_mu_comb_users_info = [sp_mu_comb_users.count(), len(sp_mu_comb_users.columns), sp_mu_comb_users.columns]

print("Merge 2...")
sp_tr_comb_reader = authorized_df.join(reader_df, authorized_df.did == reader_df.serial_number, how="left")
sp_tr_comb_reader = sp_tr_comb_reader.drop("serial_number")
sp_tr_comb_reader_info = [sp_tr_comb_reader.count(), len(sp_tr_comb_reader.columns), sp_tr_comb_reader.columns]

print("Merge 3...")
sp_mid_mmcc = mid_df.join(mastermcc_df, mid_df.master_mcc_fk == mastermcc_df.master_mcc_id, how="left")
sp_mid_mmcc = sp_mid_mmcc.drop("master_mcc_fk")
sp_mid_mmcc_info = [sp_mid_mmcc.count(), len(sp_mid_mmcc.columns), sp_mid_mmcc.columns]

print("Merge 4...")
sp_mid_mmcc_bnk = sp_mid_mmcc.join(bank_df, sp_mid_mmcc.bank_fk == bank_df.bank_id, how="left")
sp_mid_mmcc_bnk = sp_mid_mmcc_bnk.drop("bank_id")
sp_mid_mmcc_bnk = sp_mid_mmcc_bnk.withColumnRenamed("merchant_fk","mcc_bank_merchant_fk")
sp_mid_mmcc_bnk = sp_mid_mmcc_bnk.withColumnRenamed("bank_fk","mcc_bank_bank_fk")
sp_mid_mmcc_bnk_info = [sp_mid_mmcc_bnk.count(), len(sp_mid_mmcc_bnk.columns), sp_mid_mmcc_bnk.columns]

print("Merge 5...")
sp_mu_comb_users_catmcc = sp_mu_comb_users.join(catmastermcc_df, sp_mu_comb_users.merchant_category_mcc == catmastermcc_df.category_master_mcc, how="left")
sp_mu_comb_users_catmcc = sp_mu_comb_users_catmcc.drop("category_master_mcc")
sp_mu_comb_users_catmcc_info = [sp_mu_comb_users_catmcc.count(), len(sp_mu_comb_users_catmcc.columns), sp_mu_comb_users_catmcc.columns]

print("Merge 6...")
sp_mu_comb_users_catmcc_mid = sp_mu_comb_users_catmcc.join(sp_mid_mmcc_bnk, sp_mu_comb_users_catmcc.merchant_fk == sp_mid_mmcc_bnk.mcc_bank_merchant_fk, how="left")
sp_mu_comb_users_catmcc_mid = sp_mu_comb_users_catmcc_mid.drop("mcc_bank_merchant_fk")
sp_mu_comb_users_catmcc_mid = sp_mu_comb_users_catmcc_mid.drop("mcc_bank_bank_fk")
sp_mu_comb_users_catmcc_mid_info = [sp_mu_comb_users_catmcc_mid.count(), len(sp_mu_comb_users_catmcc_mid.columns), sp_mu_comb_users_catmcc_mid.columns]

print("Merge 7...")
sp_pre_final = sp_tr_comb_reader.join(sp_mu_comb_users_catmcc_mid, sp_tr_comb_reader.mobileuser_fk == sp_mu_comb_users_catmcc_mid.mobile_user_id, how="left")
sp_pre_final = sp_pre_final.drop("mobile_user_id")
sp_pre_final_info = [sp_pre_final.count(), len(sp_pre_final.columns), sp_pre_final.columns]

print("Merge 8...")
sp_final = sp_pre_final.join(cardbin_df, sp_pre_final.bin_identifier == cardbin_df.card_bin, how="left")
sp_final = sp_final.drop("card_bin")
sp_final_info = [sp_final.count(), len(sp_final.columns), sp_final.columns]

因为合并已经完成。。未使用的Spark数据帧会消耗大量物理RAM,然后我会将其全部交给垃圾收集器

import gc
del sp_pre_final
del sp_mu_comb_users_catmcc_mid
del sp_mu_comb_users_catmcc
del sp_mid_mmcc_bnk
del sp_mid_mmcc
del sp_tr_comb_reader
del sp_mu_comb_users
del bank_df
del cardbin_df
del authorized_df
del mastermcc_df
del mid_df
del reader_df
del mobileuser_df
del users_df
del catmastermcc_df
gc.collect()

这只剩下最后一个大数据框(Datamart),它包含35.5M行和97列,我最后的意图是通过写JDBC或写拼花将其保存到MySQL表中。在此之前,这里是我在垃圾清理后的RAM使用情况。。java.exe的使用率为1.7GB

Physical Memory

Detailed Usage

当我试图通过表格将其写入拼花地板或JDBC时。。物理内存一直在增加,速度非常慢,直到java.exe的使用量达到2.7GB,我的运气不好,因为内存不足:

import time
s1= time.time()
sp_final.write.parquet("datamart.parquet")
print("Write to parquet Finished! Takes",time.time() - s1 ,"Seconds")

#mode="overwrite"
#properties = {
#       "user" : "user_account", 
#       "password" : "mysqlworld" 
#}
#sp_final.write.jdbc(url=jdbc_url, table="datamart", mode=mode ,properties=properties)
#print("Write to JDBC Finished! Takes",time.time() - s1 ,"Seconds")

在这种情况下,是否没有增加RAM的选项?或者有没有办法通过写作过程来减少记忆的增长?我想知道是否有可能……提前谢谢你


Tags: columnsiddfusersspreaderbankfk