Schema errors while using the Chunked writing built in fucntionality

Options
PapaA
PapaA Registered Posts: 20 ✭✭✭✭
edited July 16 in Using Dataiku

I want to split a dataframe of 4000 columns in 5 diffrent tables to be able to write it back to SQL.

However, our instance is strugling with writing a table of 1000 columns to schema, thus I had to obtain a writer as per DSS docs.

However, while the writer is working for the first round it crushes on the second one raising a schema error which we cannot really crack.

Could you please help on what goes wrong? Why the schema of the empty table is not been updated based on the input that is about to be writen?

Thanks in advance,
A

sql_datasets= [
  "DTO_Risk_Factors_unshifted_1",
"DTO_Risk_Factors_unshifted_2"]

inp = dataiku.Dataset("DLS_TEST.DTO_Risk_Factors")

start = 0
end = 950
step = 950


for out in  sql_datasets:

    print ('starting location', start)
    print ('ending location', end)
    print(out)

    out = dataiku.Dataset(out)
    
    with out.get_writer() as writer:
        inp = dataiku.Dataset("DLS_TEST.DTO_Risk_Factors")
        for df in inp.iter_dataframes( chunksize=10500):
            print (len(df))
            # preprocess\

            df_temp = df.iloc[:, start:end ]
            print(df_temp.shape)
            
            # Write the processed dataframe
            writer.write_dataframe(df_temp)
            
    start +=  step
    end +=  step
    

Answers

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer Posts: 315 Dataiker
    edited July 17
    Options

    Hi @papam
    ,

    I think it should work to use the following two lines to write the schema + dataset given the example that you outline:

    dataiku.Dataset.write_schema_from_dataframe(out, df_temp)
    dataiku.Dataset.write_dataframe(out, df_temp)

    And here's an example of what your for loop might look like with this setup:

    # iterate over output datasets 
    for out in  sql_datasets:
       out = dataiku.Dataset(out)
     
       for df in inp.iter_dataframes( chunksize=10500):
            df_temp = df.iloc[:, start:end ]
            # Write the processed dataframe
            dataiku.Dataset.write_schema_from_dataframe(out, df_temp)
            dataiku.Dataset.write_dataframe(out, df_temp)

    There's a little more information on this in the Note in this section.

    Hope that helps,

    Sarina

Setup Info
    Tags
      Help me…