not able to merge spark scala code in spark pipeline

Options
AshishM
AshishM Registered Posts: 4 ✭✭✭✭

we are merging our spark scala code with spark pipeline, however if i run the code step individually it runs fine (in both function mode / free mode) but when it is been merged in spark pipeline it give error

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace:, caused by: ClassNotFoundException: CustomScalaRecipe$$anonfun$1

scala code:


import org.apache.spark.sql.functions._<BR /><BR />// Recipe inputs<BR />val iups_prepared_joined_by_time_cell_joined = inputDatasets("iups_prepared_joined_by_time_cell_joined")<BR /><BR /><BR />def redaction(data:String, d_cnt: Int): String = {<BR /> <BR /> var data_trim = "" ;<BR /> if (data.isEmpty || data =="" || data == null){<BR /> return data ;<BR /> }else { <BR /> data_trim = data.trim() ;<BR /> }<BR /> <BR /> if (d_cnt == 1){<BR /> var data_length = data_trim.length();<BR /> return data_trim.replaceAll(data_trim, "*"*data_length)<BR /> }<BR /> else if (d_cnt> 5){<BR /> return data_trim<BR /> }<BR /> else {<BR /> val redac_length = 6-d_cnt<BR /> val length_r = Seq(redac_length, data_trim.length()).min<BR /> return data_trim.slice(0,data_trim.length()-length_r) + "*"*(length_r+3)<BR /> }<BR /> <BR /> }<BR /><BR />val redaction_udf = udf((data: String,d_cnt:Int) => redaction(data,d_cnt));<BR /><BR />val iups_pre_anonymized = iups_prepared_joined_by_time_cell_joined.withColumn("price_plan", when(col("price_plan").isNull, lit(null)).otherwise(redaction_udf(col("price_plan"),col("price_plan_distinct"))))<BR />// Recipe outputs<BR /><BR />Map("iups_pre_anonymized" -> iups_pre_anonymized)

Tagged:

Best Answer

  • AdrienL
    AdrienL Dataiker, Alpha Tester Posts: 196 Dataiker
    Answer ✓
    Options

    Hello,

    Thanks for reporting this, it seems to be a bug in the way spark pipelines and spark serialization work, I'll check to see if we can fix that in a future DSS version.

    In the meantime, you can workaround it by either:

    • (Simplest) disabling pipelining for this recipe, in the recipe's Advanced tab
    • Converting the UDF to sparkSQL (a bit difficult for your use case)
    • Compiling your UDF outside of DSS and place the resulting jar in <DATA_DIR>/lib/java, and call the UDF from your code

    Best

Answers

Setup Info
    Tags
      Help me…