Checking for dataset warn outcomes within a Scenario - is there a better way?

Marlan
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 320 Neuron
edited July 16 in Using Dataiku

Hi all,

Within a scenario we would like to notify customers by email if dataset checks result in warnings. At minimum, we'd like to do this if there any warnings. It would be better to specify the checks that resulted in warnings.

We've been able to do this with a Python step after a Run Checks step in a scenario.

However the Python script (included below) is fairly messy. Is there an easier way? Ideally a Python step would not be required. If Python is required, could the script I included be simplified?

Thanks!

Marlan

from dataiku.scenario import Scenario

# Create the main handle to interact with the scenario
scenario_obj = Scenario()

step_output = scenario_obj.get_previous_steps_outputs()

check_data = []
ok_cnt = 0
warn_cnt = 0
error_cnt = 0

for step in step_output:

    # Each step has a name and result
    step_name = step['stepName']
    result = step['result'] # result is a dict

    # Determine the type of step (unfortunately, there is apparently no field that specifies type)
    if 'success' in result:
        # result dict has key 'success'
        step_type = 'execute_sql' # there may be another step type that has this (haven't checked all of them)
    else:
        some_value_in_result = result[list(result.keys())[0]] # arbitrary value in result dict (which itself may be a dict)
        if 'computed' in some_value_in_result:
            step_type = 'compute_metrics'
        elif 'results' in some_value_in_result:
            step_type = 'run_checks'
        else:
            step_type = 'other'

    # Create variables for run checks (may be multiple checks for each dataset)
    if step_type == 'run_checks':


        # datasets are keys in result dict
        for dataset_info in result:

            # parse and clean up dataset name
            dataset_info_list = dataset_info.split('.')
            project_key = dataset_info_list[0]
            dataset_spec = dataset_info_list[1]
            if dataset_spec[-3:] == '_NP':
                # remove no partition text
                dataset_name = dataset_spec[:-3]
            else:
                dataset_name = dataset_spec

            # Extract check results
            dataset_value = result[dataset_info]
            checks_results = dataset_value['results'] # list of check results
            for check_result in checks_results: 
                check_spec = check_result['check']
                check_value = check_result['value']

                check_label = check_spec['meta'].get('label','<Unlabelled>')
                check_metric = check_spec['metricId'].split(':', 1)[1] # strip off metric type (type:metric or type:metric:otherinfo) 
                check_message = check_value['message']
                check_outcome = check_value['outcome']
                
                if check_outcome == 'OK':
                    ok_cnt += 1
                elif check_outcome == 'WARNING':
                    warn_cnt += 1
                elif check_outcome == 'ERROR': # won't get here unless check step is set to ignore failure
                    error_cnt += 1

                check_dict = {'dataset': dataset_name, 'metric':check_metric, 'label': check_label,
                              'outcome': check_outcome, 'message': check_message}
                
                check_data.append(check_dict)

# Assign to variables (or could send email or take appropriate action here)        
scenario_obj.set_scenario_variables(datasetCheckResults=check_data, okCount=ok_cnt, warnCount=warn_cnt, errorCount=error_cnt) 

Operating system used: Linux Red Hat

Tagged:
Setup Info
    Tags
      Help me…