Write huge data to Postgres efficiently
Hi,
I'm a data engineer and working for walmart labs.
my team is using dataiku to create flows and for creating a data lake and analysis. In one scenario we're writing huge data from a HDFS DSS down to postgres table. The process is taking a lot of time. I have checked this is because the insertion is happening sequentially. I tried to use spark and then execute the recipe but still only one task is getting created which is writing down to database. Kindly let me know if is there any way we can parallelize this operation to improve the performance and save execution time.
Thanks for your time
Answers
-
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 318 Neuron
Hi @shakti29
,Our experience with Netezza may be helpful. Moving data into Netezza was slow because Netezza doesn't handle the row level insert statements used by DSS well. We ended up writing out data we wanted to move to a text file and then using Netezza's bulk load functionality to load it in. This approach is much faster. Let me know if you would like more detail on this.
Marlan
-
Thanks Marlan.
I am going to try similar approach as well. Just one question did you write the output to disk and then to Netezza. As the data that i want to write is around 50Gb and hosted on HDFS . So did you have to write the data down to disk and then use the bulk load functionality or you directly uploaded from HDFS to postgres.
-
Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 318 Neuron
Hi @shakti29
,Yes, we wrote the output to disk and then loaded it into Netezza from there. HDFS wasn't in the mix for us but even if it was, I think we'd have to do the same thing as I believe Netezza's bulk load (external table) functionality would require it.
Below is an excerpt of a Python script that shows how we did this.
Good luck!
Marlan
input_folder = dataiku.Folder('Prediction_Files') input_folder_path = input_folder.get_path() input_path_and_name = os.path.join(input_folder_path, 'prediction.csv') log_folder = dataiku.Folder('Log_Files') log_folder_path = log_folder.get_path() output_dataset = dataiku.Dataset('PREDICTION') sql_table_name = output_dataset.get_location_info()['info']['table'] pre_queries = """ DROP TABLE {sql_table_name} IF EXISTS; COMMIT; CREATE TABLE {sql_table_name} AS SELECT ID, MTH_DT, PROBA, PRED_TS FROM EXTERNAL '{input_path_and_name}' (ID INTEGER, MTH_DT DATE, PROBA REAL, PRED_TS TIMESTAMP) USING ( DELIM 9 QUOTEDVALUE DOUBLE TRUNCSTRING TRUE LOGDIR '{log_folder_path}' REMOTESOURCE 'JDBC'); COMMIT """ pre_queries = pre_queries.format(sql_table_name=sql_table_name, input_path_and_name=input_path_and_name, log_folder_path=log_folder_path) #print(pre_queries) # Pre queries must be a list (convert to one statement per list item) pre_queries = pre_queries.split(';') # Use final query to check # of records loaded query = "SELECT COUNT(*) AS REC_CNT FROM {sql_table_name}".format(sql_table_name=sql_table_name) #print(query) executor = SQLExecutor2(dataset='PREDICTION') result_df = executor.query_to_df(query, pre_queries=pre_queries) print(result_df.at[0, 'REC_CNT'])