无法在Pysp中填充空表

2024-05-03 11:43:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图用for填充一个空表,我不得不说一个数据帧非常大,所以它需要很长时间,但函数在15分钟左右后什么也不做。。你知道吗

我尝试了一个for来表示我想要包含在表中的值,但是它一直在思考

    #Formato exportacion
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

#field = ['Codigo_campania', 'Descripcion', 'Definicion exito','Total publico','Total Exitos','Canal']


esquema = StructType([
    StructField("Codigo_campania",StringType(),True),
    StructField("Descripcion",StringType(),True),
    StructField("Definicion exito",StringType(),True),
    StructField("Total publico",IntegerType(),True),
    StructField("Total Exitos",IntegerType(),True),
    StructField("Canal",StringType(),True)
])

resultado_exito2 = sqlContext.createDataFrame(sc.emptyRDD(), esquema)

resultado_exito2.show()



+---------------+-----------+----------------+-------------+------------+-----+
|Codigo_campania|Descripcion|Definicion exito|Total publico|Total Exitos|Canal|
+---------------+-----------+----------------+-------------+------------+-----+
+---------------+-----------+----------------+-------------+------------+-----+



canales_lst2 = [['MD19-080','email'],['MD19-081','email'],['MD19-083','sms'],['MD19-084','sms']]



schema_canal2 = StructType([
    StructField("CampaignID",StringType(),True),
    StructField("canal",StringType(),True)    
])

df_canal2 = sqlContext.createDataFrame(canales_lst2,  schema_canal2) 






for codigo in ['MD19-080','MD19-083','MD19-084','MD19-081'] :

    tmp3 = exito_bd_reciente.filter((exito_bd_reciente["CampaignID"] == codigo))
    tmp4 = exito_bd_6m.filter((exito_bd_6m["CampaignID"] == codigo))

    ok = df_canal2.filter(df_canal2["CampaignID"] == codigo)
    canal_str=ok.select('canal').collect()[0]["canal"]

    newRow2 = [[ codigo,'Que tengan BD sin acceso en 2anios','Uso BD despues del aviso',tmp3.count(),tmp4.count(), canal_str]]

    newDF2 = spark.createDataFrame(newRow2,resultado_exito.schema)
    resultado_exito = resultado_exito.union(newDF2)

i dont get any error, it keeps the kernel busy all time

Tags: fromimporttruesqlpysparkcodigotypestotal