Is it possible to get notified when a job exceeds a pre-determined duration

wengkin_chew
wengkin_chew Partner, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer Posts: 2 Partner

Hi,

I was wondering if it is possible in Dataiku to get notified or emailed when a job exceeds a pre-determined duration.

For example I am a user administrator and I want to receive an email if there are jobs running for more than 1 hour.

Regards,

Weng Kin

Best Answer

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,226 Dataiker
    edited July 17 Answer ✓

    Hi,
    Not possible at the job level, but you can leverage the APIs and run a scenario to find long-running jobs and send emails.

    https://doc.dataiku.com/dss/latest/python-api/jobs.html Here is an example you can use and modify as needed :

    import dataiku
    import datetime
    from dataiku.core.message_sender import MessageSender
    
    
    client = dataiku.api_client()
    client = dataiku.api_client()
    projects = client.list_projects()
    
    # e.g adjust here to find job in this example 4 hours or older and state still running
    four_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=4)
    timestamp = int(four_hour_ago.timestamp() * 1000)
    job_info_str = ""
    
    for p in projects:
        pk = p["projectKey"]
        project = client.get_project(pk)
        jobs = project.list_jobs()
        running_job_list = [job for job in jobs if job['state'] == 'RUNNING' and job['def']['initiationTimestamp'] >= timestamp]
        if len(running_job_list) >=1: 
            job_info_str += p["projectKey"] + "\n" + "\n".join([str(job) for job in running_job_list]) + "\n"
    
    print(job_info_str)
    # Now send either one email for each long-running or create a list with the project key + job info from above.
    
    s = MessageSender(channel_id='my-email-channel', type='mail-scenario', configuration={})
    s.send(message=str(final_list), subject="test", recipient='recipient@domain.com', sender="abc@abc.com")
    

Answers

  • wengkin_chew
    wengkin_chew Partner, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer Posts: 2 Partner

    Hi,

    Thanks for prompt reply.

    Much appreciated.

    Regards,

    Weng Kin

  • ecerulm
    ecerulm Registered Posts: 47 ✭✭✭✭✭
    edited July 17

    Seems that in 12.4.2 the jobs returned by project.list_jobs() only contain the "state" key if they finished.

    For running jobs the the "state" key is missing

    {'def': {'autoUpdateSchemaBeforeEachRecipeRun': False,
             'id': 'Build_DATAHUB_PROD_CORE_RGU_filtered2__NP__2024-02-29T08-06-19.371',
             'initiationTimestamp': 1709193979371,
             'initiator': 'rubelagu',
             'mailNotification': False,
             'name': 'Build DATAHUB_PROD_CORE_RGU_filtered2 (NP)',
             'outputs': [{'targetDataset': 'DATAHUB_PROD_CORE_RGU_filtered2',
                          'targetDatasetProjectKey': 'DATAIKUUPGRADEPOSTTEST',
                          'targetPartition': 'NP',
                          'type': 'DATASET'}],
             'projectKey': 'DATAIKUUPGRADEPOSTTEST',
             'recipe': 'compute_DATAHUB_PROD_CORE_RGU_filtered2',
             'refreshHiveMetastore': True,
             'reverseStartingPoints': [],
             'stopAtFlowZoneBoundary': False,
             'triggeredFrom': 'RECIPE',
             'type': 'NON_RECURSIVE_FORCED_BUILD'},
     'endTime': 0,
     'kernelPid': 0,
     'stableState': False,
     'startTime': 1709193982748,
     'warningsCount': 0}

    So what is the correct way of checking for RUNNING jobs now?

    is checking the endTime==0 enought?

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,126 Neuron
  • ecerulm
    ecerulm Registered Posts: 47 ✭✭✭✭✭

    Sure, but I'm assuming that @AlexT
    solution was working at some point on 2023-03-14 , so it was possible to get the running jobs by iterating over each project , using list_jobs() on it and then checking if the job was running or not with job['state'] = "RUNNING".

    But now they removed the "state" if it's running, or so it seems to me.

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,126 Neuron

    I wasn't aware of any changes around this but to be honest running list_jobs() for every project it's unfeasable for us as we have over 2000 projects in some instances. If you really want this data you could get using the "private" API the [dss_host]/admin/monitoring/background-tasks/ screen uses: [dss_host]/dip/api/running/list-all

    Obviously calling private APIs is unsupported but hey there is a reason why developers use private APIs: there is no public API! If you are interested post a new thread and I will add the code required to key the authentication cookie and the xsrf token required to call private APIs.

  • ecerulm
    ecerulm Registered Posts: 47 ✭✭✭✭✭

    I opened a ticket to dataiku and they confirmed

    You are correct, the metadata of jobs (among others) have been subject to some changes in the last year, and this particular key has been altogether removed for running jobs.

    So now the options are

    • use project.list_jobs() but check for the following
      • .endTime==0
      • .stableState==False
      • jobs.get('state') is None
    • use project.get_job(job['def']['id']).get_status() then check of
      • .baseStatus.state == "RUNNING"
      • .globalState.running > 0
    • the undocumented/private api endpoint [dss_host]/dip/api/running/list-all

    I don't understand why they decided to remove the .state for running jobs. And it's not clear to me if any of this API are stable at all. I mean if I upgrade from 12.5.1 to 12.5.2 can all this APIs change, it seems that there is no guarantee.

Setup Info
    Tags
      Help me…