Copying data using Pyspark Recipe from HDFS Dataset to Teradata using JDBC

bilal707786
bilal707786 Dataiku DSS Core Designer, Dataiku DSS & SQL, Registered Posts: 1 ✭✭✭
edited July 16 in General Discussion


# -*- coding: utf-8 -*-
import dataiku
import traceback
from dataiku import spark as dkuspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext,HiveContext,SparkSession
import concurrent.futures
from dataiku import pandasutils as pdu
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *
#from pyspark.broadcast import _broadcastRegistry
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import broadcast,substring,length,col,expr
import logging
import logging.handlers
import smtplib
import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from collections import OrderedDict
import pprint as pp
import datetime
import pytz
import sys
from pyspark.sql.functions import col
from pyspark.sql.functions import trim
import pandas as pd
from pyspark.sql.functions import unix_timestamp, from_unixtime
#import psycopg2
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from functools import reduce
from pyspark.sql import functions as F
import pyspark.sql.functions as F
import math
#from asyncactions.utils import patch_all
from pyspark.sql.dataframe import DataFrameWriter
#import asyncactions
import array
import numpy as np
import teradata
import teradatasql
import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'


conf = (SparkConf().set("spark.sql.parquet.writeLegacyFormat", "true"))
#conf = (SparkConf().set("spark.sql.parquet.enableVectorizedReader", "true"))
sc = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
sqlContext = SQLContext(sc)
sqlContext.sql('set hive.exec.dynamic.partition=true')
sqlContext.sql('set hive.exec.max.dynamic.partitions=10000')
sqlContext.sql('set hive.exec.max.dynamic.partitions.pernode=10000')
sqlContext.sql('set hive.exec.parallel=true')
sqlContext.sql('set hive.exec.parallel.thread.number=60')
sqlContext.sql('set hive.exec.dynamic.partition.mode=nonstrict')
sqlContext.sql("set spark.executor.cores= 5")
sqlContext.sql("set spark.driver.cores=5")
sqlContext.sql("set spark.num-executors=5")
sqlContext.sql("set spark.driver.memory=19G")
sqlContext.sql("set spark.executor.memory=19G")
sqlContext.sql("set spark.executor.instances=104")
sqlContext.sql("set spark.default.parallelism=1040")
sqlContext.sql("set spark.driver.memoryOverhead=4096")
sqlContext.sql("set spark.executor.memoryOverhead=4096")
#sqlContext.sql("set spark.sql.autoBroadcastJoinThreshold=256")
#sqlContext.sql("set spark.sql.inMemoryColumnarStorage.compressed=true")
#sqlContext.sql("set spark.sql.inMemoryColumnarStorage.batchSize=10000")
sqlContext.sql("set spark.sql.shuffle.partitions=300")
sqlContext.sql("set spark.yarn.executor.memoryOverhead=2048M")
sqlContext.sql("set hive.exec.reducers.bytes.per.reducer=268435456")


# Teradata connection details
user = dataiku.get_custom_variables()['DB_TERADATA_BA_USER']
password = dataiku.get_custom_variables()['DB_TERADATA_BA_USER_PWD']
teradata_server = dataiku.get_custom_variables()['Teradata_server']

# Connect to Teradata
tera_con = teradatasql.connect(host=teradata_server, user=user, password=password)
tera_cur = tera_con.cursor()
print("connection to teradata successful")

driver = dataiku.get_custom_variables()['DB_TERADATA_DRIVER']
auditdburl = "jdbc:teradata://"+teradata_server+"/Database=DL_DNA_BLUE_AXIS"

#LOGMECH=TD2"
# Read recipe inputs
PVS_OP_10052020_1 = dataiku.Dataset("310_PVS_OP_10052020_1")
PVS_OP_10052020_1_df = dkuspark.get_dataframe(sqlContext, PVS_OP_10052020_1)

# Compute recipe outputs from inputs
# TODO: Replace this part by your actual code that computes the output, as a SparkSQL dataframe
bac_NCCT_310_PVS_OP_Test_POC_test1_df = PVS_OP_10052020_1_df # For this sample code, simply copy input to output

# Write recipe outputs
#bac_NCCT_310_PVS_OP_Test_POC = dataiku.Dataset("BAC_NCCT_310_PVS_OP_Test_POC")
#dkuspark.write_with_schema(bac_NCCT_310_PVS_OP_Test_POC, bac_NCCT_310_PVS_OP_Test_POC_df)

bac_NCCT_310_PVS_OP_Test_POC_test1_df.write.format("jdbc")\
.option("driver",driver)\
.option("url",auditdburl)\
.option("dbtable",'BAC_NCCT_310_PVS_OP_Test_POC_test3')\
.option("user",user)\
.option("password",password)\
.option('TYPE','FASTEXPORT')\
.mode('append')\
.save()

I'm getting below error:

Job failed: Pyspark code failed: At line 148: <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o600.save.

line 148 is .mode('append')\

Also, on looking at the logs, I found that

[2020/10/20-16:50:34.810] [null-err-116] [INFO] [dku.utils]  - java.io.FileNotFoundException: /var/log/spark/user/u111437/stderr (No such file or directory)
[2020/10/20-16:50:34.810] [null-err-116] [INFO] [dku.utils]  -     at java.io.FileOutputStream.open0(Native Method)
[2020/10/20-16:50:34.810] [null-err-116] [INFO] [dku.utils]  -     at java.io.FileOutputStream.open(FileOutputStream.java:270)
[2020/10/20-16:50:34.810] [null-err-116] [INFO] [dku.utils]  -     at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
[2020/10/20-16:50:34.810] [null-err-116] [INFO] [dku.utils]  -     at java.io.FileOutputStream.<init>(FileOutputStream.java:133)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at org.apache.log4j.DailyRollingFileAppender.activateOptions(DailyRollingFileAppender.java:223)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
[2020/10/20-16:50:34.811] [null-err-116] [INFO] [dku.utils]  -     at  
Setup Info
    Tags
      Help me…