Save pandas dataframe to .csv in managed S3 folder

osk
Level 1
Save pandas dataframe to .csv in managed S3 folder

 


Hi Dataiku-Team, 



I have a quick question related to managed S3 folders. I have a dataframe which I want to save as a .csv file in a managed S3 folder. 



Reading the documentation, it sounds to me that I have to store the .csv file in local folder on the DSS server, and then have to upload it like this:




handle = dataiku.Folder("FolderName")
handle.upload_file(file_path="local_path_to_file", path=path_upload_file)


It works like this, however, I feel that there must be a better way of doing it. 



So my question is, is there a way to write a dataframe directly to a managed S3 folder?



Thanks a lot for your help!



Best, 



Oliver



 

0 Kudos
9 Replies
Nicolas_Servel
Dataiker

Hello Oliver,



The Folder API also allows you to retrieve directly a writer, that enables you to to write incrementally to a specific path in the managed folder.



This writer can then be passed directly to pandas to save the dataframe.



It will then save directly the dataframe to S3 if your managed folder is S3-based.



In your case, the code would look like:



 




handle = dataiku.Folder("FolderName")
path_upload_file = "path/in/folder/s3"
with handle.get_writer(path_upload_file) as writer:
your_df.to_csv(writer, ...)
# where ... is replaced by the other params you want for "to_csv"


 



Regards,



Nicolas Servel

osk
Level 1
Author
Hi Nicolas,

Thanks a lot for your help!

Best,
Oliver
0 Kudos
Vinothkumar
Level 2

Hi @Nicolas_Servel ,

I tried to replicate the same in my project but getting error as expecting a byte information instead of str.

 

Code:

handle = dataiku.Folder("Foldername")
paths = handle.list_paths_in_partition()
path="/additional"
with handle.get_writer(path) as writer:
Final_df1.to_csv(writer)

 

Error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-142-7b82b94d5811> in <module>
      1 path="/additionalsites"
      2 with handle.get_writer(path) as writer:
----> 3     Final_df1.to_csv(writer)
     


/app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/core/frame.py in to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, line_terminator, chunksize, tupleize_cols, date_format, doublequote, escapechar, decimal)
   1743                                  doublequote=doublequote,
   1744                                  escapechar=escapechar, decimal=decimal)
-> 1745         formatter.save()
   1746 
   1747         if path_or_buf is None:

/app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in save(self)
    169                 self.writer = UnicodeWriter(f, **writer_kwargs)
    170 
--> 171             self._save()
    172 
    173         finally:

/app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in _save(self)
    270     def _save(self):
    271 
--> 272         self._save_header()
    273 
    274         nrows = len(self.data_index)

/app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in _save_header(self)
    238         if not has_mi_columns or has_aliases:
    239             encoded_labels += list(write_cols)
--> 240             writer.writerow(encoded_labels)
    241         else:
    242             # write out the mi

/app/dataiku/dataiku-dss-8.0.2/python/dataiku/core/managed_folder.py in write(self, b)
     44 
     45     def write(self, b):
---> 46         self.piping_thread.write(b)
     47 
     48     def close(self):

/app/dataiku/dataiku-dss-8.0.2/python/dataiku/core/dkuio.py in write(self, data)
    200         # logging.info("Pipe to generator thread writes: %s" % data)
    201         self._check_health()
--> 202         self.buffer.write(data)
    203         if self.buffer.tell() > self.chunk_size:
    204             self.flush()

TypeError: a bytes-like object is required, not 'str'

in Dataiku, any specific method to convert the DF to byte?i tried pickle  lib to dump and export to managed folder(s3).But the format which came out is not proper.

Thanks,

Vinothkumar M

Nicolas_Servel
Dataiker

Hello Vinothkumar,

The code previously provided only works for python 2, and from the error you are experiencing, it seems that you are working with python 3.

Python 2 and 3 have a very different approach at handling strings and bytes.

For python 3, modifying your code to:

handle = dataiku.Folder("Foldername")
paths = handle.list_paths_in_partition()
path="/additional"
with handle.get_writer(path) as writer:
  writer.write(Final_df1.to_csv().encode("utf-8"))

 

should do the trick.

 

Hope this helps,

Best regards

Vinothkumar
Level 2

@Nicolas_Servel Awesome ๐Ÿ™‚ You saved my day!!

0 Kudos
pratikgujral-sf
Level 2

Hi @Nicolas_Servel

Can I use this method to directly write my Pandas DataFrame to Dataiku managed folder in append mode?

My Python recipe processes data in chunks, and I need to save the resulting DataFrame into a Dataiku-managed folder in CSV format. Processing each chunk produces a resulting DataFrame, however, in the output folder, I only need one CSV file (outputs of all chunks appended together in one CSV).

 

In a regular Python script I could do:

 

for chunk in get_dataframe_chunk():
  .. # Process the chunk. Result is in df
  df.to_csv('filename.csv', mode='a', header=not os.path.isfile('filename.csv'))

 

To accomplish the same in a Python recipe on Dataiku, can I pass mode='a' like this?

 

for chunk in get_dataframe_chunk():
  .. ## Process chunk and store resulting DataFrame in df
  handle = dataiku.Folder("Foldername")
  paths = handle.list_paths_in_partition()
  path="/additional"
  with handle.get_writer(path) as writer:
    writer.write(df.to_csv(mode='a').encode("utf-8"))

 

 

0 Kudos
sagar_dubey
Level 1

@Nicolas_Servel We also have a very similar use case but instead of using Pandas dataframe we have Spark datasets since volume of data is large. Could you please help us how to achieve this via Pyspark recipe.

0 Kudos
shreyass
Level 1

@sagar_dubey this will work if your python env is 2.X-

with dataiku.Folder("your dataiku folder id in s3").get_writer("filename_for_CSV") as w:
w.write(your_dataframe.toPandas().to_csv(sep="\t",header=False,index=False))

 

for 3.x use this -

with dataiku.Folder("your dataiku folder id in s3").get_writer("filename_for_CSV") as w:
w.write(your_dataframe.toPandas().to_csv(sep="\t",header=False,index=False).encode())

0 Kudos
Carl
Level 3

HI, I've tried the code with python 3 and it's not working.

Error : None: b'"AMEX/train_amex_all_prep_for_score.json" is not a valid file/directory name (forbidden characters or too long) 

 Can you please provide accurate info? 

0 Kudos