Survey banner
Switching to Dataiku - a new area to help users who are transitioning from other tools and diving into Dataiku! CHECK IT OUT

not able to merge spark scala code in spark pipeline

Solved!
AshishM
Level 1
not able to merge spark scala code in spark pipeline

we are merging our spark scala code with spark pipeline, however if i run the code step individually it runs fine (in both function mode / free mode) but when it is been merged in spark pipeline it give error

 



java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace:, caused by: ClassNotFoundException: CustomScalaRecipe$$anonfun$1



 



 



scala code:

 




import org.apache.spark.sql.functions._

// Recipe inputs
val iups_prepared_joined_by_time_cell_joined = inputDatasets("iups_prepared_joined_by_time_cell_joined")


def redaction(data:String, d_cnt: Int): String = {

var data_trim = "" ;
if (data.isEmpty || data =="" || data == null){
return data ;
}else {
data_trim = data.trim() ;
}

if (d_cnt == 1){
var data_length = data_trim.length();
return data_trim.replaceAll(data_trim, "*"*data_length)
}
else if (d_cnt> 5){
return data_trim
}
else {
val redac_length = 6-d_cnt
val length_r = Seq(redac_length, data_trim.length()).min
return data_trim.slice(0,data_trim.length()-length_r) + "*"*(length_r+3)
}

}

val redaction_udf = udf((data: String,d_cnt:Int) => redaction(data,d_cnt));

val iups_pre_anonymized = iups_prepared_joined_by_time_cell_joined.withColumn("price_plan", when(col("price_plan").isNull, lit(null)).otherwise(redaction_udf(col("price_plan"),col("price_plan_distinct"))))
// Recipe outputs

Map("iups_pre_anonymized" -> iups_pre_anonymized)


 

0 Kudos
1 Solution
AdrienL
Dataiker

 



Hello,



Thanks for reporting this, it seems to be a bug in the way spark pipelines and spark serialization work, I'll check to see if we can fix that in a future DSS version.



In the meantime, you can workaround it by either:




  • (Simplest) disabling pipelining for this recipe, in the recipe's Advanced tab

  • Converting the UDF to sparkSQL (a bit difficult for your use case)

  • Compiling your UDF outside of DSS and place the resulting jar in <DATA_DIR>/lib/java, and call the UDF from your code



Best

View solution in original post

0 Kudos
7 Replies
AdrienL
Dataiker

 



Hello,



Thanks for reporting this, it seems to be a bug in the way spark pipelines and spark serialization work, I'll check to see if we can fix that in a future DSS version.



In the meantime, you can workaround it by either:




  • (Simplest) disabling pipelining for this recipe, in the recipe's Advanced tab

  • Converting the UDF to sparkSQL (a bit difficult for your use case)

  • Compiling your UDF outside of DSS and place the resulting jar in <DATA_DIR>/lib/java, and call the UDF from your code



Best

0 Kudos
AshishM
Level 1
Author
Thanks for the answer Andrien, Hopefully it will be fixed in future releases.

Meanwhile i have one more question, why my spark pipeline job runs only 8 tasks in parallel even though i can run 1200 tasks in parallel. Any clue about this and how can improve it
0 Kudos
AdrienL
Dataiker
Are talking about DSS activities or Spark tasks in a single spark job? Where did you get those 8 / 1200 numbers?
0 Kudos
AshishM
Level 1
Author
hi Adrien,

i have created another question for this since its completely different issue, please use below link for the question

https://answers.dataiku.com/5883/pipeline-running-spark-tasks-whereas-spark-recipe-culster
0 Kudos
bkostya
Level 1

Hello Dataiker
Are any solutions available since 2019?  I'm using dataiku 7.0.2 and have that problem of ClassNotFoundException: CustomScalaRecipe$$anonfun$1

0 Kudos
CoreyS
Dataiker Alumni

Hi @bkostya I can confirm that this original issue was was fixed in version 5.1.5. Do you the same distinction between "it runs fine individually" but "it fails in a pipeline"?

Looking for more resources to help you use Dataiku effectively and upskill your knowledge? Check out these great resources: Dataiku Academy | Documentation | Knowledge Base

A reply answered your question? Mark as ‘Accepted Solution’ to help others like you!
0 Kudos
bkostya
Level 1

It fails in a recipe (pipeline). The CustomScalaRecipe caught my eye. Please look at that ticket.

0 Kudos

Labels

?
Labels (2)
A banner prompting to get Dataiku