Update table schema as part of a scenario

Solved!
Pascal_B
Level 2
Update table schema as part of a scenario

Hello,
I am playing scenarios that update a part of my flow, based on some global variables
As part of the computation involved at some step of the process, there is a varying number of columns in some of the datased generated.
How can we have an update of the scheme of the tables that are concerned as part of a scenario so the scenario can run till the end ?
Thanks for your help
Pascal

1 Solution
tanguy

Haven't tried your solution, but updating the schema via the (SQL) recipe rather than directly on the dataset worked (and sounds simpler ;-)):

 

import dataiku

project_key = dataiku.get_custom_variables()['projectKey']
project = dataiku.api_client().get_project(project_key)

# apply schema updates before running recipe 
recipe = project.get_recipe('RECIPE_NAME')
recipe.compute_schema_updates().apply()
recipe.run()

 

 Thank you for your help!

View solution in original post

5 Replies
AlexT
Dataiker

Hi Pascal,

In DSS 9 we introduced the "Propagate schema" step within a scenario. Which should help with your case : 

https://doc.dataiku.com/dss/latest/release_notes/9.0.html#schema-reload-and-propagation-as-scenario-...

Screenshot 2021-07-26 at 16.22.54.png

This would behave in the same manner as the schema propagation tool explained here : 

https://knowledge.dataiku.com/latest/courses/flow-views-and-actions/schema-propagation-concept-summa...

Pascal_B
Level 2
Author

Hello Alex, thanks very much for the prompt answer.

Unfortunately we are on V8 for the next two months.

What could be done on that regard based on V8 features ?

0 Kudos
tanguy

Hi Alex,

Thanks for the quick answer. I am working with Pascal (the OP) and unfortunately we currently only have DSS v8 (so if am correct the "Propagate schema" is not included in our scenario steps).

Instead, I was investigating the dataiku api using `DSSDataset.settings.autodetect_settings().save()`. However this solution does not seem to work. It is as if dataiku caches the previous schema and returns it with the `autodetect_settings()` method (although the schema should obviously change). Even after deleting the columns schema (as suggested here), dataiku is still returning me the last schema the table had.

The current configuration of our problem is the following:

  • the output dataset (for which we would like the automatic schema detection feature) is an S3 dataset
  • it is built using JOINs in an SQL recipe which runs on AWS Athena
  • there are several S3 tables as inputs, of which one has a changing schema

The schema update works perfectly fine using the dataiku's GUI (for example by clicking on the "validate schema" button of the SQL recipe), but as suggested above, we fail to do this programmatically.

0 Kudos
AlexT
Dataiker

Hi,

Just to clarify does the dataset with the changing schema actually change the schema or does it actually contain multiple schemas?

When new files are added to S3 do they contain new column? . In that case, the autodetect_settings will use pick up the first file( hence the behavior you are describing) 

If you don't know the exact file name with the latest schema then it wouldn't be possible to automate this. If you do not this or can determinate it by date for example then you could do something like : 

dataset = project.get_dataset(INPUT_DATASET)
settings = dataset.get_settings()
settings.get_raw()['schema'] = {"columns":[], 'userModified': False}
settings.get_raw_params()["previewFile"] = CSV_PATH_CSV_NAME # path within s3 dataset and file name of latest file with latest schema /2020/06/01/test.csv
settings.save()
detected_settings = dataset.autodetect_settings()
detected_settings.save()

 

0 Kudos
tanguy

Haven't tried your solution, but updating the schema via the (SQL) recipe rather than directly on the dataset worked (and sounds simpler ;-)):

 

import dataiku

project_key = dataiku.get_custom_variables()['projectKey']
project = dataiku.api_client().get_project(project_key)

# apply schema updates before running recipe 
recipe = project.get_recipe('RECIPE_NAME')
recipe.compute_schema_updates().apply()
recipe.run()

 

 Thank you for your help!