Writing into Managed Folder

sagar_dubey
sagar_dubey Partner, Registered Posts: 17 Partner

Hi,

I have a spark dataframe which I'm converting into Pandas dataframe and then writing into Managed Folder. We were able to do successful previously but now we are getting below mentioned error.

Error : Exception: None: b'Early EOF'

Any help would be appreciated.

ERROR:root:Pipe to generator thread failed
Traceback (most recent call last):
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dkuio.py", line 244, in run
self.consumer(self._generate())
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/managed_folder.py", line 42, in upload_call
}, data=g)
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/intercom.py", line 411, in jek_or_backend_void_call
return backend_void_call(path, data, err_msg, **kwargs)
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/intercom.py", line 402, in backend_void_call
return _handle_void_resp(backend_api_post_call(path, data, **kwargs), err_msg = err_msg)
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/intercom.py", line 460, in _handle_void_resp
raise Exception("%s: %s" % (err_msg, _get_error_message(err_data).encode("utf8")))
Exception: None: b'Early EOF'

Answers

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,226 Dataiker

    Hi,

    "Early EOF" can be due to a network issue.

    Are you able to reproduce the issue every time? If so can you please share a snippet of your code?

    Thanks,

  • sagar_dubey
    sagar_dubey Partner, Registered Posts: 17 Partner

    Hi @AlexT

    I'm facing this issue while converting ~7 million records from Spark to Pandas dataframe and then uploading the same into Managed Folder. We have different datasets as well which are working as expected.

  • sagar_dubey
    sagar_dubey Partner, Registered Posts: 17 Partner

    Hi @AlexT
    ,

    Is there any API so that we can directly push the spark data frame into Managed Folder rather than converting Spark data frame and then pushing it to managed folder.

  • Tomas
    Tomas Registered, Neuron 2022 Posts: 121 ✭✭✭✭✭
    edited July 17

    Hi guys, I bumped into the same issue. A simple data frame read and then convert to csv and upload to managed folder failed with:

    ERROR:root:Pipe to generator thread failed
    Traceback (most recent call last):
      File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/dkuio.py", line 244, in run
        self.consumer(self._generate())
      File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/managed_folder.py", line 42, in upload_call
        }, data=g)
      File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/intercom.py", line 411, in jek_or_backend_void_call
        return backend_void_call(path, data, err_msg, **kwargs)
      File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/intercom.py", line 402, in backend_void_call
        return _handle_void_resp(backend_api_post_call(path, data, **kwargs), err_msg = err_msg)
      File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/intercom.py", line 460, in _handle_void_resp
        raise Exception("%s: %s" % (err_msg, _get_error_message(err_data).encode("utf8")))
    Exception: None: b'Early EOF'

    After isolating the problem I figured out that the issue is with the upload to the managed folder. The amount of rows is ~10 million and size of CSV is 3.8GB.

    The issue is reproducible and it fails always.

    with open('/tmp/tomas_full.csv', 'rb') as ff:
        with folder.get_writer(filepath) as w:
            w.write(ff.read())

  • Tomas
    Tomas Registered, Neuron 2022 Posts: 121 ✭✭✭✭✭
    edited July 17

    Looks like a bug in DSS9.0.3 related to the amount of the data. The possible solution or workaround is to write the data in smaller chunks.

    # Solution 1 - store the CSV locally and read it chunk-by-chunk  when writing to the folder
    def read_in_chunks(file_object, chunk_size=1024):
        while True:
            data = file_object.read(chunk_size)
            if not data:
                break
            yield data
            
    with open('/tmp/tomas_full.csv', 'rb') as ff:
        with folder.get_writer(filepath) as w:
            for piece in read_in_chunks(ff, 10240):
                w.write(piece)
    
    # Solution 2 - store the CSV in a byte array and read it chunk-by-chunk when writing to the folder
    import math
    byte_array = csv_data.encode('UTF-8')
    chunk_size = 10240
    pieces = math.ceil(len(byte_array) / chunk_size)
    with folder.get_writer(filepath) as w:
        for i in range(0, pieces):
            w.write(byte_array[i*chunk_size : (i+1)*chunk_size])
Setup Info
    Tags
      Help me…