Setting metrics to be computed programmatically
Dear Dataiku,
I have searched the Community Forum but could not find the suitable answer.
Here in my organization we deal with multiple projects and many tables within each of the projects. Thus, we are looking for a way to programmatically set specific metrics to be computed (like for example MIN for a column X) without the need to manually enter each table and set it with a checkbox.
I know your documentation almost by heart now and I believe that
function compute_metrics within class dataikuapi.dss.dataset.DSSDataset comes closest to it. Nevertheless, it does not work the way I would expect it to execute. The code:
client = dataiku.api_client() project = client.get_project('DF_METRICS') dataset = project.get_dataset('full_metrics') dataset.compute_metrics(metric_ids=['col_stats:MAX:date_computed'])
gives out:
{'hasResult': True, 'aborted': False, 'alive': False, 'startTime': 0, 'runningTime': 0, 'unknown': False, 'result': {'partition': 'NP', 'startTime': 1579614424305, 'endTime': 1579614424308, 'skipped': [], 'computed': [{'metric': {'metricType': 'METRICS_COMPUTATION_DURATION', 'type': 'reporting', 'id': 'reporting:METRICS_COMPUTATION_DURATION', 'dataType': 'BIGINT'}, 'metricId': 'reporting:METRICS_COMPUTATION_DURATION', 'dataType': 'BIGINT', 'value': '3'}], 'runs': []}}
Would you have any suggestions how to approach the subject? Thank you!
Filip
Best Answer
-
Hi Filip,
You're on the right track!
Let's start here: the DSS Dataset class is the place to start
## Your existing code ## client = dataiku.api_client() project = client.get_project('PROJECT_KEY') dataset = project.get_dataset('DATASET_NAME')
Metrics can be changed in the dataset's definition.
dataset_def = dataset.get_definition() ## Create a new list to hold existing and new metric 'probes' new_probes_list = []
The ['metrics']['probes'] list within the dataset's definition holds metric probes. Here's an example of how to keep all existing metric probes (not adding any new ones), and enable all of them to auto-compute after each dataset rebuild.
## Iterate through each existing metric probe. If the Metric is enabled, then change the 'computeOnBuildMode' ## for metric in dataset_def['metrics']['probes']: if metric['enabled']== True: metric['computeOnBuildMode'] = u'PARTITION' new_probes_list.append(metric) ## Change the dataset_def dictionary to contain the new list of metric probes dataset_def['metrics']['probes'] = new_probes_list ## Use the set_definition method to change the dataset definition, passing in the new,a ltered dictionary dataset.set_definition(dataset_def)
If you want to create new metrics on the dataset, you'll want to similarly add items to this dataset_def['metrics']['probes'] list, and then add the names of the new probes to the dataset_def['metrics']['displayedState']['metrics'] list, then similarly use the dataset's set_definition method to update the definition. Please note that the dictionary key values in these lists vary depending on the type of metric added. This example narrowly covers the "column statistics" types of metrics.
## Get a list of existing metrics probes ## metric_probes = dataset_def['metrics']['probes'] ## if no existing metrics_probes of type "column_stats", then add this new type ## if not any(metric['type'] == 'col_stats' for metric in metric_probes): new_probe = {u'computeOnBuildMode': u'PARTITION', u'configuration': {u'aggregates': []}, u'enabled': True, u'meta': {u'level': 2, u'name': u'Columns statistics'}, u'type': u'col_stats'} metric_probes.append(new_probe) ## Here, define the new metric you'd like to create. This adds a MEAN metric (which is inherently of type "col_stats") on the "open_CREDIT_lines" column ## calculation_type = 'col_stats' calculation = 'MEAN' column_name = 'Open_CREDIT_Lines' ## Create a new placeholder list ## new_metric_probes = [] ## Add the new metric probe to the list, along with other existing probes ## for metric_probe in metric_probes: if metric_probe['type'] == calculation_type: if ('aggregates' not in metric_probe['configuration']) or (metric_probe['configuration']['aggregates']==None): metric_probe['configuration']['aggregates'] = [{u'aggregated': calculation, u'column': column_name}] else: metric_probe['configuration']['aggregates'] = metric_probe['configuration']['aggregates'].append({u'aggregated': calculation, u'column': column_name}) new_metric_probes.append(metric_probe) ## Create a new displayed metric. Note the syntax separating the calc type, calc description, and column name in this case. ## new_displayed_metric = unicode(calculation_type + ":" + calculation + ":" + column_name, "utf-8") ## Add the new displayed metric to existing displayed metrics ## prev_displayed_metrics = dataset_def['metrics']['displayedState']['metrics'] prev_displayed_metrics.append(new_displayed_metric) dataset_def['metrics']['displayedState']['metrics'] = prev_displayed_metrics ## Set the definition of the dataset, passing through the new dataset_def dictionary ## dataset.set_definition(dataset_def)
Again, this solution narrowly shows how to do this for basic column stats. You'll have to tinker around with the different dictionary formats for other types of metrics.
Good luck!
Answers
-
Hi Filip,
You can access information about the last run of a given set of metric values from a Dataiku dataset using the metrics and checks API: https://doc.dataiku.com/dss/latest/python-api/metrics.html
Here's a bit of detailed code that adds to your example, but shows how to compute your metrics from a larger list, then grab the updated values and compare them to prior ones:
client = dataiku.api_client() proj = client.get_project("MANIPULATEMETRICS") ds = proj.get_dataset("crm_last_year") #get info on the most recent metric run metrics = ds.get_last_metric_values() #get the list of all metric ids ids = metrics.get_all_ids() # for all ids (or a subset of the list), compute the metrics for id in ids: ds.compute_metrics(metric_ids = [id]) # get info on the newly completed metric run new_metrics = ds.get_last_metric_values() # compare to the prior run if you want for id in ids: new_run = new_metrics.get_global_data(id)['computed'] old_run = metrics.get_global_data(id)['computed'] tsf = '%Y-%m-%d %H:%M:%S' print("time between: " + str((new_run - old_run)/1000)) ts = new_run/1000 print(datetime.utcfromtimestamp(ts).strftime(tsf)) print(new_metrics.get_global_value(id))
-
Hi Jediv,
Thank you for a quick response!
I did something similar already but your solution is neat and does not require accessing elements of metrics (as I did before):
#get info on the most recent metric run metrics = ds.get_last_metric_values()
But the question I was rather trying to figure out is how to enable/disable completely new metrics for a particular table, without accessing the menu in the project.
In other words how to for example turn on null values count for some column programatically.
Kind regards,
Filip -
Ah I understand now. In this case what you actually need to modify is the definition of the dataset, which you can access with the get_dataset and set_dataset methods.
For example, if I want to add a "min" calculation for the column "birth" in the dataset "crm_last_year", I would do the following:
#get client and dataset client = dataiku.api_client() proj = client.get_project("MANIPULATEMETRICS") ds = proj.get_dataset("crm_last_year") # if i want to get the min of column birth column = "birth" agg = "MIN" location = "Columns statistics" probe_rep = { "column" : column, "aggregated" : agg } # get the definition of the dataset and modify it ds_def = ds.get_definition() for probe in ds_def['metrics']['probes']: if probe['meta']['name'] == location: probe['configuration']['aggregates'].append(probe_rep) # activate the metric metrics_list = ds_def['metrics']['displayedState']['metrics'] metrics_list.append('col_stats:{0}:{1}'.format(agg,column)) # save the definition and view the new version ds.set_definition(ds_def) ds.get_definition()['metrics']['probes']
-
Hi Jediv, pmasiphelps.
Thank you very much for the help!
You saved me loads of time - now it works like a charm
Below I gave the example of how to set almost all metrics for a single column called rec_vld_fm_dts:
# HOW NEW PROBES LIST SHOULD LOOK LIKE BEFORE YOU UPDATE IT. # IT IS A LIST THAT CONSISTS OF DICTIONARIES IN A SPECIFIC FORMAT: new_probes_list = [ # Size, files count & columns count {'type': 'basic', 'enabled': True, 'computeOnBuildMode': 'PARTITION', 'meta': {'name': 'Basic data', 'level': 0}, 'configuration': {}}, # Records count {'type': 'records', 'enabled': True, 'computeOnBuildMode': 'PARTITION', 'meta': {'name': 'Record count', 'level': 0}, 'configuration': {}}, # Columns statistics {'type': 'col_stats', 'enabled': True, 'computeOnBuildMode': 'PARTITION', 'meta': {'name': 'Columns statistics', 'level': 2}, 'configuration': {'aggregates': [{'column': 'rec_vld_fm_dts', 'aggregated': 'MIN'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'MEAN'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'MAX'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'COUNT'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'COUNT_NULL'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'STDDEV'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'COUNT_DISTINCT'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'HISTOGRAM'} ]}}, # Most frequent values {'type': 'adv_col_stats', 'enabled': True, 'computeOnBuildMode': 'PARTITION', 'meta': {'name': 'Most frequent values', 'level': 3}, 'configuration': {'aggregates': [{'column': 'rec_vld_fm_dts', 'aggregated': 'MODE'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'TOP10'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'TOP10_WITH_COUNTS'} ]}}, # Columns percentiles {'type': 'percentile_stats', 'enabled': True, 'computeOnBuildMode': 'PARTITION', 'meta': {'name': 'Columns percentiles', 'level': 4}, 'configuration': {'aggregates': [{'column': 'rec_vld_fm_dts', 'aggregated': 'P25'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'P50'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'P75'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'IQR'}, {'column': 'rec_vld_fm_dts', 'aggregated': 'QUARTILES'} ]}} ] dataset_def['metrics']['probes'] = new_probes_list dataset.set_definition(dataset_def)
Thanks again Gents!
Filip