Partition specification for data updated every X hours and Scenario on latest partition.

kresten
kresten Dataiku DSS Core Designer, Registered Posts: 5 ✭✭✭✭
edited July 16 in Using Dataiku

Working with data stored in ADLS where a new partition is written every X hour. The Partition specification string is:

%Y-%M-%DT%{forecasthour}/.*

Where the forecasthour is [0,6,12,18].

The issue arises when a scenario is set to run when new data arrives. How to specify it to read only the newest partition when the partition is specified by a mix of time and discrete variables?

I have tried to calculate a scenario variable like this:

today = now()
hour = datePart(today, "hour")
forecasthour = if(0 < hour && hour < 6, "00", if(6 < hour && hour < 12, "06", if(12 < hour && hour < 18, "12", if(18 < hour && hour < 24, "18", "99"))))

I can then specify the partition in build steps using

CURRENT_DAY|${forecasthour}

ETL(done outside DSS) may be delayed and the computed forecasthour is not guaranteed to match the newest partition.

Any Idea how to solve this?

Best Answer

  • kresten
    kresten Dataiku DSS Core Designer, Registered Posts: 5 ✭✭✭✭
    edited July 17 Answer ✓

    Inserting a custom Python step is the solution I have arrived at. After listing all partitions in a dataset and selecting the lates one can calculate a number of relevant variables and save them back to the project variable. One can reference those in the Build steps etc.

    import dataiku
    import pandas as pd
    from dataiku import pandasutils as pdu
    from datetime import datetime
    
    client  = dataiku.api_client()
    project = client.get_project("wkjehf") 
    
    dataset = project.get_dataset('lkwejf')
    partitions = dataset.list_partitions()
    partitions.sort()
    
    latest_partition = partitions[-1]
    
    inittime_date, inittime_hour = latest_partition.split("|")
    latest_inittime = f"{inittime_date} {inittime_hour}:00:00"
    inittime = pd.to_datetime(latest_inittime, format='%Y-%m-%d %H:%M:%S')
    time_diff = inittime - datetime.now()
    innittime_delay = time_diff.seconds / 3600
    
    vars = project.get_variables()
    vars['standard']['latest_partition']     = latest_partition
    vars['standard']['latest_inittime']      = latest_inittime
    vars['standard']['latest_inittime_hour'] = inittime_hour
    vars['standard']['innittime_delay']      = innittime_delay
    
    project.set_variables(vars)

Setup Info
    Tags
      Help me…