Save pandas dataframe to .csv in managed S3 folder

osk
osk Registered Posts: 9 ✭✭✭✭
edited July 18 in Using Dataiku

 

Hi Dataiku-Team,

I have a quick question related to managed S3 folders. I have a dataframe which I want to save as a .csv file in a managed S3 folder.

Reading the documentation, it sounds to me that I have to store the .csv file in local folder on the DSS server, and then have to upload it like this:


handle = dataiku.Folder("FolderName")
handle.upload_file(file_path="local_path_to_file", path=path_upload_file)

It works like this, however, I feel that there must be a better way of doing it.

So my question is, is there a way to write a dataframe directly to a managed S3 folder?

Thanks a lot for your help!

Best,

Oliver

Answers

  • Nicolas_Servel
    Nicolas_Servel Dataiker Posts: 37 Dataiker
    edited July 18

    Hello Oliver,

    The Folder API also allows you to retrieve directly a writer, that enables you to to write incrementally to a specific path in the managed folder.

    This writer can then be passed directly to pandas to save the dataframe.

    It will then save directly the dataframe to S3 if your managed folder is S3-based.

    In your case, the code would look like:


    handle = dataiku.Folder("FolderName")
    path_upload_file = "path/in/folder/s3"
    with handle.get_writer(path_upload_file) as writer:
    your_df.to_csv(writer, ...)
    # where ... is replaced by the other params you want for "to_csv"

    Regards,

    Nicolas Servel

  • osk
    osk Registered Posts: 9 ✭✭✭✭
    Hi Nicolas,

    Thanks a lot for your help!

    Best,
    Oliver
  • Vinothkumar
    Vinothkumar Registered Posts: 17 ✭✭✭✭
    edited July 17

    Hi @Nicolas_Servel
    ,

    I tried to replicate the same in my project but getting error as expecting a byte information instead of str.

    Code:

    handle = dataiku.Folder("Foldername")
    paths = handle.list_paths_in_partition()
    path="/additional"
    with handle.get_writer(path) as writer:
    Final_df1.to_csv(writer)

    Error:

    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    <ipython-input-142-7b82b94d5811> in <module>
          1 path="/additionalsites"
          2 with handle.get_writer(path) as writer:
    ----> 3     Final_df1.to_csv(writer)
         
    
    
    /app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/core/frame.py in to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, line_terminator, chunksize, tupleize_cols, date_format, doublequote, escapechar, decimal)
       1743                                  doublequote=doublequote,
       1744                                  escapechar=escapechar, decimal=decimal)
    -> 1745         formatter.save()
       1746 
       1747         if path_or_buf is None:
    
    /app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in save(self)
        169                 self.writer = UnicodeWriter(f, **writer_kwargs)
        170 
    --> 171             self._save()
        172 
        173         finally:
    
    /app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in _save(self)
        270     def _save(self):
        271 
    --> 272         self._save_header()
        273 
        274         nrows = len(self.data_index)
    
    /app/dataiku/DSS_DATA_DIR/code-envs/python/Python36_Default/lib/python3.6/site-packages/pandas/io/formats/csvs.py in _save_header(self)
        238         if not has_mi_columns or has_aliases:
        239             encoded_labels += list(write_cols)
    --> 240             writer.writerow(encoded_labels)
        241         else:
        242             # write out the mi
    
    /app/dataiku/dataiku-dss-8.0.2/python/dataiku/core/managed_folder.py in write(self, b)
         44 
         45     def write(self, b):
    ---> 46         self.piping_thread.write(b)
         47 
         48     def close(self):
    
    /app/dataiku/dataiku-dss-8.0.2/python/dataiku/core/dkuio.py in write(self, data)
        200         # logging.info("Pipe to generator thread writes: %s" % data)
        201         self._check_health()
    --> 202         self.buffer.write(data)
        203         if self.buffer.tell() > self.chunk_size:
        204             self.flush()
    
    TypeError: a bytes-like object is required, not 'str'

    in Dataiku, any specific method to convert the DF to byte?i tried pickle lib to dump and export to managed folder(s3).But the format which came out is not proper.

    Thanks,

    Vinothkumar M

  • Nicolas_Servel
    Nicolas_Servel Dataiker Posts: 37 Dataiker
    edited July 17

    Hello Vinothkumar,

    The code previously provided only works for python 2, and from the error you are experiencing, it seems that you are working with python 3.

    Python 2 and 3 have a very different approach at handling strings and bytes.

    For python 3, modifying your code to:

    handle = dataiku.Folder("Foldername")
    paths = handle.list_paths_in_partition()
    path="/additional"
    with handle.get_writer(path) as writer:
      writer.write(Final_df1.to_csv().encode("utf-8"))

    should do the trick.

    Hope this helps,

    Best regards

  • Vinothkumar
    Vinothkumar Registered Posts: 17 ✭✭✭✭

    @Nicolas_Servel
    Awesome You saved my day!!

  • sagar_dubey
    sagar_dubey Partner, Registered Posts: 17 Partner

    @Nicolas_Servel
    We also have a very similar use case but instead of using Pandas dataframe we have Spark datasets since volume of data is large. Could you please help us how to achieve this via Pyspark recipe.

  • shreyass
    shreyass Dataiku DSS Core Designer, Registered Posts: 4 ✭✭✭

    @sagar_dubey
    this will work if your python env is 2.X-

    with dataiku.Folder("your dataiku folder id in s3").get_writer("filename_for_CSV") as w:
    w.write(your_dataframe.toPandas().to_csv(sep="\t",header=False,index=False))

    for 3.x use this -

    with dataiku.Folder("your dataiku folder id in s3").get_writer("filename_for_CSV") as w:
    w.write(your_dataframe.toPandas().to_csv(sep="\t",header=False,index=False).encode())

  • Carl
    Carl Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 24 ✭✭✭✭
    edited July 17

    HI, I've tried the code with python 3 and it's not working.

    Error : None: b'"AMEX/train_amex_all_prep_for_score.json" is not a valid file/directory name (forbidden characters or too long) 

    Can you please provide accurate info?

  • pratikgujral-sf
    pratikgujral-sf Registered Posts: 8
    edited July 17

    Hi @Nicolas_Servel
    ,

    Can I use this method to directly write my Pandas DataFrame to Dataiku managed folder in append mode?

    My Python recipe processes data in chunks, and I need to save the resulting DataFrame into a Dataiku-managed folder in CSV format. Processing each chunk produces a resulting DataFrame, however, in the output folder, I only need one CSV file (outputs of all chunks appended together in one CSV).

    In a regular Python script I could do:

    for chunk in get_dataframe_chunk():
      .. # Process the chunk. Result is in df
      df.to_csv('filename.csv', mode='a', header=not os.path.isfile('filename.csv'))

    To accomplish the same in a Python recipe on Dataiku, can I pass mode='a' like this?

    for chunk in get_dataframe_chunk():
      .. ## Process chunk and store resulting DataFrame in df
      handle = dataiku.Folder("Foldername")
      paths = handle.list_paths_in_partition()
      path="/additional"
      with handle.get_writer(path) as writer:
        writer.write(df.to_csv(mode='a').encode("utf-8"))

Setup Info
    Tags
      Help me…