Creating many jobs with a python recipe

ben_p
ben_p Neuron 2020, Registered, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant Posts: 143 ✭✭✭✭✭✭✭

Hi folks!

So I have a scenario in which I have a block of python code which performs a task. I also have a list of items, and I want to run the same job for each item in the list, the exact same code, just with the item name injected in.

I could do with with a python loop, but my list contains about 30 items, and if one fails, the whole job fails and so does every other item in the list. My preference would be to create a DSS job for each item in the my list.

One way I thought about doing this is with a scenario and project variables, the variable can contain the item I want to run the code on, so the steps would be:

  1. Set variable to item 1
  2. Run code for item 1
  3. Set variable to item 2
  4. Run code for item 2
  5. ...

As you can imagine this scenario would be pretty large, and all the variables would have to be set manually.

Is there a better, more dynamic way to tackle a problem like this in DSS?

Ben

Best Answer

  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 319 Neuron
    edited July 17 Answer ✓

    OK, coincidentally I ended up needing to do pretty much this task today. Here is an custom python scenario that builds a dataset multiple times, once for each value of two variables. I'm using the scenario build_dataset method rather than running jobs as I had suggested earlier. Definitely a simpler approach.

    from __future__ import absolute_import, division, print_function
    from datetime import date
    from dateutil.relativedelta import relativedelta
    import dataiku
    from dataiku.scenario import Scenario
    
    # Set control variables
    begin_dt = date(2016, 1, 1)
    end_dt = date(2020, 10, 22)
    month_increment = 3
    dataset_name = 'CORE_FEATURES'
    
    # Step through time period in month_increment chunks
    scenario = Scenario()
    chunk_begin_dt = begin_dt
    done = False
    
    while not done:
        
        chunk_end_dt = chunk_begin_dt + relativedelta(months=month_increment, days=-1)
        if chunk_end_dt >= end_dt:
            chunk_end_dt = end_dt
            done = True
            
        print("-"*40)    
        print('Running recursive build of {} for the period {} to {}'.format(dataset_name, chunk_begin_dt, chunk_end_dt))
        print("-"*40)    
        
        # Update variables
        chunk_begin_dt_spec = "date '" + chunk_begin_dt.strftime("%Y-%m-%d") + "'"
        chunk_end_dt_spec = "date '" + chunk_end_dt.strftime("%Y-%m-%d") + "'" 
        scenario.set_scenario_variables(coreFeaturesUpdateFromDate=chunk_begin_dt_spec, coreFeaturesUpdateToDate=chunk_end_dt_spec)
    
        # Run dataset build
        scenario.build_dataset(dataset_name, build_mode='RECURSIVE_FORCED_BUILD', fail_fatal=True) # fail entire scenario if any build fails
        
        chunk_begin_dt = chunk_end_dt + relativedelta(days=1)

    Hope this helps.

    Marlan

Answers

  • tim-wright
    tim-wright Partner, L2 Designer, Snowflake Advanced, Neuron 2020, Registered, Neuron 2021, Neuron 2022 Posts: 77 Partner

    @ben_p
    Can you use some exception handling within Python? Something like bellow:

    for item in list_of_items:

    try:

    # Do something to your item (this logic will be the same for all items)

    except AnticipatedException as e: # Catch expected failures (exceptions) you dont want to kill the scenario

    # Do something for the item when there was a failure in the "try". Could be `pass` but more likely will include some logic to record that information or trigger other actions with Dataiku API

    finally:

    # any cleanup code you need regardless of the success of the try statement

  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 319 Neuron
    edited July 17

    Hi @ben_p
    ,

    If you can just use Python for the task then @tim-wright
    's suggestion seems like a good option.

    If you want to execute a DSS job each time (also in a Python loop), here's some example code that shows how:

    import dataiku
    
    client = dataiku.api_client()
    project = client.get_project(dataiku.default_project_key())
    
    job_def = project.new_job('NON_RECURSIVE_FORCED_BUILD')
    job_def.with_output(nz_work_dataset_name)
    project.start_job_and_wait(job_def.get_definition())

    Note that this came from a process written for DSS 6 and hopefully updated to DSS 8. I say hopefully because I don't have 8 yet and so can't test it there. I updated per changes in documentation.

    The other step of course would be updating a project variable via Python before each job start. I think there are examples of this on the board if you need it or I have an example I can share too.

    Marlan

  • ben_p
    ben_p Neuron 2020, Registered, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant Posts: 143 ✭✭✭✭✭✭✭
  • ben_p
    ben_p Neuron 2020, Registered, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant Posts: 143 ✭✭✭✭✭✭✭

    Hi @Marlan
    ,

    Just getting around to putting this together and I have a couple of question - does the scenario used I the code have to already exist? Same question for the dataset too, are you creating these as 'shells' then automating them in your code?

    Ben

  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 319 Neuron

    Hi @ben_p
    ,

    I'm not sure I'm understanding your first question. The code I shared would go into the script part of a custom python scenario. The Scenario() call in the script gets a reference to the currently executing scenario object.

    Yes, the dataset that is getting built each time already exists in my project. In this case I was planning to use a regular scenario to rebuild the dataset but ran into issues with the jdbc driver throwing an error if number of records processed exceeds 2.1 billion (i.e., the max of integer value). So had to update the table in chunks.

    You could certainly create datasets and associated recipes using the API as well. If you need to do that, I have some example code I can share.

    Marlan

  • ben_p
    ben_p Neuron 2020, Registered, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant Posts: 143 ✭✭✭✭✭✭✭

    Hi @Marlan

    I am a little farther ahead in my understanding now - I didn't consider the differences between the scenario API and the public API.

    What I would like to do is change a project variable inside a python recipe step in my flow. Is this possible?

    Ben

  • ben_p
    ben_p Neuron 2020, Registered, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant Posts: 143 ✭✭✭✭✭✭✭
    edited July 17

    Finally got there! I worked out how to correctly set a project variable, which I am using inside a loop to create multiple jobs. Thanks @Marlan
    for all your guidance here!

    Here is a snippet of my code should anyone else every find this useful:

    project = client.get_project(dataiku.default_project_key())
    proj_vars = project.get_variables()
    audiences = proj_vars['standard']['aud_name']
    
    # get the dataset
    dataset = project.get_dataset('test_bq_output')
    
    # senario = Scenario()
    scenario = project.get_scenario("testscenario")
    
    for x in audiences:
        print(x)  
        proj_vars['standard']['current_aud'] = x
        project.set_variables(proj_vars)
        # Run dataset build
        job = dataset.build()
        dataset.build(job_type="RECURSIVE_FORCED_BUILD")
  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 319 Neuron

    Glad you got it figured out!

    Marlan

  • buqing
    buqing Registered Posts: 1

    It only trigger the job once and the code exits. Do you see similar issue?

Setup Info
    Tags
      Help me…