Bug when updating a flow with partitioned data ?

Tanguy
Tanguy Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer, Neuron 2023 Posts: 124 Neuron
edited July 16 in Using Dataiku

We have been using partitions in order to archive data but we started noticing some weird behaviours, as if the partitioning configuration was being ignored when updating our flow.

Here is a workflow demo :

1.jpgThe table « screenshot_data » changes regularly, and we want to save a version of it at a given frequency (in our case every month).

step 1

For the demo, suppose « screenshot_data » starts with the following data :

2.jpg

For reproducibility purposes, the SparkSQL code used to generate this simple table is the following :

SELECT
   1 AS N
  ,1 AS N_2_partition

We then want to archive this version of « screenshot_data » into a partition of the « archived_data_partitioned » table. Having anticipated this, we created a « N_2_partition » column in the input data. This column is then used as a partitioning column in the « archived_data_partitioned » table as shown in the following screenshot :

3.jpg

In the sync recipe (which is meant to save a copy of our « screenshot_data ») we configure the partitions to be built to the last value of « N_2_partition » (here, as there is only one row, it is trivially set at 1).

4.jpg

Note that :

  • the option « redispatch partitioning according to input columns » is deactivated as we want to create (or update) a partition containing a full copy of our input data, and not split the input data into partitions according to the input column
  • the option « append instead of overwrite » is also deactived because we want to update a partition if it has already been created

From there, once we run the sync recipe, everything works fine : we get a single new partition containing the data in our « archived_data_partitioned » table :

5.jpg

step 2

Now, suppose a new row is added to our « screenshot_data » as such :6.jpg

For reproducibility purpose, here is the SparkSQL code adding the new row :

SELECT
   1 AS N
  ,1 AS N_2_partition
UNION ALL
SELECT
   2 AS N
  ,2 AS N_2_partition

We then run the sync recipe by pointing to the last value of the column « N_2_partition » :7.jpg

At this point, everything is still fine as we get our new copy of « screenshot_data » in an additional partition :

8.jpg

The records in each archive are behaving well :

  • 1 record in the first partition
  • 2 records in the second partitions

And now for the fun part

step 3

Suppose our « screenshot_data » is updated with a third additional row :

9.jpg

For reproducibility purposes, here is the SparkSQL code to generate this data :

SELECT
   1 AS N
  ,1 AS N_2_partition
UNION ALL
SELECT
   2 AS N
  ,2 AS N_2_partition
UNION ALL
SELECT
   3 AS N
  ,3 AS N_2_partition

Suppose we also have another new table after the partitioned dataset (here, we just add a « SELECT * » to retrieve all the data in an unpartitioned format) :10.jpg

As our screenshot data has changed for the third time, the sync recipe partition configuration is updated to add a third partition :

11.jpg

We now build the new table « archived_data_unpartitioned » table using a recursive build with a smart reconstruction (these 2 steps - specifying the partition in the sync recipe + building the last table using a recursive build – are typically defined inside a scenario).

12.jpg

And whatch what happens in the partitioned dataset :

13.jpgNot only has the 3rd partition not been built, but additionnaly the two previous partitions have been overwritten by the last version of our « screenshot_data » dataset !

It is as if dataiku ignored our partition configuration in the sync recipe.

This problem has been tricky for us because the behaviour we have been testing worked when we have been manually running the sync recipe, but our all archives have been overwritten when we started automating things with scenarios (without being notified by errors of course ). This is a serious issue as it may result in an unrecoverable data loss...

Side-note : I investigated a work-around solution by using a python recipe instead of a sync recipe, but to date I have not managed to make things work properly. See my reply in thread : https://community.dataiku.com/t5/Using-Dataiku/Writing-to-partitions-Partitions/m-p/18028


Operating system used: windows 10

Tagged:

Best Answer

  • Tanguy
    Tanguy Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer, Neuron 2023 Posts: 124 Neuron
    Answer ✓

    Thanks to dataiku's support team (thank you Clément Stenac ), I finally understood why dataiku was behaving this way.

    TLDR:
    with a recursive build, dataiku indeed ignores the partition configuration in the sync recipe. Instead, dataiku relies on its dependency management which, in this case, leads to rebuild all the partitions.

    In detail:
    The last recipe builds a non partitioned (NP) dataset from a (P) partitioned dataset. In this configuration, dataiku by default applies the "ALL AVAILABLE" dependency between the P dataset and the NP dataset (which, at least in this case, makes sense as I want to retrieve all the partitions).

    See the following screenshot to visualize the dependency function between the P dataset and the NP last dataset:
    14.jpg

    So when asking dataiku to build the last dataset in a recursive format (e.g. with a smart build), it will seek to rebuild all partitions in the previous dataset.
    15.jpg

    So when arriving to the sync recipe, the partitions will be rebuilt by using the version of the first dataset (recall that the redispatch option is deactivated in the sync recipe so the NP input dataset will overwride each partition of the P ouptut dataset).

    Apart from using dataiku's api (as I have in my other reply), one can prevent rebuilding the P dataset in a recursive fashion by setting its building behaviour to "explicit". However, this will prevent dataiku from updating the flow from the P dataset.

    Note that when solving the flow dependencies, dataiku restricts itself to existing partitions, so it will not try to build a new partition (as configured in the sync dataset).

Answers

  • Tanguy
    Tanguy Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer, Neuron 2023 Posts: 124 Neuron

    So I eventually managed to perform partitioning using dataiku's api, but this did not help solve the problem. Enclosed is the code in any case you are interested.

    The only advantage in using python was to perform a test verifying that the correct partition is being built. Raising an error can then prevent old partitions from being overwritten (but having a job ending full of errors because of all the partitions that failed to pass the test is not what I would call a solution ).

  • CoreyS
    CoreyS Dataiker Alumni, Dataiku DSS Core Designer, Dataiku DSS Core Concepts, Registered Posts: 1,150 ✭✭✭✭✭✭✭✭✭

    Thank you for sharing your solution with us @tanguy
    . This entire post thread demonstrates a level of detail and transparency that no doubt will be helpful to other users!

Setup Info
    Tags
      Help me…