Partition specification for data updated every X hours and Scenario on latest partition.
Working with data stored in ADLS where a new partition is written every X hour. The Partition specification string is:
%Y-%M-%DT%{forecasthour}/.*
Where the forecasthour is [0,6,12,18].
The issue arises when a scenario is set to run when new data arrives. How to specify it to read only the newest partition when the partition is specified by a mix of time and discrete variables?
I have tried to calculate a scenario variable like this:
today = now() hour = datePart(today, "hour") forecasthour = if(0 < hour && hour < 6, "00", if(6 < hour && hour < 12, "06", if(12 < hour && hour < 18, "12", if(18 < hour && hour < 24, "18", "99"))))
I can then specify the partition in build steps using
CURRENT_DAY|${forecasthour}
ETL(done outside DSS) may be delayed and the computed forecasthour is not guaranteed to match the newest partition.
Any Idea how to solve this?
Best Answer
-
Inserting a custom Python step is the solution I have arrived at. After listing all partitions in a dataset and selecting the lates one can calculate a number of relevant variables and save them back to the project variable. One can reference those in the Build steps etc.
import dataiku import pandas as pd from dataiku import pandasutils as pdu from datetime import datetime client = dataiku.api_client() project = client.get_project("wkjehf") dataset = project.get_dataset('lkwejf') partitions = dataset.list_partitions() partitions.sort() latest_partition = partitions[-1] inittime_date, inittime_hour = latest_partition.split("|") latest_inittime = f"{inittime_date} {inittime_hour}:00:00" inittime = pd.to_datetime(latest_inittime, format='%Y-%m-%d %H:%M:%S') time_diff = inittime - datetime.now() innittime_delay = time_diff.seconds / 3600 vars = project.get_variables() vars['standard']['latest_partition'] = latest_partition vars['standard']['latest_inittime'] = latest_inittime vars['standard']['latest_inittime_hour'] = inittime_hour vars['standard']['innittime_delay'] = innittime_delay project.set_variables(vars)