Pre-write dataset statement not working outside of a Python recipe

info-rchitect
Level 6
Pre-write dataset statement not working outside of a Python recipe

Hi,

I have a Streamlit Code Studio web-app running that generates dataframes.  At the end of the web-app UI, a SUBMIT button is clicked and a function is run to do the following:

1. Duplicate an existing project template (works)

2. Set variables for the duplicated project (works)

3. Create a schema to write the dataframes' data to tables (doesn't work)

The code attempts to write the dataframes but returns this error (same exact error when using the 'write_dataframes' or the 'write_with_schema' function):

Exception: Error : [Errno 32] Broken pipe

Traceback:

File "/home/dataiku/workspace/code_studio-versioned/streamlit/admin.py", line 146, in _create_yrr_instance
    app_input_selected_tmp_lots.write_dataframe(combined_selected_df, infer_schema=True)File "/opt/dataiku/python/dataiku/core/dataset.py", line 1071, in write_dataframe
    writer.write_dataframe(df)File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 600, in __exit__
    self.close()File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 592, in close
    self.remote_writer.flush()File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 159, in flush
    raise Exception(self.error_message)

When I check the duplicated project, the dataset is created but is empty.  The schema is correct, the pre-write statement is correct, just no data.  Code is below, thx much.

 

 

def _create_yrr_instance(user,
                   tmp_psf,
                   start_date,
                   end_date,                   
                   tmp_tx_included_lots,
                   tmp_tx_excluded_lots=None,
                   stage='DEV',
                   overwrite=False):
    start_date = start_date.isoformat().replace('-', '_')
    end_date = end_date.isoformat().replace('-', '_')
    now = datetime.now().isoformat(' ', 'seconds')    
    client = dataiku.api_client() 
    yrr_template_project = None
    try:
        yrr_template_project = client.get_project("YRR_TEMPLATE")
    except:
        alert_user(
            msg="Could not find the YRR_TEMPLATE project, this is a bug, report it to the developers!",
            level='error'
        )
        return
    project_keys = client.list_project_keys()   
    excluded_lots = None if tmp_tx_excluded_lots is None else tmp_tx_excluded_lots.LOT_ID.tolist()
    excluded_lot_id = '' if excluded_lots is None else ','.join(excluded_lots)    
    excluded_lot_hash_id = excluded_lot_id if not excluded_lot_id else f"_{hashlib.md5(excluded_lot_id.encode('utf-8', 'ignore')).hexdigest()}"
    id = f"YRR_{stage}_{tmp_psf}_{start_date}_{end_date}{excluded_lot_hash_id}"
    if id in project_keys:
        if overwrite:
            # Delete an existing project with the same name
            project = client.get_project(id)
            project.delete()
        else:
            alert_user(
                msg=f"Cannot create YRR instance, a project with the same ID '{id}' exists, set the 'overwrite' argument to True",
                level='error'
            )
            return
    copy_result = None
    try:
        copy_result = yrr_template_project.duplicate(target_project_key=id,
                                                    target_project_name=id)   
    except Exception as e:
        alert_user(
            msg="Could not duplicate the YRR_TEMPLATE project, this is a bug, report it to the developers!",
            level='error'
        )
        return
    copied_project = None
    try:
        copied_project = client.get_project(copy_result['targetProjectKey'])
    except:
        alert_user(
            msg="Could not get the project handle for the duplicated YRR project, this is a bug, report it to the developers!",
            level='error'
        )
        return
    variables = None
    try:
        variables = copied_project.get_variables()
    except:
        alert_user(
            msg="Could not get variables for the duplicated YRR project, this is a bug, report it to the developers!",
            level='error'
        )
        return
    schema_name = id.replace(f"YRR_{stage}_", '')
    user_role = f"YRR_USER_{stage}"
    variables["standard"]["snowflake_db"] = f"YRR_{stage}"
    variables["standard"]["snowflake_role"] = user_role
    variables["standard"]["snowflake_wh"] = "XSMALL_USER_WH"
    variables["standard"]["snowflake_schema"] = schema_name
    variables["standard"]["snowflake_schema_path"] = f"YRR_{stage}.{schema_name}"
    variables["standard"]["tmp_psf"] = tmp_psf
    variables["standard"]["created_on"] = now
    variables["standard"]["initiator"] = user()
    variables["standard"]["tmp_tx_start_date"] = start_date
    variables["standard"]["tmp_tx_end_date"] = end_date
    try:
        copied_project.set_variables(variables)
    except Exception as e:
        alert_user(
            msg="Could not set variables for the duplicated YRR project, this is a bug, report it to the developers!",
            level='error'
        )
        return
    # Create the datasets for the selected and excluded TMP lots
    params = {'connection': 'GOQ_SNOWFLAKE_VAR', 'mode': 'table', 'table': 'APP_INPUT_SELECTED_TMP_LOTS', 'catalog': '${snowflake_db}', 'schema': '${snowflake_schema}', 'tableCreationMode': 'auto'}
    dataset_handle = None
    try:
        dataset_handle = copied_project.create_dataset('app_input_selected_tmp_lots', type='Snowflake', params=params)
    except:
        st.write('create dataset failed')
        return
    ds_def = None
    try:
        ds_def = dataset_handle.get_definition()
    except:
        st.write('get dataset def failed')
        return
    ds_def['managed'] = True
    try:
        dataset_handle.set_definition(ds_def)
    except:
        st.write('set dataset def failed')
        return
    # Setup the pre-write SQL statements
    settings = None
    try:
        settings = dataset_handle.get_settings()
    except Exception as e:
        st.write(e)
        st.write('getting dataset settings failed')
        return
    pre_write_statements = "\n".join([
        f"create schema if not exists ${{snowflake_db}}.${{snowflake_schema}};"
    ])
    settings.get_raw()["params"]["customPreWriteStatements"] = pre_write_statements
    try:
        settings.save()
    except Exception as e:
        st.write(e)
        st.write('saving dataset settings failed')
        return
    app_input_selected_tmp_lots = None
    try:
        app_input_selected_tmp_lots = dataiku.Dataset("app_input_selected_tmp_lots", project_key=id, ignore_flow=True)
    except exception as e:
        st.write(e)
        st.write('get dataset write handle failed')
        return
    # Combine the selected lots into a single dataframe
    dataframes = tmp_tx_included_lots.values()
    combined_selected_df = pd.concat(dataframes, axis=0, ignore_index=True)
    try:
        #app_input_selected_tmp_lots.write_dataframe(combined_selected_df, infer_schema=True)
        app_input_selected_tmp_lots.write_with_schema(combined_selected_df)
    except Exception as e:
        st.write(e)
        st.write('write to dataset failed')
        return
    return id

 

 


Operating system used: Windows 10


Operating system used: Windows 10

0 Kudos
1 Reply
info-rchitect
Level 6
Author

I have verified that the pre-write statement is not working to create the schema before trying to write the dataframe to the Snowflake table.  I altered my code to use an internal library to write the schema manually and then the dataset was created successfully.

0 Kudos