How to run a SQL query in database, from a Python recipe, and append to dataset ?

VFL
VFL Registered Posts: 11 ✭✭✭

Hi there,

Here is what I do :

  • I generate a SQL query programmatically with Python (in a Python recipe),
  • Then I run it with exec_recipe_fragment() in order to run it directly on Snowflake SQL engine,
  • And, while I checked the "append instead of overwrite" box in Input/Output, the result overwrites the table.

Note :

  • It works if I use :
    df = SQLExecutor2.query_to_df(sql) and then
    with dataiku.Dataset("my_dataset").get_writer() as writer:
    writer.write_dataframe(df)
  • However the problem above is that I need to load the df in memory and then send it back to Snowflake, which is impossible because the table is very large.

So my question is : how to run the SQL query directly in DB, from a Python recipe, and then append to the output dataset ?

NB : is this question supposed to be post in Dataiku Support as I may expect the table to be appended in the first place ?

Answers

  • StanG
    StanG Dataiker, Registered Posts: 52 Dataiker

    Hi,
    Indeed, it's not currently possible to append data computed using exec_recipe_fragment().

    If you can't use a SQL recipe, then a solution could be to use partitions and write each new table into a partition.
    Another idea could be to use SQLExecutor2.query_to_iter to not load the complete dataframe at once in memory.



  • VFL
    VFL Registered Posts: 11 ✭✭✭

    OK thanks!

    (I am also considering using a temporary table as an output of the recipe, before stacking)

  • sylvyr3
    sylvyr3 Registered Posts: 21 ✭✭✭✭
    edited July 17

    I've come across a similar situation where I want to create a series of volatile tables and then join them all together in database and save that final table. The way I've accomplished that is to put all my sql statements in a list and add them to the pre_queries argument of query_to_df.

    Below is a framework of how I managed to process the data entirely in database.

    import dataiku
    
    # Import the class that allows us to execute SQL on the Studio connections
    from dataiku.core.sql import SQLExecutor2
    
    executor = SQLExecutor2(connection="SQL_CONNECTION_NAME")
    
    first_sql_statement = """
    
        create volatile table vt1...
    """
    
    second_sql_statement = """
    
        create volatile table vt2...
    """
    
    final_sql_statement = """
        insert into dbname.{tblname}
        select a.* 
        from vt1 as a
        inner join vt2 as b
            on a.keyname = b.keyname
    """
    
    list_of_pre_queries = [first_sql_statement, second_sql_statement]
    
    executor.query_to_df(query = final_sql_statement.format(tblname = '${projectKey}_output_table_name'),
                         pre_queries = list_of_pre_queries)

  • VFL
    VFL Registered Posts: 11 ✭✭✭

    Great idea !

    So, when using "INSERT INTO" with query_to_df(), it actually inserts/appends, and returns an empty df ?

  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 320 Neuron

    Hi @VFL
    ,

    I have needed to do the same thing you describe. I have used two approaches, both of which have already come up on this topic. So while not new maybe it'll be helpful to hear that others have come to the same place.

    One is the one you allude to. That is, writing the output to a SQL table (that is dropped and recreated each time) and then using a subsequent SQL recipe to append the rows from that table to the final table that you want them in. This is my preferred approach because it give me more control when I do the append (I can better handle situations like rows to be appended that already exist in the destination table using SQL).

    The other is the one that @sylvyr3
    describes. You can do the append in one step but it can be a bit trickier to set up (for example, some databases need commit statements after each pre query statement and some don't - see DSS documentation on this) and test. This method can also be super useful in other situations where you need to run arbitrary SQL statements. I haven't tried the approach of putting the insert statement in the final query. I usually do something like a select count(*) as the final query and then check the result in the returned dataframe to make sure everything worked.

    You might also check out the slide deck from the Using the Python API to Interface with SQL Databases presentation on the community site. Slide 21 covers the first approach and slide 25 the second approach.

    Marlan

  • psvnm
    psvnm Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS Core Concepts, Dataiku DSS Adv Designer, Registered Posts: 7 ✭✭✭✭

    You will have to provide the option post_queries=['COMMIT'] in your function query_to_df.

Setup Info
    Tags
      Help me…