Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
I have a scenario with a custom python step running the below code. Essentially I'm checking if the tables are accessible (not locked for loading) and I check if the source data is newer than the destination table.
Checking the tables for access is done using the scenario function execute_sql. This is convenient because it prints a step to the scenario run log. Is there a way I can print a python step to the scenario log just the like execute_sql()?
from dataiku.scenario import Scenario
import time
import datetime
from datetime import timedelta
from dataiku.core.sql import SQLExecutor2
from dateutil.relativedelta import relativedelta
TD_executor = SQLExecutor2(connection="TD")
SSO_executor = SQLExecutor2(connection="SSO")
today = datetime.datetime.now().date()
counter = 0
max_tries = 35
sec = 60
wait_time = 10
state_list = ['QQ']
view_list = ['xxx', 'yyy', 'zzz']
scenario = Scenario()
def check_views_access(conn_name, state, view):
scenario.execute_sql(conn_name, "select max(dt) from {state}.{view}".format(state=state, view=view))
def check_eeee(conn_name):
scenario.execute_sql(conn_name,"select top 1 XX from [ddd].[TBL1]")
scenario.execute_sql(conn_name,"select top 1 XX from [ddd].[TBL2]")
def compare_sss_dt_to_iii_dt(TD_exec, SSO_exec):
sss_pd_dt = TD_exec.query_to_df("select max(dt) as dt from view.tbl").iloc[0,0]
sss_pd_dt = datetime.datetime.strptime(sss_pd_dt, "%Y-%m-%dT%H:%M:%S.%fZ")
iii_pd_dt = SSO_exec.query_to_df("select max(dt) as dt from db.schema.tbl").iloc[0,0]
iii_pd_dt = datetime.datetime.strptime(iii_pd_dt, "%Y-%m-%dT%H:%M:%S.%fZ")
if sss_pd_dt == iii_pd_dt:
raise Exception("sss and iii are same")
elif sss_pd_dt > iii_pd_dt:
pass
elif sss_pd_dt < iii_pd_dt:
raise Exception("sss is lower than iii")
while counter < max_tries:
try:
for state in state_list:
for view in view_list:
check_views_access(conn_name = 'TD',
state = state,
view = view)
check_eeee(conn_name = 'TTT')
compare_sss_dt_to_iii_dt(TD_exec=TD_executor, SSO_exec=SSO_executor)
break
except:
time.sleep(sec * wait_time)
counter += 1
if counter >= max_tries:
raise Exception("failed to run")
I want to be able to see in the scenario log that my function (compare_sss_dt_to_iii_dt) raised an exception.
One solution I came up with is to create another scenario in the project and put just one function in there. Then I have my main scenario use scenario.run_scenario("other_scenario") in a custom python step to call that auxiliary scenario so that it shows the outcome of that function as a single step within my main scenario.
Thank you for sharing your solution @sylvyr3!