How to optimize dataset batch writer for long-running tasks?
We have a compute and memory-intensive Python recipe that we are building. The recipe both reads and writes from/to Snowflake-based datasets. We are running into an issue where the task fails at the end of the code when the context manager closes and the dataset writer attempts to commit the transaction.
Solution 1: The Dataiku Way
in_ds = dataiku.Dataset("in") out_ds = dataiku.Dataset("out") with out_ds.get_writer() as writer: for df in in_df.iter_dataframes(chunksize=cs): huge_df = long_running_func(df) writer.write_dataframe(huge_df)
This recipe can run for a couple of hours. While it will complete successfully some of the time, other times it will fail with one of two errors:
Error 1
Error in python process: At line 24: <class 'Exception'>: Stream has been closed
Error 2
Error in python process: At line 24: <class 'Exception'>: An error occurred during dataset write (tDthmt3GsF): SnowflakeSQLException: Your transaction was aborted. All statements will be ignored until you issue a COMMIT or a ROLLBACK.
Looking at Error 2, nothing is written to Snowflake until the context manager is closed. We believe that the SQL statement being built is too large or the connection to Snowflake is kept open for too long.
The solution we are currently working on involves removing the context manager and instead creating and closing writers for each iteration.
Solution 2: The Hacky Way
in_ds = dataiku.Dataset("in") out_ds = dataiku.Dataset("out") for df in in_df.iter_dataframes(chunksize=cs): huge_df = long_running_func(df) writer = out_ds.get_writer() writer.write_dataframe(huge_df) writer.close() out_ds.spec_item["appendMode"] = True # append after first batch
This appears to be working, but we are wondering if we are missing something and there is a better way to manage long-running writes like this.
Any suggestions? Thanks!
Answers
-
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 319 Neuron
Hi,
You could write the dataframe to a text file from Python and then use a separate Sync recipe to load it into Snowflake. If you have fastload for Snowflake configured, the load of the text file into Snowflake should be pretty quick. I've done this on some of my projects. As I recall, the two steps were faster (and probably more reliable) than writing directly to Snowflake from Python.
Marlan
-
Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 1,901 Neuron
The first thing I would look at is to parallelise the execution of your long_running_func(). This can be done either using parallel engines like Spark (using PySpark) or plain old Python multiprocessing. I would also separate the function execution from writing the results to Snowflake. This is untested code but should work:
for df in in_df.iter_dataframes(chunksize=cs): chunk_df = long_running_func(df) huge_df = pd.concat([huge_df, chunk_df], ignore_index=True, sort=False) with out_ds.get_writer() as writer: writer.write_dataframe(huge_df)
I basically run your function, concat each chunk df into a consolidated dataframe and write that in one go. This may a lot of memory depending on how big your dataset is.
You could also move away from this Python function and try to do your transformation in a visual recipe or SQL recipe as this would be "pushed down" to Snowflake and hence should be much faster and will avoid data having to be loaded in DSS memory and the read/write via JDBC which is much slower than what Snowflake can do directly with data inside the database.
-
LouisDHulst Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Neuron, Registered, Neuron 2023 Posts: 53 Neuron