Help Shape the Future of Dataiku Join our User Research Program

Example of spark broadcast

bkostya
Level 1
Example of spark broadcast

Hello!
Please help to build a working example of using broadcast on dataiku scala spark. I'm getting NullPointerException. Running from recipe. The goal is to get dfA available per RDD or per partition.

DKU_SPARK_VERSION 2.3.0.2.6.5.106

val dfA = sqlContext.createDataFrame(Seq(
(1L,501L),
(2L,502L)
)).toDF("uid","data")

val dfB = sqlContext.createDataFrame(Seq(
(10L,1001L),
(20L,1002L),
(30L,1003L),
(40L,1004L)
)).toDF("uid","data")

val broadcastA = sparkContext.broadcast(dfA.collect)



dfB
.repartition(2)
.rdd
.foreach(rdd => {
import sqlContext.implicits._
import scala.collection.JavaConverters._

val schema5 = new StructType()
.add(StructField("uid", LongType, false))
.add(StructField("data", LongType, false))

val whoami = sqlContext.getClass.toString // OK, can access sqlContext
val data5 = broadcastA.value.toList.asJava // OK
// Fail here: NullPointerException

val df5 = sqlContext.createDataFrame(data5,schema5)//.toDF("id","data")

})

Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 8, strplpa62gv10.fg.rbc.com, executor 1): java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:457) at CustomScalaRecipe$$anonfun$apply$mcV$sp$1.apply((inline):73) at 
0 Kudos
0 Replies
A banner prompting to get Dataiku DSS