用于python cod的emr cluster spark提交选项

2024-09-28 05:26:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我还有一个问题,我使用的是一个EMR集群,它有:

你知道吗船长:Running1m4.large你知道吗

你知道吗核心:Running1m4.2xlarge你知道吗

我有一个python代码可以分割一个txt文件。我的问题是,当我试图执行我的代码时,我现在只使用:

spark-submit <my_python_file>.py

结果是:

[hadoop@ip-192-168-97-253 test_0]$ spark-submit 3.py
19/07/08 13:28:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/07/08 13:28:11 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000654650000, 384626688, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 384626688 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/hadoop/test_0/hs_err_pid30413.log
[hadoop@ip-192-168-97-253 test_0]$ ls -la
total 364
drwxrwxr-x 2 hadoop hadoop   4096 Jul  8 13:29 .
drwxr-xr-x 5 hadoop hadoop   4096 Jul  8 13:22 ..
-rw-rw-r-- 1 hadoop hadoop   1974 Jul  3 23:32 3.py

我必须在这里写些什么来利用电子病历的优势?

这是我的python代码:

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType, StringType, DateType
from pyspark.sql.functions import array, col, explode, struct, lit



spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext


fuente_1 = "s3://someplace/data_1.txt"
save_folder = "s3://anotherplace/converted"



new_set_1 = []


def split_txt_1(str):
    global new_set_1

    start_psotion = 0
    index = 0
    prev = 0
    # str = mylines[0]
    substr = u"\u001E"
    registro = 0

    while index < len(str):
        index = str.find(substr, index)


        if index == -1:
            new_set_1.append(str[start_psotion:len(str)])
            start_psotion = index + len(substr)

            my_df_1 = spark.createDataFrame(new_set_1, StringType())
            final_2 = my_df_1.repartition(1)
            final_2.write.csv(path=save_folder, mode="append")


        else:
            new_set_1.append(str[start_psotion:index])
            start_psotion = index + len(substr)

            if registro == 100:
                my_df_1 = spark.createDataFrame(new_set_1, StringType())
                final_2 = my_df_1.repartition(1)
                final_2.write.csv(path=save_folder, mode="append")
                new_set_1 = []
                registro = 0
            else:
                registro += len(substr)

        if index == -1:
            break

        prev = index + len(substr)
        index += len(substr)


lines_1 = sc.textFile(fuente_1)

llist_1 = lines_1.collect()

split_txt_1(llist_1[0])

这是一个很小的txt文件的例子,很明显真实的文件有很多信息(都是用u“\u001E”表示的)

你知道吗**********************文件.txt**************************************你知道吗

768 | c-234r4 |胡里奥| 38 | c-123 | 123a-533r2 |安娜| 32 | c-123 | 32a-543r4 |索尼娅| 33 | c-v23 | 43

*********************结束文件.txt**********************************你知道吗

我的文件是这样的: image

请问这有什么好处吗? 我做错了什么?你知道吗

敬礼


Tags: 文件txthadoopnewindexlenmystart

热门问题