Writing to partitions | Partitions

Solved!
GuimoAAGG
Level 1
Writing to partitions | Partitions

Hello everyone,

I'm working with datasets with partitions, I found a post showing how to read a partition from a dataset but I have not found a way to write that partition down on another dataframe in the same partition name.

 

 

for p in dataset.list_partitions():
    dataset.read_partitions = [p]
    df = dataset.get_dataframe()
    print(p, df.shape)  # transformations on df, works down to here
    """How can I write down this to the corresponding partition of another dataset"""
    with dataset_2.get_writer() as writer:
        dataset_2.writePartition = [p]
        writer.write_dataframe(df)

 

 

 

0 Kudos
1 Solution
RoyE
Dataiker

Hello

Thank you for the clarification! 

In order to write to a partition, you must use the below in order to set the correct partition!

<dataset>.set_write_partition("<partition name>")

 

https://doc.dataiku.com/dss/latest/python-api/datasets-reference.html#dataiku.Dataset.set_write_part...

Please note that set_write_partition requires the use of DatasetWriter.

https://doc.dataiku.com/dss/latest/python-api/datasets-reference.html#dataiku.core.dataset_write.Dat...

with myoutputdataset.get_writer() as writer:
    for p in mydataset.list_partitions():
        mydataset.read_partitions = [p]
        myoutputdataset.set_write_partition(str(p))
        df = mydataset.get_dataframe()

        writer.write_dataframe(df)
writer.close()

 

This code should iterate through each partition in your old dataset to your new one!

 

Roy

 

View solution in original post

5 Replies
RoyE
Dataiker

Hello!

Just to be clear, are you trying to save a particular single partition to a new dataset? 

If so, in your for loop, while you are iterating through the different partitions, you could insert an if statement that would save the specific partition that you are looking for.

for p in mydataset.list_partitions():
    mydataset.read_partitions = [p]
    if (p == '2021'): #In this case, my dataset is partitioned by year
        df = mydataset.get_dataframe()

myoutputdataset = dataiku.Dataset("New_partitioned")
myoutputdataset.write_with_schema(df) #as a new dataset will require the schema to be written as well.

 

Are you executing this code through a Python Recipe? 

If so, another alternative is to use the partitioning filters in the Inputs/Outputs tab of the Recipe.

Screen Shot 2021-07-09 at 10.46.39.png

 

Please let me know if I understood or misunderstood the intent and I can provide further assistance! 

Roy

0 Kudos
GuimoAAGG
Level 1
Author

Thanks RoyE

Basically I'm trying to move partitions from one partitioned dataset to another partitioned dataset.

For example, I read the dataset:

dataset = dataiku.Dataset('table_name')
dataframe = dataset.get_dataframe()

I am trying to save it to a different place. i've tried 

  • write_dataframe
  • write_with_schema

but I get this exception.

Exception: An error occurred during dataset write (FpKjfFG9nB): RuntimeException: A partition ID must be provided, because the dataset DEEP_SUPPLY_CHAIN_PRODUCCION.Teorico_Real_M_ABX is partitioned

 

0 Kudos
RoyE
Dataiker

Hello

Thank you for the clarification! 

In order to write to a partition, you must use the below in order to set the correct partition!

<dataset>.set_write_partition("<partition name>")

 

https://doc.dataiku.com/dss/latest/python-api/datasets-reference.html#dataiku.Dataset.set_write_part...

Please note that set_write_partition requires the use of DatasetWriter.

https://doc.dataiku.com/dss/latest/python-api/datasets-reference.html#dataiku.core.dataset_write.Dat...

with myoutputdataset.get_writer() as writer:
    for p in mydataset.list_partitions():
        mydataset.read_partitions = [p]
        myoutputdataset.set_write_partition(str(p))
        df = mydataset.get_dataframe()

        writer.write_dataframe(df)
writer.close()

 

This code should iterate through each partition in your old dataset to your new one!

 

Roy

 

GuimoAAGG
Level 1
Author

Hi Roy E,

