PySpark Recipes persist DataFrame

Tate_fr
Tate_fr Registered Posts: 8 ✭✭✭✭

Hi,

I'm using PySpark Recipes. To reduce the time of execution + reduce memory storage, I would like to use the function:

DataFrame.persist()

DataFrame.unpersist()

But I have this error message: 'Job failed: Pyspark code failed: At line 186: <type 'exceptions.AttributeError'>: 'SparkSession' object has no attribute '_getJavaStorageLevel'

Any idea??? Thank you for your help!

Best Answers

  • Clément_Stenac
    Clément_Stenac Dataiker, Dataiku DSS Core Designer, Registered Posts: 753 Dataiker
    edited July 17 Answer ✓

    It seems that Spark does not like mixing old and new style APIs (SQLContext created from a SparkSession instead of a SparkContext). Could you please try, but instead of creating a SparkSession, you create a SparkContext ?

    sc = SparkContext(conf=config)
    sqlContext = SQLContext(sc)
    df = dkuspark.get_dataframe(sc, dataset)
    
  • Tate_fr
    Tate_fr Registered Posts: 8 ✭✭✭✭
    Answer ✓

    Hi Clément,

    Ok it works great! Just for the futur readers of the post, when you're creating your dataframe, use sqlContext

    df = dkuspark.get_dataframe(sqlContext, dataset)

    Thank you Clément, nice to have the help of the CTO of DSS. It's not always easy to deal with the old and the new version of Spark vs NoteBook / Recipes.

    Best regards! (A bientôt)

Answers

  • Clément_Stenac
    Clément_Stenac Dataiker, Dataiku DSS Core Designer, Registered Posts: 753 Dataiker

    Hi,

    Are you using a SparkSession or a SQLContext to create your dataframes ? Whichever you are using, can you please try with the other one ?

  • Tate_fr
    Tate_fr Registered Posts: 8 ✭✭✭✭

    This is a part of my code:

    import dataiku
    from dataiku import spark as dkuspark
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, SQLContext
    import pyspark

    from pyspark import StorageLevel

    config = pyspark.SparkConf().setAll([(
    'spark.executor.memory', '64g'), (
    'spark.executor.cores', '8'), (
    'spark.cores.max', '8'), (
    'spark.driver.memory','64g')])

    spark = SparkSession.builder.config(conf=config).getOrCreate()
    sc = SQLContext(spark)

    dataset = dataiku.Dataset("my_dataset")
    df = dkuspark.get_dataframe(sc, dataset)

    df.persist(StorageLevel.MEMORY_AND_DISK)

    => I've got an error on the persist function.

    Again thank you for your help.

Setup Info
    Tags
      Help me…