Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Hi all,
Within a scenario we would like to notify customers by email if dataset checks result in warnings. At minimum, we'd like to do this if there any warnings. It would be better to specify the checks that resulted in warnings.
We've been able to do this with a Python step after a Run Checks step in a scenario.
However the Python script (included below) is fairly messy. Is there an easier way? Ideally a Python step would not be required. If Python is required, could the script I included be simplified?
Thanks!
Marlan
from dataiku.scenario import Scenario
# Create the main handle to interact with the scenario
scenario_obj = Scenario()
step_output = scenario_obj.get_previous_steps_outputs()
check_data = []
ok_cnt = 0
warn_cnt = 0
error_cnt = 0
for step in step_output:
# Each step has a name and result
step_name = step['stepName']
result = step['result'] # result is a dict
# Determine the type of step (unfortunately, there is apparently no field that specifies type)
if 'success' in result:
# result dict has key 'success'
step_type = 'execute_sql' # there may be another step type that has this (haven't checked all of them)
else:
some_value_in_result = result[list(result.keys())[0]] # arbitrary value in result dict (which itself may be a dict)
if 'computed' in some_value_in_result:
step_type = 'compute_metrics'
elif 'results' in some_value_in_result:
step_type = 'run_checks'
else:
step_type = 'other'
# Create variables for run checks (may be multiple checks for each dataset)
if step_type == 'run_checks':
# datasets are keys in result dict
for dataset_info in result:
# parse and clean up dataset name
dataset_info_list = dataset_info.split('.')
project_key = dataset_info_list[0]
dataset_spec = dataset_info_list[1]
if dataset_spec[-3:] == '_NP':
# remove no partition text
dataset_name = dataset_spec[:-3]
else:
dataset_name = dataset_spec
# Extract check results
dataset_value = result[dataset_info]
checks_results = dataset_value['results'] # list of check results
for check_result in checks_results:
check_spec = check_result['check']
check_value = check_result['value']
check_label = check_spec['meta'].get('label','<Unlabelled>')
check_metric = check_spec['metricId'].split(':', 1)[1] # strip off metric type (type:metric or type:metric:otherinfo)
check_message = check_value['message']
check_outcome = check_value['outcome']
if check_outcome == 'OK':
ok_cnt += 1
elif check_outcome == 'WARNING':
warn_cnt += 1
elif check_outcome == 'ERROR': # won't get here unless check step is set to ignore failure
error_cnt += 1
check_dict = {'dataset': dataset_name, 'metric':check_metric, 'label': check_label,
'outcome': check_outcome, 'message': check_message}
check_data.append(check_dict)
# Assign to variables (or could send email or take appropriate action here)
scenario_obj.set_scenario_variables(datasetCheckResults=check_data, okCount=ok_cnt, warnCount=warn_cnt, errorCount=error_cnt)
Operating system used: Linux Red Hat