sync to GCS/BQ via API

Meirkhan
Meirkhan Registered Posts: 20 ✭✭✭✭

Hello,

I have following flow of data which I created within interface: Postgres table -> sync to GCS -> sync to BQ

I would like to do the same thing via python API.

How can I do so? (Haven't found docs on it).

P.S. DSS v7

Thanks,

Meirkhan

Best Answer

  • Meirkhan
    Meirkhan Registered Posts: 20 ✭✭✭✭
    Answer ✓

    I've found the reason.

    Apparently there are differences between new connections and old ones (I don't know when it was created).

    Old connection did not have option "Default path" in dataiku. Therefore inconsistencies.

    Problem solved with creating new connection.

Answers

  • HarizoR
    HarizoR Dataiker, Alpha Tester, Registered Posts: 138 Dataiker
    edited July 17

    Hi,

    Programmatic recipe creation is available via the DSS API. More specifically, you can start from a Dataset handle and use the recipe builders available, here is a simplistic example:

    import dataiku
    client = dataiku.api_client()
    project = client.get_project("PROJECT_KEY")
    
    # Define input dataset
    dataset_a = project.get_dataset("INPUT_DATASET_NAME")
    
    # Create recipe and output dataset
    builder = dataset_a.new_recipe("sync")
    builder.with_new_output("OUTPUT_DATASET_NAME", "OUTPUT_DATASET_CONNECTION_ID")
    sync_recipe = builder.create()
    

    After running this code the sync recipe and its (empty) output dataset should be visible in your Flow. For more information, you can refer to our documentation.

    Best,

    Harizo

  • Meirkhan
    Meirkhan Registered Posts: 20 ✭✭✭✭
    edited July 17

    Hi,

    I think this code is for v9, while I have dataiku v7.

    But anyway I'm having an error while creating sync programmatically to GCS (to Postgres it works fine, visual sync recipe to GCS works fine too)

    import dataiku

    from dataikuapi import SyncRecipeCreator

    client = dataiku.api_client()
    project = client.get_project("project_name")
    builder = SyncRecipeCreator("sync_recipe_name", project=project)

    builder = builder.with_input("input_dataest", "project_name")
    builder = builder.with_new_output("output_dataset", "connection_id")
    builder.build()

    While running this code, I am getting
    DataikuException: com.dataiku.dip.exceptions.CodedException: default path should not be left blank

  • HarizoR
    HarizoR Dataiker, Alpha Tester, Registered Posts: 138 Dataiker

    Hi,

    This code is indeed valid for DSS 9. Regarding your example, you should double-check that the GCS connection you are syncing to is properly configured, in particular that the path to the target bucket is valid.

    Best,

    Harizo

  • Meirkhan
    Meirkhan Registered Posts: 20 ✭✭✭✭

    Well, it is valid connection and path to bucket is valid.

    Sync works fine if to create visual recipe with the same connection.

  • Meirkhan
    Meirkhan Registered Posts: 20 ✭✭✭✭
    edited July 17

    Here is the full error

    ---------------------------------------------------------------------------
    HTTPError                                 Traceback (most recent call last)
    /opt/dataiku-dss-7.0.2/python/dataikuapi/dssclient.py in _perform_http(self, method, path, params, body, stream, files, raw_body)
        903                     stream = stream)
    --> 904             http_res.raise_for_status()
        905             return http_res
    
    /data/DSS/5.1/code-envs/python/MRAK_SANDBOX/lib/python3.6/site-packages/requests/models.py in raise_for_status(self)
        939         if http_error_msg:
    --> 940             raise HTTPError(http_error_msg, response=self)
        941 
    
    HTTPError: 500 Server Error: Server Error for url: http://127.0.0.1:10001/dip/publicapi/projects/PREPRAWDUCTION/recipes/
    
    During handling of the above exception, another exception occurred:
    
    DataikuException                          Traceback (most recent call last)
    <ipython-input-90-3d4478aa39b3> in <module>
    ----> 1 out_recipe = builder.build()
    
    /opt/dataiku-dss-7.0.2/python/dataikuapi/dss/recipe.py in build(self)
        285         """
        286         self._finish_creation_settings()
    --> 287         return self.project.create_recipe(self.recipe_proto, self.creation_settings)
        288 
        289     def _finish_creation_settings(self):
    
    /opt/dataiku-dss-7.0.2/python/dataikuapi/dss/project.py in create_recipe(self, recipe_proto, creation_settings)
        779         definition = {'recipePrototype': recipe_proto, 'creationSettings' : creation_settings}
        780         recipe_name = self.client._perform_json("POST", "/projects/%s/recipes/" % self.project_key,
    --> 781                        body = definition)['name']    782         return DSSRecipe(self.client, self.project_key, recipe_name)
        783 
    
    /opt/dataiku-dss-7.0.2/python/dataikuapi/dssclient.py in _perform_json(self, method, path, params, body, files, raw_body)
        918 
        919     def _perform_json(self, method, path, params=None, body=None,files=None, raw_body=None):
    --> 920         return self._perform_http(method, path,  params=params, body=body, files=files, stream=False, raw_body=raw_body).json()
        921 
        922     def _perform_raw(self, method, path, params=None, body=None,files=None, raw_body=None):
    
    /opt/dataiku-dss-7.0.2/python/dataikuapi/dssclient.py in _perform_http(self, method, path, params, body, stream, files, raw_body)
        909             except ValueError:
        910                 ex = {"message": http_res.text}
    --> 911             raise DataikuException("%s: %s" % (ex.get("errorType", "Unknown error"), ex.get("message", "No message")))
        912 
        913     def _perform_empty(self, method, path, params=None, body=None, files = None, raw_body=None):
    
    DataikuException: com.dataiku.dip.exceptions.CodedException: default path should not be left blank
Setup Info
    Tags
      Help me…