About get_writer
Hello,
I was trying to develop a plugin which reads the content of a csv file and split it into as many files (which we don't know the number in advance) as the values of a specific field. The problem I'm now facing is that it seems that for a specific folder only a single writer can be used so it doesn't seem possible to keep as many writers opened as are the files to save. On the other side, if I close the former writer to allow the current writer to save data it doesn't seem possible to reopen the previous one if needed (I cannot sort the file because it's too big). More below just a snippet of my code. I'm going to follow a different approach now (i.e. creating a subfolder for each value and a single writer for each) but would like to know how to deal with such situations. Thanks in advance.
Best Regards.
Giuseppe
for current_file in list_of_csv: name_of_folder = current_file.strip('.csv') new_handle = project.create_managed_folder(name_of_folder, folder_type='HDFS', connection_name='hdfs_managed') newFolder = dataiku.Folder(new_handle.get_definition()['id']) #print(newFolder.get_info()) csv_writers = {} with nytf_Train.get_download_stream(current_file) as current_stream: global_header = current_stream.readline() header_array = global_header.strip().split(',') if( "key" in header_array): column_index = header_array.index("key") else: continue #print("First line of myfile is: {}".format(global_header)) for current_line in current_stream: line_array = current_line.strip().split(',') current_key = line_array[column_index][0:7] #print(current_key) if current_key not in csv_writers: with newFolder.get_writer(current_key) as new_file_handler: csv_writers[current_key] = new_file_handler #print(new_file_handler.__dict__) #print(dir(new_file_handler)) # Warning: if you make use of with because outside the block file is closed # and we need t write a lot and avoid open/close overwork #with csv_writers[current_key] as tmp_file_handler: new_file_handler.write(global_header) new_file_handler.write(current_line) else: # pass # same as above for the with with csv_writers[current_key] as current_file_handler: # Trying this... but I am rather sure it is wrong... f=open(current_file_handler,"a+") f.write(current_line) #current_file_handler = csv_writers[current_key] # Unfortunately the ManagedFolderWriter object does not have the closed attribute # Alternative way could be to check if any thread is keeping it # or except IOError #if not current_file_handler.closed: #try: # #print('NOK') # current_file_handler.write(current_line) #except IOError: # current_file_handler.close() for k, v in csv_writers.items(): v.close() metrics_data[current_file] = len(csv_writers) csv_writers.clear() current_stream.close()
Best Answer
-
Hi @gnaldi62
,You can actually leverage partitioning to proceed with the split of the big original file.
The input of the Sync recipe, used with the "redispatch partitioning" mode enabled, would be a single not-partitioned HDFS dataset pointing to your CSV file. The output would be a file-based partitioned HDFS dataset, where each partition would be referring to a separate CSV file.
Here is an example made with the titanic dataset stored in HDFS, partitioned according to the values of the "Pclass" column.
And if I open the output dataset, I can see that I have as many partitions as I have values in the "Pclass" column (3), and I have a separate CSV file stored for each of them.
You can refer to the tutorial about Repartitioning a non-partitioned dataset for more details.
Hope this helps.
Answers
-
Hi @gnaldi62
,Such situations are generally solved by using our native partitioning capabilities.
(In your case, you can create an HDFS dataset to import the input csv file. Then create a Sync recipe and configure the output dataset as partitioned by a discrete dimension (the "key" column according to which you want to break down your original file.) Finally, enable the "redispatch partitioning according to input columns" option in the sync recipe.
Here's a tutorial for more details about Repartitioning a non-partitioned dataset.)Is there a particular reason for writing a plugin instead of using the existing partitioning capabilities?
How big is your input file? What the order of magnitude of the number of different keys that you can get?
Hope it helps.
-
gnaldi62 Partner, L2 Designer, Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2022, Frontrunner 2022 Finalist, Frontrunner 2022 Winner, Frontrunner 2022 Participant, Neuron 2023 Posts: 79 Neuron
Hi Dimitri,
the reason is HDFS; as far as I know in this case only file partitioning works. But because the starting file is a single, huge csv file (and not a set of folders) I need first to split its content into chunks inside folders. Both input and output should be HDFS managed (this choice has been dictated by the size of almost 10GB).
Txs. Rgds.
Giuseppe
-
gnaldi62 Partner, L2 Designer, Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2022, Frontrunner 2022 Finalist, Frontrunner 2022 Winner, Frontrunner 2022 Participant, Neuron 2023 Posts: 79 Neuron
Hi,
thank you, it worked. The problem was with my hadoop credentials.
Giuseppe