PySpark notebook env

TimLC
TimLC Dataiku DSS Core Designer, Registered Posts: 1
edited July 16 in Using Dataiku

Hello,

I'm trying to use the dataiku platform to run pyspark code in a notebook but I still get the following error (below). I tried with Python versions from 3.6 to 3.10 but it doesn't work.

The error appears after initializing my Spark session, when I want to use PySpark functions.

Py4JJavaError: An error occurred while calling o74.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (10.147.253.101 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ml-app/INSTALL/spark-standalone-home/python/lib/pyspark.zip/pyspark/worker.py", line 547, in main
    % ("%d.%d" % sys.version_info[:2], version)
RuntimeError: Python in worker has different version 3.7 than that in driver 3.10, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at scala.collection.AbstractIterator.to(Iterator.scala:1431)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
    at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1470)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
    at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1470)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1443)
    at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1578)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
    at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1578)
    at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:960)
    at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:722)
    at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
    at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:704)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)

Answers

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,225 Dataiker

    Hi,
    You seem to be using a local Spark standalone installation and not Spark-on-k8s. Is this correct? Your spark config would have spark.master set to local[1] or similar?


    To use a specific Python code environment in your Pyspark notebook, you must build the code environment for Spark-on-K8s from here:

    Screenshot 2023-11-28 at 2.08.26 PM.png

    If indeed you are using local spark, then the Python used will be the built-in code env unless absolutely necessary you should stick with using the built-in code env, which should be the same version as the spark driver python version.

    Otherwise, you can try to adjust the recipe-level spark configuration to match the actual version of the env you are trying to use:

    spark.pyspark.python -> /dataiku/app/design/pyenv/bin/python




  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,225 Dataiker
    edited July 17

    Also should have noted for notebook the spark config is determined by the setting under:
    Screenshot 2023-11-28 at 2.22.11 PM.png

    You can override notebook level when creating the spark context:


    from pyspark import SparkContext
    from pyspark.sql import SQLContext, SparkSession
    
    spark_session = (
        SparkSession
        .builder
        .config("spark.executor.memory", "4g")
        .config("spark.executor.cores", "1")
        .config("spark.pyspark.python", "/change_with_actual_path/python"
        .getOrCreate()
    )
    sqlContext = SQLContext(spark_session)
Setup Info
    Tags
      Help me…