Creating many jobs with a python recipe

Solved!
ben_p
Level 5
Creating many jobs with a python recipe

Hi folks!

So I have a scenario in which I have a block of python code which performs a task. I also have a list of items, and I want to run the same job for each item in the list, the exact same code, just with the item name injected in.

I could do with with a python loop, but my list contains about 30 items, and if one fails, the whole job fails and so does every other item in the list. My preference would be to create a DSS job for each item in the my list.

One way I thought about doing this is with a scenario and project variables, the variable can contain the item I want to run the code on, so the steps would be:

  1. Set variable to item 1
  2. Run code for item 1
  3. Set variable to item 2
  4. Run code for item 2
  5. ...

As you can imagine this scenario would be pretty large, and all the variables would have to be set manually.

Is there a better, more dynamic way to tackle a problem like this in DSS?

Ben

0 Kudos
1 Solution
Marlan

OK, coincidentally I ended up needing to do pretty much this task today. Here is an custom python scenario that builds a dataset multiple times, once for each value of two variables. I'm using the scenario build_dataset method rather than running jobs as I had suggested earlier. Definitely a simpler approach.

from __future__ import absolute_import, division, print_function
from datetime import date
from dateutil.relativedelta import relativedelta
import dataiku
from dataiku.scenario import Scenario

# Set control variables
begin_dt = date(2016, 1, 1)
end_dt = date(2020, 10, 22)
month_increment = 3
dataset_name = 'CORE_FEATURES'

# Step through time period in month_increment chunks
scenario = Scenario()
chunk_begin_dt = begin_dt
done = False

while not done:
    
    chunk_end_dt = chunk_begin_dt + relativedelta(months=month_increment, days=-1)
    if chunk_end_dt >= end_dt:
        chunk_end_dt = end_dt
        done = True
        
    print("-"*40)    
    print('Running recursive build of {} for the period {} to {}'.format(dataset_name, chunk_begin_dt, chunk_end_dt))
    print("-"*40)    
    
    # Update variables
    chunk_begin_dt_spec = "date '" + chunk_begin_dt.strftime("%Y-%m-%d") + "'"
    chunk_end_dt_spec = "date '" + chunk_end_dt.strftime("%Y-%m-%d") + "'" 
    scenario.set_scenario_variables(coreFeaturesUpdateFromDate=chunk_begin_dt_spec, coreFeaturesUpdateToDate=chunk_end_dt_spec)

    # Run dataset build
    scenario.build_dataset(dataset_name, build_mode='RECURSIVE_FORCED_BUILD', fail_fatal=True) # fail entire scenario if any build fails
    
    chunk_begin_dt = chunk_end_dt + relativedelta(days=1)

 

Hope this helps.

Marlan 

View solution in original post

10 Replies
tim-wright
Level 5

@ben_p   Can you use some exception handling within Python? Something like bellow:

 

for item in list_of_items:

       try:

               # Do something to your item (this logic will be the same for all items)

       except AnticipatedException as e:  # Catch expected failures (exceptions) you dont want to kill the scenario

                # Do something for the item when there was a failure  in the "try". Could be `pass` but more  likely will include some logic to record that information or trigger other actions with Dataiku API

       finally: 

               # any cleanup code you need regardless of the success of the try statement

Marlan

Hi @ben_p,

If you can just use Python for the task then @tim-wright's suggestion seems like a good option.

If you want to execute a DSS job each time (also in a Python loop), here's some example code that shows how:

import dataiku

client = dataiku.api_client()
project = client.get_project(dataiku.default_project_key())

job_def = project.new_job('NON_RECURSIVE_FORCED_BUILD')
job_def.with_output(nz_work_dataset_name)
project.start_job_and_wait(job_def.get_definition())

 

Note that this came from a process written for DSS 6 and hopefully updated to DSS 8. I say hopefully because I don't have 8 yet and so can't test it there. I updated per changes in documentation. 

The other step of course would be updating a project variable via Python before each job start. I think there are examples of this on the board if you need it or I have an example I can share too. 

