Update partitioned table schema without dropping old partitions

lnguyen
lnguyen Registered Posts: 7 ✭✭✭✭

Hi community,

I work on a data flow with the last datasets of the flows partitioned. (HDFS on Hadoop)

Everytime I made a change in the flow that change the schema of the last datasets, the tables is dropped and I have to recalculate all partitions of the last datasets which take a lot of times and ressources.

Is still ok now since I'm on the design phase of the project but once we launch the project on production, we risk to have to recalculate years of data and the impact will be unbearable.

Is there anyway to update the schema of the partitioned dataset by adding for example the new column but with empty values to that field for all olds partitions and only recalculate the selected partitions?

I guess what I'm asking if there is a way to update the schema of a partitioned dataset without dropping the whole dataset?

Thanks a lot for your help


Operating system used: Windows 10

Tagged:

Answers

  • Tanguy
    Tanguy Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer, Neuron 2023 Posts: 113 Neuron
    edited July 17

    I have managed to do this using dataiku's API (see below screenshot for the change in the number of columns across partitions).

    Note: in this example I was using partitions to archive different versions of a dataset. More info here.

    Sans titre.png

    Here is a piece of the code that I used to get the above result:

    import dataiku

    def in_ipynb():
      """
      The dataiku module has the dictionary dku_flow_variables available when a Python recipe is run from the flow itself.
      If the same code is run outside of the flow, dku_flow_variables won't be available.
      https://community.dataiku.com/t5/Using-Dataiku/is-there-a-way-to-check-if-python-code-is-ran-in-a-recipe-vs/m-p/30028#M11202
      """
      if hasattr(dataiku, 'dku_flow_variables'):
      return False
      else:
      return True


    client = dataiku.api_client()
    project = client.get_project(PROJECT_NAME)

    input_dataset = dataiku.Dataset(INPUT_DATASET_NAME)
    output_dataset_api = project.get_dataset(OUTPUT_DATASET_NAME)
    output_dataset = output_dataset_api.get_as_core_dataset()

    df = input_dataset.get_dataframe(infer_with_pandas=False)

    if in_ipynb():
      # the following line is unecessary inside a recipe (but not in a notebook!) as partition setting is configured by the flow
      # https://doc.dataiku.com/dss/latest/python-api/datasets-reference.html#dataiku.Dataset.set_write_partition :
      # --> "Setting the write partition is not allowed in Python recipes, where write is controlled by the Flow."
      # see also : https://doc.dataiku.com/dss/latest/partitions/variables.html#python
      # --> "Since read and write is done through Dataiku DSS, you don’t need to specify the source or destination partitions in your code for that, using “get_dataframe()” will automatically give you only the relevant partitions.
      # For other purposes than reading/writing dataframes, all variables are available in a dictionary called dku_flow_variables in the dataiku module."
      output_dataset.set_write_partition(PARTITION_NAME)

    output_dataset.write_with_schema(df)

    output_dataset_api.synchronize_hive_metastore()
    output_dataset_api.compute_metrics() # compute metrics on whole dataset
    output_dataset_api.compute_metrics(partition=PARTITION_NAME) # compute metrics on partition

    Hope it will solve your problem!

Setup Info
    Tags
      Help me…