How to run a SQL query in database, from a Python recipe, and append to dataset ?

VFL
Level 2
How to run a SQL query in database, from a Python recipe, and append to dataset ?

Hi there,

Here is what I do :

  • I generate a SQL query programmatically with Python (in a Python recipe),
  • Then I run it with exec_recipe_fragment() in order to run it directly on Snowflake SQL engine,
  • And, while I checked the "append instead of overwrite" box in Input/Output, the result overwrites the table.

Note :

  • It works if I use :
    df = SQLExecutor2.query_to_df(sql) and then 
    with dataiku.Dataset("my_dataset").get_writer() as writer:
         writer.write_dataframe(df)
  • However the problem above is that I need to load the df in memory and then send it back to Snowflake, which is impossible because the table is very large.

So my question is : how to run the SQL query directly in DB, from a Python recipe, and then append to the output dataset ? 

NB : is this question supposed to be post in Dataiku Support as I may expect the table to be appended in the first place ?

6 Replies
StanG
Dataiker

Hi,
Indeed, it's not currently possible to append data computed using exec_recipe_fragment().

If you can't use a SQL recipe, then a solution could be to use partitions and write each new table into a partition.
Another idea could be to use SQLExecutor2.query_to_iter to not load the complete dataframe at once in memory.



0 Kudos
VFL
Level 2
Author

OK thanks!

(I am also considering using a temporary table as an output of the recipe, before stacking)

0 Kudos
sylvyr3
Level 3

I've come across a similar situation where I want to create a series of volatile tables and then join them all together in database and save that final table.  The way I've accomplished that is to put all my sql statements in a list and add them to the pre_queries argument of query_to_df.

Below is a framework of how I managed to process the data entirely in database.  

 

import dataiku

# Import the class that allows us to execute SQL on the Studio connections
from dataiku.core.sql import SQLExecutor2

executor = SQLExecutor2(connection="SQL_CONNECTION_NAME")

first_sql_statement = """

    create volatile table vt1...
"""

second_sql_statement = """

    create volatile table vt2...
"""

final_sql_statement = """
    insert into dbname.{tblname}
    select a.* 
    from vt1 as a
    inner join vt2 as b
        on a.keyname = b.keyname
"""

list_of_pre_queries = [first_sql_statement, second_sql_statement]

executor.query_to_df(query = final_sql_statement.format(tblname = '${projectKey}_output_table_name'),
                     pre_queries = list_of_pre_queries)

 

VFL
Level 2
Author

Great idea !

So, when using "INSERT INTO" with query_to_df(), it actually inserts/appends, and returns an empty df ?

0 Kudos
psvnm
Level 2

You will have to provide the option post_queries=['COMMIT'] in your function query_to_df. 

0 Kudos
Marlan

Hi @VFL,

I have needed to do the same thing you describe. I have used two approaches, both of which have already come up on this topic. So while not new maybe it'll be helpful to hear that others have come to the same place.

One is the one you allude to. That is, writing the output to a SQL table (that is dropped and recreated each time) and then using a subsequent SQL recipe to append the rows from that table to the final table that you want them in. This is my preferred approach because it give me more control when I do the append (I can better handle situations like rows to be appended that already exist in the destination table using SQL). 

The other is the one that @sylvyr3 describes. You can do the append in one step but it can be a bit trickier to set up (for example, some databases need commit statements after each pre query statement and some don't - see DSS documentation on this) and test. This method can also be super useful in other situations where you need to run arbitrary SQL statements. I haven't tried the approach of putting the insert statement in the final query. I usually do something like a select count(*) as the final query and then check the result in the returned dataframe to make sure everything worked.

You might also check out the slide deck from the Using the Python API to Interface with SQL Databases presentation on the community site. Slide 21 covers the first approach and slide 25 the second approach. 

Marlan