如何在pyspark的本地模式下读取S3?

2024-09-24 22:28:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用PyCharm 2018.1,使用python3.4,在virtualenv中通过pip安装spark2.3。本地主机上没有hadoop安装,因此没有Spark安装(因此没有Spark_HOME、hadoop_HOME等)

当我尝试这个:

from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")

我得到:

^{pr2}$

在本地模式下运行pyspark而不在本地安装完整的Hadoop,如何读取s3?在

FWIW-当我在非本地模式下在EMR节点上执行它时,效果非常好。在

以下操作不起作用(相同的错误,尽管它确实解决并下载了依赖项):

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:3.1.0" pyspark-shell'
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")

结果相同(不好):

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/path/to/hadoop-aws-3.1.0.jar" pyspark-shell'
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")

Tags: fromimporthadoops3oslocalconfspark
3条回答

所以格伦尼的回答很接近,但对你的情况却没有什么效果。关键是选择正确版本的依赖项。如果你看看虚拟环境

Jars

所有内容都指向一个版本2.7.3,您还需要使用这个版本

os.environ['PYSPARK_SUBMIT_ARGS'] = ' packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'

您应该通过检查项目的虚拟环境中的路径venv/Lib/site-packages/pyspark/jars来验证安装使用的版本

然后,您可以在默认情况下使用s3a,或者通过为其定义处理程序类来使用s3

^{pr2}$

输出量低于

OutputSpark

在本地访问S3时,应该使用s3a协议。请确保先将密钥和密钥添加到SparkContext。像这样:

sc = SparkContext(conf = conf)
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'awsKey')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'awsSecret')

inputFile = sparkContext.textFile("s3a://somebucket/file.csv")

准备工作:

在spark配置文件中添加以下行,对于我的本地pyspark,它是/usr/local/spark/conf/spark-default.conf

spark.hadoop.fs.s3a.access.key=<your access key>
spark.hadoop.fs.s3a.secret.key=<your secret key>

python文件内容:

^{pr2}$

提交:

spark-submit  master local \
 packages org.apache.hadoop:hadoop-aws:2.7.3,\
com.amazonaws:aws-java-sdk:1.7.4,\
org.apache.hadoop:hadoop-common:2.7.3 \
<path to the py file above>

相关问题 更多 >