Trigger a flow when variables are set

Options
MJL
MJL Registered Posts: 5

I'm still quite new to Dataiku.

I have a project where I want the flow to run when two conditions are met. The flow should run when the underlying data warehouse (which is refreshed nightly) has been refreshed (There is a table that reports the time when the refresh is complete) AND when one of the tables in the data warehouse has new records in it.

I've set up two SQL data change scenarios to detect those conditions. I see that it is possible to create a variable in the step of the scenario. What I think I'd like to do is create a third scenario that tests for the existence of those two variables. I'm presuming that the trigger in the third scenario would be a custom (Python) trigger but I don't know what the syntax of the python code would look like i.e. what is the syntax of the statement that would reference the two variables.

Can someone help me?


Operating system used: Windows

Best Answer

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 1,717 Neuron
    edited 4:22PM Answer ✓
    Options

    Hi, thank you for posting a more detailed update, it gives me an opportunity to comment on other things. Note that I used a CTE in the sQL but this could also be solved using UNION and a CASE statement.

    Query 1 is the perfect output for the SQL dataset change trigger as it has a date/time that will change when the ETL is complete. Therefore all you really need to do is to make sure you only return the new ExecutionEndInstant when there is new data in your second table. For that I would assume it's possible to see all the data loaded on DOMENICOROBBINS_OUTPUTS for a specific ETL load. You therefore can do something like this (this is pseudo code):

    SELECT current ExecutionEndInstant
    SELECT previous ExecutionEndInstant
    SELECT count of rows loaded AS part of current ExecutionEndInstant
    Final SELECT does a CASE statement as follows:
    CASE WHEN "count of rows loaded AS part of current ExecutionEndInstant" > 0 THEN current ExecutionEndInstant ELSE previous ExecutionEndInstant

    Anyway there are many ways to solve this in a single SQL statement. So looking at your solution I don't see how it can work. You have defined your two variables as a string. And you set NewDRs to "true" but the if checks it's "True" so it should never execute. Personally I would define the variables as boolean ("RDWRefreshedBoolean": false) and then use asBool(1) to assign True to the variable which if you look at the scenario step logs you will will actually define it as a boolean (--> Evaluated to true (class java.lang.Boolean)).

    There are other things about your solution I don't like:

    1. You are keeping a "state" in your project variables. If any of your scenarios fail this state will be "remembered" which is probably not a good thing. For instance if your third scenario fails and you re-run it you may have started a new ETL DWH load at which point maybe you shouldn't be attempting to refresh your flow. An better alternative could to use Scenario variables. Scenario variables work like Project Variables but are only available at run time so "state" is not preserved after a failure. It is a much more natural way of executing a program since it doesn't depend on any prior state.
    2. Running custom triggers every 10 seconds is inneficient as it consumes CPU cycles as Dataiku has to evaluate the code every time. Given that DSS is a Java backend and the trigger is in Python there is quite a bit of overhead for DSS to get a piece of Python to run. A better solution will be to run the trigger in a constant loop and use the sleep function while you wait for the conditions to be met. We also found a bug in Dataiku that means that each custom trigger execution leaves temp files behind. This can quickly cause your server to run out of inodes, space or became very slow to do anything as the disk operations become slow due to the large number of small temp files left behind. I suggest you check for DATA_DIR/scenarios/project_id/scenario_id/ and see how many files you have in there...
    3. Finally the whole concept of polling for your ETL DWH to be done is really the problem as it is very inneficient even if you can get it to work with a single scenario / single SQL dataset change trigger. It will be much better to integrate your ETL DWH process with Dataiku so you can simple request the Dataiku scenario to be started once the ETL DWH load is finished. Triggering a scenario using the Dataiku Python API is very simple: https://developer.dataiku.com/latest/concepts-and-examples/scenarios.html#variant-2-run-then-poll-while-doing-other-stuff

    Hope that helps.

