Writing to partitions | Partitions
Hello everyone,
I'm working with datasets with partitions, I found a post showing how to read a partition from a dataset but I have not found a way to write that partition down on another dataframe in the same partition name.
for p in dataset.list_partitions(): dataset.read_partitions = [p] df = dataset.get_dataframe() print(p, df.shape) # transformations on df, works down to here """How can I write down this to the corresponding partition of another dataset""" with dataset_2.get_writer() as writer: dataset_2.writePartition = [p] writer.write_dataframe(df)
Best Answer
-
RoyE Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 31 Dataiker
Hello
Thank you for the clarification!
In order to write to a partition, you must use the below in order to set the correct partition!
<dataset>.set_write_partition("<partition name>")
Please note that set_write_partition requires the use of DatasetWriter.
with myoutputdataset.get_writer() as writer: for p in mydataset.list_partitions(): mydataset.read_partitions = [p] myoutputdataset.set_write_partition(str(p)) df = mydataset.get_dataframe() writer.write_dataframe(df)
writer.close()This code should iterate through each partition in your old dataset to your new one!
Roy
Answers
-
RoyE Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 31 Dataiker
Hello!
Just to be clear, are you trying to save a particular single partition to a new dataset?
If so, in your for loop, while you are iterating through the different partitions, you could insert an if statement that would save the specific partition that you are looking for.
for p in mydataset.list_partitions(): mydataset.read_partitions = [p] if (p == '2021'): #In this case, my dataset is partitioned by year df = mydataset.get_dataframe() myoutputdataset = dataiku.Dataset("New_partitioned") myoutputdataset.write_with_schema(df) #as a new dataset will require the schema to be written as well.
Are you executing this code through a Python Recipe?
If so, another alternative is to use the partitioning filters in the Inputs/Outputs tab of the Recipe.
Please let me know if I understood or misunderstood the intent and I can provide further assistance!
Roy
-
Thanks RoyE
Basically I'm trying to move partitions from one partitioned dataset to another partitioned dataset.
For example, I read the dataset:
Exception: An error occurred during dataset write (FpKjfFG9nB): RuntimeException: A partition ID must be provided, because the dataset DEEP_SUPPLY_CHAIN_PRODUCCION.Teorico_Real_M_ABX is partitioned
I am trying to save it to a different place. i've tried
- write_dataframe
- write_with_schema
but I get this exception.
dataset = dataiku.Dataset('table_name') dataframe = dataset.get_dataframe()
-
Hi Roy E,
Thank you very much for your explanation. I was working on something different the last days but your explanation will help me a lot in future projects as this is something I had come across a few times and now I know how to fix it. -
Tanguy Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer, Neuron 2023 Posts: 114 Neuron
I have tried the proposed solution using the set_write_partition inside the DatasetWriter handler but this solution did not work for me.
I have 2 datasets:
- the input dataset is not partitioned
- the output dataset is partitioned and is meant to save an archive of the input dataset every month
So the purpose is to log a version of the input dataset every month and to keep its history through partitions. The current month is controlled through dataiku's custom variables.
Here is a screenshot of my code (note that the output_dataset is new when I ran this code - it does not hold any data yet - hence the .write_schema() at the beginning of the last cell):
As the [*] suggests in the last cell, this code runs indefinitely for a very long time but I eventually get the following error:
--------------------------------------------------------------------------- Exception Traceback (most recent call last) <ipython-input-6-94c52c06091e> in <module>() 4 output_dataset.set_write_partition(archive_partition) ----> 5 writer.write_dataframe(df) 6 writer.close() /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in write_dataframe(self, df) 396 index=None, header=False, sep=',', --> 397 quoting=csv.QUOTE_ALL,).save() 398 /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in save(self) 196 self.writer = Python2DKUUTF8Writer(f, **writer_kwargs) --> 197 self._save() 198 finally: /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in _save(self) 295 --> 296 self._save_chunk(start_i, end_i) 297 /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in _save_chunk(self, start_i, end_i) 327 --> 328 libwriters.write_csv_rows(self.data, ix, self.nlevels, self.cols, self.writer) pandas/_libs/writers.pyx in pandas._libs.writers.write_csv_rows() /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in writerows(self, rows) 59 def writerows(self, rows): ---> 60 self.writer.writerows(rows) 61 /usr/lib64/python3.6/codecs.py in write(self, object) 376 data, consumed = self.encode(object, self.errors) --> 377 self.stream.write(data) 378 /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in write(self, data) 111 if self.buffer.tell() > self.chunk_size: --> 112 self.flush() 113 /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in flush(self) 104 if self.error_message: --> 105 raise Exception(self.error_message) 106 Exception: Error : [Errno 32] Broken pipe During handling of the above exception, another exception occurred: Exception Traceback (most recent call last) <ipython-input-6-94c52c06091e> in <module>() 4 output_dataset.set_write_partition(archive_partition) 5 writer.write_dataframe(df) ----> 6 writer.close() /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in __exit__(self, type, value, traceback) 411 412 def __exit__(self, type, value, traceback): --> 413 self.close() /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in close(self) 402 del DatasetWriter.active_writers[self.dataset.full_name] 403 --> 404 self._start_once() 405 self.remote_writer.flush() 406 self.remote_writer.close() /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in _start_once(self, data_schema) 308 def _start_once(self,data_schema=None): 309 if self.waiter: --> 310 self.waiter.raise_on_failure() 311 312 if not self.remote_writer: /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in raise_on_failure(self) 201 if self.exception_type is not None: 202 if (sys.version_info > (3, 0)): --> 203 raise self.exception 204 else: 205 exec("raise self.exception_type, self.exception, self.traceback") /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in run(self) 216 if self.session_id == MISSING_ID_MARKER and self.session_init_message is not None: 217 raise Exception(u'An error occurred while starting the writing to the dataset : %s' % self.session_init_message) --> 218 self.streaming_api.wait_write_session(self.session_id) 219 except Exception as e: 220 logger.exception("Exception caught while writing") /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in wait_write_session(self, id) 179 print ("%s rows successfully written (%s)" % (writtenRows,id)) 180 else: --> 181 raise Exception(u'An error occurred during dataset write (%s): %s' % (id, decoded_resp["message"])) 182 183 def push_data(self,id,generator): Exception: An error occurred during dataset write (YKZFP7NFyl): RuntimeException: A partition ID must be provided, because the dataset EFFECTIFS.DMC_entreprise_archives_partitions_bis is partitioned