Python - SQL - partitioned loading

muthu11
Level 2
Python - SQL - partitioned loading

trying to load the Snowflake table via python data frame.

test_data - my Snowfkale table 

python code:

test_data = dataiku.Dataset("test_data")
test_data .write_with_schema(test_df)

I am able to load the table without a partition.

**********************************************************************

Enabled  partition by activating the partition on the table definition (in table explorer)

When I enabled the partition and execute the python recipe getting the below error,

 

ERROR:dataiku.core.dataset_write:Exception caught while writing
Traceback (most recent call last):
  File "/data/dataiku/dataiku-dss-10.0.3/python/dataiku/core/dataset_write.py", line 218, in run
    self.streaming_api.wait_write_session(self.session_id)
  File "/data/dataiku/dataiku-dss-10.0.3/python/dataiku/core/dataset_write.py", line 181, in wait_write_session
    raise Exception(u'An error occurred during dataset write (%s): %s' % (id, decoded_resp["message"]))
Exception: An error occurred during dataset write (a2YqezyvBf): RuntimeException: A partition ID must be provided, because the dataset TESTEXPLORER.test_explorer_data is partitioned
ERROR:dataiku.core.dataset_write:RemoteStreamWriter thread failed
Traceback (most recent call last):
  File "/data/dataiku/data_dir/code-envs/python/python-36-dev-mk/lib/python3.6/site-packages/requests/adapters.py", line 474, in send
    low_conn.send(b'\r\n')
  File "/usr/lib64/python3.6/http/client.py", line 1000, in send
    self.sock.sendall(data)
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/data/dataiku/dataiku-dss-10.0.3/python/dataiku/core/dataset_write.py", line 147, in run
    self.streaming_api.push_data(self.id,self._generate())
  File "/data/dataiku/dataiku-dss-10.0.3/python/dataiku/core/dataset_write.py", line 184, in push_data
    jek_or_backend_void_call("datasets/push-data/", params={"id": id}, data=generator, err_msg="Streaming: push-data call failed")
  File "/data/dataiku/dataiku-dss-10.0.3/python/dataiku/core/intercom.py", line 440, in jek_or_backend_void_call
    return backend_void_call(path, data, err_msg, **kwargs)
  File "/data/dataiku/dataiku-dss-10.0.3/python/dataiku/core/intercom.py", line 431, in backend_void_call
    return _handle_void_resp(backend_api_post_call(path, data, **kwargs), err_msg = err_msg)
  File "/data/dataiku/dataiku-dss-10.0.3/python/dataiku/core/intercom.py", line 361, in backend_api_post_call
    **kwargs)
  File "/data/dataiku/data_dir/code-envs/python/python-36-dev-mk/lib/python3.6/site-packages/requests/sessions.py", line 577, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/data/dataiku/data_dir/code-envs/python/python-36-dev-mk/lib/python3.6/site-packages/requests/sessions.py", line 529, in request
    resp = self.send(prep, **send_kwargs)
  File "/data/dataiku/data_dir/code-envs/python/python-36-dev-mk/lib/python3.6/site-packages/requests/sessions.py", line 645, in send
    r = adapter.send(request, **kwargs)
  File "/data/dataiku/data_dir/code-envs/python/python-36-dev-mk/lib/python3.6/site-packages/requests/adapters.py", line 501, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: [Errno 32] Broken pipe

 

 

0 Kudos
1 Reply
MikeG
Dataiker

Hi @muthu11

Thanks for posting.

It looks like the error of interest in this case is:

Exception: An error occurred during dataset write (a2YqezyvBf): RuntimeException: A partition ID must be provided, because the dataset TESTEXPLORER.test_explorer_data is partitioned

Itโ€™s curious you mentioned youโ€™re encountering `A partition ID must be provided` error when running your code from a Python recipe (https://doc.dataiku.com/dss/latest/code_recipes/python.html) because I would expect a Python recipe to automatically handle partition dependencies.

For example, the following code runs successfully when executed from a Python recipe in my lab environment:

[...]
partitioned_output = dataiku.Dataset("partitioned_output2")
partitioned_output.write_with_schema(partitioned_output_df)

 

Note: the partition to write to is specified next to the run button in the Python recipe (the UI will not allow you to run the recipe unless a partition is specified):

Screen Shot 2022-05-09 at 12.30.06 PM.png

Note: `partitioned_output2` is a DSS partitioned dataset being written to Snowflake

The same code, when when run from a Python notebook fails with the following error:

ERROR:dataiku.core.dataset_write:Exception caught while writing
Traceback (most recent call last):
  File "/Users/mgallegos/Library/DataScienceStudio/kits/dataiku-dss-10.0.5-osx/python/dataiku/core/dataset_write.py", line 229, in run
    self.streaming_api.wait_write_session(self.session_id)
  File "/Users/mgallegos/Library/DataScienceStudio/kits/dataiku-dss-10.0.5-osx/python/dataiku/core/dataset_write.py", line 192, in wait_write_session
    raise Exception(u'An error occurred during dataset write (%s): %s' % (id, decoded_resp["message"]))
Exception: An error occurred during dataset write (Yvssn6hyqi): RuntimeException: A partition ID must be provided, because the dataset COMMUNITY_23249.partitioned_output2 is partitioned

To allow the code to run in a Python notebook I must specify a partition to write to, for example using `set_write_partition`:

[...]
partitioned_output = dataiku.Dataset("partitioned_output")
partitioned_output.set_write_partition(spec='2014') # specify a partition to write to when inside a Python _notebook_
partitioned_output.write_with_schema(partitioned_output_df)

---

Can you provide the following:

  • A screenshot of the Python code in your Python recipe (including the partition specified next to the RUN button)
  • A screenshot of the error you receive when running the Python recipe

Thank you,
Mike

0 Kudos