Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Added on July 11, 2022 5:26PM
Likes: 0
Replies: 3
Hi Team,
I'm using pyspark with Dataiku after processing the data, I'm facing an issue with writing the data to the output. Could you please suggest an efficient way to write the data to the output? Dataset size: 40million(approx.) Getting Error at line 15 while writing(as the data is massive)
#Recipe
1 import dataiku
2 from dataiku import spark as dkuspark
3 from pyspark import SparkContext
4 from pyspark.sql import SQLContext, SparkSession
5 sc = SparkSession.builder.enableHiveSupport().getOrCreate()
6 sqlContext = SQLContext(sc)
# Read recipe inputs
7 Table_A= dataiku.Dataset("A")
8 Table_A_df = dkuspark.get_dataframe(sqlContext, Table_A)
9 Table_B= dataiku.Dataset("B")
10 Table_B_df = dkuspark.get_dataframe(sqlContext, Table_B)
11 live_fact_table_df.createOrReplaceTempView("Table_A_df ")
12 live_date_dimension_df.createOrReplaceTempView("Table_B_df ")
#Preprocessing
13 output_df= sqlContext.sql(f""" SELECT * FROM Table_A INNER JOIN Table_B ON Table_A.id= Table_B.id""")
# Write recipe outputs
14 output = dataiku.Dataset("output")
15 dkuspark.write_with_schema(output, output_df)
Note: output_df is a pyspark dataframe
Hi @vaibhavsoni0017
,
From the sounds of your will need to repartition your input dataset you can do this with
1) DSS partitions can be levered with Spark as well if your input dataset can be partitioned by the date you should be able to build the latest partition e.g LAST_DAY thus reducing the number of rows you need to process and write each time.
https://doc.dataiku.com/dss/latest/code_recipes/pyspark.html#anatomy-of-a-basic-pyspark-recipe
2) You may also use spark to repartition large dataset df.repartition(number_of_partitions)
As suggested here:https://doc.dataiku.com/dss/latest/spark/datasets.html#other
Let us know if either of these approaches helps.
Hi @AlexT
, Many thanks for your response,
I tried with responded options but ended up with a similar partition. Tried partitioning the dataset as a string and a date format. Please find the attached related snapshot.
Note: Input dataset fetches from Hive storage.
If these are HDFS dataset splitting the data into multiple partition should be done automatically. As explained here :
https://doc.dataiku.com/dss/latest/spark/datasets.html#interacting-with-dss-datasets
If this is failing for you can you please open a support ticket with the job diagnostics so we can look into this?
Thanks