Answers

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 1,717 Neuron
    Options

    I don't really understand why you think you need three scenarios to solve your use case. Your requirements look quite straight forward to me. Create a single scenario with SQL dataset change trigger. Then build all the logic you need in this SQL statement so that it only "changes" (ie return a new business date) when all your required conditions are met. Then simply run the scenario to refresh your flow. If I am missing something then please state your additional requirements clearly. Thanks

  • MJL
    MJL Registered Posts: 5
    Options

    @Turribeach
    thanks for taking the time to respond.

    Unfortunately, unless you know something about SQL triggers that I don't, I don't think what you propose will not work. The online documentation describing SQL triggers explains that it only evaluates to true, and executes the flow, when the single condition - the result of the current execution of the query being different than the result of the preceding execution, is true. A SQL query CAN NOT evaluate multiple conditions e.g. IF/THEN/ELSE and execute with the the THEN condition is true. I have tested and confirmed that this is true.

    In my case, the result of the evaluation of my two conditions, which must both be true could evaluate to any of four possibilities;

    - both are true (the only time I want the flow to execute)
    - both are false
    - Frist true, second false
    - Second true, first false

    A SQL query would trigger any time the result of the current run of the query was different than the previous run of the query. That's not what I need.

    For the sake of clarity, here are my two SQL queries

    Query 1 (The data warehouse refresh is complete)

    SELECT MAX([ExecutionEndInstant])
    FROM [Log].[EtlWorkflowRun]
    WHERE
    [EtlWorkflowName] = 'wf_REG_DW'
    AND [ExecutionStatus] = 'Succeeded'

    Query 2 (the table of significance contains new rows)

    SELECT MAX(SUBMISSION_SK) from [dbo].[DOMENICOROBBINS_OUTPUTS]

    As separate SQL triggers in separate scenarios (set to run once every 10 minutes), I can use the fact that when the condition is true (i.e. the value from the current run of teh query is different from the previous run), I can run a step that sets a project variable to true. Then I can use a third scenario that runs (what I expect is a simple custom python trigger) that evaluates the variables with an IF/THEN/ELSE statement. After the flow successfully completes, the variables would have to be set to false (in the last step of the flow).

    To reiterate, I think this approach will produce the desired result. I just don't know what the python syntax would be to reference the variables created by the first two scenarios.

    I hope this explanation clarifies what I'm trying to do.

  • konathan
    konathan Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 25 ✭✭
    Options

    Hi,

    I agree with Turribeach that you can combine all the logic in just one scenario. From what I've understood, I believe that what you are trying to achieve can be done through a Trigger on dataset change (ADD TRIGGER section) in the Settings tab of the scenario. You can add there the two datasets that you want to be checked in order for the flow to be refreshed or not. I hope that solved the issue!

    -Konstantina

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 1,717 Neuron
    edited 4:22PM
    Options

    OK apologies in advance to dissect your answer but there are too many things that are not correct, in my view.

    The online documentation describing SQL triggers explains that it only evaluates to true, and executes the flow, when the single condition - the result of the current execution of the query being different than the result of the preceding execution, is true.

    This is not what the documentation says. You are confusing two things here. One is the fact that if any scenario Triggers are true then the scenario executes. This statement is meant to clarify that you can not combine different triggers into a logic expression. However SQL triggers don't evaluate to true, they simply execute and have a value. When the value changes, the trigger fires and the scenario executes. There is no evaluation to true. And crucially SQL queries can evaluate multiple logical conditions which takes us to this statement:

    A SQL query CAN NOT evaluate multiple conditions e.g. IF/THEN/ELSE and execute with the THEN condition is true. I have tested and confirmed that this is true.

    This couldn't be more far from the thruth. First of all and in a more general way you can easily evaluate multiple logic conditions in a SQL using either a WHERE clause or a CASE statement. You don't say what SQL engine you are using but this sample SELECT statement should work on most SQL engines:

    WITH my_data AS
    (
    SELECT 1 AS DUMMY_VAL, True AS DUMMY_BOOL
    UNION ALL
    SELECT 2 AS DUMMY_VAL, False AS DUMMY_BOOL
    UNION ALL
    SELECT 3 AS DUMMY_VAL, True AS DUMMY_BOOL
    )
    SELECT CASE WHEN MIN(DUMMY_BOOL) = False AND SUM(DUMMY_VAL) > 5 THEN False ELSE True END AS CASE_DUMMY FROM my_data

    In the above SQL statement I show how I can conditionally return a value of column based on a IF conditional statement with two logical evaluations based on the value of multiple different SELECT statements.

    So now to your issue. You have provided your SQL queries you need to execute but unfortunatelly the columns names say very llittle about the data types and the data itself which makes harder for me to suggest a clear solution. But in a nutshell what you need to do is to combine your two queries using the WITH statement. Then evaluate each condition of each SQL statement to determine if they have met the condition or not. Finally in the final part of the SQL you check the conditions and return a value which Dataiku evaluates. This value can't be a True/False value since as previously advised Dataiku will execute every time this changes so you don't want it to execute when it changes back to False. So you need to use either the Business Date of your data or Batch ID or some sort of value that can change over time.

    So here is a sample SQL of how you can achive what you want:

    WITH first_condition AS
    (
    SELECT True AS DUMMY_BOOL
    ),
    second_condition AS
    (
    SELECT False AS DUMMY_BOOL
    ),
    previous_ExecutionEndInstant AS
    (
    SELECT 1 AS ID
    ),
    current_ExecutionEndInstant AS
    (
    SELECT 2 AS ID
    )
    SELECT CASE WHEN first_condition.DUMMY_BOOL = True AND second_condition.DUMMY_BOOL = True THEN current_ExecutionEndInstant.ID ELSE previous_ExecutionEndInstant.ID END AS SQL_TRIGGER_LOGIC 
    FROM first_condition, second_condition, previous_ExecutionEndInstant, current_ExecutionEndInstant

    As you can see I got 4 SELECT statements. I evaluate your 2 conditions to True or False. I then fetch the current_ExecutionEndInstant and the previous_ExecutionEndInstant one. Finally I simply check if both conditions are met I return the current_ExecutionEndInstant otherwise I return previous_ExecutionEndInstant. In my sample condition is True the other is False so I return the previous_ExecutionEndInstant. But if you change the second_condition to True you will see the query now returns the current_ExecutionEndInstant (ie 2) as both conditions are met.

    Asuming ExecutionEndInstant works like a batch Id this would work like this:

    1. Your batch is completed so current_ExecutionEndInstant = X and previous_ExecutionEndInstant = X - 1
    2. The SQL trigger query returns X
    3. Your batch starts so current_ExecutionEndInstant becomes X + 1 and previous_ExecutionEndInstant = X.
    4. You SQL trigger query continues to evaluate your two conditions but since the batch is not completed the SQL trigger query returns X so Dataiku does not execute anything
    5. At some point your batch completes.
    6. You SQL trigger query evaluate your two conditions and these are now met so it now returns X + 1 so Dataiku detects the change executes the scenario.
    7. The cycle repeats again ad infinitum

  • MJL
    MJL Registered Posts: 5
    Options

    @Turribeach
    Thank you so much for this excellent response. This is very helpful. I stand corrected!

    While I'm new to Dataiku, I do have experience with SQL. I had not thought of using a CTE.

    I did not intend to imply that I thought there was any limitation to what SQL was capable of. I was speaking to what I perceived as the limitation of the SQL data change trigger ultimately needing to produce a single result that gets compared with the subsequent result for difference.

    The data being returned from the two queries in my previous post is as follows.

    Query 1

    This returns the most recent date/time representing when the data warehouse ETL refresh is complete. We can't start the Dataiku flow while the data warehouse is still being refreshed. The data warehouse refreshed once daily but the completion time can vary by hours (sadly). We want to trigger the flow after the warehouse has been refreshed for the day.

    Query 2

    This returns an integer that is the PK of the latest record added to a particular table. We only need to run the flow if there are new records in this table.

    ------

    In both cases, the trigger fires when the value from the previous run of the SQL data change trigger is different from the current run. My problem was in not understanding how I could, with a single query, achieve what I need - only run the flow when the data warehouse refresh has completed AND there are new records in the table that are important for the flow.

    It will take me some time to parse your example to make it work for me. It is reassuring to learn that this can be built into one scenario/trigger.

    --------
    Based on what was clearly a flawed understanding of what could be done with a SQL data change trigger, and so looking for an alternative solution, I spent the day pursuing the direction I was headed - two scenarios that set variables and a third, with a custom trigger, that fires when both variables are set (to True). The first two scenarios were very simple. After some googling and Chat GPTing for the right syntax of for the custom python trigger, I've got a working solution! Here's what it looks like.

    Two global variables

    image.png

    A scenario with a SQL query change trigger that tests to ensure that the data warehouse refresh has completed

    image.png


    This scenario runs a step that sets the variable.

    image.png

    The 2nd scenario is similar
    image.png

    image.png

    The third scenario evaluates the two variables and executes when both are true.

    image.png

    There are multiple steps for this scenario that run the flow and clean up intermediate tables to keep the database neat 'n tidy. The final step resets the variables.

    image.png

    Again, thank you so much for helping me solve, what for me, felt like a problem without a solution.

Setup Info
    Tags
      Help me…