Partition Redispatch S3 parquet dataset using column - how to run optimally?

gt
gt Registered Posts: 6 ✭✭✭

Hi,
I am working on data in S3 which is partitioned by timestamp on filename. I need to repartition the data using a column value as the files contain unordered timestamp data. I tried redispatch partitioning using Sync recipe as mentioned in https://knowledge.dataiku.com/latest/courses/advanced-partitioning/hands-on-file-based-partitioning.html. This approach didn't work for me as I'm running out of memory when running redispatch partitioning even for 2 hrs dataset (~3 GB). Is there a way I can run it on Spark or Snowflake?.

Answers

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
    edited July 17

    Hi @gt
    ,

    It is in indeed the case that redispatch partitioning can only be run on the DSS engine.

    The redispatch partitioning function in DSS is very convenient, but you can also programmatically perform the redispatch. Another option would be to programmatically redispatch your data by separating out your data slices for the new output partition, and then write out your new data slices partition-by-partition. I am including an example of doing this in Python below.

    If you are up for using Snowflake, then this is actually quite an easy method. First, you could simply create a Sync recipe from your S3 dataset to Snowflake using the S3-to-Snowflake engine. In the output Snowflake dataset, you can then actually just set the partitioning value the same way you would as with an external Snowflake table under the dataset connection settings > Activate Partitioning, and then enter in your partitioning dimensions:

    Screen Shot 2022-10-05 at 5.25.05 PM.png

    Any subsequent recipes that use the Snowflake table as input will use the partitioning scheme defined in the Snowflake dataset. So this is an easy method of skipping redispatch partitioning, if switching to Snowflake would work for your use case. With this method, you could have a flow that looks like this:

    Screen Shot 2022-10-05 at 5.32.46 PM.png

    If you needed to pull the S3 filename partitioning information into the dataset prior to syncing to Snowflake, you could also use the enrich processor in a prepare recipe to pull the partition info into the dataset and then partition the Snowflake dataset by the associated column name.  

    The other option is that you could programmatically generate your output partitions via Python through a scenario Python step, with code similar to the below. This would be easiest if you can read in the full dataset though into memory though:

    import dataiku
    
    input_dataset = dataiku.Dataset("YOUR_INPUT_DATASET")
    input_df = input_dataset.get_dataframe()
    
    output = dataiku.Dataset("YOUR_OUTPUT_DATASET")
    
    # here I'm manually setting my partitions based on the 'order_id' column - this logic will vary based on your use case 
    for order in input_df.order_id.unique():
        # retrieving just the dataframe for a partition 
        temp_df = input_df.loc[input_df.order_id == order]
        # if the output dataset does not have a schema yet, you must set the schema before writing the data
        output.write_schema_from_dataframe(temp_df)
        # now, we use set_write_partition with the partition value 
        output.set_write_partition(order)
        # get the writer and use writer.write_dataframe() to write out the individual partition 
        with output.get_writer() as w:
            print("writing out partition ", order)
            w.write_dataframe(temp_df)
    


    This would allow you to generate a non-partitioned dataset -> partitioned dataset without going through the redispatch partitioning function. You should be able to take a similar approach using PySpark as well, if the above two options don't work for you.

    Let me know if you have questions about either approach given your specific use case.

    Thanks,
    Sarina

  • gt
    gt Registered Posts: 6 ✭✭✭

    Hi @SarinaS
    ,

    Thank you for your response.

    In this case, the smallest partitioned data I can read each time is hourly data. Repartitioning each hour data(~7M records) is also slow when using DSS. So, I tried using Pyspark and it's performing better.

    Best,

    Gowtham.

Setup Info
    Tags
      Help me…