Testing SQL Connections
We want to test database connections in all our instances since our internal security policy is to change database passwords yearly and this invariably leads to some user connections being missed and this invariably leads to some user database connections being missed, the password expiring, flows failing, data not being updated, etc. So we are trying to be more proactive and try to test connections programatically. I looked at the REST API and there isn't any available public APIs at the moment and I have raised the Product Idea here so please vote for it if you like it.
As DSS has this feature built in in the GUI I did some digging and noticed the GUI calls the /dip/api/admin/connections/test-sql internal/private API when I click on the Test button of a SQL connection. I am now trying to see if I can get this internal/private API working. I obviously understand internal/private API are unsupported and could change/break/dissapear at any point but I see no harm in trying.
I am stuck as I keep getting "500 - Server Error". Looking at the backend log I can see what the issue is:
[2023/05/31-14:13:24.295] [qtp1900366749-124139] [ERROR] [dip.controllers] - API call '/dip/api/admin/connections/test-sql' failedorg.springframework.web.bind.MissingServletRequestParameterException: Required String parameter 'data' is not present
This is happening since the API expects all the connection details to be sent into a data URL encoded parameter but I have not been able to replicate this. My code is below, anyone knows how to get this working? Thanks!
import dataikuapi import requests import json requests.packages.urllib3.disable_warnings() api_key = '' base_url = 'https://dss_url/' test_connection_url = base_url + 'dip/api/admin/connections/test-sql' headers = {'Accept': 'application/json, text/plain, */*', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8', 'Authorization': 'Bearer {}'.format(api_key)} client = dataikuapi.DSSClient(base_url, api_key) client._session.verify = False connections = client.list_connections() for connection in connections: if connection == 'pick_one_sql_connection': try: payload_str = urllib.parse.urlencode(connections[connection], safe=':+') response = requests.post(test_connection_url, headers=headers, data=payload_str, verify=False, timeout=(1, 3)) status_code = response.status_code status_reason = response.reason status_text = str(status_code) + ' - ' + str(status_reason) # Raise an exception if the response status code is not successful response.raise_for_status() except requests.exceptions.BaseHTTPError: error_text = "Base HTTP Error: " + status_text except requests.exceptions.HTTPError: error_text = "HTTP Error: " + status_text except requests.exceptions.Timeout: error_text = "The request timed out" except requests.exceptions.ConnectionError: error_text = "Connection Error" except requests.exceptions.RequestException: error_text = "Unknown error occurred: " + str(status_code) # If the request was successful if status_code == 200: # Parse the response as JSON json_object = response.json() # Print JSON object print(json.dumps(json_object, indent=4)) else: print(status_reason) print(error_text)
Answers
-
Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,160 Neuron
I managed to get this working so if anyone is interested let me know and I will post the solution.
-
@Turribeach Could you please me the working process for testing any dataiku connection. Did you use any dataiku api provided solution ?
-
Grixis PartnerApplicant, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 82 ✭✭✭✭✭
Hello @Turribeach do you stille manage your connection test in this way ?
In my side, I'm used to create a fake api service to use the test queries feature on each sql connection I need to monitor their availability.
-
Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,160 Neuron
Here is the code. Has been tested in v10/v11/v12/13 so should work on most versions. You need both an Admin API key and an Admin user. This is because there is no public API to test connections so I use the private API Dataiku uses.
import dataikuapi
import requests
import json, datetime, time, zoneinfo
import urllib.parse # Dataiku URL and Authentication details dataiku_url = 'http://dss_host:dss_port/' # Admin level key needed dataiku_api_key = 'xxxxxxxxxx' # Admin level user required dataiku_username = 'xxxxxx' dataiku_password = 'xxxxxx' # Create a Dataiku client dataiku_client = dataikuapi.DSSClient(dataiku_url, dataiku_api_key) #dataiku_client._session.verify = False instance_vars = dataiku_client.get_global_variables() # Logins to Dataiku to obtain authentication cookies def dss_get_auth_cookies(dataiku_url, dataiku_username, dataiku_password): login_endpoint = dataiku_url + 'dip/api/login' config_endpoint = dataiku_url + 'dip/api/get-configuration' auth_cookies = {} payload = {'login': dataiku_username, 'password': dataiku_password} try: # Call Login endpoint to authenticate auth_response = requests.post(login_endpoint, data=payload, verify=False) status_code = auth_response.status_code status_reason = auth_response.reason status_text = str(status_code) + ' - ' + str(status_reason) error_text = '' # Raise an exception if the response status code is not successful auth_response.raise_for_status() except requests.exceptions.BaseHTTPError: error_text = "Base HTTP Error" except requests.exceptions.HTTPError: error_text = "HTTP Error" except requests.exceptions.Timeout: error_text = "The request timed out" except requests.exceptions.ConnectionError: error_text = "Connection Error" except requests.exceptions.RequestException: error_text = "Unknown error occurred" # If the request was successful if status_code == 200: # Fetch cookies auth_cookies = auth_response.cookies.get_dict() # Make sure both auth cookies were returned if len(auth_cookies) == 2: try: # Call the config endpoint to get XSRF token config_response = requests.get(config_endpoint, cookies=auth_cookies, verify=False) status_code = auth_response.status_code status_reason = auth_response.reason status_text = str(status_code) + ' - ' + str(status_reason) # Raise an exception if the response status code is not successful auth_response.raise_for_status() except requests.exceptions.BaseHTTPError: error_text = "Base HTTP Error: " + status_text except requests.exceptions.HTTPError: error_text = "HTTP Error: " + status_text except requests.exceptions.Timeout: error_text = "The request timed out" except requests.exceptions.ConnectionError: error_text = "Connection Error" except requests.exceptions.RequestException: error_text = "Unknown error occurred: " + str(status_code) # If the request was successful if status_code == 200: auth_cookies.update(dict(config_response.cookies.items())) # Make sure both auth cookies and XSRF token are present if len(auth_cookies) == 3: return status_code, auth_cookies, '' else: error_text = "Number of cookies does not match 3: " + str(len(auth_cookies)) return status_code, auth_cookies, error_text else: error_text = "Number of does not match 2: " + str(len(auth_cookies)) return status_code, auth_cookies, error_text else: print() return status_code, '', error_text + ' - ' + status_textThe rest of the code queries a DSS server, will fetch all valid connections and try to test them:
result_df = pd.DataFrame() # Fetch all connections client = dataikuapi.DSSClient(dataiku_url, dataiku_api_key) client._session.verify = False connections = client.list_connections() # Call the authentication function to get Dataiku auth cookies return_status_code, auth_cookies, error_text = dss_get_auth_cookies(dataiku_url, dataiku_username, dataiku_password) # Make sure authentication worked if return_status_code == 200 and error_text == '': # Extract XSRF-TOKEN from cookies xsrf_token = list(auth_cookies.values())[2] # Loop through all connections and test them for connection in connections: connection_type = connections[connection]['type'] # Exclude non-SQL connections if connection_type not in ['Filesystem', 'HDFS', 'GCS', 'EC2', 'SQ5', 'Azure']: test_connection_endpoint = dataiku_url + 'dip/api/admin/connections/test-sql' # Measure the start time start_time = time.time() connection_url = dataiku_url + 'admin/connections/' + connection + '/' timestamp_str = datetime.datetime.now(tz=zoneinfo.ZoneInfo('Europe/London')).strftime("%d-%b-%Y %H:%M:%S") try: # Insert XSRF token into the request headers request_headers = {'Content-Type': 'application/x-www-form-urlencoded', 'X-XSRF-TOKEN': xsrf_token} json_data = connections[connection] json_dump = json.dumps(json_data) # URL Encode connection parameters encoded_string = urllib.parse.quote(json_dump.replace(" ", "")) payload_string = f'data={encoded_string}' # Test Connections test_connection_response = requests.post(test_connection_endpoint, headers=request_headers, cookies=auth_cookies, data=payload_string, verify=False, timeout=(2, 30)) status_code = test_connection_response.status_code status_reason = test_connection_response.reason status_text = str(status_code) + ' - ' + str(status_reason) # Raise an exception if the response status code is not successful test_connection_response.raise_for_status() except requests.exceptions.BaseHTTPError: error_text = "Base HTTP Error: " + status_text except requests.exceptions.HTTPError: error_text = "HTTP Error: " + status_text except requests.exceptions.Timeout: error_text = "The request timed out" except requests.exceptions.ConnectionError: error_text = "Connection Error" except requests.exceptions.RequestException: error_text = "Unknown error occurred: " + str(status_code) # If the request was successful if status_code == 200: # Parse the response as JSON json_object = test_connection_response.json() # Print JSON object # print(json.dumps(json_object, indent=4)) if json_object.get('connectionOK') is True or json_object.get('ok') is True: connection_status = 'OK' error_message = '' else: connection_status = 'Error' error_message = json_object['connectionError']['detailedMessage'] else: connection_status = 'Error' error_message = status_reason + '-' + error_text # Measure the end time end_time = time.time() # Calculate the duration in seconds duration_secs = round((end_time - start_time), 2) # Append the record record_df = pd.DataFrame({'DssURL': dataiku_url, 'ConnectionURL': connection_url, 'Timestamp': timestamp_str, 'ConnectionName': connection, 'ConnectionType': connections[connection]['type'], 'ConnectionStatus': connection_status, 'ErrorMessage': error_message, 'SecondsSpent': f'{duration_secs:.1f}'}, index=[0]) result_df = pd.concat([result_df, record_df], ignore_index=True) else: print(f'Authentication failed. Error code {return_status_code} {error_text}') result_df
The output is a data frame similar to this: