Writing to partitions | Partitions

Options
GuimoAAGG
GuimoAAGG Partner, Registered Posts: 3 Partner
edited July 16 in Using Dataiku

Hello everyone,

I'm working with datasets with partitions, I found a post showing how to read a partition from a dataset but I have not found a way to write that partition down on another dataframe in the same partition name.

for p in dataset.list_partitions():
    dataset.read_partitions = [p]
    df = dataset.get_dataframe()
    print(p, df.shape)  # transformations on df, works down to here
    """How can I write down this to the corresponding partition of another dataset"""
    with dataset_2.get_writer() as writer:
        dataset_2.writePartition = [p]
        writer.write_dataframe(df)

Best Answer

Answers

  • RoyE
    RoyE Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 31 Dataiker
    edited July 17
    Options

    Hello!

    Just to be clear, are you trying to save a particular single partition to a new dataset?

    If so, in your for loop, while you are iterating through the different partitions, you could insert an if statement that would save the specific partition that you are looking for.

    for p in mydataset.list_partitions():
        mydataset.read_partitions = [p]
        if (p == '2021'): #In this case, my dataset is partitioned by year
            df = mydataset.get_dataframe()
    
    myoutputdataset = dataiku.Dataset("New_partitioned")
    myoutputdataset.write_with_schema(df) #as a new dataset will require the schema to be written as well.

    Are you executing this code through a Python Recipe?

    If so, another alternative is to use the partitioning filters in the Inputs/Outputs tab of the Recipe.

    Screen Shot 2021-07-09 at 10.46.39.png

    Please let me know if I understood or misunderstood the intent and I can provide further assistance!

    Roy

  • GuimoAAGG
    GuimoAAGG Partner, Registered Posts: 3 Partner
    edited July 17
    Options

    Thanks RoyE

    Basically I'm trying to move partitions from one partitioned dataset to another partitioned dataset.

    For example, I read the dataset:

    Exception: An error occurred during dataset write (FpKjfFG9nB): RuntimeException: A partition ID must be provided, because the dataset DEEP_SUPPLY_CHAIN_PRODUCCION.Teorico_Real_M_ABX is partitioned

    I am trying to save it to a different place. i've tried

    • write_dataframe
    • write_with_schema

    but I get this exception.

    dataset = dataiku.Dataset('table_name')
    dataframe = dataset.get_dataframe()

  • GuimoAAGG
    GuimoAAGG Partner, Registered Posts: 3 Partner
    Options

    Hi Roy E,

    Thank you very much for your explanation. I was working on something different the last days but your explanation will help me a lot in future projects as this is something I had come across a few times and now I know how to fix it.

  • Tanguy
    Tanguy Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer, Neuron 2023 Posts: 112 Neuron
    edited July 17
    Options

    I have tried the proposed solution using the set_write_partition inside the DatasetWriter handler but this solution did not work for me.

    I have 2 datasets:

    • the input dataset is not partitioned
    • the output dataset is partitioned and is meant to save an archive of the input dataset every month

    So the purpose is to log a version of the input dataset every month and to keep its history through partitions. The current month is controlled through dataiku's custom variables.

    Here is a screenshot of my code (note that the output_dataset is new when I ran this code - it does not hold any data yet - hence the .write_schema() at the beginning of the last cell):

    screenshot-jupyter.jpg

    As the [*] suggests in the last cell, this code runs indefinitely for a very long time but I eventually get the following error:

    ---------------------------------------------------------------------------
    Exception                                 Traceback (most recent call last)
    <ipython-input-6-94c52c06091e> in <module>()
          4     output_dataset.set_write_partition(archive_partition)
    ----> 5     writer.write_dataframe(df)
          6     writer.close()
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in write_dataframe(self, df)
        396                                        index=None, header=False, sep=',',
    --> 397                                        quoting=csv.QUOTE_ALL,).save()    398 
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in save(self)
        196                 self.writer = Python2DKUUTF8Writer(f, **writer_kwargs)
    --> 197             self._save()
        198         finally:
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in _save(self)
        295 
    --> 296             self._save_chunk(start_i, end_i)
        297 
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in _save_chunk(self, start_i, end_i)
        327 
    --> 328         libwriters.write_csv_rows(self.data, ix, self.nlevels, self.cols, self.writer)
    
    pandas/_libs/writers.pyx in pandas._libs.writers.write_csv_rows()
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dku_pandas_csv.py in writerows(self, rows)
         59     def writerows(self, rows):
    ---> 60         self.writer.writerows(rows)
         61 
    
    /usr/lib64/python3.6/codecs.py in write(self, object)
        376         data, consumed = self.encode(object, self.errors)
    --> 377         self.stream.write(data)
        378 
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in write(self, data)
        111         if self.buffer.tell() > self.chunk_size:
    --> 112             self.flush()
        113 
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in flush(self)
        104         if self.error_message:
    --> 105             raise Exception(self.error_message)
        106 
    
    Exception: Error : [Errno 32] Broken pipe
    
    During handling of the above exception, another exception occurred:
    
    Exception                                 Traceback (most recent call last)
    <ipython-input-6-94c52c06091e> in <module>()
          4     output_dataset.set_write_partition(archive_partition)
          5     writer.write_dataframe(df)
    ----> 6     writer.close()
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in __exit__(self, type, value, traceback)
        411 
        412     def __exit__(self, type, value, traceback):
    --> 413         self.close()
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in close(self)
        402             del DatasetWriter.active_writers[self.dataset.full_name]
        403 
    --> 404         self._start_once()
        405         self.remote_writer.flush()
        406         self.remote_writer.close()
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in _start_once(self, data_schema)
        308     def _start_once(self,data_schema=None):
        309         if self.waiter:
    --> 310             self.waiter.raise_on_failure()
        311 
        312         if not self.remote_writer:
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in raise_on_failure(self)
        201         if self.exception_type is not None:
        202             if (sys.version_info > (3, 0)):
    --> 203                 raise self.exception
        204             else:
        205                 exec("raise self.exception_type, self.exception, self.traceback")
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in run(self)
        216             if self.session_id == MISSING_ID_MARKER and self.session_init_message is not None:
        217                 raise Exception(u'An error occurred while starting the writing to the dataset : %s' % self.session_init_message)
    --> 218             self.streaming_api.wait_write_session(self.session_id)
        219         except Exception as e:
        220             logger.exception("Exception caught while writing")
    
    /home/dataiku/dataiku-dss-9.0.4/python/dataiku/core/dataset_write.py in wait_write_session(self, id)
        179             print ("%s rows successfully written (%s)" % (writtenRows,id))
        180         else:
    --> 181             raise Exception(u'An error occurred during dataset write (%s): %s' % (id, decoded_resp["message"]))
        182 
        183     def push_data(self,id,generator):
    
    Exception: An error occurred during dataset write (YKZFP7NFyl): RuntimeException: A partition ID must be provided, because the dataset EFFECTIFS.DMC_entreprise_archives_partitions_bis is partitioned

Setup Info
    Tags
      Help me…