Write huge data to Postgres efficiently

shakti29
shakti29 Registered Posts: 4 ✭✭✭✭

Hi,

I'm a data engineer and working for walmart labs.

my team is using dataiku to create flows and for creating a data lake and analysis. In one scenario we're writing huge data from a HDFS DSS down to postgres table. The process is taking a lot of time. I have checked this is because the insertion is happening sequentially. I tried to use spark and then execute the recipe but still only one task is getting created which is writing down to database. Kindly let me know if is there any way we can parallelize this operation to improve the performance and save execution time.

Thanks for your time

Answers

  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 318 Neuron

    Hi @shakti29
    ,

    Our experience with Netezza may be helpful. Moving data into Netezza was slow because Netezza doesn't handle the row level insert statements used by DSS well. We ended up writing out data we wanted to move to a text file and then using Netezza's bulk load functionality to load it in. This approach is much faster. Let me know if you would like more detail on this.

    Marlan

  • shakti29
    shakti29 Registered Posts: 4 ✭✭✭✭

    Thanks Marlan.

    I am going to try similar approach as well. Just one question did you write the output to disk and then to Netezza. As the data that i want to write is around 50Gb and hosted on HDFS . So did you have to write the data down to disk and then use the bulk load functionality or you directly uploaded from HDFS to postgres.

  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 318 Neuron
    edited July 17

    Hi @shakti29
    ,

    Yes, we wrote the output to disk and then loaded it into Netezza from there. HDFS wasn't in the mix for us but even if it was, I think we'd have to do the same thing as I believe Netezza's bulk load (external table) functionality would require it.

    Below is an excerpt of a Python script that shows how we did this.

    Good luck!

    Marlan

    input_folder = dataiku.Folder('Prediction_Files')
    input_folder_path = input_folder.get_path()
    input_path_and_name = os.path.join(input_folder_path, 'prediction.csv')
    
    log_folder = dataiku.Folder('Log_Files')
    log_folder_path = log_folder.get_path()
    
    output_dataset = dataiku.Dataset('PREDICTION')
    sql_table_name = output_dataset.get_location_info()['info']['table']
    
    pre_queries = """
    DROP TABLE {sql_table_name} IF EXISTS;
    COMMIT;
    
    CREATE TABLE {sql_table_name} AS
    SELECT 
        ID,
        MTH_DT,
        PROBA,
        PRED_TS
    FROM EXTERNAL '{input_path_and_name}'
        (ID INTEGER,
        MTH_DT DATE,
        PROBA REAL,
        PRED_TS TIMESTAMP)
    USING (
        DELIM 9 
        QUOTEDVALUE DOUBLE
        TRUNCSTRING TRUE
        LOGDIR '{log_folder_path}'
        REMOTESOURCE 'JDBC');
    COMMIT
    """
    
    pre_queries = pre_queries.format(sql_table_name=sql_table_name, input_path_and_name=input_path_and_name, log_folder_path=log_folder_path)
    #print(pre_queries)
    
    # Pre queries must be a list (convert to one statement per list item) 
    pre_queries = pre_queries.split(';')
    
    # Use final query to check # of records loaded
    query = "SELECT COUNT(*) AS REC_CNT FROM {sql_table_name}".format(sql_table_name=sql_table_name)
    #print(query)
    
    executor = SQLExecutor2(dataset='PREDICTION')
    result_df = executor.query_to_df(query, pre_queries=pre_queries)
    print(result_df.at[0, 'REC_CNT'])

Setup Info
    Tags
      Help me…