import dataiku import pandas as pd def get_dataset_in_project(dataset_name, project_name=None): client = dataiku.api_client() if project_name: project = client.get_project(project_name) else: # check if dataset is a shared dataset from another project (prefixed by another project) if len(dataset_name.split('.')) > 1: sharing_project, dataset_name = dataset_name.split('.') if project_name is None: project_name = sharing_project else: raise Exception(f"project_name is '{project_name}' but dataset comes from project '{sharing_project}'") project = client.get_project(project_name) else: project = client.get_default_project() dataset = project.get_dataset(dataset_name) return dataset def checking_partition_corresponds_to_data(df, col_2_partition, archive_partition): col_2_partition_values = list(df[col_2_partition].value_counts().index) assert len(col_2_partition_values) == 1, f"expecting 1 value but {len(col_2_partition_values)} values found in column '{col_2_partition}': {col_2_partition_values}" col_2_partition_value = col_2_partition_values[0] assert col_2_partition_value == archive_partition, f"partition to build does not match data: partition to build is '{archive_partition}' but found '{col_2_partition_value}' in column '{col_2_partition}'" def archive_into_partition(input_dataset_name, output_dataset_name, archive_partition, col_2_partition=None, check_partition_corresponds_to_data=True): from .utils import get_dataset_in_project input_dataset = dataiku.Dataset(input_dataset_name) output_dataset_api = get_dataset_in_project(output_dataset_name) output_dataset = output_dataset_api.get_as_core_dataset() df = input_dataset.get_dataframe(infer_with_pandas=False) if check_partition_corresponds_to_data: checking_partition_corresponds_to_data(df, col_2_partition, archive_partition) # TODO : detect if in a recipe or in a flow # actually the following line is unecessary inside a recipe (but not in a notebook!) as partition setting is configured by the flow # https://doc.dataiku.com/dss/latest/python-api/datasets-reference.html#dataiku.Dataset.set_write_partition : "Setting the write partition is not allowed in Python recipes, where write is controlled by the Flow." # output_dataset.set_write_partition(archive_partition) if col_2_partition and (col_2_partition in df.columns): # drop partionning col to avoid duplicate column error when reading partitionned table with Athena output_dataset.write_with_schema(df.drop(columns=col_2_partition)) else: output_dataset.write_with_schema(df) output_dataset_api.synchronize_hive_metastore() output_dataset_api.compute_metrics() # compute metrics on whole dataset output_dataset_api.compute_metrics(partition=archive_partition) # compute metrics on partition input_dataset_name = 'screenshot_data' output_dataset_name = 'archived_data_partitioned' archive_partition = dataiku.dku_flow_variables["DKU_DST_N_2_partition"] # partitioning is configured in the flow variables col_2_partition = 'N_2_partition' archive_into_partition(input_dataset_name, output_dataset_name, archive_partition, col_2_partition)