Is it possible to write multiple datasources at a time using python?

Jason
Jason Registered Posts: 33 ✭✭✭✭✭
edited July 16 in Using Dataiku

I have a data processing task that requires python. Specifically, I'm reading data from a proprietary file format, then writing the extracted data out to the database. I want to split this data into multiple datasources, one for training data, one for holdouts, and one for bad data (so I can analyze corrupted data from the file). This is a LOT of data, and the normal Dataiku way of making copy after copy of the data to do the processing pipeline doesn't really work (the bulk of the data needs to be stored in an array, and Dataiku recipes see the array as a JSON encoded string). So I think I'm backed into a corner by all of this that the read/cleaning/splitting all needs to happen just once in Python. To this end, I've leveraged as much vectorized functions in pandas, and multiprocessing from Python in order to get the processing time down.

When I analyze the run logs, it appears as though the data write is partially blocking my script. What I mean by this, is that in the logs, if I drop a timestamp to standard out, I can see a pause from a few seconds, to a few minutes before my script resumes... but then I can see (what I suppose is the Java side) continuing to write rows after my script has control back. The number of writes I am doing is such that these few seconds/minutes of pausing add up to about an hour and a half of waiting.

I tried to solve this by adding a Queue from multiprocessing so that the call to write_dataframe() can pause in a separate process, and the main script can move on without waiting. This all worked fine, and my main script was so fast now, that it finished the processing work way before all the writes were completed... but because of the ongoing writes, I was still waiting!

So... I got greedy and wanted more. I figured if I set up three processes, with three queues, each one working a specific data source, I could make things go even faster. When I try this configuration, I get a thrown exception that I really can't make sense of:

ERROR:dataiku.core.dataset_write:Exception caught while writing
Traceback (most recent call last):
  File "/data/dataiku/dataiku-dss-12.4.0/python/dataiku/core/dataset_write.py", line 353, in run
    self.streaming_api.wait_write_session(self.session_id)
  File "/data/dataiku/dataiku-dss-12.4.0/python/dataiku/core/dataset_write.py", line 296, in wait_write_session
    raise Exception(u'An error occurred during dataset write (%s): %s' % (id, decoded_resp["message"]))
Exception: An error occurred during dataset write (7TGwc1jgs7): EofException: Early EOF
ERROR:dataiku.core.dataset_write:RemoteStreamWriter thread failed
Traceback (most recent call last):
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/connectionpool.py", line 715, in urlopen
    httplib_response = self._make_request(
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/connectionpool.py", line 467, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/connectionpool.py", line 462, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib64/python3.9/http/client.py", line 1377, in getresponse
    response.begin()
  File "/usr/lib64/python3.9/http/client.py", line 320, in begin
    version, status, reason = self._read_status()
  File "/usr/lib64/python3.9/http/client.py", line 289, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
http.client.RemoteDisconnected: Remote end closed connection without response

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/requests/adapters.py", line 486, in send
    resp = conn.urlopen(
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/connectionpool.py", line 799, in urlopen
    retries = retries.increment(
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/util/retry.py", line 550, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/packages/six.py", line 769, in reraise
    raise value.with_traceback(tb)
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/connectionpool.py", line 715, in urlopen
    httplib_response = self._make_request(
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/connectionpool.py", line 467, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/urllib3/connectionpool.py", line 462, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib64/python3.9/http/client.py", line 1377, in getresponse
    response.begin()
  File "/usr/lib64/python3.9/http/client.py", line 320, in begin
    version, status, reason = self._read_status()
  File "/usr/lib64/python3.9/http/client.py", line 289, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
urllib3.exceptions.ProtocolError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/data/dataiku/dataiku-dss-12.4.0/python/dataiku/core/dataset_write.py", line 220, in run
    self.streaming_api.push_data(self.id,self._generate())
  File "/data/dataiku/dataiku-dss-12.4.0/python/dataiku/core/dataset_write.py", line 304, in push_data
    jek_or_backend_void_call("datasets/push-data/", params={"id": id}, data=generator, err_msg="Streaming: push-data call failed")
  File "/data/dataiku/dataiku-dss-12.4.0/python/dataiku/core/intercom.py", line 511, in jek_or_backend_void_call
    return backend_void_call(path, data, err_msg, **kwargs)
  File "/data/dataiku/dataiku-dss-12.4.0/python/dataiku/core/intercom.py", line 502, in backend_void_call
    return _handle_void_resp(backend_api_post_call(path, data, **kwargs), err_msg = err_msg)
  File "/data/dataiku/dataiku-dss-12.4.0/python/dataiku/core/intercom.py", line 394, in backend_api_post_call
    return _cached_session.post("%s/dip/api/tintercom/%s" % (get_backend_url(), path),
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/requests/sessions.py", line 637, in post
    return self.request("POST", url, data=data, json=json, **kwargs)
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/requests/sessions.py", line 703, in send
    r = adapter.send(request, **kwargs)
  File "/data/dataiku/dataiku_data/code-envs/python/Spectrum_Analysis_39/lib/python3.9/site-packages/requests/adapters.py", line 501, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')) 

If I use this exact code, but only attempt to write one datasource, and drop the data for the other two, then I get no error messages. So it has something to do with having multiple data sources being open and written to at the same time, but my hope was that because they were created and running under seprate processes (technically, with multiprocessing it's sub-processes) that it would act like it was three separate python recipes writing to separate data sources.

Am I wanting too much? Is this possible to do?

Thanks,
Jason


Operating system used: Red Hat

Answers

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,160 Neuron

    I had a similar requirement to process millions of XML files that were embedded into a proprietary text file format and resulted in different file types. I found that writing to Dataiku datasets added too much overhead therefore moved away from using them until later in the flow, once I could aggregate the data and process it set based. My design was basically a Python recipe that read the proprietary file format and split it into 3 files types which would go to 3 different managed Dataiku folders. Then there will be 3 Python recipes, which had the side effect of adding parallel processing as I now had 3 separate flow branches, using those as an input and converting and validating the files into CSV files and then another recipe to merge all CSV files and write them to a Dataiku Dataset. This design was very efficient because it was file based so provided you have fast local storage (ie SSDs) then you get much more IO performance than with a database, specially when you are doing lots of “small” writes. It also allowed us to use an “event” driven scenario as we could have scenario triggers in the corresponding folders when “dataset changed” which meant all we had to do for the flow to execute end to end was to push some source files on the landing folder and the flow will process them end to end.

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,226 Dataiker

    The write_dataframe method is not thread-safe when writing to the same dataset. DSS handle this exception/condition by checking the active_writers :

    -> Thread raised an exception: Unable to instantiate a new dataset writer. There is already another active writer for this dataset (API_DATASET_READ_CACHING.test_snowflake).

    https://developer.dataiku.com/latest/api-reference/python/datasets.html#dataiku.core.dataset_write.DatasetWriter.active_writers

    I didn't have any issues regarding parallel writes to multiple outputs but different datasets, which I understand you are doing now. I tried using S3 file-based datasets or different snowflake tables concurrently.

    Is your dataset the same SQL table? This may cause issues if the table is locked due writer from another thread in your code. You may need to write to different temp tables and consolidate them with a stack recipe later in your flow.

    I hope this helps. If not, feel free to raise a support ticket with the job diagnostics so we can check further.

    Thanks,


  • Jason
    Jason Registered Posts: 33 ✭✭✭✭✭

    Yes, this is a SQL database, and I suppose it's possible that the ODBC driver is getting confused by multiple writes trying to go to various (not the same) databases. I've got a few things to check and verify that I really am making three clean processes writing to three separate datasets, and if that all fails, then I'll shoot it over to support and see if they can help. At present, I've reduced the run time from 24 hours to 2, so I'm in a place where I can tolerate it if this is as good as we are going to get it.

    Thanks,
    Jason

Setup Info
    Tags
      Help me…