Copy data from a MySQL database to Vertica

Romain_L
Romain_L Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer Posts: 13 ✭✭✭✭

Hi,

I explain you a little bit my problem.

The data I use come from a MySQL Database where I have a read-only access.

For my work, I use a Vertica Database. The first operation is to copy the data from MySQL to Vertica. I simply use the DSS synchronization recipe.

But the problem is that I have database of several hundred million data and this synchronization is extremely long...

Do you have an idea to improve this data copy knowing that is complicated to use an in Database Engine as we are on two different connections.

Thank you in advance for your answer

Have a nice day

Best Answer

  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 319 Neuron
    Answer ✓

    Hi @Romain_L
    ,

    Connecting directly of course would be ideal. That is the way to go if you can make it work.

    Note for clarity that our solution didn't require permissions on the source database to write to a text file. We instead used a SQL Query recipe or a Sync recipe to write the output to a text file. So this occurs in Dataiku thus no write permissions are needed in the source database.

    Good luck!

    Marlan

Answers

  • Manuel
    Manuel Alpha Tester, Dataiker Alumni, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Dataiku DSS Adv Designer, Registered Posts: 193 ✭✭✭✭✭✭✭

    Hi,

    Perhaps you should explore the use of partitioning, which let's you process specific chunks of data:

    - Rather than syncing the entire table every time, just sync the latest data

    - You won't avoid processing the large initial volume once, but then you just process the incremental volume.

    See more at https://doc.dataiku.com/dss/latest/partitions/sql_datasets.html

    I hope this helps.

    Best regards

  • Romain_L
    Romain_L Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer Posts: 13 ✭✭✭✭

    Hi,

    Thank's for your answer.

    It's one of the possibility, but i have some old data that is updated regularly and values changes...

  • Marlan
    Marlan Neuron 2020, Neuron, Registered, Dataiku Frontrunner Awards 2021 Finalist, Neuron 2021, Neuron 2022, Dataiku Frontrunner Awards 2021 Participant, Neuron 2023 Posts: 319 Neuron
    edited July 17

    Hi @Romain_L
    ,

    We have had success using bulk load functionality of the destination database when moving data between databases.

    This is not a specific solution but a direction to explore as the details depend on the databases involved.

    What we did was first use a recipe to write the source data to a text file (e.g., comma delimited or something similar) in a managed folder. We used a SQL Query recipe to do this but I'd think you can use a visual recipe as well.

    Then use the bulk load functionality of your destination database to read in that text file. I did a quick google search and it looks Vertica has the functionality needed but I can't tell for sure. This of course is the where all the tricky details are. I included an example Python recipe below that builds a Netezza query that reads a text file and loads it into a database table.

    We haven't done extensive testing but found something like a 2X speedup with large records and 10X speedup with small records.

    Good luck!

    Marlan

    from __future__ import absolute_import, division, print_function
    import numpy as np
    import pandas as pd
    import dataiku
    from dataiku.core.sql import SQLExecutor2
    
    
    ##############################################
    # Input folder/file details & prediction info
    ##############################################
    input_folder = dataiku.Folder('Prediction_Files')
    input_folder_path = input_folder.get_path()
    input_path_and_name = os.path.join(input_folder_path, 'prediction.csv')
    prediction_info = input_folder.read_json('prediction_info.json')
    
    
    ##############################################
    # Folder for Netezza load log files
    ##############################################
    log_folder = dataiku.Folder('Log_Files')
    log_folder_path = log_folder.get_path()
    
    
    ##############################################
    # Load input text file into a Netezza table 
    ##############################################
    
    
    #
    # Step 1 - Specify SQL statements that will execute data load
    # 
    
    # Get SQL table name of output dataset 
    output_dataset = dataiku.Dataset('PREDICTION')
    sql_table_name = output_dataset.get_location_info()['info']['table'] # projectKey already resolved
    
    # Load SQL statements including dropping and recreating table
    pre_queries = """
    DROP TABLE {sql_table_name} IF EXISTS;
    COMMIT;
    
    CREATE TABLE {sql_table_name} AS
    SELECT 
        COMM_MBR_ID,
        -- Need to specify date types for columns with no time for external table load to work
        -- yet want the columns to be timestamp as this works better with Dataiku
        CAST(INCUR_MTH_DT AS TIMESTAMP) AS INCUR_MTH_DT,
        CAST(AS_OF_DT AS TIMESTAMP) AS AS_OF_DT,
        LOB_KEY,
        GRP_HIER_KEY,
        ACT_ALLOW_AMT,
        PRED_ALLOW_AMT,
        CGZ_PROBA,
        HCS_PROBA,
        VHCS_PROBA,
        PRED_TS
    FROM EXTERNAL '{input_path_and_name}'
        (COMM_MBR_ID INTEGER,
        INCUR_MTH_DT DATE,
        AS_OF_DT DATE,
        LOB_KEY INTEGER,
        GRP_HIER_KEY BIGINT,
        ACT_ALLOW_AMT INTEGER,
        PRED_ALLOW_AMT INTEGER,
        CGZ_PROBA REAL,
        HCS_PROBA REAL, 
        VHCS_PROBA REAL,
        PRED_TS TIMESTAMP)
    USING (
        DELIM 9 -- tab character
        QUOTEDVALUE DOUBLE
        TRUNCSTRING TRUE
        LOGDIR '{log_folder_path}'
        REMOTESOURCE 'JDBC');
    COMMIT -- required in Netezza so above statements are executed and not rolled back
    """
    
    # Replace variables
    pre_queries = pre_queries.format(sql_table_name=sql_table_name, input_path_and_name=input_path_and_name, log_folder_path=log_folder_path)
    print(pre_queries)
    
    # Pre queries must be a list (convert to one statement per list item) 
    pre_queries = pre_queries.split(';')
    
    
    #
    # Step 2 - Specify final query (use this to check that all records got loaded)
    # 
    
    query = "SELECT COUNT(*) AS REC_CNT FROM {sql_table_name}".format(sql_table_name=sql_table_name)
    print(query)
    
    
    #
    # Step 3 - Execute load and final queries
    # 
    
    executor = SQLExecutor2(dataset='PREDICTION')
    result_df = executor.query_to_df(query, pre_queries=pre_queries) # pre queries must a list
    assert (result_df.at[0, 'REC_CNT'] == prediction_info['record_count']), "Number of records loaded into output table different from number of input records."

  • Romain_L
    Romain_L Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer Posts: 13 ✭✭✭✭

    Thank you very much for your answer !

    I will try and get back to you soon

  • Romain_L
    Romain_L Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Core Concepts, Dataiku DSS Adv Designer, Registered, Dataiku DSS Developer Posts: 13 ✭✭✭✭

    Hello,
    I tried your solution and I think it works with Vertica too.
    The problem is that I don't have the rights on my source database to write in a text file.
    I searched on my side and I saw that Vertica gives the possibility to connect directly to a MySQL database
    https://github.com/vertica/Vertica-Extension-Packages/tree/master/odbc_loader_package
    I will explore this possibility

    Thanks again for your precious help

Setup Info
    Tags
      Help me…