我使用pyspark进行一些数据转换:如下所示:
df_systems_tree_users = sqlContext.read.format("jdbc") \
.option("dbtable",
"(select ID as SYSTEMUID,M_EXTERNAL_ID,metric,DATATRANSMISSIONFREQUENCY,MODEL,BRAND,BUILDING FROM SYSTEM INNER JOIN SENSOR ON SYSTEM.ID=SENSOR.SYSTEMID WHERE LPWANOPERATOR='Objenious' AND M_EXTERNAL_ID!='None' )") \
.option("url", "jdbc:phoenix:master1:2181:/hbase-unsecure") \
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver") \
.load()
objRDD = df_systems_tree_users.rdd.map(lambda x: getStatesAndUplink(x))
getStatesAndUplink方法使用请求python库对外部api执行httpget请求。在
我在4个执行器上运行这个spark作业,每个执行器有4个核心,但是运行30分钟需要很多时间。在
我的问题是如何优化我的代码,以高效的方式并行化我的http请求?在
如documentation中所述,必须指定4个参数:
partitionColumn
lowerBound
upperBound
numPartitions
只有有了这些选项Spark才会并行读取-在其他情况下,它将在一个线程中完成。在
编辑:存在Phoenix Spark plugin。它将并行读取而不指定这4个参数。在
编辑2:如果数据不平衡,方法
getStatesAndUplink
可能正在限制外部服务。它可能只是“卡住”在一个节点上处理相关问题 更多 >
编程相关推荐