Multiple outputs in a single Python recipe are only writing data from the first dataset
I am trying to wirte multiple outputs out of a single Python recipe, as such:
# Write recipe outputs
user_exclusions_df = dataiku.Dataset("user_exclusions")
user_exclusions_df.write_schema_from_dataframe(excluded)
user_exclusions_df.write_from_dataframe(excluded)
#user_exclusions_df.write_with_schema(excluded)
manual_touches_df = dataiku.Dataset("manual_touches")
manual_touches_df.write_schema_from_dataframe(man_touches)
manual_touches_df.write_from_dataframe(man_touches)
#manual_touches_df.write_with_schema(man_touches)
employee_assignment_df = dataiku.Dataset("employee_assignment")
employee_assignment_df.write_schema_from_dataframe(z)
employee_assignment_df.write_from_dataframe(z)
The datasets reference already exist, and I have confirmed the dataframes exist and contain the desired data. I have also tried the following output format:
manual_touches_df = dataiku.Dataset("manual_touches")
manual_touches_df.write_with_schema(man_touches)
In either case, the first dataset written this way appears correctly, with the correct schema and data. However subsequent datasets, while having the correct columns, have data from the first dataframe insert instead of the data from the dataframe actually reference in the code.
This is occurring in both recipes and notebooks.
Best Answer
-
Hello again rtaylor,
Maybe you can try one thing: if you have got just 1 recipe go to Advanced and set Concurrent activities to 1.
It seems like accessing DSS generates one thread for each output dataset and they access at the same time to the postgresql dataset and it breakes... I'm not sure if DSS works like that (maybe Clément Stenac knows that) and I've never used postgresql dataset in DSS.
Another solution, with isolation, is to create a scenario for running all these datasets (place them in different steps and don't a step if the previous one failed). With this configuration you avoid the problem.
I use to do scenarios to execute whole projects, when you get used to wrok with them, you save time
Answers
-
Hi rtaylor,
Have you added all the outputs to the recipe?
I use to do (and it works):
output_dataset = dataiku.Dataset(output_dataset_name, project_key=project_name, ignore_flow=True)
output_dataset_df = input_dataset.get_dataframe()
output_dataset.write_with_schema(output_dataset_df)
# If you want to modify the schema
output_dataset.write_schema(schema) -
I tried to test this on one of the dataframes I am trying to export to a dataset:
employee_assignment = dataiku.Dataset("employee_assignment", project_key='PRODODATADUMPS', ignore_flow=True)
employee_assignment_df = z.get_dataframe()
employee_assignment_df.write_with_schema(employee_assignment_df)
I think the ignore_flow command is not valid in my version as I get the following error:
Error in Python process: At line 148: : __init__() got an unexpected keyword argument 'ignore_flow'
Type: -
And to confirm, I do have my imports, exports, and datasets in order. Using the methods in my original comment the issue lies with data inserted into the resulting schemas not matching the source dataframes, while the schemas are correct.
-
Also, I am using DSS 5.0.4
-
I tried Alan's answer and ran into an error, please see my follow-up comments.
I also tested splitting the recipe into three copies, and running in parallel. This did not work if the recipes were all run concurrently. However, IF I individually run each recipe in isolation, that appears to work, and output the proper schema and data.
The caveat to this is I am not sure how it will interact with trying to build the entire flow, as the recipes only work when run in isolation. Image below shows the final layout of this portion of the Flow. Each of the final three recipes must be run and allowed to complete before running another.
-
Hi,
Your behavior is extremely weird. How did you create the output datasets ? Are you sure that all 3 output datasets are not accidentally pointing to the same location, or some similar kind of funkiness ? -
I first created the datasets in the Flow. The pandas dataframes and python recipe code were tested in a Notebook, to ensure the dataframes were correct. I then created the initial single recipe withe two inputs (one shown in the image above, one offscreen) and three outputs shown above. I used the export format I usually use:
result_df =
output_df = dataikue.Dataset("dataframe_name")
output_df.write_with_schema(result_df)
I tried that output code for each of the three dfs and datasets. It gave the odd results.
I also tried to export to dataframes with:
output_df = dataiku.Dataset("dataset_name")
output_df_df.write_schema_from_dataframe(results_df)
output_df.write_from_dataframe(results_df)
That also gave odd results.
I then split the recipe into three copies, each with both inputs and a single output. That also gave odd results if I ran the recipes at the same time.
This morning I decided to just run each recipe in isolation, and that... worked. -
This worked!
In case anyone is confused, within a Python Recipe, go to the Advanced tab and set the Concurrent Activities value to 1.
Based on this, I assume multiple dataset write actions within a recipe, if allowed to run concurrently, can display the issue Alan found in sql. In my case, this was with Hive files. -
-
I have a similar use case. Since I don't have access to our corporate license, I am testing the free version. I think partitioning by a discrete column value would solve my problem, but partitioning is not allowed on the free license.
2nd potential option: I'm familiar with Pandas' funcitonality of referencing a groupby as all of its records, not just as an aggregation. As I've tried Grouping in a flow in DSS without code, I don't see a way to access all the records. I only see aggregations. I would hope that Dataiku Group recipe could accomplish this:
df=pd.DataFrame([[1,2,'A'],[2,3,'A'],[1,2,'B'],[2,3,'B']],columns=['x','y','grp'])
g=df.groupby('grp')
dfg=g.get_group('A')
dfg
3rd potential option: a short piece of Python code as above.
Can anyone provide a successful example of splitting one dataset into several with the 2nd or 3rd option? Other ideas?