python api - Select all upstream and downstream datasets

Solved!
Bader
Level 3
python api - Select all upstream and downstream datasets

Hello all,  

The feature of getting all upstream datasets and all downstream datasets from flow is amazing. I would like to do the same by using python api.

Could you please support in this ? if not possible, any alternative ? 

Kind regards

1 Solution
SarinaS
Dataiker

Hi @Bader ,

Hereโ€™s an example of how you could traverse the flow to get upstream + downstream datasets with the Python API.

I am using flow graph nodes to traverse the flow in this example, but you may also want to take a look at the flow API functions: Flow creation and management.  

So that you can see my flow in this example, hereโ€™s a screenshot of my project, using the Dataiku TShirts "sample" project:โ€ƒ

Screen Shot 2021-01-28 at 5.18.55 PM.png

I created the function get_nodes in this example that can either pull all โ€œpredecessorsโ€ of a given element in your flow (i.e. upstream) or all โ€œsuccessorsโ€ of a given element in your flow (i.e. downstream). Note that in this example I am limiting to pull only dataset elements as it sounds like that's your goal, but that condition could easily be removed or modified.

import dataiku
from dataiku import pandasutils as pdu

client = dataiku.api_client()
# enter in your project ID 
project = client.get_project('DKU_TSHIRTS')
flow = project.get_flow()

# gets the graph for a given project flow 
graph = flow.get_graph()

"""
    direction should be either 'successors' for downstream or 'predecessors' for upstream 
"""
def get_nodes(node, node_array, direction='successors'):
    nodeval = graph.nodes[node]
    if nodeval:
        if nodeval['type'] == 'COMPUTABLE_DATASET':
            node_array.append(nodeval['ref'])
        for connected_node in nodeval[direction]:
            get_nodes(connected_node, node_array, direction)
    return node_array  

 

Hereโ€™s an example of how I would call the function for the above flow.  Note that revenue_prediction is my "rightmost" dataset, so when I get all "predecessors" on it, I get the five datasets that are upstream of it in the flow.  web_last_month is my "leftmost" dataset, so when I use the "successors" parameter, I get the two datasets that are downstream of it in the flow.  

Screen Shot 2021-01-28 at 5.34.31 PM.png

Note that if I call the "successors" version of the call on the dataset revenue_prediction, only ['revenue_prediction'] would get returned, as it is the downstream-most dataset. 

Thanks,

Sarina 

View solution in original post

7 Replies
SarinaS
Dataiker

Hi @Bader ,

Hereโ€™s an example of how you could traverse the flow to get upstream + downstream datasets with the Python API.

I am using flow graph nodes to traverse the flow in this example, but you may also want to take a look at the flow API functions: Flow creation and management.  

So that you can see my flow in this example, hereโ€™s a screenshot of my project, using the Dataiku TShirts "sample" project:โ€ƒ

Screen Shot 2021-01-28 at 5.18.55 PM.png

I created the function get_nodes in this example that can either pull all โ€œpredecessorsโ€ of a given element in your flow (i.e. upstream) or all โ€œsuccessorsโ€ of a given element in your flow (i.e. downstream). Note that in this example I am limiting to pull only dataset elements as it sounds like that's your goal, but that condition could easily be removed or modified.

import dataiku
from dataiku import pandasutils as pdu

client = dataiku.api_client()
# enter in your project ID 
project = client.get_project('DKU_TSHIRTS')
flow = project.get_flow()

# gets the graph for a given project flow 
graph = flow.get_graph()

"""
    direction should be either 'successors' for downstream or 'predecessors' for upstream 
"""
def get_nodes(node, node_array, direction='successors'):
    nodeval = graph.nodes[node]
    if nodeval:
        if nodeval['type'] == 'COMPUTABLE_DATASET':
            node_array.append(nodeval['ref'])
        for connected_node in nodeval[direction]:
            get_nodes(connected_node, node_array, direction)
    return node_array  

 

Hereโ€™s an example of how I would call the function for the above flow.  Note that revenue_prediction is my "rightmost" dataset, so when I get all "predecessors" on it, I get the five datasets that are upstream of it in the flow.  web_last_month is my "leftmost" dataset, so when I use the "successors" parameter, I get the two datasets that are downstream of it in the flow.  

Screen Shot 2021-01-28 at 5.34.31 PM.png

