Save pandas dataframe to .csv in managed S3 folder
Hi Dataiku-Team,
I have a quick question related to managed S3 folders. I have a dataframe which I want to save as a .csv file in a managed S3 folder.
Reading the documentation, it sounds to me that I have to store the .csv file in local folder on the DSS server, and then have to upload it like this:
handle = dataiku.Folder("FolderName")
handle.upload_file(file_path="local_path_to_file", path=path_upload_file)
It works like this, however, I feel that there must be a better way of doing it.
So my question is, is there a way to write a dataframe directly to a managed S3 folder?
Thanks a lot for your help!
Best,
Oliver
Answers
-
Hello Oliver,
The Folder API also allows you to retrieve directly a writer, that enables you to to write incrementally to a specific path in the managed folder.
This writer can then be passed directly to pandas to save the dataframe.
It will then save directly the dataframe to S3 if your managed folder is S3-based.
In your case, the code would look like:
handle = dataiku.Folder("FolderName")
path_upload_file = "path/in/folder/s3"
with handle.get_writer(path_upload_file) as writer:
your_df.to_csv(writer, ...)
# where ... is replaced by the other params you want for "to_csv"Regards,
Nicolas Servel
-
Hi Nicolas,
Thanks a lot for your help!
Best,
Oliver -
Hi @Nicolas_Servel
,I tried to replicate the same in my project but getting error as expecting a byte information instead of str.
Code:
handle = dataiku.Folder("Foldername")
paths = handle.list_paths_in_partition()
path="/additional"
with handle.get_writer(path) as writer:
Final_df1.to_csv(writer)Error:
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-142-7b82b94d5811> in <module> 1 path="/additionalsites" 2 with handle.get_writer(path) as writer: ----> 3 Final_df1.to_csv(writer) /app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/core/frame.py in to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, line_terminator, chunksize, tupleize_cols, date_format, doublequote, escapechar, decimal) 1743 doublequote=doublequote, 1744 escapechar=escapechar, decimal=decimal) -> 1745 formatter.save() 1746 1747 if path_or_buf is None: /app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in save(self) 169 self.writer = UnicodeWriter(f, **writer_kwargs) 170 --> 171 self._save() 172 173 finally: /app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in _save(self) 270 def _save(self): 271 --> 272 self._save_header() 273 274 nrows = len(self.data_index) /app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in _save_header(self) 238 if not has_mi_columns or has_aliases: 239 encoded_labels += list(write_cols) --> 240 writer.writerow(encoded_labels) 241 else: 242 # write out the mi /app/dataiku/dataiku-dss-8.0.2/python/dataiku/core/managed_folder.py in write(self, b) 44 45 def write(self, b): ---> 46 self.piping_thread.write(b) 47 48 def close(self): /app/dataiku/dataiku-dss-8.0.2/python/dataiku/core/dkuio.py in write(self, data) 200 # logging.info("Pipe to generator thread writes: %s" % data) 201 self._check_health() --> 202 self.buffer.write(data) 203 if self.buffer.tell() > self.chunk_size: 204 self.flush() TypeError: a bytes-like object is required, not 'str'
in Dataiku, any specific method to convert the DF to byte?i tried pickle lib to dump and export to managed folder(s3).But the format which came out is not proper.
Thanks,
Vinothkumar M
-
Hello Vinothkumar,
The code previously provided only works for python 2, and from the error you are experiencing, it seems that you are working with python 3.
Python 2 and 3 have a very different approach at handling strings and bytes.
For python 3, modifying your code to:
handle = dataiku.Folder("Foldername") paths = handle.list_paths_in_partition() path="/additional" with handle.get_writer(path) as writer: writer.write(Final_df1.to_csv().encode("utf-8"))
should do the trick.
Hope this helps,
Best regards
-
@Nicolas_Servel
AwesomeYou saved my day!! -
@Nicolas_Servel
We also have a very similar use case but instead of using Pandas dataframe we have Spark datasets since volume of data is large. Could you please help us how to achieve this via Pyspark recipe. -
@sagar_dubey
this will work if your python env is 2.X-with dataiku.Folder("your dataiku folder id in s3").get_writer("filename_for_CSV") as w:
w.write(your_dataframe.toPandas().to_csv(sep="\t",header=False,index=False))for 3.x use this -
with dataiku.Folder("your dataiku folder id in s3").get_writer("filename_for_CSV") as w:
w.write(your_dataframe.toPandas().to_csv(sep="\t",header=False,index=False).encode()) -
Carl Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 24 ✭✭✭✭
HI, I've tried the code with python 3 and it's not working.
Error : None: b'"AMEX/train_amex_all_prep_for_score.json" is not a valid file/directory name (forbidden characters or too long)
Can you please provide accurate info?
-
Hi @Nicolas_Servel
,Can I use this method to directly write my Pandas DataFrame to Dataiku managed folder in append mode?
My Python recipe processes data in chunks, and I need to save the resulting DataFrame into a Dataiku-managed folder in CSV format. Processing each chunk produces a resulting DataFrame, however, in the output folder, I only need one CSV file (outputs of all chunks appended together in one CSV).
In a regular Python script I could do:
for chunk in get_dataframe_chunk(): .. # Process the chunk. Result is in df df.to_csv('filename.csv', mode='a', header=not os.path.isfile('filename.csv'))
To accomplish the same in a Python recipe on Dataiku, can I pass mode='a' like this?
for chunk in get_dataframe_chunk(): .. ## Process chunk and store resulting DataFrame in df handle = dataiku.Folder("Foldername") paths = handle.list_paths_in_partition() path="/additional" with handle.get_writer(path) as writer: writer.write(df.to_csv(mode='a').encode("utf-8"))