Schema evolution: CSVs with varying headers

aburkh
Level 1
Schema evolution: CSVs with varying headers

When creating a dataset from a list of csv files, dataiku detects the schema of the first file (or can be overridden with the schema of one manually selected file).

I have collections of files where the schema slightly changes over time:

  • New columns are added
  • Some columns are removed
  • Some columns change position

With python, it is easy to handle schema resolution, with pd.concat.

Screenshot 2024-01-29 091022.jpg

The problem is that dataiku is not able to properly read those files (ignoring data, or mixing two different columns into one because it uses position, not column name). Screenshot 2024-01-29 091004.jpg

In the above example:

  • The column "size" is ignored
  • The columns are loaded in ordinal position, therefore mixing and matching "product_name" with "color", etc.
  • A second column "product_name" is created.

 

How can I perform full schema resolution within dataiku? Is it possible to automatically resolve schema evolution, extend the dataset schema when new columns are encountered and reordering columns when they are in a different order?


Operating system used: Windows 10

0 Kudos
4 Replies
JordanB
Dataiker

Hi @aburkh,

I recommend taking a look at the following documentation on text-based files datasets and schema modification: 

https://doc.dataiku.com/dss/latest/schemas/datasets.html#text-based-files-datasets

https://doc.dataiku.com/dss/latest/schemas/datasets.html#modifying-the-schema

DSS never automatically modifies the schema of a dataset without an explicit user action: navigate to the settings tab of the dataset > schema > check schema or reload schema > save. 

Most recipes in DSS are able to generate automatically the output schema of their datasets, either:

  • When you click on the Save button

  • When you click on the Validate button

Screenshot 2024-01-30 at 12.31.02 PM.png

You can also check the consistency of a schema by right-clicking on the dataset or recipe from the flow:

Screenshot 2024-01-30 at 12.32.38 PM.png

If you are still experiencing issues with your dataset schemas, could you please provide more information on how your data has been imported and what, if any, recipes were used to get the output that you displayed in the last image shared.

Thanks!

 

0 Kudos
Turribeach

As you have noted already letting Dataiku detect the schema will lead to problems if your CSV files are changing. However as you noted too a pandas concat can handle dataframes with different schemas easily. So instead of loading these CSV files using traditional Dtaiku datasets you should put them in a Dataiku Managed Folder and read them via pandas using the read_csv() method and then use concat(). You can also perform any column ordering that you wish and then finally write back to a Dataiku dataset when you are done. Here is a sample post showing how to read all files from a Dataiku managed folder.

0 Kudos
aburkh
Level 1
Author

Thanks for your replies and suggestions!

1. Totally agree that having varying headers is not best practice, but sometimes that's what users get.

2. @JordanB The files above is just sample data, I have attached the python script to generate the sample data + the csv files (love chatGPT for generating sample data just by describing the issue at hand). Then I uploaded both csv.gz files into an "Uploaded Files Dataset".

I'm worried that working with multiple files (csv, parquet, etc.) in dataiku inherently carries the risk of mixing and matching data without even realizing it. If the column order is different and the data type matches, users would end up with giberish data and might take a long time to realize.

It would be great to have an option to resolve schema not on ordinal position based on the first file, but using headers/schema of all files. For now, I think I will recommend our users do not import multiple files into a single dataset, but go through a plugin or a "cleaning" process to ensure schema consistency.

0 Kudos
Turribeach

Few things I want to comment on:

"For now, I think I will recommend our users do not import multiple files into a single dataset, but go through a plugin or a "cleaning" process to ensure schema consistency."

This is the correct advice for this particular case where you can't assure the file structure will be consistent across all files. If the files are all from the same structure then you should be using the "Files in folder" dataset which has some neat functionality that I documented in this post and that can give full record traceability which is a great feature to have when you load lots of data from different files.

"I'm worried that working with multiple files (csv, parquet, etc.) in dataiku inherently carries the risk of mixing and matching data without even realizing it."