Thank you very much for your explanation. I was working on something different the last days but your explanation will help me a lot in future projects as this is something I had come across a few times and now I know how to fix it.

0 Kudos
tanguy

I have tried the proposed solution using the set_write_partition inside the DatasetWriter handler but this solution did not work for me.

I have 2 datasets:

  • the input dataset is not partitioned
  • the output dataset is partitioned and is meant to save an archive of the input dataset every month

So the purpose is to log a version of the input dataset every month and to keep its history through partitions. The current month is controlled through dataiku's custom variables.

Here is a screenshot of my code (note that the output_dataset is new when I ran this code - it does not hold any data yet - hence the .write_schema() at the beginning of the last cell):

screenshot-jupyter.jpg

 As the [*] suggests in the last cell, this code runs indefinitely for a very long time but I eventually get the following error:

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-6-94c52c06091e> in <module>()
      4     output_dataset.set_write_partition(archive_partition)
----> 5     writer.write_dataframe(df)
      6     writer.close()

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in write_dataframe(self, df)
    396                                        index=None, header=False, sep=',',
--> 397                                        quoting=csv.QUOTE_ALL,).save()    398 

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in save(self)
    196                 self.writer = Python2DKUUTF8Writer(f, **writer_kwargs)
--> 197             self._save()
    198         finally:

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in _save(self)
    295 
--> 296             self._save_chunk(start_i, end_i)
    297 

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in _save_chunk(self, start_i, end_i)
    327 
--> 328         libwriters.write_csv_rows(self.data, ix, self.nlevels, self.cols, self.writer)

pandas/_libs/writers.pyx in pandas._libs.writers.write_csv_rows()

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in writerows(self, rows)
     59     def writerows(self, rows):
---> 60         self.writer.writerows(rows)
     61 

/usr/lib64/python3.6/codecs.py in write(self, object)
    376         data, consumed = self.encode(object, self.errors)
--> 377         self.stream.write(data)
    378 

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in write(self, data)
    111         if self.buffer.tell() > self.chunk_size:
--> 112             self.flush()
    113 

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in flush(self)
    104         if self.error_message:
--> 105             raise Exception(self.error_message)
    106 

Exception: Error : [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Exception                                 Traceback (most recent call last)
<ipython-input-6-94c52c06091e> in <module>()
      4     output_dataset.set_write_partition(archive_partition)
      5     writer.write_dataframe(df)
----> 6     writer.close()

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in __exit__(self, type, value, traceback)
    411 
    412     def __exit__(self, type, value, traceback):
--> 413         self.close()

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in close(self)
    402             del DatasetWriter.active_writers[self.dataset.full_name]
    403 
--> 404         self._start_once()
    405         self.remote_writer.flush()
    406         self.remote_writer.close()

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in _start_once(self, data_schema)
    308     def _start_once(self,data_schema=None):
    309         if self.waiter:
--> 310             self.waiter.raise_on_failure()
    311 
    312         if not self.remote_writer:

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in raise_on_failure(self)
    201         if self.exception_type is not None:
    202             if (sys.version_info > (3, 0)):
--> 203                 raise self.exception
    204             else:
    205                 exec("raise self.exception_type, self.exception, self.traceback")

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in run(self)
    216             if self.session_id == MISSING_ID_MARKER and self.session_init_message is not None:
    217                 raise Exception(u'An error occurred while starting the writing to the dataset : %s' % self.session_init_message)
--> 218             self.streaming_api.wait_write_session(self.session_id)
    219         except Exception as e:
    220             logger.exception("Exception caught while writing")

/home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in wait_write_session(self, id)
    179             print ("%s rows successfully written (%s)" % (writtenRows,id))
    180         else:
--> 181             raise Exception(u'An error occurred during dataset write (%s): %s' % (id, decoded_resp["message"]))
    182 
    183     def push_data(self,id,generator):

Exception: An error occurred during dataset write (YKZFP7NFyl): RuntimeException: A partition ID must be provided, because the dataset EFFECTIFS.DMC_entreprise_archives_partitions_bis is partitioned

 

0 Kudos