Reading a file with 40M+ records using PySpark
Hello,
I'm trying to read a file having 40M+ records and around 70 columns. After reading the file, when I'm trying to display the record count using df.count() method, it is taking loads of time to execute. Last time I checked, the statement was executing for 30+ minutes with no output.
I'm very new to the Dataiku platform and just wanted to understand if there is any limitation of processing large sized filed with the platform? Do we need any infrastructure upgrade for processing large sized files?
Regards,
RB
Answers
-
Hi,
It's important to recognize here that Dataiku will simply execute whatever Spark code you ask it to run on the Spark cluster you have configured. In the Spark code you've described, the operations before
count
are "lazy" and only register a transformation, rather than actually force a computation.When you call
count
, the computation is triggered. This is when Spark reads your data, performs all previously-registered transformations and calculates the result that you requested (in this case acount
).Here, that process is taking quite a while. Some options for speeding it up:
- Ensure your data is stored in a data store like HDFS or a cloud bucket (S3, ADLS, GCS) which Spark can read from in a distributed fashion.
- If it is not stored in one of the above options, ensure that your data is being distributed across your Spark cluster. You can read more here: https://stackoverflow.com/questions/38084293/spark-how-does-it-distribute-data-around-the-nodes
- Ensure that your cluster is well sized and your spark configuration is well set.
-
Hi Jediv,
Thanks a lot for the informative response. Looks like we already have a distributed environment and might have to update the memory, core settings. For carrying out computation on ~ 1 billion records, is there any recommended setting in Dataiku? I was thinking of playing around by manually setting up some of the configuration parameters and running the script just to get an idea about the execution time. I'm currently using the following:
spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')Thanks once again for the support.
Regards,
RB