Check out the first Dataiku 8 Deep Dive focusing on Productivity on October 29th Read More

About get_writer

Level 3
Level 3
About get_writer

Hello,

  I was trying to develop a plugin which reads the content of a csv file and split it into as many files (which we don't know the number in advance) as the values of a specific field.  The problem I'm now facing is that it seems that for a specific folder only a single writer can be used so it doesn't  seem possible to keep as many writers opened as are the files to save.  On the other side, if I close the former writer to allow the current writer to save data it doesn't seem possible to  reopen the previous one if needed (I cannot sort the file because it's too big). More below just a snippet of my code. I'm going to follow a different approach now (i.e. creating a subfolder for each value and a single writer for each) but would like to know how to deal with such situations. Thanks in advance.

Best Regards.

Giuseppe

 

 

for current_file in list_of_csv:
    name_of_folder = current_file.strip('.csv')
    new_handle = project.create_managed_folder(name_of_folder, folder_type='HDFS', connection_name='hdfs_managed')
    newFolder  = dataiku.Folder(new_handle.get_definition()['id'])
    #print(newFolder.get_info())
    csv_writers = {}
    with nytf_Train.get_download_stream(current_file) as current_stream:
        global_header = current_stream.readline()
        header_array  = global_header.strip().split(',')
        if( "key" in header_array):
            column_index  = header_array.index("key")
        else:
            continue
        #print("First line of myfile is: {}".format(global_header))
        for current_line in current_stream:
            line_array  = current_line.strip().split(',')
            current_key = line_array[column_index][0:7]
            #print(current_key)
            if current_key not in csv_writers:
                with newFolder.get_writer(current_key) as new_file_handler:
                    csv_writers[current_key] = new_file_handler
                    #print(new_file_handler.__dict__)
                    #print(dir(new_file_handler))
                    # Warning: if you make use of with because outside the block file is closed
                    # and we need t write a lot and avoid open/close overwork
                    #with csv_writers[current_key] as tmp_file_handler:
                    new_file_handler.write(global_header)
                    new_file_handler.write(current_line)
            else:
                # pass
                # same as above for the with
                with csv_writers[current_key] as current_file_handler:
                    # Trying this... but I am rather sure it is wrong...
                    f=open(current_file_handler,"a+")
                    f.write(current_line)
                    #current_file_handler = csv_writers[current_key]
                    # Unfortunately the ManagedFolderWriter object does not have the closed attribute
                    # Alternative way could be to check if any thread is keeping it
                    # or except IOError
                    #if not current_file_handler.closed:
                    #try:
                    #    #print('NOK')
                    #    current_file_handler.write(current_line)
                    #except IOError:
                    #    current_file_handler.close()

    for k, v in csv_writers.items():
        v.close()
    metrics_data[current_file] = len(csv_writers)
    csv_writers.clear()
    current_stream.close()

 

 

0 Kudos
4 Replies
Dataiker
Dataiker

Hi @gnaldi62 ,

Such situations are generally solved by using our native partitioning capabilities.
(In your case, you can create an HDFS dataset to import the input csv file. Then create a Sync recipe and configure the output dataset as partitioned by a discrete dimension (the "key" column according to which you want to break down your original file.) Finally, enable the "redispatch partitioning according to input columns" option in the sync recipe.
Here's a tutorial for more details about Repartitioning a non-partitioned dataset.)

Is there a particular reason for writing a plugin instead of using the existing partitioning capabilities?

How big is your input file? What the order of magnitude of the number of different keys that you can get?

Hope it helps.

0 Kudos
Level 3
Level 3
Author

Hi Dimitri,

  the reason is HDFS; as far as I know in this case only file partitioning works. But because the starting file is a single, huge csv file (and not a set of folders) I need first to split its content into chunks inside folders. Both input and output should be HDFS managed (this choice has been dictated by the size of almost 10GB). 

Txs. Rgds.

Giuseppe

0 Kudos
Dataiker
Dataiker

Hi @gnaldi62 ,

You can actually leverage partitioning to proceed with the split of the big original file.

The input of the Sync recipe, used with the "redispatch partitioning" mode enabled, would be a single not-partitioned HDFS dataset pointing to your CSV file. The output would be a file-based partitioned HDFS dataset, where each partition would be referring to a separate CSV file.

Here is an example made with the titanic dataset stored in HDFS, partitioned according to the values of the "Pclass" column.

Screenshot 2020-07-02 at 16.51.43.png

And if I open the output dataset, I can see that I have as many partitions as I have values in the "Pclass" column (3), and I have a separate CSV file stored for each of them.

Screenshot 2020-07-02 at 16.52.02.png

You can refer to the tutorial about Repartitioning a non-partitioned dataset for more details.

Hope this helps.

Level 3
Level 3
Author

Hi,

  thank you, it worked. The problem was with my hadoop credentials.

Giuseppe