如何在k8s上运行pyspark作业?

2024-06-28 22:00:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在k8s集群上运行hello world spark应用程序。我已经在标准pyspark docker映像的基础上使用脚本构建了自己的docker映像,现在我正尝试在k8s集群上运行此映像,但出现以下错误。DNS吊舱日志正常

我当前的Dockerfile:

FROM semenchukou/spark-py:v2.4.1
COPY . /app
WORKDIR /app

我用于在k8s上部署作业的命令:

bin/spark-submit 
      --master k8s://https://172.20.234.174:6443
      --deploy-mode cluster
      --conf spark.executor.instances=2
      --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
      --conf spark.kubernetes.container.image=semenchukou/pyspark-k8s-example:likeEx3
      --name spark_k8s_hello_world_0
      --conf spark.kubernetes.pyspark.pythonVersion=3
      local:///app/HelloWorldSpark.py

错误:

Traceback (most recent call last):
  File "/app/HelloWorldSpark.py", line 10, in <module>
    .appName("PythonPi")\
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: External scheduler cannot be instantiated
        at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2794)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:493)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Pod]  with name: [sparkk8shelloworld0-1583151334880-driver]  in namespace: [default]  failed.
        at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
        at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:229)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:162)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$1.apply(ExecutorPodsAllocator.scala:57)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$1.apply(ExecutorPodsAllocator.scala:55)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.<init>(ExecutorPodsAllocator.scala:55)
        at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:89)
        at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2788)
        ... 13 more
Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again
        at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
        at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
        at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
        at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
        at java.net.InetAddress.getAllByName(InetAddress.java:1193)
        at java.net.InetAddress.getAllByName(InetAddress.java:1127)
        at okhttp3.Dns$1.lookup(Dns.java:39)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:119)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:110)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
        at okhttp3.RealCall.execute(RealCall.java:69)
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:404)
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:365)
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:330)
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:311)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleGet(BaseOperation.java:810)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:218)
        ... 20 more

我做错了什么

helloWorkd脚本:


from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession\
        .builder\
        .appName("PythonEx")\
        .getOrCreate()
    txt = spark.sparkContext.textFile('hdfs://172.20.234.174:1515/testing/testFile.txt')
    first = txt.first()
    spark.sparkContext.parallelize(first).saveAsTextFile('hdfs://172.20.234.174:9000/testing/result.txt')
    spark.stop()

Tags: ioorgclienthttpapachejavakubernetesat
1条回答
网友
1楼 · 发布于 2024-06-28 22:00:26

您的错误跟踪显示:

Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again

这意味着由于某种原因,您在命名空间default中没有服务kubernetes,或者您的集群中存在与DNS相关的问题

此外,已经与你讨论了这个问题

相关问题 更多 >