Pre-write dataset statement not working outside of a Python recipe
Hi,
I have a Streamlit Code Studio web-app running that generates dataframes. At the end of the web-app UI, a SUBMIT button is clicked and a function is run to do the following:
1. Duplicate an existing project template (works)
2. Set variables for the duplicated project (works)
3. Create a schema to write the dataframes' data to tables (doesn't work)
The code attempts to write the dataframes but returns this error (same exact error when using the 'write_dataframes' or the 'write_with_schema' function):
Exception: Error : [Errno 32] Broken pipe
Traceback:
File "/home/dataiku/workspace/code_studio-versioned/streamlit/admin.py", line 146, in _create_yrr_instance
app_input_selected_tmp_lots.write_dataframe(combined_selected_df, infer_schema=True)File "/opt/dataiku/python/dataiku/core/dataset.py", line 1071, in write_dataframe
writer.write_dataframe(df)File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 600, in __exit__
self.close()File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 592, in close
self.remote_writer.flush()File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 159, in flush
raise Exception(self.error_message)
When I check the duplicated project, the dataset is created but is empty. The schema is correct, the pre-write statement is correct, just no data. Code is below, thx much.
def _create_yrr_instance(user, tmp_psf, start_date, end_date, tmp_tx_included_lots, tmp_tx_excluded_lots=None, stage='DEV', overwrite=False): start_date = start_date.isoformat().replace('-', '_') end_date = end_date.isoformat().replace('-', '_') now = datetime.now().isoformat(' ', 'seconds') client = dataiku.api_client() yrr_template_project = None try: yrr_template_project = client.get_project("YRR_TEMPLATE") except: alert_user( msg="Could not find the YRR_TEMPLATE project, this is a bug, report it to the developers!", level='error' ) return project_keys = client.list_project_keys() excluded_lots = None if tmp_tx_excluded_lots is None else tmp_tx_excluded_lots.LOT_ID.tolist() excluded_lot_id = '' if excluded_lots is None else ','.join(excluded_lots) excluded_lot_hash_id = excluded_lot_id if not excluded_lot_id else f"_{hashlib.md5(excluded_lot_id.encode('utf-8', 'ignore')).hexdigest()}" id = f"YRR_{stage}_{tmp_psf}_{start_date}_{end_date}{excluded_lot_hash_id}" if id in project_keys: if overwrite: # Delete an existing project with the same name project = client.get_project(id) project.delete() else: alert_user( msg=f"Cannot create YRR instance, a project with the same ID '{id}' exists, set the 'overwrite' argument to True", level='error' ) return copy_result = None try: copy_result = yrr_template_project.duplicate(target_project_key=id, target_project_name=id) except Exception as e: alert_user( msg="Could not duplicate the YRR_TEMPLATE project, this is a bug, report it to the developers!", level='error' ) return copied_project = None try: copied_project = client.get_project(copy_result['targetProjectKey']) except: alert_user( msg="Could not get the project handle for the duplicated YRR project, this is a bug, report it to the developers!", level='error' ) return variables = None try: variables = copied_project.get_variables() except: alert_user( msg="Could not get variables for the duplicated YRR project, this is a bug, report it to the developers!", level='error' ) return schema_name = id.replace(f"YRR_{stage}_", '') user_role = f"YRR_USER_{stage}" variables["standard"]["snowflake_db"] = f"YRR_{stage}" variables["standard"]["snowflake_role"] = user_role variables["standard"]["snowflake_wh"] = "XSMALL_USER_WH" variables["standard"]["snowflake_schema"] = schema_name variables["standard"]["snowflake_schema_path"] = f"YRR_{stage}.{schema_name}" variables["standard"]["tmp_psf"] = tmp_psf variables["standard"]["created_on"] = now variables["standard"]["initiator"] = user() variables["standard"]["tmp_tx_start_date"] = start_date variables["standard"]["tmp_tx_end_date"] = end_date try: copied_project.set_variables(variables) except Exception as e: alert_user( msg="Could not set variables for the duplicated YRR project, this is a bug, report it to the developers!", level='error' ) return # Create the datasets for the selected and excluded TMP lots params = {'connection': 'GOQ_SNOWFLAKE_VAR', 'mode': 'table', 'table': 'APP_INPUT_SELECTED_TMP_LOTS', 'catalog': '${snowflake_db}', 'schema': '${snowflake_schema}', 'tableCreationMode': 'auto'} dataset_handle = None try: dataset_handle = copied_project.create_dataset('app_input_selected_tmp_lots', type='Snowflake', params=params) except: st.write('create dataset failed') return ds_def = None try: ds_def = dataset_handle.get_definition() except: st.write('get dataset def failed') return ds_def['managed'] = True try: dataset_handle.set_definition(ds_def) except: st.write('set dataset def failed') return # Setup the pre-write SQL statements settings = None try: settings = dataset_handle.get_settings() except Exception as e: st.write(e) st.write('getting dataset settings failed') return pre_write_statements = "\n".join([ f"create schema if not exists ${{snowflake_db}}.${{snowflake_schema}};" ]) settings.get_raw()["params"]["customPreWriteStatements"] = pre_write_statements try: settings.save() except Exception as e: st.write(e) st.write('saving dataset settings failed') return app_input_selected_tmp_lots = None try: app_input_selected_tmp_lots = dataiku.Dataset("app_input_selected_tmp_lots", project_key=id, ignore_flow=True) except exception as e: st.write(e) st.write('get dataset write handle failed') return # Combine the selected lots into a single dataframe dataframes = tmp_tx_included_lots.values() combined_selected_df = pd.concat(dataframes, axis=0, ignore_index=True) try: #app_input_selected_tmp_lots.write_dataframe(combined_selected_df, infer_schema=True) app_input_selected_tmp_lots.write_with_schema(combined_selected_df) except Exception as e: st.write(e) st.write('write to dataset failed') return return id
Operating system used: Windows 10
Operating system used: Windows 10
Answers
-
I have verified that the pre-write statement is not working to create the schema before trying to write the dataframe to the Snowflake table. I altered my code to use an internal library to write the schema manually and then the dataset was created successfully.