Testing SQL Connections

Turribeach
Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,160 Neuron
edited July 16 in Using Dataiku

We want to test database connections in all our instances since our internal security policy is to change database passwords yearly and this invariably leads to some user connections being missed and this invariably leads to some user database connections being missed, the password expiring, flows failing, data not being updated, etc. So we are trying to be more proactive and try to test connections programatically. I looked at the REST API and there isn't any available public APIs at the moment and I have raised the Product Idea here so please vote for it if you like it.

As DSS has this feature built in in the GUI I did some digging and noticed the GUI calls the /dip/api/admin/connections/test-sql internal/private API when I click on the Test button of a SQL connection. I am now trying to see if I can get this internal/private API working. I obviously understand internal/private API are unsupported and could change/break/dissapear at any point but I see no harm in trying.

I am stuck as I keep getting "500 - Server Error". Looking at the backend log I can see what the issue is:

[2023/05/31-14:13:24.295] [qtp1900366749-124139] [ERROR] [dip.controllers]  - API call '/dip/api/admin/connections/test-sql' failedorg.springframework.web.bind.MissingServletRequestParameterException: Required String parameter 'data' is not present

This is happening since the API expects all the connection details to be sent into a data URL encoded parameter but I have not been able to replicate this. My code is below, anyone knows how to get this working? Thanks!

import dataikuapi
import requests
import json
requests.packages.urllib3.disable_warnings()

api_key = ''
base_url = 'https://dss_url/'
test_connection_url = base_url + 'dip/api/admin/connections/test-sql'
headers = {'Accept': 'application/json, text/plain, */*',
           'Accept-Encoding': 'gzip, deflate, br',
           'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8', 
           'Authorization': 'Bearer {}'.format(api_key)}

client = dataikuapi.DSSClient(base_url, api_key)
client._session.verify = False
connections = client.list_connections()

for connection in connections:
    if connection == 'pick_one_sql_connection':
        try:
            payload_str = urllib.parse.urlencode(connections[connection], safe=':+')
            response = requests.post(test_connection_url, headers=headers, data=payload_str, verify=False, timeout=(1, 3))
            
            status_code = response.status_code
            status_reason = response.reason
            status_text = str(status_code) + ' - ' + str(status_reason)
        
            # Raise an exception if the response status code is not successful
            response.raise_for_status()
    
        except requests.exceptions.BaseHTTPError:
            error_text = "Base HTTP Error: " + status_text
        except requests.exceptions.HTTPError:
            error_text = "HTTP Error: " + status_text
        except requests.exceptions.Timeout:
            error_text = "The request timed out"
        except requests.exceptions.ConnectionError:
            error_text = "Connection Error"
        except requests.exceptions.RequestException:
            error_text = "Unknown error occurred: " + str(status_code)

        # If the request was successful
        if status_code == 200:
            # Parse the response as JSON
            json_object = response.json()

            # Print JSON object
            print(json.dumps(json_object, indent=4))

        else:
            print(status_reason)
            print(error_text)

