Is it possible to get notified when a job exceeds a pre-determined duration

Solved!
wengkin_chew
Level 1
Is it possible to get notified when a job exceeds a pre-determined duration

Hi,

I was wondering if it is possible in Dataiku to get notified or emailed when a job exceeds a pre-determined duration.

For example I am a user administrator and I want to receive an email if there are jobs running for more than 1 hour.

 

Regards,

Weng Kin

0 Kudos
1 Solution
AlexT
Dataiker

Hi,
Not possible at the job level, but you can leverage the APIs and run a scenario to find long-running jobs and send emails.

https://doc.dataiku.com/dss/latest/python-api/jobs.html Here is an example you can use and modify as needed : 

import dataiku
import datetime
from dataiku.core.message_sender import MessageSender


client = dataiku.api_client()
client = dataiku.api_client()
projects = client.list_projects()

# e.g adjust here to find job in this example 4 hours or older and state still running
four_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=4)
timestamp = int(four_hour_ago.timestamp() * 1000)
job_info_str = ""

for p in projects:
    pk = p["projectKey"]
    project = client.get_project(pk)
    jobs = project.list_jobs()
    running_job_list = [job for job in jobs if job['state'] == 'RUNNING' and job['def']['initiationTimestamp'] >= timestamp]
    if len(running_job_list) >=1: 
        job_info_str += p["projectKey"] + "\n" + "\n".join([str(job) for job in running_job_list]) + "\n"

print(job_info_str)
# Now send either one email for each long-running or create a list with the project key + job info from above.

s = MessageSender(channel_id='my-email-channel', type='mail-scenario', configuration={})
s.send(message=str(final_list), subject="test", recipient='recipient@domain.com', sender="abc@abc.com")

 

View solution in original post

0 Kudos
7 Replies
AlexT
Dataiker

Hi,
Not possible at the job level, but you can leverage the APIs and run a scenario to find long-running jobs and send emails.

https://doc.dataiku.com/dss/latest/python-api/jobs.html Here is an example you can use and modify as needed : 

import dataiku
import datetime
from dataiku.core.message_sender import MessageSender


client = dataiku.api_client()
client = dataiku.api_client()
projects = client.list_projects()

# e.g adjust here to find job in this example 4 hours or older and state still running
four_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=4)
timestamp = int(four_hour_ago.timestamp() * 1000)
job_info_str = ""

for p in projects:
    pk = p["projectKey"]
    project = client.get_project(pk)
    jobs = project.list_jobs()
    running_job_list = [job for job in jobs if job['state'] == 'RUNNING' and job['def']['initiationTimestamp'] >= timestamp]
    if len(running_job_list) >=1: 
        job_info_str += p["projectKey"] + "\n" + "\n".join([str(job) for job in running_job_list]) + "\n"

print(job_info_str)
# Now send either one email for each long-running or create a list with the project key + job info from above.

s = MessageSender(channel_id='my-email-channel', type='mail-scenario', configuration={})
s.send(message=str(final_list), subject="test", recipient='recipient@domain.com', sender="abc@abc.com")

 

0 Kudos
wengkin_chew
Level 1
Author

Hi,

Thanks for prompt reply.

Much appreciated.

 

Regards,

Weng Kin

0 Kudos
ecerulm
Level 4

Seems that in 12.4.2 the jobs returned by project.list_jobs()  only contain the "state" key if they finished. 

For running jobs the the "state" key is missing 

{'def': {'autoUpdateSchemaBeforeEachRecipeRun': False,
         'id': 'Build_DATAHUB_PROD_CORE_RGU_filtered2__NP__2024-02-29T08-06-19.371',
         'initiationTimestamp': 1709193979371,
         'initiator': 'rubelagu',
         'mailNotification': False,
         'name': 'Build DATAHUB_PROD_CORE_RGU_filtered2 (NP)',
         'outputs': [{'targetDataset': 'DATAHUB_PROD_CORE_RGU_filtered2',
                      'targetDatasetProjectKey': 'DATAIKUUPGRADEPOSTTEST',
                      'targetPartition': 'NP',
                      'type': 'DATASET'}],
         'projectKey': 'DATAIKUUPGRADEPOSTTEST',
         'recipe': 'compute_DATAHUB_PROD_CORE_RGU_filtered2',
         'refreshHiveMetastore': True,
         'reverseStartingPoints': [],
         'stopAtFlowZoneBoundary': False,
         'triggeredFrom': 'RECIPE',
         'type': 'NON_RECURSIVE_FORCED_BUILD'},
 'endTime': 0,
 'kernelPid': 0,
 'stableState': False,
 'startTime': 1709193982748,
 'warningsCount': 0}

 

So what is the correct way of checking for RUNNING jobs now? 

is checking the endTime==0 enought?

0 Kudos

You might want to vote for this idea:

https://community.dataiku.com/t5/Product-Ideas/API-to-obtain-list-of-currently-running-jobs-in-a-DSS...

which requests a new API for running jobs. 

0 Kudos
ecerulm
Level 4

Sure, but I'm assuming that @AlexT  solution was working at some point on 2023-03-14 , so it was possible to get the running jobs by iterating over each project , using list_jobs() on it and then checking if the job was running or not with job['state'] = "RUNNING". 

But now they removed the "state" if it's running, or so it seems to me. 

 

 

0 Kudos

I wasn't aware of any changes around this but to be honest running list_jobs() for every project it's unfeasable for us as we have over 2000 projects in some instances. If you really want this data you could get using the "private" API the [dss_host]/admin/monitoring/background-tasks/ screen uses: [dss_host]/dip/api/running/list-all

Obviously calling private APIs is unsupported but hey there is a reason why developers use private APIs: there is no public API! If you are interested post a new thread and I will add the code required to key the authentication cookie and the xsrf token required to call private APIs. 

ecerulm
Level 4

I opened a ticket to dataiku and they confirmed

You are correct, the metadata of jobs (among others) have been subject to some changes in the last year, and this particular key has been altogether removed for running jobs.

 So now the options are  

  • use project.list_jobs() but check for the following
    • .endTime==0
    • .stableState==False
    • jobs.get('state') is None
  • use project.get_job(job['def']['id']).get_status() then check of
    • .baseStatus.state == "RUNNING"
    • .globalState.running > 0
  • the undocumented/private api endpoint  [dss_host]/dip/api/running/list-all

I don't understand why they decided to remove the .state for running jobs. And it's not clear to me if any of this API are stable at all. I mean if I upgrade from 12.5.1 to 12.5.2 can all this APIs change, it seems that there is no guarantee. 

0 Kudos