2024-09-29 01:34:02 发布
网友
我有以下格式的数据。在
abc, x1, x2, x3 def, x1, x3, x4,x8,x9 ghi, x7, x10, x11
我想要的输出是
您的数据不是CSV格式。CSV表示具有固定模式的逗号分隔文本文件。您的数据的CSV将是:
abc,x1,x2,x3,, def,x1,x3,x4,x8,x9 ghi,x7,x10,x11,,
请注意第1行和第3行中的尾随逗号,它们不在您的数据中。在
由于您有一个不是CSV的文本文件,所以在Spark中获取所需模式的方法是在Python中读取整个文件,解析成您想要的内容,然后使用spark.crateDataFrame()。或者,如果在一个目录中有多个这样的文件,请使用SparkContext.wholeTextFiles,然后使用flatMap解析函数。在
spark.crateDataFrame()
SparkContext.wholeTextFiles
flatMap
假设您已经完成了open("Your File.txt").readlines之类的操作,剩下的就简单了:
open("Your File.txt").readlines
如果数据以文件形式出现,可以这样实现:
在Scala上可以这样实现:
val df = spark.read.option("header", "false").csv("non-csv.txt") val remainingColumns = df.columns.tail df.withColumn("id", monotonically_increasing_id). select( col("id"), col(df.columns(0)), array(remainingColumns.head, remainingColumns.tail: _*) ).show(false)
输出:
您可以做的是首先使用zipWithIndex生成id,然后在map函数中使用r[0].split(",")[0]获取字符串的第一部分,使用r[0].split(",")[1:]获取第二部分。在
zipWithIndex
r[0].split(",")[0]
r[0].split(",")[1:]
代码如下:
from pyspark.sql.types import StringType lines = ["abc, x1, x2, x3", "def, x1, x3, x4,x8,x9", "ghi, x7, x10, x11"] df = spark.createDataFrame(lines, StringType()) df = df.rdd.zipWithIndex() \ .map(lambda (r, indx): (indx, r[0].split(",")[0], r[0].split(",")[1:])) \ .toDF(["id", "name", "x_col"]) df.show(10, False)
以及输出:
您的数据不是CSV格式。CSV表示具有固定模式的逗号分隔文本文件。您的数据的CSV将是:
请注意第1行和第3行中的尾随逗号,它们不在您的数据中。在
由于您有一个不是CSV的文本文件,所以在Spark中获取所需模式的方法是在Python中读取整个文件,解析成您想要的内容,然后使用
spark.crateDataFrame()
。或者,如果在一个目录中有多个这样的文件,请使用SparkContext.wholeTextFiles
,然后使用flatMap
解析函数。在假设您已经完成了
^{pr2}$open("Your File.txt").readlines
之类的操作,剩下的就简单了:如果数据以文件形式出现,可以这样实现:
在Scala上可以这样实现:
输出:
^{pr2}$您可以做的是首先使用
zipWithIndex
生成id,然后在map函数中使用r[0].split(",")[0]
获取字符串的第一部分,使用r[0].split(",")[1:]
获取第二部分。在代码如下:
以及输出:
^{pr2}$相关问题 更多 >
编程相关推荐