This should not be an issue if you deal with these files correctly. I think your mistake is expecting that by uploading multiple files with different structure to the same dataset Dataiku will be able to deal with them gracefully. It won't and it really shouldn't. I worked with many ETL tools over my IT career and I have yet to see one that can load multiple files with different structure to the same dataset. Most ETL tools I know will fail if you add/remove a column from a previously known CSV file where the ETL tool has been coded to load it. Some ETL tools may ignore new columns if they are added at the end of the CSV file. So in the Dataiku world, like in most other ETL tools, a different schema = different dataset. That is the approach users should take and the one that more clearly will result in a flow that other users can easily  understand. On that note I have taken your Python file (*) and created a proper Dataiku Python recipe with it:

# -*- coding: utf-8 -*-
import dataiku
import pandas as pd, numpy as np
from dataiku import pandasutils as pdu


data1 = {
    "color": ["Red", "Blue"],
    "size": ["S", "M"],
    "quantity": [10, 15],
    "unit_price": [100, 150],
    "product_name": ["Shirt", "Pants"]
}

dataset_1_df = pd.DataFrame(data1)


# Write recipe outputs
dataset_1 = dataiku.Dataset("dataset_1")
dataset_1.write_with_schema(dataset_1_df)

data2 = {
    "product_name": ["Ball", "Racket"],
    "unit_price": [50, 120],
    "quantity": [5, 10],
    "color": ["Red", "Blue"]
}

dataset_2_df = pd.DataFrame(data2)


# Write recipe outputs
dataset_2 = dataiku.Dataset("dataset_2")
dataset_2.write_with_schema(dataset_2_df)

This Python recipe needs two outputs created: dataset_1 and dataset_2. Once you create the recipe, the outputs and you run it you will have two populated datasets with your dummy data. Now you can use the Stack visual recipe to join them together which is the visual equivalent to pd.concat() for the "clicker users" as oppose to the "coder user". Once you create the Stack recipe you will end up with a flow like this:

Capture.PNG

And by default the Stack recipe will deal with uneven schemas and give you a consolidated dataset:

Capture.PNG

You can customise how should the Stack recipe handle the merged columns in the "Columns selection" drop down. It can take all columns (default), it can take the ones that match, it can take the ones from from a specific dataset and few more options. However none of this will be able to deal with the impact of columns appearing or dissapearing from your input datasets (more on this below).

"Is it possible to automatically resolve schema evolution, extend the dataset schema when new columns are encountered and reordering columns when they are in a different order?"

The solution to your question has been provided by my earlier response. Use a Python recipe, read all files using then read_csv() method and then use concat() to join them as you want. You can also perform any column ordering that you wish and then finally write back to a Dataiku dataset when you are done. You could even go a step further an even automate the propogation of the schema changes over your flow using the Dataiku Python API. But I seriously doubt this is a reasonable path you want to take. No matter how clever your Python recipe could in dealing with new/deleted columns and reordering the schema you will never get away from the fact that you will not be able to refer to any new/deleted columns in any subsequent recipes. You can't have a group by recipe and group by by a column it doesn't exist.

So in summary your requirement is sort of generic file ingestion layer that can deal with any schema files. While you may be able to have that it will not remove the fact that once columns are added and removed this could have an impact in the rest of the flow and someone will need to take care of those changes. So having something that "automatically" breaks a flow sounds like a recipe for a disaster to me (pun intended!). Furthermore while you may be able to deal with new columns in a more graceful maner, what happens when a column is deleted or doesn't exist? In your initial post you had Size only on one dataset. Even if you end up with a column with Size in your merged dataset what happens to the rows that have no Size value? In Machine Learning empty/null columns are a massive problem since models can't measure distance / approximate to null. So in my view the path you taken is mistaken and you should rethink your approach.

(*): Please post code using the code block (the </> icon in the toolbar) not in files so it can easily be seen and copy/pasted. Also select the right language for nice syntax colouring. 

0 Kudos