Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
I am collecting data from a datasource over a long period of time. The dataset is pretty big (10+ million rows).
Because I dont want to recreate the dataset every time, I am using "append instead of overwrite" to only add the last day of data.
At the same time I dont want my dataset to get even bigger, I want to remove the data of the oldest date. I cannot find a way to do this with visual recipes. I also tried overwriting the data with a python recipe, but I am not allowed to save data into the same dataset as I am reading from.
Thanks for your help!
Hi @deguodedongxi,
You should be able to use a Python recipe with a dummy output, and you need to use ignore_flow=True when defining the dataset for writing
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd, numpy as np
from dataiku import pandasutils as pdu
# Read recipe inputs
crm_history = dataiku.Dataset("crm_history")
df = crm_history.get_dataframe()
#replace 1000000 with the number of rows you want to keep
truncated_df = df.tail(1000000)
# Write recipe outputs
crm_history_out = dataiku.Dataset("crm_history", ignore_flow=True)
crm_history_out.write_with_schema(truncated_df)
Hi @deguodedongxi,
You should be able to use a Python recipe with a dummy output, and you need to use ignore_flow=True when defining the dataset for writing
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd, numpy as np
from dataiku import pandasutils as pdu
# Read recipe inputs
crm_history = dataiku.Dataset("crm_history")
df = crm_history.get_dataframe()
#replace 1000000 with the number of rows you want to keep
truncated_df = df.tail(1000000)
# Write recipe outputs
crm_history_out = dataiku.Dataset("crm_history", ignore_flow=True)
crm_history_out.write_with_schema(truncated_df)
i have a similar use case. My dataset is being stored in an appended .CSV file based managed partitioned data set. Every hour or so I add a partition of new or changed records. My data source is very slow. So what you are showing here might be useful.
however this leaves me with a few questions:
thanks for any insights you can share.
Hi @tgb417 ,
The tail was a quick example with the assumption that the logic on the rows to keep would-be customized, and the dataset would be sorted with ( sort_values())
Another example, if you want to keep rows with a date column > 1 year
import pandas as pd
from datetime import datetime, timedelta
#convert date if needed
df['date'] = pd.to_datetime(df['date'])
one_year_ago = datetime.now() - timedelta(days=365)
df = df[df['date'] >= one_year_ago]
If you write to CSV from DSS, you can preserve the order by selecting the option:
When reading from a SQL dataset, the order is not guaranteed if ORDER BY statement is not included.
In that case, you may want to use SQLExecutor2 and use ORDER BY.
https://doc.dataiku.com/dss/latest/python-api/sql.html#executing-queries-dataikuapi-variant
But if you are already dealing with SQL datasets, it's probably easier to use the SQL Script recipe to truncate the tables :
TRUNCATE TABLE table_name WHERE date_column <= '2023-05-17';
If memory usage is a concern, you would need to stream the data using chunked reading and only write back the rows to keep.
https://doc.dataiku.com/dss/latest/python-api/datasets-data.html#chunked-reading-and-writing-with-pa...
Thanks,
That worked perfectly! The dummy dataset seemed a bit weird, but as long as it works, fine with me ๐ Maybe in future versions, it would be a nice feature to have some kind of macro to automize this step.
Thanks for your quick help!