我有点困惑:
我有下面的脚本。我把spark DF改装成熊猫DF来执行我的功能。在
我现在有一个输出数据帧DF6,这正是我需要的。在
我现在需要将数据写回HDFS(Pandas无法做到),所以我需要将Pandas数据帧转换回Spark并将其写入目录。在
我使用了下面的函数来完成这个任务,但不幸的是,它不起作用
data_spark = spark_session.createDataFrame(df6)
data_spark.show()
我得到的错误是:
^{pr2}$显然有一个叫做PySpark的模块,因为我在stdout中得到了正确的df6输出。也就是说,Spark创建了一个dataframe,并将其转换为Pandas&只是在努力实现最后一个函数。在
对此我能做些什么吗?在
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
import pandas as pd
import time
from datetime import datetime
import os
import glob
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift://domain.net:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('testing_files')
#import file into dataframe
start = time.time()
#--------------------------------------------------------------------------------------------
#-----------------------------CALCUALTE DATES AND TIMES FOR QUERY----------------------------
#--------------------------------------------------------------------------------------------
dt_now = datetime.now()
target_hour = int(dt_now.strftime('%s')) - 60*60*12
today_date = datetime.fromtimestamp(target_hour).strftime('%Y%m%d')
hour = datetime.fromtimestamp(target_hour).strftime('%H')
#--------------------------------------------------------------------------------------------
#-----------------------------------CREATE DF FROM FILES ------------------------------------
#--------------------------------------------------------------------------------------------
schema = [\
StructField('dmy',StringType(), True),\
StructField('hh',StringType(), True),\
very long list of fields....
]
final_structure = StructType(schema)
df = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://nameservice/data/data/dt=20181022/hour=11/*/*', final_structure)\
.select('domain', 'optimisedsize')
df2 = df.filter(df.domain != '----').groupby('domain').agg(sqlfunc.sum(df.optimisedsize).alias('sdsf'))
df2.show()
df3 = df2.toPandas()
#--------------------------------------------------------------------------------------------
#-----------------------------DEFINE REQUIRED LOOKUP LISTS-----------------------------------
#--------------------------------------------------------------------------------------------
tld = ('co.uk', 'com', 'org', 'gov.uk', 'co', 'net', 'news', 'it', 'in' 'es', 'tw', 'pe', 'io', 'ca', 'cat', 'com.au',
'com.ar', 'com.mt', 'com.co', 'ws', 'to', 'es', 'de', 'us', 'br', 'im', 'gr', 'cc', 'cn', 'org.uk', 'me', 'ovh', 'be',
'tv', 'tech', '..', 'life', 'com.mx', 'pl', 'uk', 'ru', 'cz', 'st', 'info', 'mobi', 'today', 'eu', 'fi', 'jp', 'life',
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'earth', 'ninja', 'ie', 'im', 'ai', 'at', 'ch', 'ly', 'market', 'click',
'fr', 'nl', 'se')
cdns = ('akamai', 'akamaized', 'maxcdn', 'cloudflare')
cleandomain = []
#--------------------------------------------------------------------------------------------
#-----------------------------SPLIT DOMAIN AT EVERY DOT--------------------------------------
#--------------------------------------------------------------------------------------------
index = df3.domain.str.split('.').tolist()
#--------------------------------------------------------------------------------------------
#------------------DEFINE FUNCTION FOR DOMAIN MANIPULATION-----------------------------------
#--------------------------------------------------------------------------------------------
def domfunction():
#if it isn't a string, then print the value directly in the cleandomain list
try:
if str(x[-1]).isdigit():
try:
cleandomain.append(str(x[0])+'.'+str(x[1])+'.*.*')
except IndexError:
cleandomain.append(str(x))
#if its in the CDN list, take a subdomain as well
elif len(x) > 3 and str(x[len(x)-2]).rstrip() in cdns:
try:
cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
elif len(x) > 3 and str(x[len(x)-3]).rstrip() in cdns:
try:
cleandomain.append(str(x[len(x)-4])+'.'+str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
#if its in the TLD list, do this
elif len(x) > 2 and str(x[len(x)-2]).rstrip()+'.'+ str(x[len(x)-1]).rstrip() in tld:
try:
cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
elif len(x) > 2 and str(x[len(x)-1]) in tld:
try:
cleandomain.append(str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
#if its not in the TLD list, do this
else:
cleandomain.append(str(x))
except IndexError:
cleandomain.append(str(x))
except TypeError:
cleandomain.append(str(x))
#--------------------------------------------------------------------------------------------
#-------------LOOP OVER ITEMS WITHIN THE INDEX & CONCAT REQUIRED ELEMENTS--------------------
#--------------------------------------------------------------------------------------------
for x in index:
domfunction()
#--------------------------------------------------------------------------------------------
#-------------------------------CONFIGURE OUTPUTS--------------------------------------------
#--------------------------------------------------------------------------------------------
#add the column to the dataframe
se = pd.Series(cleandomain)
df3['newdomain2'] = se.values
#select only the new domain column & usage & group by
df5 = df3.groupby(['newdomain2'],as_index = False)[['sdsf']].sum()
df6 = df5.sort_values(['sdsf'], ascending=["true"])
print(df6)
spark_df = spark_session.createDataFrame(df6)
spark_df.show()
spark_df.coalesce(100).write.format("com.databricks.spark.csv").option("header", "false").option('sep', '\t').mode('append').save('hdfs://nameservice/user/keenek1/domainlookup')
data_spark = spark_session.createDataFrame(df6)
data_spark.show()
print(df6)
end = time.time()
print("RunTime:")
print(end-start)
目前没有回答
相关问题 更多 >
编程相关推荐