Writing into Managed Folder

sagar_dubey
Level 1
Writing into Managed Folder

Hi,

I have a spark dataframe which I'm converting into Pandas dataframe and then writing into Managed Folder. We were able to do successful previously but now we are getting below mentioned error.

 

Error : Exception: None: b'Early EOF'

Any help would be appreciated.

 

ERROR:root:Pipe to generator thread failed
Traceback (most recent call last):
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dkuio.py", line 244, in run
self.consumer(self._generate())
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/managed_folder.py", line 42, in upload_call
}, data=g)
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/intercom.py", line 411, in jek_or_backend_void_call
return backend_void_call(path, data, err_msg, **kwargs)
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/intercom.py", line 402, in backend_void_call
return _handle_void_resp(backend_api_post_call(path, data, **kwargs), err_msg = err_msg)
File "/mnt/dataiku/dataiku-dss-9.0.4/python/dataiku/core/intercom.py", line 460, in _handle_void_resp
raise Exception("%s: %s" % (err_msg, _get_error_message(err_data).encode("utf8")))
Exception: None: b'Early EOF'

 

0 Kudos
5 Replies
AlexT
Dataiker

Hi,

"Early EOF" can be due to a network issue. 

Are you able to reproduce the issue every time? If so can you please share a snippet of your code? 

Thanks,

0 Kudos
sagar_dubey
Level 1
Author

Hi @AlexT 

I'm facing this issue while converting ~7 million records from Spark to Pandas dataframe and then uploading the same into Managed Folder. We have different datasets as well which are working as expected.

0 Kudos
sagar_dubey
Level 1
Author

Hi @AlexT,

 

Is there any API so that we can directly push the spark data frame into Managed Folder rather than converting Spark data frame and then pushing it to managed folder.

0 Kudos
tomas
Level 5

Hi guys, I bumped into the same issue. A simple data frame read and then convert to csv and upload to managed folder failed with:

 

ERROR:root:Pipe to generator thread failed
Traceback (most recent call last):
  File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/dkuio.py", line 244, in run
    self.consumer(self._generate())
  File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/managed_folder.py", line 42, in upload_call
    }, data=g)
  File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/intercom.py", line 411, in jek_or_backend_void_call
    return backend_void_call(path, data, err_msg, **kwargs)
  File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/intercom.py", line 402, in backend_void_call
    return _handle_void_resp(backend_api_post_call(path, data, **kwargs), err_msg = err_msg)
  File "/home/dssuser/dataiku-dss-9.0.3/python/dataiku/core/intercom.py", line 460, in _handle_void_resp
    raise Exception("%s: %s" % (err_msg, _get_error_message(err_data).encode("utf8")))
Exception: None: b'Early EOF'

 

After isolating the problem I figured out that the issue is with the upload to the managed folder. The amount of rows is ~10 million and size of CSV is 3.8GB. 

The issue is reproducible and it fails always. 

 

with open('/tmp/tomas_full.csv', 'rb') as ff:
    with folder.get_writer(filepath) as w:
        w.write(ff.read())

 

 

0 Kudos
tomas
Level 5

Looks like a bug in DSS9.0.3 related to the amount of the data. The possible solution or workaround is to write the data in smaller chunks.

 

# Solution 1 - store the CSV locally and read it chunk-by-chunk  when writing to the folder
def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data
        
with open('/tmp/tomas_full.csv', 'rb') as ff:
    with folder.get_writer(filepath) as w:
        for piece in read_in_chunks(ff, 10240):
            w.write(piece)

# Solution 2 - store the CSV in a byte array and read it chunk-by-chunk when writing to the folder
import math
byte_array = csv_data.encode('UTF-8')
chunk_size = 10240
pieces = math.ceil(len(byte_array) / chunk_size)
with folder.get_writer(filepath) as w:
    for i in range(0, pieces):
        w.write(byte_array[i*chunk_size : (i+1)*chunk_size])