Note that if I call the "successors" version of the call on the dataset revenue_prediction, only ['revenue_prediction'] would get returned, as it is the downstream-most dataset. 

Thanks,

Sarina 

Bader
Level 3
Author

is it possible to get all downstream datasets with taking into consideration if dataset is shared across projects?

0 Kudos
SarinaS
Dataiker

Hi @Bader ,

The exact details will depend a little on your use case, but you could do something like cross reference the "exposed objects" for a project with the "downstream datasets" to pull all downstream datasets that are also exposed.  

For an example, let's assume I have the following downstream datasets:

Screen Shot 2021-03-03 at 1.59.08 PM.png

I also have the following exposed datasets for this project.  Note that "web_new_customers" and "revenue_prediction" are both exposed:

Screen Shot 2021-03-03 at 1.58.50 PM.png

If you wanted the "intersection" of these two datasets (downstream datasets that are also exposed), you could add some code to get the exposed datasets, like this:

exposed_objects = project.get_settings().get_raw()['exposedObjects']['objects']
exposed_datasets = []
for obj in exposed_objects:
    if obj['type'] == 'DATASET':
        exposed_datasets.append(obj['localName'])
exposed_downstream = set(exposed_datasets) & set(downstream)

 

That will giveโ€ƒ the final set of the two exposed downstream datasets, "revenue_prediction" and "web_new_customers":

Screen Shot 2021-03-03 at 2.09.26 PM.png

Hope this helps,

Sarinaโ€ƒ

โ€ƒ

0 Kudos
Bader
Level 3
Author

Hi Sarina,  

Thanks for help and support  I really appreciated.

let me clarify the question more. 

Lets assume we have Three projects

1- Project 1 has the following dataset ( A -> B -> C ->D ) 

2- project 2 has the following dataset (project1.B -> E -> F -> G)

3- project 3 has the following dataset ( project2.E ->H ->  I -> J)

 

when I call get_node(A, downstram, 'successors'),  I'm expecting the following results: 

[A,B,C,D,E,F,G,E,H,I,J]  

Because E is downstream of B and B is downstream of A

Hope is clear

0 Kudos
SarinaS
Dataiker

Hi @Bader ,

Thank you for clarifying!  I think for your use case you can still utilized the exposed project setting, in combination with going down the flow across projects.  

The idea here would be, for each of these datasets, you can do a check to see if it is exposed to an another project.  If it is, you can then fetch all downstream datasets in that project (or set of projects).  If it's not, then you won't need to search any further.  

So taking that approach, let's create a function that will tell us if the dataset has been shared with another project, and returns the list of projects if so: 

def is_exposed(project, dataset):
    projects_to_check = []
    exposed_objects = project.get_settings().get_raw()['exposedObjects']['objects']
    exposed_datasets = []
    for obj in exposed_objects:
        if obj['type'] == 'DATASET':
            if obj['localName'] == dataset.name:
                for project in obj['rules']:
                    projects_to_check.append(project['targetProject'])
    return projects_to_check    

 

Then you can update your get_nodes() function to go into any "downstream projects" and get the downstream datasets from there.  Please do note that this gets complicated pretty quickly and probably will be difficult to troubleshoot.  You could give something like this a shot though.  

def get_nodes(project, node, node_array, direction='successors'):
    flow = project.get_flow()
    # gets the graph for a given project flow 
    graph = flow.get_graph()
    if node in graph.nodes:
        nodeval = graph.nodes[node]
        if nodeval['type'] == 'COMPUTABLE_DATASET':
            node_array.append(nodeval['ref'])
            # check if the dataset is in any other projects 
            projects = is_exposed(project, nodeval['ref'])
            # if this dataset is in any other projects, get each project and call get_nodes() on that project and the current node
            if len(projects) > 0:
                for p in projects:
                    project_object = client.get_project(p)
                    # gets the graph for a given project flow 
                    get_nodes(project_object, nodeval['fullId'], node_array, direction)
        for connected_node in nodeval[direction]:
            get_nodes(project, connected_node, node_array, direction)
    return node_array 

  

Hope that helps, 

Sarina

0 Kudos
Bader
Level 3
Author

Perfect !! its solved my problem. many thanks

0 Kudos
Bader
Level 3
Author

Excellent !!! Many thanks

0 Kudos