Partition Redispatch S3 parquet dataset using column - how to run optimally?
Hi,
I am working on data in S3 which is partitioned by timestamp on filename. I need to repartition the data using a column value as the files contain unordered timestamp data. I tried redispatch partitioning using Sync recipe as mentioned in https://knowledge.dataiku.com/latest/courses/advanced-partitioning/hands-on-file-based-partitioning.html. This approach didn't work for me as I'm running out of memory when running redispatch partitioning even for 2 hrs dataset (~3 GB). Is there a way I can run it on Spark or Snowflake?.
Answers
-
Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
Hi @gt
,
It is in indeed the case that redispatch partitioning can only be run on the DSS engine.
The redispatch partitioning function in DSS is very convenient, but you can also programmatically perform the redispatch. Another option would be to programmatically redispatch your data by separating out your data slices for the new output partition, and then write out your new data slices partition-by-partition. I am including an example of doing this in Python below.
If you are up for using Snowflake, then this is actually quite an easy method. First, you could simply create a Sync recipe from your S3 dataset to Snowflake using the S3-to-Snowflake engine. In the output Snowflake dataset, you can then actually just set the partitioning value the same way you would as with an external Snowflake table under the dataset connection settings > Activate Partitioning, and then enter in your partitioning dimensions:
Any subsequent recipes that use the Snowflake table as input will use the partitioning scheme defined in the Snowflake dataset. So this is an easy method of skipping redispatch partitioning, if switching to Snowflake would work for your use case. With this method, you could have a flow that looks like this:If you needed to pull the S3 filename partitioning information into the dataset prior to syncing to Snowflake, you could also use the enrich processor in a prepare recipe to pull the partition info into the dataset and then partition the Snowflake dataset by the associated column name.
The other option is that you could programmatically generate your output partitions via Python through a scenario Python step, with code similar to the below. This would be easiest if you can read in the full dataset though into memory though:import dataiku input_dataset = dataiku.Dataset("YOUR_INPUT_DATASET") input_df = input_dataset.get_dataframe() output = dataiku.Dataset("YOUR_OUTPUT_DATASET") # here I'm manually setting my partitions based on the 'order_id' column - this logic will vary based on your use case for order in input_df.order_id.unique(): # retrieving just the dataframe for a partition temp_df = input_df.loc[input_df.order_id == order] # if the output dataset does not have a schema yet, you must set the schema before writing the data output.write_schema_from_dataframe(temp_df) # now, we use set_write_partition with the partition value output.set_write_partition(order) # get the writer and use writer.write_dataframe() to write out the individual partition with output.get_writer() as w: print("writing out partition ", order) w.write_dataframe(temp_df)
This would allow you to generate a non-partitioned dataset -> partitioned dataset without going through the redispatch partitioning function. You should be able to take a similar approach using PySpark as well, if the above two options don't work for you.
Let me know if you have questions about either approach given your specific use case.
Thanks,
Sarina -
Hi @SarinaS
,
Thank you for your response.
In this case, the smallest partitioned data I can read each time is hourly data. Repartitioning each hour data(~7M records) is also slow when using DSS. So, I tried using Pyspark and it's performing better.Best,
Gowtham.