python api - Select all upstream and downstream datasets
Hello all,
The feature of getting all upstream datasets and all downstream datasets from flow is amazing. I would like to do the same by using python api.
Could you please support in this ? if not possible, any alternative ?
Kind regards
Best Answer
-
Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
Hi @Bader
,Here’s an example of how you could traverse the flow to get upstream + downstream datasets with the Python API.
I am using flow graph nodes to traverse the flow in this example, but you may also want to take a look at the flow API functions: Flow creation and management.
So that you can see my flow in this example, here’s a screenshot of my project, using the Dataiku TShirts "sample" project:
I created the function get_nodes in this example that can either pull all “predecessors” of a given element in your flow (i.e. upstream) or all “successors” of a given element in your flow (i.e. downstream). Note that in this example I am limiting to pull only dataset elements as it sounds like that's your goal, but that condition could easily be removed or modified.
import dataiku from dataiku import pandasutils as pdu client = dataiku.api_client() # enter in your project ID project = client.get_project('DKU_TSHIRTS') flow = project.get_flow() # gets the graph for a given project flow graph = flow.get_graph() """ direction should be either 'successors' for downstream or 'predecessors' for upstream """ def get_nodes(node, node_array, direction='successors'): nodeval = graph.nodes[node] if nodeval: if nodeval['type'] == 'COMPUTABLE_DATASET': node_array.append(nodeval['ref']) for connected_node in nodeval[direction]: get_nodes(connected_node, node_array, direction) return node_array
Here’s an example of how I would call the function for the above flow. Note that revenue_prediction is my "rightmost" dataset, so when I get all "predecessors" on it, I get the five datasets that are upstream of it in the flow. web_last_month is my "leftmost" dataset, so when I use the "successors" parameter, I get the two datasets that are downstream of it in the flow.
Note that if I call the "successors" version of the call on the dataset revenue_prediction, only ['revenue_prediction'] would get returned, as it is the downstream-most dataset.
Thanks,
Sarina
Answers
-
Excellent !!! Many thanks
-
is it possible to get all downstream datasets with taking into consideration if dataset is shared across projects?
-
Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
Hi @Bader
,The exact details will depend a little on your use case, but you could do something like cross reference the "exposed objects" for a project with the "downstream datasets" to pull all downstream datasets that are also exposed.
For an example, let's assume I have the following downstream datasets:
I also have the following exposed datasets for this project. Note that "web_new_customers" and "revenue_prediction" are both exposed:
If you wanted the "intersection" of these two datasets (downstream datasets that are also exposed), you could add some code to get the exposed datasets, like this:
exposed_objects = project.get_settings().get_raw()['exposedObjects']['objects'] exposed_datasets = [] for obj in exposed_objects: if obj['type'] == 'DATASET': exposed_datasets.append(obj['localName']) exposed_downstream = set(exposed_datasets) & set(downstream)
That will give the final set of the two exposed downstream datasets, "revenue_prediction" and "web_new_customers":
Hope this helps,
Sarina
-
Hi Sarina,
Thanks for help and support I really appreciated.
let me clarify the question more.
Lets assume we have Three projects
1- Project 1 has the following dataset ( A -> B -> C ->D )
2- project 2 has the following dataset (project1.B -> E -> F -> G)
3- project 3 has the following dataset ( project2.E ->H -> I -> J)
when I call get_node(A, downstram, 'successors'), I'm expecting the following results:
[A,B,C,D,E,F,G,E,H,I,J]
Because E is downstream of B and B is downstream of A
Hope is clear
-
Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
Hi @Bader
,Thank you for clarifying! I think for your use case you can still utilized the exposed project setting, in combination with going down the flow across projects.
The idea here would be, for each of these datasets, you can do a check to see if it is exposed to an another project. If it is, you can then fetch all downstream datasets in that project (or set of projects). If it's not, then you won't need to search any further.
So taking that approach, let's create a function that will tell us if the dataset has been shared with another project, and returns the list of projects if so:
def is_exposed(project, dataset): projects_to_check = [] exposed_objects = project.get_settings().get_raw()['exposedObjects']['objects'] exposed_datasets = [] for obj in exposed_objects: if obj['type'] == 'DATASET': if obj['localName'] == dataset.name: for project in obj['rules']: projects_to_check.append(project['targetProject']) return projects_to_check
Then you can update your get_nodes() function to go into any "downstream projects" and get the downstream datasets from there. Please do note that this gets complicated pretty quickly and probably will be difficult to troubleshoot. You could give something like this a shot though.
def get_nodes(project, node, node_array, direction='successors'): flow = project.get_flow() # gets the graph for a given project flow graph = flow.get_graph() if node in graph.nodes: nodeval = graph.nodes[node] if nodeval['type'] == 'COMPUTABLE_DATASET': node_array.append(nodeval['ref']) # check if the dataset is in any other projects projects = is_exposed(project, nodeval['ref']) # if this dataset is in any other projects, get each project and call get_nodes() on that project and the current node if len(projects) > 0: for p in projects: project_object = client.get_project(p) # gets the graph for a given project flow get_nodes(project_object, nodeval['fullId'], node_array, direction) for connected_node in nodeval[direction]: get_nodes(project, connected_node, node_array, direction) return node_array
Hope that helps,
Sarina
-
Perfect !! its solved my problem. many thanks