python api - Select all upstream and downstream datasets

Options
Bader
Bader Registered Posts: 46 ✭✭✭✭✭

Hello all,

The feature of getting all upstream datasets and all downstream datasets from flow is amazing. I would like to do the same by using python api.

Could you please support in this ? if not possible, any alternative ?

Kind regards

Best Answer

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer Posts: 315 Dataiker
    edited July 17 Answer ✓
    Options

    Hi @Bader
    ,

    Here’s an example of how you could traverse the flow to get upstream + downstream datasets with the Python API.

    I am using flow graph nodes to traverse the flow in this example, but you may also want to take a look at the flow API functions: Flow creation and management.

    So that you can see my flow in this example, here’s a screenshot of my project, using the Dataiku TShirts "sample" project: 

    Screen Shot 2021-01-28 at 5.18.55 PM.png

    I created the function get_nodes in this example that can either pull all “predecessors” of a given element in your flow (i.e. upstream) or all “successors” of a given element in your flow (i.e. downstream). Note that in this example I am limiting to pull only dataset elements as it sounds like that's your goal, but that condition could easily be removed or modified.

    import dataiku
    from dataiku import pandasutils as pdu
    
    client = dataiku.api_client()
    # enter in your project ID 
    project = client.get_project('DKU_TSHIRTS')
    flow = project.get_flow()
    
    # gets the graph for a given project flow 
    graph = flow.get_graph()
    
    """
        direction should be either 'successors' for downstream or 'predecessors' for upstream 
    """
    def get_nodes(node, node_array, direction='successors'):
        nodeval = graph.nodes[node]
        if nodeval:
            if nodeval['type'] == 'COMPUTABLE_DATASET':
                node_array.append(nodeval['ref'])
            for connected_node in nodeval[direction]:
                get_nodes(connected_node, node_array, direction)
        return node_array  

    Here’s an example of how I would call the function for the above flow. Note that revenue_prediction is my "rightmost" dataset, so when I get all "predecessors" on it, I get the five datasets that are upstream of it in the flow. web_last_month is my "leftmost" dataset, so when I use the "successors" parameter, I get the two datasets that are downstream of it in the flow.

    Screen Shot 2021-01-28 at 5.34.31 PM.png

    Note that if I call the "successors" version of the call on the dataset revenue_prediction, only ['revenue_prediction'] would get returned, as it is the downstream-most dataset.

    Thanks,

    Sarina

Answers

  • Bader
    Bader Registered Posts: 46 ✭✭✭✭✭
    Options

    Excellent !!! Many thanks

  • Bader
    Bader Registered Posts: 46 ✭✭✭✭✭
    Options

    is it possible to get all downstream datasets with taking into consideration if dataset is shared across projects?

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer Posts: 315 Dataiker
    edited July 17
    Options

    Hi @Bader
    ,

    The exact details will depend a little on your use case, but you could do something like cross reference the "exposed objects" for a project with the "downstream datasets" to pull all downstream datasets that are also exposed.

    For an example, let's assume I have the following downstream datasets:

    Screen Shot 2021-03-03 at 1.59.08 PM.png

    I also have the following exposed datasets for this project. Note that "web_new_customers" and "revenue_prediction" are both exposed:

    Screen Shot 2021-03-03 at 1.58.50 PM.png

    If you wanted the "intersection" of these two datasets (downstream datasets that are also exposed), you could add some code to get the exposed datasets, like this:

    exposed_objects = project.get_settings().get_raw()['exposedObjects']['objects']
    exposed_datasets = []
    for obj in exposed_objects:
        if obj['type'] == 'DATASET':
            exposed_datasets.append(obj['localName'])
    exposed_downstream = set(exposed_datasets) & set(downstream)

    That will give  the final set of the two exposed downstream datasets, "revenue_prediction" and "web_new_customers":

    Screen Shot 2021-03-03 at 2.09.26 PM.png

    Hope this helps,

    Sarina 

  • Bader
    Bader Registered Posts: 46 ✭✭✭✭✭
    Options

    Hi Sarina,

    Thanks for help and support I really appreciated.

    let me clarify the question more.

    Lets assume we have Three projects

    1- Project 1 has the following dataset ( A -> B -> C ->D )

    2- project 2 has the following dataset (project1.B -> E -> F -> G)

    3- project 3 has the following dataset ( project2.E ->H -> I -> J)

    when I call get_node(A, downstram, 'successors'), I'm expecting the following results:

    [A,B,C,D,E,F,G,E,H,I,J]

    Because E is downstream of B and B is downstream of A

    Hope is clear

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer Posts: 315 Dataiker
    edited July 17
    Options

    Hi @Bader
    ,

    Thank you for clarifying! I think for your use case you can still utilized the exposed project setting, in combination with going down the flow across projects.

    The idea here would be, for each of these datasets, you can do a check to see if it is exposed to an another project. If it is, you can then fetch all downstream datasets in that project (or set of projects). If it's not, then you won't need to search any further.

    So taking that approach, let's create a function that will tell us if the dataset has been shared with another project, and returns the list of projects if so:

    def is_exposed(project, dataset):
        projects_to_check = []
        exposed_objects = project.get_settings().get_raw()['exposedObjects']['objects']
        exposed_datasets = []
        for obj in exposed_objects:
            if obj['type'] == 'DATASET':
                if obj['localName'] == dataset.name:
                    for project in obj['rules']:
                        projects_to_check.append(project['targetProject'])
        return projects_to_check    

    Then you can update your get_nodes() function to go into any "downstream projects" and get the downstream datasets from there. Please do note that this gets complicated pretty quickly and probably will be difficult to troubleshoot. You could give something like this a shot though.

    def get_nodes(project, node, node_array, direction='successors'):
        flow = project.get_flow()
        # gets the graph for a given project flow 
        graph = flow.get_graph()
        if node in graph.nodes:
            nodeval = graph.nodes[node]
            if nodeval['type'] == 'COMPUTABLE_DATASET':
                node_array.append(nodeval['ref'])
                # check if the dataset is in any other projects 
                projects = is_exposed(project, nodeval['ref'])
                # if this dataset is in any other projects, get each project and call get_nodes() on that project and the current node
                if len(projects) > 0:
                    for p in projects:
                        project_object = client.get_project(p)
                        # gets the graph for a given project flow 
                        get_nodes(project_object, nodeval['fullId'], node_array, direction)
            for connected_node in nodeval[direction]:
                get_nodes(project, connected_node, node_array, direction)
        return node_array 

    Hope that helps,

    Sarina

  • Bader
    Bader Registered Posts: 46 ✭✭✭✭✭
    Options

    Perfect !! its solved my problem. many thanks

Setup Info
    Tags
      Help me…