Creating many jobs with a python recipe
Hi folks!
So I have a scenario in which I have a block of python code which performs a task. I also have a list of items, and I want to run the same job for each item in the list, the exact same code, just with the item name injected in.
I could do with with a python loop, but my list contains about 30 items, and if one fails, the whole job fails and so does every other item in the list. My preference would be to create a DSS job for each item in the my list.
One way I thought about doing this is with a scenario and project variables, the variable can contain the item I want to run the code on, so the steps would be:
- Set variable to item 1
- Run code for item 1
- Set variable to item 2
- Run code for item 2
- ...
As you can imagine this scenario would be pretty large, and all the variables would have to be set manually.
Is there a better, more dynamic way to tackle a problem like this in DSS?
Ben
Best Answer
-
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 321 Neuron
OK, coincidentally I ended up needing to do pretty much this task today. Here is an custom python scenario that builds a dataset multiple times, once for each value of two variables. I'm using the scenario build_dataset method rather than running jobs as I had suggested earlier. Definitely a simpler approach.
from __future__ import absolute_import, division, print_function from datetime import date from dateutil.relativedelta import relativedelta import dataiku from dataiku.scenario import Scenario # Set control variables begin_dt = date(2016, 1, 1) end_dt = date(2020, 10, 22) month_increment = 3 dataset_name = 'CORE_FEATURES' # Step through time period in month_increment chunks scenario = Scenario() chunk_begin_dt = begin_dt done = False while not done: chunk_end_dt = chunk_begin_dt + relativedelta(months=month_increment, days=-1) if chunk_end_dt >= end_dt: chunk_end_dt = end_dt done = True print("-"*40) print('Running recursive build of {} for the period {} to {}'.format(dataset_name, chunk_begin_dt, chunk_end_dt)) print("-"*40) # Update variables chunk_begin_dt_spec = "date '" + chunk_begin_dt.strftime("%Y-%m-%d") + "'" chunk_end_dt_spec = "date '" + chunk_end_dt.strftime("%Y-%m-%d") + "'" scenario.set_scenario_variables(coreFeaturesUpdateFromDate=chunk_begin_dt_spec, coreFeaturesUpdateToDate=chunk_end_dt_spec) # Run dataset build scenario.build_dataset(dataset_name, build_mode='RECURSIVE_FORCED_BUILD', fail_fatal=True) # fail entire scenario if any build fails chunk_begin_dt = chunk_end_dt + relativedelta(days=1)
Hope this helps.
Marlan
Answers
-
tim-wright Partner, L2 Designer, Snowflake Advanced, Neuron 2020, Registered, Neuron 2021, Neuron 2022 Posts: 77 Partner
@ben_p
Can you use some exception handling within Python? Something like bellow:for item in list_of_items:
try:
# Do something to your item (this logic will be the same for all items)
except AnticipatedException as e: # Catch expected failures (exceptions) you dont want to kill the scenario
# Do something for the item when there was a failure in the "try". Could be `pass` but more likely will include some logic to record that information or trigger other actions with Dataiku API
finally:
# any cleanup code you need regardless of the success of the try statement
-
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 321 Neuron
Hi @ben_p
,If you can just use Python for the task then @tim-wright
's suggestion seems like a good option.If you want to execute a DSS job each time (also in a Python loop), here's some example code that shows how:
import dataiku client = dataiku.api_client() project = client.get_project(dataiku.default_project_key()) job_def = project.new_job('NON_RECURSIVE_FORCED_BUILD') job_def.with_output(nz_work_dataset_name) project.start_job_and_wait(job_def.get_definition())
Note that this came from a process written for DSS 6 and hopefully updated to DSS 8. I say hopefully because I don't have 8 yet and so can't test it there. I updated per changes in documentation.
The other step of course would be updating a project variable via Python before each job start. I think there are examples of this on the board if you need it or I have an example I can share too.
Marlan
-
ben_p Neuron 2020, Registered, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant Posts: 143 ✭✭✭✭✭✭✭
Hi @Marlan
,Just getting around to putting this together and I have a couple of question - does the scenario used I the code have to already exist? Same question for the dataset too, are you creating these as 'shells' then automating them in your code?
Ben
-
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 321 Neuron
Hi @ben_p
,I'm not sure I'm understanding your first question. The code I shared would go into the script part of a custom python scenario. The Scenario() call in the script gets a reference to the currently executing scenario object.
Yes, the dataset that is getting built each time already exists in my project. In this case I was planning to use a regular scenario to rebuild the dataset but ran into issues with the jdbc driver throwing an error if number of records processed exceeds 2.1 billion (i.e., the max of integer value). So had to update the table in chunks.
You could certainly create datasets and associated recipes using the API as well. If you need to do that, I have some example code I can share.
Marlan
-
ben_p Neuron 2020, Registered, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant Posts: 143 ✭✭✭✭✭✭✭
Hi @Marlan
I am a little farther ahead in my understanding now - I didn't consider the differences between the scenario API and the public API.
What I would like to do is change a project variable inside a python recipe step in my flow. Is this possible?
Ben
-
ben_p Neuron 2020, Registered, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant Posts: 143 ✭✭✭✭✭✭✭
Finally got there! I worked out how to correctly set a project variable, which I am using inside a loop to create multiple jobs. Thanks @Marlan
for all your guidance here!Here is a snippet of my code should anyone else every find this useful:
project = client.get_project(dataiku.default_project_key()) proj_vars = project.get_variables() audiences = proj_vars['standard']['aud_name'] # get the dataset dataset = project.get_dataset('test_bq_output') # senario = Scenario() scenario = project.get_scenario("testscenario") for x in audiences: print(x) proj_vars['standard']['current_aud'] = x project.set_variables(proj_vars) # Run dataset build job = dataset.build() dataset.build(job_type="RECURSIVE_FORCED_BUILD")
-
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 321 Neuron
Glad you got it figured out!
Marlan
-
It only trigger the job once and the code exits. Do you see similar issue?