Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Added on October 29, 2021 7:37AM
Likes: 0
Replies: 6
Hi,
I explain you a little bit my problem.
The data I use come from a MySQL Database where I have a read-only access.
For my work, I use a Vertica Database. The first operation is to copy the data from MySQL to Vertica. I simply use the DSS synchronization recipe.
But the problem is that I have database of several hundred million data and this synchronization is extremely long...
Do you have an idea to improve this data copy knowing that is complicated to use an in Database Engine as we are on two different connections.
Thank you in advance for your answer
Have a nice day
Hi @Romain_L
,
Connecting directly of course would be ideal. That is the way to go if you can make it work.
Note for clarity that our solution didn't require permissions on the source database to write to a text file. We instead used a SQL Query recipe or a Sync recipe to write the output to a text file. So this occurs in Dataiku thus no write permissions are needed in the source database.
Good luck!
Marlan
Hi,
Perhaps you should explore the use of partitioning, which let's you process specific chunks of data:
- Rather than syncing the entire table every time, just sync the latest data
- You won't avoid processing the large initial volume once, but then you just process the incremental volume.
See more at https://doc.dataiku.com/dss/latest/partitions/sql_datasets.html
I hope this helps.
Best regards
Hi,
Thank's for your answer.
It's one of the possibility, but i have some old data that is updated regularly and values changes...
Hi @Romain_L
,
We have had success using bulk load functionality of the destination database when moving data between databases.
This is not a specific solution but a direction to explore as the details depend on the databases involved.
What we did was first use a recipe to write the source data to a text file (e.g., comma delimited or something similar) in a managed folder. We used a SQL Query recipe to do this but I'd think you can use a visual recipe as well.
Then use the bulk load functionality of your destination database to read in that text file. I did a quick google search and it looks Vertica has the functionality needed but I can't tell for sure. This of course is the where all the tricky details are. I included an example Python recipe below that builds a Netezza query that reads a text file and loads it into a database table.
We haven't done extensive testing but found something like a 2X speedup with large records and 10X speedup with small records.
Good luck!
Marlan
from __future__ import absolute_import, division, print_function import numpy as np import pandas as pd import dataiku from dataiku.core.sql import SQLExecutor2 ############################################## # Input folder/file details & prediction info ############################################## input_folder = dataiku.Folder('Prediction_Files') input_folder_path = input_folder.get_path() input_path_and_name = os.path.join(input_folder_path, 'prediction.csv') prediction_info = input_folder.read_json('prediction_info.json') ############################################## # Folder for Netezza load log files ############################################## log_folder = dataiku.Folder('Log_Files') log_folder_path = log_folder.get_path() ############################################## # Load input text file into a Netezza table ############################################## # # Step 1 - Specify SQL statements that will execute data load # # Get SQL table name of output dataset output_dataset = dataiku.Dataset('PREDICTION') sql_table_name = output_dataset.get_location_info()['info']['table'] # projectKey already resolved # Load SQL statements including dropping and recreating table pre_queries = """ DROP TABLE {sql_table_name} IF EXISTS; COMMIT; CREATE TABLE {sql_table_name} AS SELECT COMM_MBR_ID, -- Need to specify date types for columns with no time for external table load to work -- yet want the columns to be timestamp as this works better with Dataiku CAST(INCUR_MTH_DT AS TIMESTAMP) AS INCUR_MTH_DT, CAST(AS_OF_DT AS TIMESTAMP) AS AS_OF_DT, LOB_KEY, GRP_HIER_KEY, ACT_ALLOW_AMT, PRED_ALLOW_AMT, CGZ_PROBA, HCS_PROBA, VHCS_PROBA, PRED_TS FROM EXTERNAL '{input_path_and_name}' (COMM_MBR_ID INTEGER, INCUR_MTH_DT DATE, AS_OF_DT DATE, LOB_KEY INTEGER, GRP_HIER_KEY BIGINT, ACT_ALLOW_AMT INTEGER, PRED_ALLOW_AMT INTEGER, CGZ_PROBA REAL, HCS_PROBA REAL, VHCS_PROBA REAL, PRED_TS TIMESTAMP) USING ( DELIM 9 -- tab character QUOTEDVALUE DOUBLE TRUNCSTRING TRUE LOGDIR '{log_folder_path}' REMOTESOURCE 'JDBC'); COMMIT -- required in Netezza so above statements are executed and not rolled back """ # Replace variables pre_queries = pre_queries.format(sql_table_name=sql_table_name, input_path_and_name=input_path_and_name, log_folder_path=log_folder_path) print(pre_queries) # Pre queries must be a list (convert to one statement per list item) pre_queries = pre_queries.split(';') # # Step 2 - Specify final query (use this to check that all records got loaded) # query = "SELECT COUNT(*) AS REC_CNT FROM {sql_table_name}".format(sql_table_name=sql_table_name) print(query) # # Step 3 - Execute load and final queries # executor = SQLExecutor2(dataset='PREDICTION') result_df = executor.query_to_df(query, pre_queries=pre_queries) # pre queries must a list assert (result_df.at[0, 'REC_CNT'] == prediction_info['record_count']), "Number of records loaded into output table different from number of input records."
Thank you very much for your answer !
I will try and get back to you soon
Hello,
I tried your solution and I think it works with Vertica too.
The problem is that I don't have the rights on my source database to write in a text file.
I searched on my side and I saw that Vertica gives the possibility to connect directly to a MySQL database
https://github.com/vertica/Vertica-Extension-Packages/tree/master/odbc_loader_package
I will explore this possibility
Thanks again for your precious help