sync to GCS/BQ via API
Hello,
I have following flow of data which I created within interface: Postgres table -> sync to GCS -> sync to BQ
I would like to do the same thing via python API.
How can I do so? (Haven't found docs on it).
P.S. DSS v7
Thanks,
Meirkhan
Best Answer
-
I've found the reason.
Apparently there are differences between new connections and old ones (I don't know when it was created).
Old connection did not have option "Default path" in dataiku. Therefore inconsistencies.
Problem solved with creating new connection.
Answers
-
Hi,
Programmatic recipe creation is available via the DSS API. More specifically, you can start from a Dataset handle and use the recipe builders available, here is a simplistic example:
import dataiku client = dataiku.api_client() project = client.get_project("PROJECT_KEY") # Define input dataset dataset_a = project.get_dataset("INPUT_DATASET_NAME") # Create recipe and output dataset builder = dataset_a.new_recipe("sync") builder.with_new_output("OUTPUT_DATASET_NAME", "OUTPUT_DATASET_CONNECTION_ID") sync_recipe = builder.create()
After running this code the sync recipe and its (empty) output dataset should be visible in your Flow. For more information, you can refer to our documentation.
Best,
Harizo
-
Hi,
I think this code is for v9, while I have dataiku v7.
But anyway I'm having an error while creating sync programmatically to GCS (to Postgres it works fine, visual sync recipe to GCS works fine too)
import dataiku
from dataikuapi import SyncRecipeCreator
client = dataiku.api_client()
project = client.get_project("project_name")
builder = SyncRecipeCreator("sync_recipe_name", project=project)builder = builder.with_input("input_dataest", "project_name")
While running this code, I am getting
builder = builder.with_new_output("output_dataset", "connection_id")
builder.build()DataikuException: com.dataiku.dip.exceptions.CodedException: default path should not be left blank
-
Hi,
This code is indeed valid for DSS 9. Regarding your example, you should double-check that the GCS connection you are syncing to is properly configured, in particular that the path to the target bucket is valid.
Best,
Harizo
-
Well, it is valid connection and path to bucket is valid.
Sync works fine if to create visual recipe with the same connection.
-
Here is the full error
--------------------------------------------------------------------------- HTTPError Traceback (most recent call last) /opt/dataiku-dss-7.0.2/python/dataikuapi/dssclient.py in _perform_http(self, method, path, params, body, stream, files, raw_body) 903 stream = stream) --> 904 http_res.raise_for_status() 905 return http_res /data/DSS/5.1/code-envs/python/MRAK_SANDBOX/lib/python3.6/site-packages/requests/models.py in raise_for_status(self) 939 if http_error_msg: --> 940 raise HTTPError(http_error_msg, response=self) 941 HTTPError: 500 Server Error: Server Error for url: http://127.0.0.1:10001/dip/publicapi/projects/PREPRAWDUCTION/recipes/ During handling of the above exception, another exception occurred: DataikuException Traceback (most recent call last) <ipython-input-90-3d4478aa39b3> in <module> ----> 1 out_recipe = builder.build() /opt/dataiku-dss-7.0.2/python/dataikuapi/dss/recipe.py in build(self) 285 """ 286 self._finish_creation_settings() --> 287 return self.project.create_recipe(self.recipe_proto, self.creation_settings) 288 289 def _finish_creation_settings(self): /opt/dataiku-dss-7.0.2/python/dataikuapi/dss/project.py in create_recipe(self, recipe_proto, creation_settings) 779 definition = {'recipePrototype': recipe_proto, 'creationSettings' : creation_settings} 780 recipe_name = self.client._perform_json("POST", "/projects/%s/recipes/" % self.project_key, --> 781 body = definition)['name'] 782 return DSSRecipe(self.client, self.project_key, recipe_name) 783 /opt/dataiku-dss-7.0.2/python/dataikuapi/dssclient.py in _perform_json(self, method, path, params, body, files, raw_body) 918 919 def _perform_json(self, method, path, params=None, body=None,files=None, raw_body=None): --> 920 return self._perform_http(method, path, params=params, body=body, files=files, stream=False, raw_body=raw_body).json() 921 922 def _perform_raw(self, method, path, params=None, body=None,files=None, raw_body=None): /opt/dataiku-dss-7.0.2/python/dataikuapi/dssclient.py in _perform_http(self, method, path, params, body, stream, files, raw_body) 909 except ValueError: 910 ex = {"message": http_res.text} --> 911 raise DataikuException("%s: %s" % (ex.get("errorType", "Unknown error"), ex.get("message", "No message"))) 912 913 def _perform_empty(self, method, path, params=None, body=None, files = None, raw_body=None): DataikuException: com.dataiku.dip.exceptions.CodedException: default path should not be left blank