有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

线程“broadcastexchange0”java中的apache spark异常。lang.OutOfMemoryError:内存不足,无法生成表并将其广播给所有工作节点

我正在以下配置上运行spark应用程序:

1个主节点,2个工作节点

  • 每个工人有88个核心,因此核心总数为176个

  • 每个辅助进程都有502 GB内存,因此总可用内存为1004 GB

运行应用程序时出现以下异常:

Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
        at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

此错误本身中提到了两种解决方案:

  1. 作为一种解决方法,您可以通过设置 火花sql。自动将阈值设置为-1

  2. 通过设置spark来增加spark驱动程序内存。驾驶员回忆 更高的价值

我试图在运行时设置更多的驱动程序内存,但是我想了解这个问题的根本原因。谁能解释一下吗

我在代码中使用了Java

编辑1

我正在代码中使用广播变量

编辑2

添加包含广播变量的代码

//1.
        Dataset<Row> currencySet1 = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CURRENCY_CD).load();
        currencySetCache = currencySet1.select(CURRENCY_CD, DECIMAL_POSITIONS).persist(StorageLevel.MEMORY_ONLY());
        Dataset<Row> currencyCodes = currencySetCache.select(CURRENCY_CD);
        currencySet = currencyCodes.as(Encoders.STRING()).collectAsList();

        //2.
        Dataset<Row>  divisionSet = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CIS_DIVISION).load();
        divisionSetCache = divisionSet.select(CIS_DIVISION).persist(StorageLevel.MEMORY_ONLY());
        divisionList = divisionSetCache.as(Encoders.STRING()).collectAsList();

        //3.
        Dataset<Row> userIdSet =  sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", SC_USER).load();
        userIdSetCache = userIdSet.select(USER_ID).persist(StorageLevel.MEMORY_ONLY());
        userIdList = userIdSetCache.as(Encoders.STRING()).collectAsList();

ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);

        ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);

        ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);


        //Validation -- Start
        Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
        Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);


        Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
                broadcastVarForUserID.value()),encoder);
        validateDataset.persist(StorageLevel.MEMORY_ONLY());

共 (1) 个答案

  1. # 1 楼答案

    可能的根本原因:仅“spark.driver.memory”的默认值为1GB(取决于分配),这是一个非常小的数字。若您正在读取驱动程序上的大量数据,OutOfMemory很容易发生,异常的建议是正确的

    解决方案:将“spark.driver.memory”和“spark.executor.memory”至少增加到16Gb