Keyword for creating sql spark recipe via Python API

nmadhu20
nmadhu20 Neuron, Registered, Neuron 2022, Neuron 2023 Posts: 35 Neuron

Hi Team,

Could you please help me clear below doubts?

The use-case here is that the output of window recipe(s3) would be input to sql-spark recipe whose query we will dynamically create in python, add and run it from python. The output will also be s3 output.

builder = project.new_recipe(??) #what would be the keyword for sql_spark?
builder.with_input(output_df)
builder.with_new_output("name_of_output_dataset", "s3_connection_name")
recipe = builder.create()

recipe_settings.get_json_payload() #for adding query in the recipe which keyword is used?

image.png

ThankYou in advance.

Best Answer

  • AlexandreL
    AlexandreL Dataiker, Registered Posts: 36 Dataiker
    edited July 17 Answer ✓

    Hello,

    Can you explain a bit more why you need to create a sparksql recipe programmatically ? Having to create such recipes is not easy so there might be another simpler way to solve your issue. Anyway, you can do it but not from the new_recipe method. You'll need to use the create_recipe method. Which is harder to use. You can use the following code. If you need to find which values to use for specific parameters, such as a dataset's storage format for example, feel free to create such object via the UI, and then copy its settings.

    %pylab inline
    
    import dataiku
    from dataiku import pandasutils as pdu
    import pandas as pd
    
    #Instanciate client
    client = dataiku.api_client()
    
    project = client.get_project("PROJECT_KEY")
    output_dataset_name = "output_dataset_name"
    input_dataset_name = "input_dataset_name"
    output_dataset_connection = 'connection_name'
    recipe_name = "compute_" + output_dataset_name
    
    #First, output dataset needs to be created
    project.create_dataset(output_dataset_name, formatType="csv", params={'connection': output_dataset_connection,
      'filesSelectionRules': {'excludeRules': [],
       'explicitFiles': [],
       'includeRules': [],
       'mode': 'ALL'}}, type="HDFS")
    
    #Then, we can create the recipe
    recipe_proto = {}
    recipe_proto["type"] = "spark_sql_query"
    recipe_proto["name"] = recipe_name
    recipe_proto["inputs"] = {'main': {'items': [{'appendMode': False,
         'ref': input_dataset_name}]}}
    recipe_proto["outputs"] = {'main': {'items': [{'appendMode': False,
         'ref': output_dataset_name}]}}
    
    creation_settings = {
        "useGlobalMetastore": False,
        "useGlobalMetastore": True,
        "forcePipelineableForTests": False,  
    }
    
    spsql_recipe = project.create_recipe(recipe_proto, creation_settings)
    
    # Finally, modify the recipe's code
    recipe_def = spsql_recipe.get_definition_and_payload()
    recipe_def.set_payload("YOUR SQL CODE HERE")
    spsql_recipe.set_definition_and_payload(recipe_def)

Answers

Setup Info
    Tags
      Help me…