Dynamic SQL creation using SQLExecutor2 fails
Hi,
I am trying to dynamically create SQL that creates N tables in my database and then finally creates a view that concatenates them all together.
I can't show all of the SQL as it is proprietary but I can show the general concept:
create or replace table in mydatabase.myschema.mytable1 as
<proprietary sql>
;
create or replace table in mydatabase.myschema.mytable2 as
<proprietary sql>
;
create or replace view in mydatabase.myschema.myview as
select * from mydatabase.myschema.mytable1
union all
select * from mydatabase.myschema.mytable2
;
select * from mydatabase.myschema.myview;
Here is how I execute the SQL:
SQLExecutor2.exec_recipe_fragment(my_output_dataset, final_sql)
It results in this error:
Job failed: Error in Python process: At line 78: <class 'NameError'>: name 'my_output_dataset' is not defined
Operating system used: Windows 10
Answers
-
Hi,
Thank you for your question! So one limitation with exec_recipe_fragment() is that you can only execute one main query. That said, you can always execute as many pre-queries as you want by passing a list of queries to the pre_queries argument (see example below)
Another limitation is that you will need to specify an input dataset for that recipe, even if you don't end up using the data from that dataset.
Example:from dataiku.core.sql import SQLExecutor2 input_dataset = dataiku.Dataset("my_dataset") executor = SQLExecutor2(dataset=input_dataset) create_table_1 = """create or replace table in mydatabase.myschema.mytable1 as <proprietary sql>;""" create_table_2 = """create or replace table in mydatabase.myschema.mytable2 as <proprietary sql>;""" create_table_3 = """create or replace view in mydatabase.myschema.myview as select * from mydatabase.myschema.mytable1 union all select * from mydatabase.myschema.mytable2;""" final_sql = """ SELECT * FROM mydatabase.myschema.myview;""" my_output_dataset = dataiku.Dataset("my_output_dataset") executor.exec_recipe_fragment( my_output_dataset, final_sql, pre_queries=[create_table_1, create_table_2, create_table_3])
The error you saw arose because the my_output_dataset variable was not defined. It needs to be a dataiku.Dataset object.
Hope this helps! -
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 320 Neuron
Hi @info-rchitect
,I know @Jean-Yves
already provided a solution for you.Just wanted to also mention that I typically use the query_to_df method to execute database operations as it is more flexible (e.g., no need to specify an input dataset). I believe that the main query is optional as well but in many cases I specify a main query that returns some information about the operations I just executed (e.g., a row count or something like that).
Also, you may need to include commits as noted here: https://doc.dataiku.com/dss/latest/python-api/sql.html#queries-with-side-effects
Marlan
-
Thanks for clarifying the single SQL statement and showing that I had to create a `Dataset` instance. So if I converted my SQL from using temporary tables to using CTEs would it work?
Back to the current solution path...I do have an input dataset defined which passes in the list of column names I use in my SQL aggregations. This is based on loading a CSV file (more on that in a second).yield_agg_by_fields = dataiku.Dataset("yield_agg_by_fields") yield_agg_by_fields_df = yield_agg_by_fields.get_dataframe()
So I stitched up your solution and get this error:
Query failed: b'Input dataset yield_agg_by_fields is not a SQL table
Why does dataiku care if how my input dataset is created if dataiku is converting it to a pandas dataframe?
Anyway, I switched to using a SQL dataset for my input query and then got this message:<class 'Exception'>: Query failed: b'Actual statement count 3 did not match the desired statement count 1.'
I realized I was setting the role and the warehouse as separate SQL statements with the SQL passed so I removed those and then got an error that the temporary table I created does not exist. This is likely due to how dataiku executes the SQL in different sessions. So I converted all of my table creation to permanent tables and it worked!
thanks much
-
Hi,
So DSS throws an error about the input dataset not being a SQL table because when executing the exec_recipe_fragment, DSS runs checks on the input dataset and require that the input dataset be a SQL table. This is true even if you don't use the dataset in your script. If that's an issue, you can use the query_to_df() method as @Marlan
rightly pointed out - you won't need to specify an input dataset. On the downside, you will need to handle the drop/creation of the output table.
Regarding CTE, yes you can use them in any query. Here is an example of CTE usage in the query_to_df() function (I'm using PostgreSQL). Note that you don't need to have an output dataiku.Dataset() instance in your codefrom dataiku.core.sql import SQLExecutor2 executor = SQLExecutor2(connection="my_connection_name") pre_query = """ DROP TABLE IF EXISTS "my_output_table"; """ main_query = """ CREATE TABLE "my_output_table" AS WITH "CTE1" as (<proprietary sql>), "CTE2" as (<proprietary sql>), "CTE3" as (SELECT * FROM "CTE1" UNION ALL SELECT * FROM "CTE2") SELECT * FROM CTE3; """ executor.query_to_df( main_query, pre_queries=[pre_query], post_queries=['COMMIT'])
Hope this helps!