Write huge data to Postgres efficiently

shakti29
Level 2
Write huge data to Postgres efficiently

Hi,

I'm a data engineer and working for walmart labs.

my team is using dataiku to create flows and for creating a data lake and analysis. In one scenario we're writing huge data from a HDFS DSS down to postgres table. The process is taking a lot of time. I have checked this is because the insertion is happening sequentially. I tried to use spark and then execute the recipe but still only one task is getting created which is writing down to database. Kindly let me know if is there any way we can parallelize this operation to improve the performance and save execution time. 

Thanks for your time

0 Kudos
3 Replies
Marlan

Hi @shakti29,

Our experience with Netezza may be helpful. Moving data into Netezza was slow because Netezza doesn't handle the row level insert statements used by DSS well. We ended up writing out data we wanted to move to a text file and then using Netezza's bulk load functionality to load it in. This approach is much faster. Let me know if you would like more detail on this.

Marlan

shakti29
Level 2
Author

Thanks Marlan.

I am going to try similar approach as well. Just one question did you write the output to disk and then to Netezza. As the data that i want to write is around 50Gb and hosted on HDFS . So did you have to write the data down to disk and then use the bulk load functionality or you directly uploaded from HDFS to postgres.

0 Kudos
Marlan

Hi @shakti29,

Yes, we wrote the output to disk and then loaded it into Netezza from there. HDFS wasn't in the mix for us but even if it was, I think we'd have to do the same thing as I believe Netezza's bulk load (external table) functionality would require it.

Below is an excerpt of a Python script that shows how we did this.

Good luck!

Marlan

input_folder = dataiku.Folder('Prediction_Files')
input_folder_path = input_folder.get_path()
input_path_and_name = os.path.join(input_folder_path, 'prediction.csv')

log_folder = dataiku.Folder('Log_Files')
log_folder_path = log_folder.get_path()

output_dataset = dataiku.Dataset('PREDICTION')
sql_table_name = output_dataset.get_location_info()['info']['table']

pre_queries = """
DROP TABLE {sql_table_name} IF EXISTS;
COMMIT;

CREATE TABLE {sql_table_name} AS
SELECT 
    ID,
    MTH_DT,
    PROBA,
    PRED_TS
FROM EXTERNAL '{input_path_and_name}'
    (ID INTEGER,
    MTH_DT DATE,
    PROBA REAL,
    PRED_TS TIMESTAMP)
USING (
    DELIM 9 
    QUOTEDVALUE DOUBLE
    TRUNCSTRING TRUE
    LOGDIR '{log_folder_path}'
    REMOTESOURCE 'JDBC');
COMMIT
"""

pre_queries = pre_queries.format(sql_table_name=sql_table_name, input_path_and_name=input_path_and_name, log_folder_path=log_folder_path)
#print(pre_queries)

# Pre queries must be a list (convert to one statement per list item) 
pre_queries = pre_queries.split(';')

# Use final query to check # of records loaded
query = "SELECT COUNT(*) AS REC_CNT FROM {sql_table_name}".format(sql_table_name=sql_table_name)
#print(query)

executor = SQLExecutor2(dataset='PREDICTION')
result_df = executor.query_to_df(query, pre_queries=pre_queries)
print(result_df.at[0, 'REC_CNT'])