Try your hand at analyzing royal sentiment in Dataiku DSS! Learn more

not able to merge spark scala code in spark pipeline

Level 1
not able to merge spark scala code in spark pipeline

we are merging our spark scala code with spark pipeline, however if i run the code step individually it runs fine (in both function mode / free mode) but when it is been merged in spark pipeline it give error

 



java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace:, caused by: ClassNotFoundException: CustomScalaRecipe$$anonfun$1



 



 



scala code:

 




import org.apache.spark.sql.functions._

// Recipe inputs
val iups_prepared_joined_by_time_cell_joined = inputDatasets("iups_prepared_joined_by_time_cell_joined")


def redaction(data:String, d_cnt: Int): String = {

var data_trim = "" ;
if (data.isEmpty || data =="" || data == null){
return data ;
}else {
data_trim = data.trim() ;
}

if (d_cnt == 1){
var data_length = data_trim.length();
return data_trim.replaceAll(data_trim, "*"*data_length)
}
else if (d_cnt> 5){
return data_trim
}
else {
val redac_length = 6-d_cnt
val length_r = Seq(redac_length, data_trim.length()).min
return data_trim.slice(0,data_trim.length()-length_r) + "*"*(length_r+3)
}

}

val redaction_udf = udf((data: String,d_cnt:Int) => redaction(data,d_cnt));

val iups_pre_anonymized = iups_prepared_joined_by_time_cell_joined.withColumn("price_plan", when(col("price_plan").isNull, lit(null)).otherwise(redaction_udf(col("price_plan"),col("price_plan_distinct"))))
// Recipe outputs

Map("iups_pre_anonymized" -> iups_pre_anonymized)


 

0 Kudos
4 Replies
Dataiker
Dataiker

 



Hello,



Thanks for reporting this, it seems to be a bug in the way spark pipelines and spark serialization work, I'll check to see if we can fix that in a future DSS version.



In the meantime, you can workaround it by either:




  • (Simplest) disabling pipelining for this recipe, in the recipe's Advanced tab

  • Converting the UDF to sparkSQL (a bit difficult for your use case)

  • Compiling your UDF outside of DSS and place the resulting jar in <DATA_DIR>/lib/java, and call the UDF from your code



Best

0 Kudos
Level 1
Author
Thanks for the answer Andrien, Hopefully it will be fixed in future releases.

Meanwhile i have one more question, why my spark pipeline job runs only 8 tasks in parallel even though i can run 1200 tasks in parallel. Any clue about this and how can improve it
0 Kudos
Dataiker
Dataiker
Are talking about DSS activities or Spark tasks in a single spark job? Where did you get those 8 / 1200 numbers?
0 Kudos
Level 1
Author
hi Adrien,

i have created another question for this since its completely different issue, please use below link for the question

https://answers.dataiku.com/5883/pipeline-running-spark-tasks-whereas-spark-recipe-culster
0 Kudos
Labels (2)