Bug when updating a flow with partitioned data ?
We have been using partitions in order to archive data but we started noticing some weird behaviours, as if the partitioning configuration was being ignored when updating our flow.
Here is a workflow demo :
The table « screenshot_data » changes regularly, and we want to save a version of it at a given frequency (in our case every month).
step 1
For the demo, suppose « screenshot_data » starts with the following data :
For reproducibility purposes, the SparkSQL code used to generate this simple table is the following :
SELECT
1 AS N
,1 AS N_2_partition
We then want to archive this version of « screenshot_data » into a partition of the « archived_data_partitioned » table. Having anticipated this, we created a « N_2_partition » column in the input data. This column is then used as a partitioning column in the « archived_data_partitioned » table as shown in the following screenshot :
In the sync recipe (which is meant to save a copy of our « screenshot_data ») we configure the partitions to be built to the last value of « N_2_partition » (here, as there is only one row, it is trivially set at 1).
Note that :
- the option « redispatch partitioning according to input columns » is deactivated as we want to create (or update) a partition containing a full copy of our input data, and not split the input data into partitions according to the input column
- the option « append instead of overwrite » is also deactived because we want to update a partition if it has already been created
From there, once we run the sync recipe, everything works fine : we get a single new partition containing the data in our « archived_data_partitioned » table :
step 2
Now, suppose a new row is added to our « screenshot_data » as such :
For reproducibility purpose, here is the SparkSQL code adding the new row :
SELECT
1 AS N
,1 AS N_2_partition
UNION ALL
SELECT
2 AS N
,2 AS N_2_partition
We then run the sync recipe by pointing to the last value of the column « N_2_partition » :
At this point, everything is still fine as we get our new copy of « screenshot_data » in an additional partition :
The records in each archive are behaving well :
- 1 record in the first partition
- 2 records in the second partitions
And now for the fun part
step 3
Suppose our « screenshot_data » is updated with a third additional row :
For reproducibility purposes, here is the SparkSQL code to generate this data :
SELECT
1 AS N
,1 AS N_2_partition
UNION ALL
SELECT
2 AS N
,2 AS N_2_partition
UNION ALL
SELECT
3 AS N
,3 AS N_2_partition
Suppose we also have another new table after the partitioned dataset (here, we just add a « SELECT * » to retrieve all the data in an unpartitioned format) :
As our screenshot data has changed for the third time, the sync recipe partition configuration is updated to add a third partition :
We now build the new table « archived_data_unpartitioned » table using a recursive build with a smart reconstruction (these 2 steps - specifying the partition in the sync recipe + building the last table using a recursive build – are typically defined inside a scenario).
And whatch what happens in the partitioned dataset :
Not only has the 3rd partition not been built, but additionnaly the two previous partitions have been overwritten by the last version of our « screenshot_data » dataset !
It is as if dataiku ignored our partition configuration in the sync recipe.
This problem has been tricky for us because the behaviour we have been testing worked when we have been manually running the sync recipe, but our all archives have been overwritten when we started automating things with scenarios (without being notified by errors of course
Side-note : I investigated a work-around solution by using a python recipe instead of a sync recipe, but to date I have not managed to make things work properly. See my reply in thread : https://community.dataiku.com/t5/Using-Dataiku/Writing-to-partitions-Partitions/m-p/18028
Operating system used: windows 10
Best Answer
-
Tanguy Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer, Neuron 2023 Posts: 118 Neuron
Thanks to dataiku's support team (thank you Clément Stenac
), I finally understood why dataiku was behaving this way.
TLDR:
with a recursive build, dataiku indeed ignores the partition configuration in the sync recipe. Instead, dataiku relies on its dependency management which, in this case, leads to rebuild all the partitions.
In detail:
The last recipe builds a non partitioned (NP) dataset from a (P) partitioned dataset. In this configuration, dataiku by default applies the "ALL AVAILABLE" dependency between the P dataset and the NP dataset (which, at least in this case, makes sense as I want to retrieve all the partitions).
See the following screenshot to visualize the dependency function between the P dataset and the NP last dataset:So when asking dataiku to build the last dataset in a recursive format (e.g. with a smart build), it will seek to rebuild all partitions in the previous dataset.
So when arriving to the sync recipe, the partitions will be rebuilt by using the version of the first dataset (recall that the redispatch option is deactivated in the sync recipe so the NP input dataset will overwride each partition of the P ouptut dataset).
Apart from using dataiku's api (as I have in my other reply), one can prevent rebuilding the P dataset in a recursive fashion by setting its building behaviour to "explicit". However, this will prevent dataiku from updating the flow from the P dataset.
Note that when solving the flow dependencies, dataiku restricts itself to existing partitions, so it will not try to build a new partition (as configured in the sync dataset).
Answers
-
Tanguy Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer, Neuron 2023 Posts: 118 Neuron
So I eventually managed to perform partitioning using dataiku's api, but this did not help solve the problem. Enclosed is the code in any case you are interested.
The only advantage in using python was to perform a test verifying that the correct partition is being built. Raising an error can then prevent old partitions from being overwritten (but having a job ending full of errors because of all the partitions that failed to pass the test is not what I would call a solution
).