Pre-write dataset statement not working outside of a Python recipe
Hi,
I have a Streamlit Code Studio web-app running that generates dataframes. At the end of the web-app UI, a SUBMIT button is clicked and a function is run to do the following:
1. Duplicate an existing project template (works)
2. Set variables for the duplicated project (works)
3. Create a schema to write the dataframes' data to tables (doesn't work)
The code attempts to write the dataframes but returns this error (same exact error when using the 'write_dataframes' or the 'write_with_schema' function):
Exception: Error : [Errno 32] Broken pipe
Traceback:
File "/home/dataiku/workspace/code_studio-versioned/streamlit/admin.py", line 146, in _create_yrr_instance
app_input_selected_tmp_lots.write_dataframe(combined_selected_df, infer_schema=True)File "/opt/dataiku/python/dataiku/core/dataset.py", line 1071, in write_dataframe
writer.write_dataframe(df)File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 600, in __exit__
self.close()File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 592, in close
self.remote_writer.flush()File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 159, in flush
raise Exception(self.error_message)
When I check the duplicated project, the dataset is created but is empty. The schema is correct, the pre-write statement is correct, just no data. Code is below, thx much.
def _create_yrr_instance(user,
tmp_psf,
start_date,
end_date,
tmp_tx_included_lots,
tmp_tx_excluded_lots=None,
stage='DEV',
overwrite=False):
start_date = start_date.isoformat().replace('-', '_')
end_date = end_date.isoformat().replace('-', '_')
now = datetime.now().isoformat(' ', 'seconds')
client = dataiku.api_client()
yrr_template_project = None
try:
yrr_template_project = client.get_project("YRR_TEMPLATE")
except:
alert_user(
msg="Could not find the YRR_TEMPLATE project, this is a bug, report it to the developers!",
level='error'
)
return
project_keys = client.list_project_keys()
excluded_lots = None if tmp_tx_excluded_lots is None else tmp_tx_excluded_lots.LOT_ID.tolist()
excluded_lot_id = '' if excluded_lots is None else ','.join(excluded_lots)
excluded_lot_hash_id = excluded_lot_id if not excluded_lot_id else f"_{hashlib.md5(excluded_lot_id.encode('utf-8', 'ignore')).hexdigest()}"
id = f"YRR_{stage}_{tmp_psf}_{start_date}_{end_date}{excluded_lot_hash_id}"
if id in project_keys:
if overwrite:
# Delete an existing project with the same name
project = client.get_project(id)
project.delete()
else:
alert_user(
msg=f"Cannot create YRR instance, a project with the same ID '{id}' exists, set the 'overwrite' argument to True",
level='error'
)
return
copy_result = None
try:
copy_result = yrr_template_project.duplicate(target_project_key=id,
target_project_name=id)
except Exception as e:
alert_user(
msg="Could not duplicate the YRR_TEMPLATE project, this is a bug, report it to the developers!",
level='error'
)
return
copied_project = None
try:
copied_project = client.get_project(copy_result['targetProjectKey'])
except:
alert_user(
msg="Could not get the project handle for the duplicated YRR project, this is a bug, report it to the developers!",
level='error'
)
return
variables = None
try:
variables = copied_project.get_variables()
except:
alert_user(
msg="Could not get variables for the duplicated YRR project, this is a bug, report it to the developers!",
level='error'
)
return
schema_name = id.replace(f"YRR_{stage}_", '')
user_role = f"YRR_USER_{stage}"
variables["standard"]["snowflake_db"] = f"YRR_{stage}"
variables["standard"]["snowflake_role"] = user_role
variables["standard"]["snowflake_wh"] = "XSMALL_USER_WH"
variables["standard"]["snowflake_schema"] = schema_name
variables["standard"]["snowflake_schema_path"] = f"YRR_{stage}.{schema_name}"
variables["standard"]["tmp_psf"] = tmp_psf
variables["standard"]["created_on"] = now
variables["standard"]["initiator"] = user()
variables["standard"]["tmp_tx_start_date"] = start_date
variables["standard"]["tmp_tx_end_date"] = end_date
try:
copied_project.set_variables(variables)
except Exception as e:
alert_user(
msg="Could not set variables for the duplicated YRR project, this is a bug, report it to the developers!",
level='error'
)
return
# Create the datasets for the selected and excluded TMP lots
params = {'connection': 'GOQ_SNOWFLAKE_VAR', 'mode': 'table', 'table': 'APP_INPUT_SELECTED_TMP_LOTS', 'catalog': '${snowflake_db}', 'schema': '${snowflake_schema}', 'tableCreationMode': 'auto'}
dataset_handle = None
try:
dataset_handle = copied_project.create_dataset('app_input_selected_tmp_lots', type='Snowflake', params=params)
except:
st.write('create dataset failed')
return
ds_def = None
try:
ds_def = dataset_handle.get_definition()
except:
st.write('get dataset def failed')
return
ds_def['managed'] = True
try:
dataset_handle.set_definition(ds_def)
except:
st.write('set dataset def failed')
return
# Setup the pre-write SQL statements
settings = None
try:
settings = dataset_handle.get_settings()
except Exception as e:
st.write(e)
st.write('getting dataset settings failed')
return
pre_write_statements = "\n".join([
f"create schema if not exists ${{snowflake_db}}.${{snowflake_schema}};"
])
settings.get_raw()["params"]["customPreWriteStatements"] = pre_write_statements
try:
settings.save()
except Exception as e:
st.write(e)
st.write('saving dataset settings failed')
return
app_input_selected_tmp_lots = None
try:
app_input_selected_tmp_lots = dataiku.Dataset("app_input_selected_tmp_lots", project_key=id, ignore_flow=True)
except exception as e:
st.write(e)
st.write('get dataset write handle failed')
return
# Combine the selected lots into a single dataframe
dataframes = tmp_tx_included_lots.values()
combined_selected_df = pd.concat(dataframes, axis=0, ignore_index=True)
try:
#app_input_selected_tmp_lots.write_dataframe(combined_selected_df, infer_schema=True)
app_input_selected_tmp_lots.write_with_schema(combined_selected_df)
except Exception as e:
st.write(e)
st.write('write to dataset failed')
return
return id
Operating system used: Windows 10
Operating system used: Windows 10
Answers
-
I have verified that the pre-write statement is not working to create the schema before trying to write the dataframe to the Snowflake table. I altered my code to use an internal library to write the schema manually and then the dataset was created successfully.