Dynamic SQL creation using SQLExecutor2 fails

info-rchitect
info-rchitect Registered Posts: 186 ✭✭✭✭✭✭
edited July 16 in Using Dataiku

Hi,

I am trying to dynamically create SQL that creates N tables in my database and then finally creates a view that concatenates them all together.

I can't show all of the SQL as it is proprietary but I can show the general concept:

create or replace table in mydatabase.myschema.mytable1 as

<proprietary sql>

;

create or replace table in mydatabase.myschema.mytable2 as

<proprietary sql>

;

create or replace view in mydatabase.myschema.myview as

select * from mydatabase.myschema.mytable1

union all

select * from mydatabase.myschema.mytable2

;

select * from mydatabase.myschema.myview;

Here is how I execute the SQL:

SQLExecutor2.exec_recipe_fragment(my_output_dataset, final_sql)

It results in this error:

Job failed: Error in Python process: At line 78: <class 'NameError'>: name 'my_output_dataset' is not defined


Operating system used: Windows 10

Tagged:

Answers

  • Jean-Yves
    Jean-Yves Dataiker Posts: 14 Dataiker
    edited July 17

    Hi,

    Thank you for your question! So one limitation with exec_recipe_fragment() is that you can only execute one main query. That said, you can always execute as many pre-queries as you want by passing a list of queries to the pre_queries argument (see example below)

    Another limitation is that you will need to specify an input dataset for that recipe, even if you don't end up using the data from that dataset.

    Example:

    from dataiku.core.sql import SQLExecutor2
    
    input_dataset = dataiku.Dataset("my_dataset")
    executor = SQLExecutor2(dataset=input_dataset)
    
    create_table_1 = """create or replace table in mydatabase.myschema.mytable1 as 
        <proprietary sql>;"""
    
    create_table_2 = """create or replace table in mydatabase.myschema.mytable2 as 
        <proprietary sql>;"""
    
    
    create_table_3 = """create or replace view in mydatabase.myschema.myview as
        select * from mydatabase.myschema.mytable1
        union all
        select * from mydatabase.myschema.mytable2;"""
    
    final_sql = """
    SELECT * FROM mydatabase.myschema.myview;"""
    
    my_output_dataset = dataiku.Dataset("my_output_dataset")
    executor.exec_recipe_fragment(
        my_output_dataset,
        final_sql,
        pre_queries=[create_table_1, create_table_2, create_table_3])
    
    


    The error you saw arose because the my_output_dataset variable was not defined. It needs to be a dataiku.Dataset object.

    Hope this helps!




  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 320 Neuron

    Hi @info-rchitect
    ,

    I know @Jean-Yves
    already provided a solution for you.

    Just wanted to also mention that I typically use the query_to_df method to execute database operations as it is more flexible (e.g., no need to specify an input dataset). I believe that the main query is optional as well but in many cases I specify a main query that returns some information about the operations I just executed (e.g., a row count or something like that).

    Also, you may need to include commits as noted here: https://doc.dataiku.com/dss/latest/python-api/sql.html#queries-with-side-effects

    Marlan

  • info-rchitect
    info-rchitect Registered Posts: 186 ✭✭✭✭✭✭
    edited July 17

    Thanks for clarifying the single SQL statement and showing that I had to create a `Dataset` instance. So if I converted my SQL from using temporary tables to using CTEs would it work?

    Back to the current solution path...I do have an input dataset defined which passes in the list of column names I use in my SQL aggregations. This is based on loading a CSV file (more on that in a second).

    yield_agg_by_fields = dataiku.Dataset("yield_agg_by_fields")
    yield_agg_by_fields_df = yield_agg_by_fields.get_dataframe()

    So I stitched up your solution and get this error:

    Query failed: b'Input dataset yield_agg_by_fields is not a SQL table

    Why does dataiku care if how my input dataset is created if dataiku is converting it to a pandas dataframe?

    Anyway, I switched to using a SQL dataset for my input query and then got this message:

    <class 'Exception'>: Query failed: b'Actual statement count 3 did not match the desired statement count 1.'

    I realized I was setting the role and the warehouse as separate SQL statements with the SQL passed so I removed those and then got an error that the temporary table I created does not exist. This is likely due to how dataiku executes the SQL in different sessions. So I converted all of my table creation to permanent tables and it worked!

    thanks much





  • Jean-Yves
    Jean-Yves Dataiker Posts: 14 Dataiker
    edited July 17

    Hi,
    So DSS throws an error about the input dataset not being a SQL table because when executing the exec_recipe_fragment, DSS runs checks on the input dataset and require that the input dataset be a SQL table. This is true even if you don't use the dataset in your script. If that's an issue, you can use the query_to_df() method as @Marlan
    rightly pointed out - you won't need to specify an input dataset. On the downside, you will need to handle the drop/creation of the output table.

    Regarding CTE, yes you can use them in any query. Here is an example of CTE usage in the query_to_df() function (I'm using PostgreSQL). Note that you don't need to have an output dataiku.Dataset() instance in your code

    from dataiku.core.sql import SQLExecutor2
    executor = SQLExecutor2(connection="my_connection_name")
    
    pre_query = """ DROP TABLE IF EXISTS "my_output_table"; """
    main_query = """
    CREATE TABLE "my_output_table" AS
    WITH 
    "CTE1" as (<proprietary sql>),
    "CTE2" as (<proprietary sql>),
    "CTE3" as (SELECT * FROM "CTE1" UNION ALL SELECT * FROM "CTE2")
    SELECT * FROM CTE3;
    """
    
    executor.query_to_df(
        main_query,
        pre_queries=[pre_query],
        post_queries=['COMMIT'])
    
    


    Hope this helps!

Setup Info
    Tags
      Help me…