# Code for custom code recipe Calc_Completeness (imported from a Python recipe) # ================================================================================================ # Author(s): Sadhana Singh # Creation date: 24.01.2020 # Version: 01.4 # Description: Calculate the completeness of the values in each # MeasurementPeriodID in the dataset. Attributes for which the calculation should # be performed can be selected via parameter. Besides the fields on which (a potential) # grouping of the results should be performed can be defined via parameter as well. # # # STEP 1: Import DataIKU classes and map/connect input, output and parameters # STEP 2: Setup variables to execute the test # STEP 3: Execute (loop) SQL core logic for every selected field in the connected datasets # STEP 4: Write data to (persistant) ouput tables # # STATUS: DEV # # INPUT: input_A_datasets: List with tables (1 or multiple) which are connected to recipe plugin # OUTPUT: output_A_datasets: 1 connected table for test results # output_B_datasets: 1 connected table for error results # ================================================================================================ # CHANGLOG: # v01.4 - 24.07.2020 - AHA # + Added new parameter for group by logic (not hard coded as Portfolio, MeasurementPeriodID) # v01.3 - 30.03.2020 - AHA # + Added CSV column input logic # + Split calculation for Portfolio # v01.2 -17.02.2020 - AHA # + Removed isnumeric check in SQL core logic # v01.1 -11.02.2020 - AHA # + Changed core logic to compute completeness for whole data # v01.0 -24.01.2020 - SSI # + Initial Version # ================================================================================================ # ================================================================================================ # STEP 1: Import DataIKU classes and map/connect input, output and parameters # ================================================================================================ import dataiku import pandas as pd, numpy as np import logging from dataiku_info import dataset_info from dqf import completeness from dqf import calc_functions as calc from dqf import log_message as logmsg from dataiku.customrecipe import * from collections import OrderedDict from dataiku import pandasutils as pdu from datetime import datetime # Import the class that allows us to execute SQL on the Studio connections from dataiku.core.sql import SQLExecutor2 # To retrieve the datasets of an input role named 'input_A' as an array of dataset names: input_A_names = get_input_names_for_role('input_A_role') # The dataset objects themselves can then be created like this: input_A_datasets = [dataiku.Dataset(name) for name in input_A_names] # Input datatypes can only have 1 source table (not multiple) input_columns = get_input_names_for_role('input_columns') # As this input is optional, check whether user has connected a file if len(input_columns) != 0: ds_input_columns = dataiku.Dataset(input_columns[0]) # For outputs, the process is the same: output_A_names = get_output_names_for_role('main_output') output_A_datasets = [dataiku.Dataset(name) for name in output_A_names] output_B_names = get_output_names_for_role('errors_output') output_B_datasets = [dataiku.Dataset(name) for name in output_B_names] # The configuration is simply a map of parameters, and retrieving the value of one of them is simply: param_useColFileInput = get_recipe_config()['param_useColFileInput'] param_useColInputCaseInsensitive = get_recipe_config()['param_useColInputCaseInsensitive'] param_columns = get_recipe_config()['param_columns'] param_grouping = get_recipe_config()['param_grouping'] # Setting up logger log = logging.getLogger() log.setLevel(logging.INFO) # ================================================================================================ # STEP 2: Setup variables to execute the test # ================================================================================================ outputs = [] outputs_error = pd.DataFrame(columns = ['Attribute','Schema','Table','DQ_TestType','Run_Timestamp','ErrorMessage']) # Setup system timestamp for run run_timestamp = str(datetime.now()) dq_testType = 'DQ_Completeness' dq_IDShort = 'COM' # Get correct column list on which the test should be performed (Either from input file or from parameter) if param_useColFileInput: input_testCols = calc.getColumnListFromInput(ds_input_columns, dq_testType) else: input_testCols = param_columns # Change column list to lowercase if param_useColInputCaseInsensitive is set if param_useColInputCaseInsensitive: input_testCols = [col.lower() for col in input_testCols] # ================================================================================================ # STEP 3: Execute (loop) SQL core logic for every selected field in the connected datasets # ================================================================================================ for input_dataset in input_A_datasets: xmap = {} # get dataset connection info cInfo = dataset_info.connectioninfo(input_dataset) # Create SQL Connection to specific dataset (table) executor = SQLExecutor2(connection=cInfo['connectionName']) # Get list of columns which are available in the connected table/dataset. Change to lower case if param is enabled if param_useColInputCaseInsensitive: datasetcols = [names['name'].lower() for names in input_dataset.read_schema()] else: datasetcols = [names['name'] for names in input_dataset.read_schema()] # Compare attributes from parameter list with columns in specific dataset and only do calculation with joined values attrsToConsider = list(set(input_testCols) & set(datasetcols)) # Test whether there are values in the columnList, which are not present in the current table missingColumns = list(set(input_testCols) - set(datasetcols)) if len(missingColumns) != 0: log.warning(logmsg.getLogMsg_MissingColumns(cInfo['table'], missingColumns)) # Define Status/Log values progressTo = len(attrsToConsider) progressCount = 1 # Loop through every relevant column/attribute and perform specific SQL query for att in attrsToConsider: log.info(logmsg.getLogMsg_Progress(cInfo['table'], att, progressCount, progressTo)) try: Test_ID = calc.getIDValue(dq_IDShort, att, cInfo['schema'], cInfo['table']) # Get results from core query for specific attribute. Grouping in SQL query can be defined via param_grouping query = completeness.getCompleteness(cInfo, att, param_grouping) log.debug(query) df = executor.query_to_df(query) xmap = {'Test_ID': Test_ID, 'Attribute': att, 'Schema': cInfo['schema'], 'Table': cInfo['table'], 'DQ_TestType': dq_testType, 'Run_Timestamp': run_timestamp, 'No_Of_EMPTY': df['No_Of_EMPTY'], 'No_Of_NA': df['No_Of_NA'], 'No_Of_NULL': df['No_Of_NULL'], 'CountRows': df['CountRows'], 'Total_Missing_Val': df['Total_Missing_Val'], 'Per_Missing_val': df['Per_Missing_val'] } # Adding grouping values given as parameter (to be part of output table) for groupValue in param_grouping: xmap.update({groupValue: df[groupValue]}) df_loopResult = pd.DataFrame(xmap) # Append loopResult to output table outputs.append(df_loopResult) except Exception as detail: log.warning(logmsg.getLogMsg_Error(cInfo['table'], att, detail)) xmap = {'Attribute': att, 'Schema': cInfo['schema'], 'Table': cInfo['table'], 'DQ_TestType': dq_testType, 'Run_Timestamp': run_timestamp, 'ErrorMessage': str(detail)} outputs_error = outputs_error.append(xmap, ignore_index=True) finally: progressCount += 1 # ================================================================================================ # STEP 4: Write data to (persistant) ouput tables # ================================================================================================ columnOrder = ['Test_ID', 'Attribute', 'Schema', 'Table', 'DQ_TestType', 'Run_Timestamp'] # Adding grouping values given as parameter (in mid of table) for groupValue in param_grouping: columnOrder.append(groupValue) # Adding calculated fields to output columnOrder.extend(['No_Of_EMPTY', 'No_Of_NA', 'No_Of_NULL', 'CountRows', 'Total_Missing_Val', 'Per_Missing_val']) # Only write to result table if outputs is not empty if (len(outputs) != 0): df_finalOutput = pd.concat(outputs)[columnOrder] output_A_datasets[0].write_with_schema(df_finalOutput) else: log.warning(logmsg.getLogMsg_EmptyOutput()) output_A_datasets[0].write_with_schema(pd.DataFrame(columns = columnOrder)) # Only write error output if it was defined by user if (len(output_B_names) != 0): output_B_datasets[0].write_with_schema(outputs_error)