PySpark notebook env

Options
TimLC
TimLC Dataiku DSS Core Designer, Registered Posts: 1

Hello,

I'm trying to use the dataiku platform to run pyspark code in a notebook but I still get the following error (below). I tried with Python versions from 3.6 to 3.10 but it doesn't work.

The error appears after initializing my Spark session, when I want to use PySpark functions.

Py4JJavaError: An error occurred while calling o74.fit.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (10.147.253.101 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "/home/ml-app/INSTALL/spark-standalone-home/python/lib/pyspark.zip/pyspark/worker.py", line 547, in main% ("%d.%d" % sys.version_info[:2], version)RuntimeError: Python in worker has different version 3.7 than that in driver 3.10, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)at scala.collection.Iterator.foreach(Iterator.scala:943)at scala.collection.Iterator.foreach$(Iterator.scala:943)at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)at scala.collection.AbstractIterator.to(Iterator.scala:1431)at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1470)at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:136)at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)at java.base/java.lang.Thread.run(Thread.java:833)Driver stacktrace:at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)at scala.Option.foreach(Option.scala:407)at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1470)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)at org.apache.spark.rdd.RDD.take(RDD.scala:1443)at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1578)at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1578)at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:960)at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:722)at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)at scala.util.Try$.apply(Try.scala:213)at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:704)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.base/java.lang.reflect.Method.invoke(Method.java:568)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:282)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)at py4j.ClientServerConnection.run(ClientServerConnection.java:106)at java.base/java.lang.Thread.run(Thread.java:833)

Answers

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,209 Dataiker
    Options

    Hi,
    You seem to be using a local Spark standalone installation and not Spark-on-k8s. Is this correct? Your spark config would have spark.master set to local[1] or similar?


    To use a specific Python code environment in your Pyspark notebook, you must build the code environment for Spark-on-K8s from here:

    Screenshot 2023-11-28 at 2.08.26 PM.png

    If indeed you are using local spark, then the Python used will be the built-in code env unless absolutely necessary you should stick with using the built-in code env, which should be the same version as the spark driver python version.

    Otherwise, you can try to adjust the recipe-level spark configuration to match the actual version of the env you are trying to use:

    spark.pyspark.python -> /dataiku/app/design/pyenv/bin/python




  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,209 Dataiker
    Options

    Also should have noted for notebook the spark config is determined by the setting under:
    Screenshot 2023-11-28 at 2.22.11 PM.png

    You can override notebook level when creating the spark context:


    from pyspark import SparkContextfrom pyspark.sql import SQLContext, SparkSessionspark_session = (SparkSession.builder.config("spark.executor.memory", "4g").config("spark.executor.cores", "1").config("spark.pyspark.python", "/change_with_actual_path/python".getOrCreate())sqlContext = SQLContext(spark_session)
Setup Info
    Tags
      Help me…