Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Hi,
I was wondering if it is possible in Dataiku to get notified or emailed when a job exceeds a pre-determined duration.
For example I am a user administrator and I want to receive an email if there are jobs running for more than 1 hour.
Regards,
Weng Kin
Hi,
Not possible at the job level, but you can leverage the APIs and run a scenario to find long-running jobs and send emails.
https://doc.dataiku.com/dss/latest/python-api/jobs.html Here is an example you can use and modify as needed :
import dataiku
import datetime
from dataiku.core.message_sender import MessageSender
client = dataiku.api_client()
client = dataiku.api_client()
projects = client.list_projects()
# e.g adjust here to find job in this example 4 hours or older and state still running
four_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=4)
timestamp = int(four_hour_ago.timestamp() * 1000)
job_info_str = ""
for p in projects:
pk = p["projectKey"]
project = client.get_project(pk)
jobs = project.list_jobs()
running_job_list = [job for job in jobs if job['state'] == 'RUNNING' and job['def']['initiationTimestamp'] >= timestamp]
if len(running_job_list) >=1:
job_info_str += p["projectKey"] + "\n" + "\n".join([str(job) for job in running_job_list]) + "\n"
print(job_info_str)
# Now send either one email for each long-running or create a list with the project key + job info from above.
s = MessageSender(channel_id='my-email-channel', type='mail-scenario', configuration={})
s.send(message=str(final_list), subject="test", recipient='recipient@domain.com', sender="abc@abc.com")
Hi,
Not possible at the job level, but you can leverage the APIs and run a scenario to find long-running jobs and send emails.
https://doc.dataiku.com/dss/latest/python-api/jobs.html Here is an example you can use and modify as needed :
import dataiku
import datetime
from dataiku.core.message_sender import MessageSender
client = dataiku.api_client()
client = dataiku.api_client()
projects = client.list_projects()
# e.g adjust here to find job in this example 4 hours or older and state still running
four_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=4)
timestamp = int(four_hour_ago.timestamp() * 1000)
job_info_str = ""
for p in projects:
pk = p["projectKey"]
project = client.get_project(pk)
jobs = project.list_jobs()
running_job_list = [job for job in jobs if job['state'] == 'RUNNING' and job['def']['initiationTimestamp'] >= timestamp]
if len(running_job_list) >=1:
job_info_str += p["projectKey"] + "\n" + "\n".join([str(job) for job in running_job_list]) + "\n"
print(job_info_str)
# Now send either one email for each long-running or create a list with the project key + job info from above.
s = MessageSender(channel_id='my-email-channel', type='mail-scenario', configuration={})
s.send(message=str(final_list), subject="test", recipient='recipient@domain.com', sender="abc@abc.com")
Hi,
Thanks for prompt reply.
Much appreciated.
Regards,
Weng Kin
Seems that in 12.4.2 the jobs returned by project.list_jobs() only contain the "state" key if they finished.
For running jobs the the "state" key is missing
{'def': {'autoUpdateSchemaBeforeEachRecipeRun': False,
'id': 'Build_DATAHUB_PROD_CORE_RGU_filtered2__NP__2024-02-29T08-06-19.371',
'initiationTimestamp': 1709193979371,
'initiator': 'rubelagu',
'mailNotification': False,
'name': 'Build DATAHUB_PROD_CORE_RGU_filtered2 (NP)',
'outputs': [{'targetDataset': 'DATAHUB_PROD_CORE_RGU_filtered2',
'targetDatasetProjectKey': 'DATAIKUUPGRADEPOSTTEST',
'targetPartition': 'NP',
'type': 'DATASET'}],
'projectKey': 'DATAIKUUPGRADEPOSTTEST',
'recipe': 'compute_DATAHUB_PROD_CORE_RGU_filtered2',
'refreshHiveMetastore': True,
'reverseStartingPoints': [],
'stopAtFlowZoneBoundary': False,
'triggeredFrom': 'RECIPE',
'type': 'NON_RECURSIVE_FORCED_BUILD'},
'endTime': 0,
'kernelPid': 0,
'stableState': False,
'startTime': 1709193982748,
'warningsCount': 0}
So what is the correct way of checking for RUNNING jobs now?
is checking the endTime==0 enought?
You might want to vote for this idea:
which requests a new API for running jobs.
Sure, but I'm assuming that @AlexT solution was working at some point on 2023-03-14 , so it was possible to get the running jobs by iterating over each project , using list_jobs() on it and then checking if the job was running or not with job['state'] = "RUNNING".
But now they removed the "state" if it's running, or so it seems to me.
I wasn't aware of any changes around this but to be honest running list_jobs() for every project it's unfeasable for us as we have over 2000 projects in some instances. If you really want this data you could get using the "private" API the [dss_host]/admin/monitoring/background-tasks/ screen uses: [dss_host]/dip/api/running/list-all
Obviously calling private APIs is unsupported but hey there is a reason why developers use private APIs: there is no public API! If you are interested post a new thread and I will add the code required to key the authentication cookie and the xsrf token required to call private APIs.
I opened a ticket to dataiku and they confirmed
You are correct, the metadata of jobs (among others) have been subject to some changes in the last year, and this particular key has been altogether removed for running jobs.
So now the options are
I don't understand why they decided to remove the .state for running jobs. And it's not clear to me if any of this API are stable at all. I mean if I upgrade from 12.5.1 to 12.5.2 can all this APIs change, it seems that there is no guarantee.