Answers

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,160 Neuron

    I managed to get this working so if anyone is interested let me know and I will post the solution.

  • thirupathiduddilla
    thirupathiduddilla Registered Posts: 1 ✭✭

    @Turribeach Could you please me the working process for testing any dataiku connection. Did you use any dataiku api provided solution ?

  • Grixis
    Grixis PartnerApplicant, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 82 ✭✭✭✭✭

    Hello @Turribeach do you stille manage your connection test in this way ?

    In my side, I'm used to create a fake api service to use the test queries feature on each sql connection I need to monitor their availability.

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,160 Neuron

    Here is the code. Has been tested in v10/v11/v12/13 so should work on most versions. You need both an Admin API key and an Admin user. This is because there is no public API to test connections so I use the private API Dataiku uses.

    import dataikuapi
    import requests
    import json, datetime, time, zoneinfo
    import urllib.parse # Dataiku URL and Authentication details dataiku_url = 'http://dss_host:dss_port/' # Admin level key needed dataiku_api_key = 'xxxxxxxxxx' # Admin level user required dataiku_username = 'xxxxxx' dataiku_password = 'xxxxxx' # Create a Dataiku client dataiku_client = dataikuapi.DSSClient(dataiku_url, dataiku_api_key) #dataiku_client._session.verify = False instance_vars = dataiku_client.get_global_variables() # Logins to Dataiku to obtain authentication cookies def dss_get_auth_cookies(dataiku_url, dataiku_username, dataiku_password): login_endpoint = dataiku_url + 'dip/api/login' config_endpoint = dataiku_url + 'dip/api/get-configuration' auth_cookies = {} payload = {'login': dataiku_username, 'password': dataiku_password} try: # Call Login endpoint to authenticate auth_response = requests.post(login_endpoint, data=payload, verify=False) status_code = auth_response.status_code status_reason = auth_response.reason status_text = str(status_code) + ' - ' + str(status_reason) error_text = '' # Raise an exception if the response status code is not successful auth_response.raise_for_status() except requests.exceptions.BaseHTTPError: error_text = "Base HTTP Error" except requests.exceptions.HTTPError: error_text = "HTTP Error" except requests.exceptions.Timeout: error_text = "The request timed out" except requests.exceptions.ConnectionError: error_text = "Connection Error" except requests.exceptions.RequestException: error_text = "Unknown error occurred" # If the request was successful if status_code == 200: # Fetch cookies auth_cookies = auth_response.cookies.get_dict() # Make sure both auth cookies were returned if len(auth_cookies) == 2: try: # Call the config endpoint to get XSRF token config_response = requests.get(config_endpoint, cookies=auth_cookies, verify=False) status_code = auth_response.status_code status_reason = auth_response.reason status_text = str(status_code) + ' - ' + str(status_reason) # Raise an exception if the response status code is not successful auth_response.raise_for_status() except requests.exceptions.BaseHTTPError: error_text = "Base HTTP Error: " + status_text except requests.exceptions.HTTPError: error_text = "HTTP Error: " + status_text except requests.exceptions.Timeout: error_text = "The request timed out" except requests.exceptions.ConnectionError: error_text = "Connection Error" except requests.exceptions.RequestException: error_text = "Unknown error occurred: " + str(status_code) # If the request was successful if status_code == 200: auth_cookies.update(dict(config_response.cookies.items())) # Make sure both auth cookies and XSRF token are present if len(auth_cookies) == 3: return status_code, auth_cookies, '' else: error_text = "Number of cookies does not match 3: " + str(len(auth_cookies)) return status_code, auth_cookies, error_text else: error_text = "Number of does not match 2: " + str(len(auth_cookies)) return status_code, auth_cookies, error_text else: print() return status_code, '', error_text + ' - ' + status_text

    The rest of the code queries a DSS server, will fetch all valid connections and try to test them:

    result_df = pd.DataFrame()
    
    # Fetch all connections
    client = dataikuapi.DSSClient(dataiku_url, dataiku_api_key)
    client._session.verify = False
    connections = client.list_connections()
    
    # Call the authentication function to get Dataiku auth cookies
    return_status_code, auth_cookies, error_text = dss_get_auth_cookies(dataiku_url, dataiku_username, dataiku_password)
    
    # Make sure authentication worked
    if return_status_code == 200 and error_text == '':
        
        # Extract XSRF-TOKEN from cookies
        xsrf_token = list(auth_cookies.values())[2]
    
        # Loop through all connections and test them
        for connection in connections:
            connection_type = connections[connection]['type']
    
            # Exclude non-SQL connections
            if connection_type not in ['Filesystem', 'HDFS', 'GCS', 'EC2', 'SQ5', 'Azure']:
                
                test_connection_endpoint = dataiku_url + 'dip/api/admin/connections/test-sql'
                    
                # Measure the start time
                start_time = time.time()
                
                connection_url = dataiku_url + 'admin/connections/' + connection + '/'
                timestamp_str = datetime.datetime.now(tz=zoneinfo.ZoneInfo('Europe/London')).strftime("%d-%b-%Y %H:%M:%S")
    
                try:
                    # Insert XSRF token into the request headers
                    request_headers = {'Content-Type': 'application/x-www-form-urlencoded', 'X-XSRF-TOKEN': xsrf_token}
                    json_data = connections[connection]
                    json_dump = json.dumps(json_data)
    
                    # URL Encode connection parameters
                    encoded_string = urllib.parse.quote(json_dump.replace(" ", ""))
                    payload_string = f'data={encoded_string}'
    
                    # Test Connections
                    test_connection_response = requests.post(test_connection_endpoint, headers=request_headers, cookies=auth_cookies, data=payload_string, verify=False, timeout=(2, 30))
    
                    status_code = test_connection_response.status_code
                    status_reason = test_connection_response.reason
                    status_text = str(status_code) + ' - ' + str(status_reason)
    
                    # Raise an exception if the response status code is not successful
                    test_connection_response.raise_for_status()
                
                except requests.exceptions.BaseHTTPError:
                    error_text = "Base HTTP Error: " + status_text
                except requests.exceptions.HTTPError:
                    error_text = "HTTP Error: " + status_text
                except requests.exceptions.Timeout:
                    error_text = "The request timed out"
                except requests.exceptions.ConnectionError:
                    error_text = "Connection Error"
                except requests.exceptions.RequestException:
                    error_text = "Unknown error occurred: " + str(status_code)
    
                # If the request was successful
                if status_code == 200:
    
                    # Parse the response as JSON
                    json_object = test_connection_response.json()
    
                    # Print JSON object
                    # print(json.dumps(json_object, indent=4))
                
                    if json_object.get('connectionOK') is True or json_object.get('ok') is True:
                        connection_status = 'OK'
                        error_message = ''
                    else:
                        connection_status = 'Error'
                        error_message = json_object['connectionError']['detailedMessage']
    
                else:
                    connection_status = 'Error'
                    error_message = status_reason + '-' + error_text
    
                # Measure the end time
                end_time = time.time()
    
                # Calculate the duration in seconds
                duration_secs = round((end_time - start_time), 2)
    
                # Append the record
                record_df = pd.DataFrame({'DssURL': dataiku_url, 'ConnectionURL': connection_url, 'Timestamp': timestamp_str, 'ConnectionName': connection, 'ConnectionType': connections[connection]['type'], 'ConnectionStatus': connection_status, 'ErrorMessage': error_message, 'SecondsSpent': f'{duration_secs:.1f}'}, index=[0])
                result_df = pd.concat([result_df, record_df], ignore_index=True)
                
    else:
        print(f'Authentication failed. Error code {return_status_code} {error_text}')
        
    result_df
    

    The output is a data frame similar to this:

Setup Info
    Tags
      Help me…