将Pandas数据帧转换为Spark Datafram时出现问题

2024-05-21 06:40:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我有点困惑:

我有下面的脚本。我把spark DF改装成熊猫DF来执行我的功能。在

我现在有一个输出数据帧DF6,这正是我需要的。在

我现在需要将数据写回HDFS(Pandas无法做到),所以我需要将Pandas数据帧转换回Spark并将其写入目录。在

我使用了下面的函数来完成这个任务,但不幸的是,它不起作用

data_spark = spark_session.createDataFrame(df6)
data_spark.show()

我得到的错误是:

^{pr2}$

显然有一个叫做PySpark的模块,因为我在stdout中得到了正确的df6输出。也就是说,Spark创建了一个dataframe,并将其转换为Pandas&只是在努力实现最后一个函数。在

对此我能做些什么吗?在

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
import pandas as pd
import time
from datetime import datetime
import os
import glob
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .config("hive.metastore.uris", "thrift://domain.net:9083")\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session
### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('testing_files')
    #import file into dataframe
    start = time.time()
    #--------------------------------------------------------------------------------------------
    #-----------------------------CALCUALTE DATES AND TIMES FOR QUERY----------------------------
    #--------------------------------------------------------------------------------------------
    dt_now = datetime.now()
    target_hour = int(dt_now.strftime('%s')) - 60*60*12
    today_date = datetime.fromtimestamp(target_hour).strftime('%Y%m%d')
    hour = datetime.fromtimestamp(target_hour).strftime('%H')
    #--------------------------------------------------------------------------------------------
    #-----------------------------------CREATE DF FROM FILES ------------------------------------
    #--------------------------------------------------------------------------------------------
    schema = [\
        StructField('dmy',StringType(), True),\
        StructField('hh',StringType(), True),\
        very long list of fields....          
        ]
    final_structure = StructType(schema)

    df = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://nameservice/data/data/dt=20181022/hour=11/*/*', final_structure)\
        .select('domain', 'optimisedsize')
    df2 = df.filter(df.domain != '----').groupby('domain').agg(sqlfunc.sum(df.optimisedsize).alias('sdsf'))
    df2.show()
    df3 = df2.toPandas()

#--------------------------------------------------------------------------------------------
#-----------------------------DEFINE REQUIRED LOOKUP LISTS-----------------------------------
#--------------------------------------------------------------------------------------------
tld = ('co.uk', 'com', 'org', 'gov.uk', 'co', 'net', 'news', 'it', 'in' 'es', 'tw', 'pe', 'io', 'ca', 'cat', 'com.au',
  'com.ar', 'com.mt', 'com.co', 'ws', 'to', 'es', 'de', 'us', 'br', 'im', 'gr', 'cc', 'cn', 'org.uk', 'me', 'ovh', 'be',
  'tv', 'tech', '..', 'life', 'com.mx', 'pl', 'uk', 'ru', 'cz', 'st', 'info', 'mobi', 'today', 'eu', 'fi', 'jp', 'life',
  '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'earth', 'ninja', 'ie', 'im', 'ai', 'at', 'ch', 'ly', 'market', 'click',
  'fr', 'nl', 'se')
cdns = ('akamai', 'akamaized', 'maxcdn', 'cloudflare')
cleandomain = []

#--------------------------------------------------------------------------------------------
#-----------------------------SPLIT DOMAIN AT EVERY DOT--------------------------------------
#--------------------------------------------------------------------------------------------
index = df3.domain.str.split('.').tolist()
#--------------------------------------------------------------------------------------------
#------------------DEFINE FUNCTION FOR DOMAIN MANIPULATION-----------------------------------
#--------------------------------------------------------------------------------------------
def domfunction():
    #if it isn't a string, then print the value directly in the cleandomain list
    try:
        if str(x[-1]).isdigit():
            try:
                cleandomain.append(str(x[0])+'.'+str(x[1])+'.*.*')
            except IndexError:
                cleandomain.append(str(x)) 
        #if its in the CDN list, take a subdomain as well
        elif len(x) > 3 and str(x[len(x)-2]).rstrip() in cdns:
            try:
                cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+str(x[len(x)-1]))
            except IndexError:
                cleandomain.append(str(x))
        elif len(x) > 3 and str(x[len(x)-3]).rstrip() in cdns:
            try:
                cleandomain.append(str(x[len(x)-4])+'.'+str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
            except IndexError:
                cleandomain.append(str(x))
        #if its in the TLD list, do this
        elif len(x) > 2 and str(x[len(x)-2]).rstrip()+'.'+ str(x[len(x)-1]).rstrip() in tld:
            try:
                cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
            except IndexError:
                cleandomain.append(str(x))
        elif len(x) > 2 and str(x[len(x)-1]) in tld:
            try:
                cleandomain.append(str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
            except IndexError:
                cleandomain.append(str(x))
        #if its not in the TLD list, do this
        else:
          cleandomain.append(str(x))
    except IndexError:
          cleandomain.append(str(x))
    except TypeError:
          cleandomain.append(str(x))
#--------------------------------------------------------------------------------------------
#-------------LOOP OVER ITEMS WITHIN THE INDEX & CONCAT REQUIRED ELEMENTS--------------------
#--------------------------------------------------------------------------------------------
for x in index:
    domfunction()
#--------------------------------------------------------------------------------------------
#-------------------------------CONFIGURE OUTPUTS--------------------------------------------
#--------------------------------------------------------------------------------------------
#add the column to the dataframe

se = pd.Series(cleandomain)
df3['newdomain2'] = se.values
#select only the new domain column & usage & group by
df5 = df3.groupby(['newdomain2'],as_index = False)[['sdsf']].sum()
df6 = df5.sort_values(['sdsf'], ascending=["true"])
print(df6)
spark_df = spark_session.createDataFrame(df6)
spark_df.show()
spark_df.coalesce(100).write.format("com.databricks.spark.csv").option("header", "false").option('sep', '\t').mode('append').save('hdfs://nameservice/user/keenek1/domainlookup')
data_spark = spark_session.createDataFrame(df6)
data_spark.show()
print(df6)
end = time.time()
print("RunTime:")
print(end-start)

Tags: theinimportcomdfdatalensession