Marlan

Marlan

OK, coincidentally I ended up needing to do pretty much this task today. Here is an custom python scenario that builds a dataset multiple times, once for each value of two variables. I'm using the scenario build_dataset method rather than running jobs as I had suggested earlier. Definitely a simpler approach.

from __future__ import absolute_import, division, print_function
from datetime import date
from dateutil.relativedelta import relativedelta
import dataiku
from dataiku.scenario import Scenario

# Set control variables
begin_dt = date(2016, 1, 1)
end_dt = date(2020, 10, 22)
month_increment = 3
dataset_name = 'CORE_FEATURES'

# Step through time period in month_increment chunks
scenario = Scenario()
chunk_begin_dt = begin_dt
done = False

while not done:
    
    chunk_end_dt = chunk_begin_dt + relativedelta(months=month_increment, days=-1)
    if chunk_end_dt >= end_dt:
        chunk_end_dt = end_dt
        done = True
        
    print("-"*40)    
    print('Running recursive build of {} for the period {} to {}'.format(dataset_name, chunk_begin_dt, chunk_end_dt))
    print("-"*40)    
    
    # Update variables
    chunk_begin_dt_spec = "date '" + chunk_begin_dt.strftime("%Y-%m-%d") + "'"
    chunk_end_dt_spec = "date '" + chunk_end_dt.strftime("%Y-%m-%d") + "'" 
    scenario.set_scenario_variables(coreFeaturesUpdateFromDate=chunk_begin_dt_spec, coreFeaturesUpdateToDate=chunk_end_dt_spec)

    # Run dataset build
    scenario.build_dataset(dataset_name, build_mode='RECURSIVE_FORCED_BUILD', fail_fatal=True) # fail entire scenario if any build fails
    
    chunk_begin_dt = chunk_end_dt + relativedelta(days=1)

 

Hope this helps.

Marlan 

ben_p
Level 5
Author

Amazing, thanks @Marlan!

0 Kudos
ben_p
Level 5
Author

Hi @Marlan,

Just getting around to putting this together and I have a couple of question - does the scenario used I the code have to already exist? Same question for the dataset too, are you creating these as 'shells' then automating them in your code?

Ben

0 Kudos
Marlan

Hi @ben_p,

I'm not sure I'm understanding your first question. The code I shared would go into the script part of a custom python scenario. The Scenario() call in the script gets a reference to the currently executing scenario object.

Yes, the dataset that is getting built each time already exists in my project. In this case I was planning to use a regular scenario to rebuild the dataset but ran into issues with the jdbc driver throwing an error if number of records processed exceeds 2.1 billion (i.e., the max of integer value). So had to update the table in chunks.

You could certainly create datasets and associated recipes using the API as well. If you need to do that, I have some example code I can share.

Marlan

0 Kudos
ben_p
Level 5
Author

Hi @Marlan 

I am a little farther ahead in my understanding now - I didn't consider the differences between the scenario API and the public API. 

What I would like to do is change a project variable inside a python recipe step in my flow. Is this possible?

Ben

0 Kudos
ben_p
Level 5
Author

Finally got there! I worked out how to correctly set a project variable, which I am using inside a loop to create multiple jobs. Thanks @Marlan for all your guidance here!

Here is a snippet of my code should anyone else every find this useful:

project = client.get_project(dataiku.default_project_key())
proj_vars = project.get_variables()
audiences = proj_vars['standard']['aud_name']

# get the dataset
dataset = project.get_dataset('test_bq_output')

# senario = Scenario()
scenario = project.get_scenario("testscenario")

for x in audiences:
    print(x)  
    proj_vars['standard']['current_aud'] = x
    project.set_variables(proj_vars)
    # Run dataset build
    job = dataset.build()
    dataset.build(job_type="RECURSIVE_FORCED_BUILD")
0 Kudos
Marlan

Glad you got it figured out!

Marlan

0 Kudos
buqing
Level 1

It only trigger the job once and the code exits. Do you see similar issue?

0 Kudos