Update DSS Datasets in Bokeh WebApps

rreff
rreff Partner Posts: 14 Partner
edited July 16 in Using Dataiku

Is it possible not only to use dataset for visualization but also to persist change on the same?

Within the visualization, the user of the Bokeh WebApp can maintain some data updates. These changes should not be lost.

# from this dataset/ dataframe we read the data
master = dataiku.Dataset("typed_Stammdaten")
master_df = master.get_dataframe()

# we created a second dataset/ dataframe ref we want to write to
master_edit = dataiku.Dataset("typed_Stammdaten")
master_ds_w = master_edit.get_writer()

tblsource = ColumnDataSource(master_df)

# callback from select widget to update the record
def updateCurveSelection(issuerid):
    # get record index
    selectionRowIndex=tblsource.selected.indices[0]
    # updating source
    tblsource.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]})
    # ? how to write back without getting an error
    master_ds_w.write_row_array(tblsource.to_df().iloc[selectionRowIndex].tolist())

Answers

  • CoreyS
    CoreyS Dataiker Alumni, Dataiku DSS Core Designer, Dataiku DSS Core Concepts, Registered Posts: 1,150 ✭✭✭✭✭✭✭✭✭

    Hi, @rreff
    ! Can you provide any further details on the thread to assist users in helping you find a solution (insert examples like DSS version etc.) Also, can you let us know if you’ve tried any fixes already?This should lead to a quicker response from the community.

  • rreff
    rreff Partner Posts: 14 Partner
    edited July 17

    Hi Corey.

    Sure.

    Version 8.0.5. Free Edition.

    We have tried the standard solution to solve this task.

    as shown in the source code, this part:

    master_ds_w.write_row_array(tblsource.to_df().iloc[selectionRowIndex].tolist())

    should store the row update.

    Regards

    Roberto

  • rreff
    rreff Partner Posts: 14 Partner

    Do you still miss information to answer our request from us?

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
    edited July 17

    Hi @rreff
    ,

    I tested out your setup, and I think writing in this way should work as you expect. One thing I did notice is that it looks like you aren't closing the writer in the snippet you provided:

    master_ds_w

    The writer does need to be closed after use, with master_ds_w.close(). Once the writer is closed, you should see the rows written:

    2021-03-30 17:14:49,660 INFO Initializing write data stream (KJg5kExH6X)
    2021-03-30 17:14:49,662 INFO Remote Stream Writer: start generate
    2021-03-30 17:14:49,662 INFO Waiting for data to send ...
    2021-03-30 17:14:49,662 INFO Sending data (19)
    2021-03-30 17:14:49,663 INFO Waiting for data to send ...
    2021-03-30 17:14:49,664 INFO Remote Stream Writer closed
    2021-03-30 17:14:49,666 INFO Got end mark, ending send
    1 rows successfully written (KJg5kExH6X)
    

    Can you try that out? If it still does not work for you, can you attach any errors that you are getting when you try to write the data? To do so, you can attach the contents of your webapp Log after clicking "refresh" on the log.

    Thanks,
    Sarina

  • rreff
    rreff Partner Posts: 14 Partner
    edited July 17

    Hello SarinaS.

    Sounds good and makes sense but does not work yet.

    Please find below the last state log and source code.

    Regards

    Roberto

    the log:

    /appl/dataikudata/test1/pyenv/lib/python3.6/site-packages/tornado/concurrent.py:521: DeprecationWarning: @return_future is deprecated, use coroutines instead
      DeprecationWarning)
    2021-03-31 10:48:49,827 INFO use old-style cookie sniffing handler
    2021-03-31 10:48:49,830 INFO Starting Bokeh server version 0.12.16 (running on Tornado 5.1.1)
    2021-03-31 10:48:49,831 WARNING Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
    2021-03-31 10:48:49,833 INFO Bokeh app running at: http://localhost:21900/backend
    2021-03-31 10:48:49,847 INFO 200 GET /static/js/bokeh.min.js (127.0.0.1) 7.26ms
    2021-03-31 10:48:50,057 INFO 200 GET /static/js/bokeh.min.js (127.0.0.1) 2.16ms
    2021-03-31 10:48:50,540 INFO Initializing dataset writer for dataset RENTENVALIDIERUNG.typed_Stammdaten
    2021-03-31 10:48:50,702 INFO 200 GET /backend (127.0.0.1) 260.30ms
    2021-03-31 10:48:50,749 INFO Initializing dataset writer for dataset RENTENVALIDIERUNG.typed_Stammdaten
    2021-03-31 10:48:50,749 ERROR Error running application handler <bokeh.application.handlers.directory.DirectoryHandler object at 0x7f9f71ef94e0>: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).
    File "dataset_write.py", line 276, in __init__:
    raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name) Traceback (most recent call last):
      File "/appl/dataikuapps/dataiku-dss-9.0.0/python36.packages/bokeh/application/handlers/code_runner.py", line 163, in run
        exec(self._code, module.__dict__)
      File "/appl/dataikudata/test1/tmp/RENTENVALIDIERUNG/web_apps/xky7LBc/backend/main.py", line 54, in <module>
        master_df_w = master_edit.get_writer()
      File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset.py", line 830, in get_writer
        return dataset_write.DatasetWriter(self,)
      File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset_write.py", line 276, in __init__
        raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name)
    Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).
    
    2021-03-31 10:48:50,752 INFO 200 GET /backend (127.0.0.1) 49.22ms
    2021-03-31 10:48:50,753 ERROR Error running application handler <bokeh.application.handlers.directory.DirectoryHandler object at 0x7f9f71ef94e0>: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).
    File "dataset_write.py", line 276, in __init__:
    raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name) Traceback (most recent call last):
      File "/appl/dataikuapps/dataiku-dss-9.0.0/python36.packages/bokeh/application/handlers/code_runner.py", line 163, in run
        exec(self._code, module.__dict__)
      File "/appl/dataikudata/test1/tmp/RENTENVALIDIERUNG/web_apps/xky7LBc/backend/main.py", line 54, in <module>
        master_df_w = master_edit.get_writer()
      File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset.py", line 830, in get_writer
        return dataset_write.DatasetWriter(self,)
      File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset_write.py", line 276, in __init__
        raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name)
    Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).
    
    2021-03-31 10:48:50,756 INFO 200 GET /backend (127.0.0.1) 3.13ms
    2021-03-31 10:48:51,070 INFO 101 GET /backend/ws?bokeh-protocol-version=1.0&bokeh-session-id=m3V0qpty9ANnMLrOfLgU7TaOeH0vd0Pz1lV7D79MjDJh (127.0.0.1) 0.56ms
    2021-03-31 10:48:51,070 INFO WebSocket connection opened
    2021-03-31 10:48:51,070 INFO ServerConnection created

    and source code:

    import dataiku
    import numpy as np
    import pandas as pd
    
    from dataiku import pandasutils as pdu
    from datetime import date, datetime
    
    from bokeh.models.widgets import (
        Panel, 
        Tabs, 
        Select, 
        Slider, 
        TextInput, 
        Paragraph,
        DataTable, 
        PreText, 
        Button, 
        DateFormatter, 
        TableColumn, 
        Dropdown, 
        DatePicker,
    )
    from bokeh.models import (
        ColumnDataSource,
        Button, 
        DataTable, 
        TableColumn, 
        CustomJS, 
        TextInput, 
        Paragraph, 
        Div,
        Label,
        CheckboxGroup,
        Panel,
        Tabs,
        DatetimeTickFormatter,
        LinearAxis,
        LogAxis,
    )
    from bokeh.layouts import widgetbox, row, column, layout
    from bokeh.models.callbacks import CustomJS
    from bokeh.models.ranges import FactorRange
    from bokeh.io import curdoc
    from bokeh.events import ButtonClick
    from bokeh.plotting import show, figure
    
    # link some tbl
    master = dataiku.Dataset("typed_Stammdaten")
    master_edit = dataiku.Dataset("typed_Stammdaten")
    issuercurves = dataiku.Dataset("prepared_IssuerCurves")
    cashFlows = dataiku.Dataset("Calculated_Cashflows")
    
    master_df = master.get_dataframe()
    master_df_w = master_edit.get_writer()
    issuercurves_df = issuercurves.get_dataframe()
    cashFlows_df = cashFlows.get_dataframe()
    
    # org sample base source: https://docs.bokeh.org/en/latest/docs/user_guide/server.html
    # docu: https://docs.bokeh.org/en/latest/docs/reference/document.html
    
    # doc
    doc = curdoc()
    doc.title = "FI Validation"
    
    # setup tbl views, including formats
    colDefStammdaten = [
            TableColumn(field="ISIN", title="ISIN", width=100),
            TableColumn(field="Issuer", title="Issuer", width=130),
            TableColumn(field="StartDate", title="Start", width=75, formatter=DateFormatter(format = '%d.%m.%Y')),
            TableColumn(field="EndDate", title="End", width=75, formatter=DateFormatter(format = '%d.%m.%Y')),
            TableColumn(field="WP_Art", title="WP Art", width=140),
            TableColumn(field="Kommentar", title="Kommentar"),
        ]
    
    colDefCurves = [
        TableColumn(field="tenor", title="Tenor", width=250),
        TableColumn(field="value", title="Value", width=250),
    ]
    
    colDefCashFlows = [
        TableColumn(field="Cashflow", title="Cashflow", width=250),
        TableColumn(field="date", title="Date", width=250),
    ]
    
    # create default tbl data links
    tblSourceStammdaten = ColumnDataSource(master_df)
    #tblSourceCurves = ColumnDataSource(pd.DataFrame(columns = ["tenor", "value"]))
    tblSourceCurves = ColumnDataSource({'tenor': np.array([]), 'value': np.array([])})
    #tblSourceCashFlows = ColumnDataSource(pd.DataFrame(columns = ["Cashflow", "ISIN", "date"]))
    tblSourceCashFlows = ColumnDataSource({'Cashflow': np.array([]), 'date': np.array([])})
    
    # stammdaten list view
    tblViewStammdaten = DataTable(source=tblSourceStammdaten, columns=colDefStammdaten, width=880, height=800, fit_columns=False, index_position=None, reorderable=False, name="Master")
    
    # stammdaten details view items and add to unique tab
    txtISIN = TextInput(title="ISIN",value="", width=400, height=30, disabled=True)
    txtWPArt = TextInput(title="WP Art",value="", width=400, height=30, disabled=True)
    txtIssuer = TextInput(title="Issuer",value="", width=400, height=30, disabled=True)
    txtNotionalCurrency = TextInput(title="Currency Notional",value="", width=400, height=30, disabled=True)
    txtNotional = TextInput(title="Notional",value="", width=400, height=30, disabled=True)
    txtStart = TextInput(title="Startdate",value="", width=400, height=30, disabled=True)
    txtEnd = TextInput(title="Enddate",value="", width=400, height=30, disabled=True)
    
    tab_Details = Panel(child = column(txtISIN, txtWPArt, txtIssuer, txtNotionalCurrency, \
                                       txtNotional, txtStart, txtEnd), title = "Details")
    
    # curves details view items and add to unique tab
    curveSelect = Select(title='Issuer Curve', value="", options= issuercurves_df["Curve_ID"].unique().tolist() + [""])
    tblViewCurves = DataTable(source = tblSourceCurves, columns=colDefCurves, width=500, height=600, fit_columns=False, index_position=None, editable=True)
    
    pCurve = figure(plot_width=500, plot_height=400, x_range=["3M"], toolbar_location="below", x_axis_label='tenor', y_axis_label='value') # tools=["box_select", "hover", "reset"], toolbar_location=None, 
    pCurve.yaxis.visible = False
    pCurve.min_border_left = 30
    yaxis = LinearAxis()
    pCurve.add_layout(yaxis, 'right')
    pCurve.line(x='tenor', y='value', source=tblSourceCurves, line_alpha=1, color="firebrick", line_width=2)
    pCurve.circle(x='tenor', y='value', source=tblSourceCurves, size=5, fill_color="white")
    
    tab_Curves = Panel(child=column(curveSelect, pCurve, tblViewCurves), title="Curves")
    
    # create tab with curve detail items and add to unique tab
    tblViewCashFlows = DataTable(source = tblSourceCashFlows, columns=colDefCashFlows, width=500, height=600, fit_columns=False, index_position=None, editable=False)
    tab_CashFlows = Panel(child=row(tblViewCashFlows), title="Cash Flows")
    
    # create tab view and add all tabs
    tabs = Tabs(tabs=[ tab_Details, tab_Curves, tab_CashFlows], width = 400)
    
    # add list and tabs to doc root
    doc.add_root(row(tblViewStammdaten, tabs))
    
    # callback implementations
    def cbRowSelected(attr, old, new):
        selectionRowIndex=tblSourceStammdaten.selected.indices[0]
        txtISIN.value= tblSourceStammdaten.data["ISIN"][selectionRowIndex]
        txtWPArt.value= tblSourceStammdaten.data["WP_Art"][selectionRowIndex]
        txtIssuer.value= tblSourceStammdaten.data["Issuer"][selectionRowIndex]
        txtNotionalCurrency.value= tblSourceStammdaten.data["Notional_Currency"][selectionRowIndex]
        txtNotional.value= tblSourceStammdaten.data["Nominal_in_Curr1"][selectionRowIndex].astype("str")
        txtStart.value= tblSourceStammdaten.data["StartDate"][selectionRowIndex].astype("str")
        txtEnd.value= tblSourceStammdaten.data["EndDate"][selectionRowIndex].astype("str")
        
        # change value of detail curve selection will also raise callback=cbSelectCurve
        curveSelect.value = tblSourceStammdaten.data["Issuer_ID"][selectionRowIndex]
        selectCashFlowByIsin(tblSourceStammdaten.data["ISIN"][selectionRowIndex])
    
    def cbSelectCurve(attr, old, new):
        data = issuercurves_df[issuercurves_df["Curve_ID"] == new]
        
        updateCurveSelection(new)
        #l = data.to_dict('list')
        #l = ColumnDataSource({'tenor': np.array(data['tenor']), 'value': np.array(data['value'])})
        l = ColumnDataSource(data.to_dict('list')).data
        #tblSourceCurves.data = l.data
        #print ("type", len(l))
        #print ("new", l)
        
        # update underlying datasets
        #tblSourceCurves.data = ColumnDataSource(data.to_dict('list')).data
        try:
            tblSourceCurves.data = l
        except ValueError as error:
            pass
        
        pCurve.x_range.factors = data["tenor"].unique().tolist()
    
    # update implementations
    def updateCurveSelection(issuerid):
        #print ("type", type(issuerid)) #df['column'].describe()
        #print ("type", issuerid)
        
        try:
            selectionRowIndex=tblSourceStammdaten.selected.indices[0]
            #tblSourceStammdaten.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]})
        except ValueError as error:
            pass
        
        # todo: Update DataIku Dataset with changed data
        #master_df_w.write_row_array(tblSourceStammdaten.to_df().iloc[selectionRowIndex].tolist())
        #master_df_w.close()
        #pass
    
    def selectCashFlowByIsin(identifier):
        data = cashFlows_df[cashFlows_df["ISIN"] == identifier]
        tblSourceCashFlows.data = ColumnDataSource(data.to_dict('list')).data
    
    # add callbacks
    tblSourceStammdaten.on_change('selected', cbRowSelected)
    curveSelect.on_change('value', cbSelectCurve)

  • rreff
    rreff Partner Posts: 14 Partner

    Hello.

    As a supplement please note that the underlying storage is based on filesystem files.

    This as the client is currently testing the dataiku possibilities.

    Regards

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
    edited July 17

    Hi @rreff
    ,

    Thank you for attaching your logs and full code!

    Seeing the current error log, I think that this confirms that the issue is still with not closing the writer:

    Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).
    

    For instance, if you run your code twice without a master_df_w.close() statement, then this is actually expected. That's because the previous writer is still open, so you can't create a new one until the previous one is closed.

    To resolve this, I would suggest:

    • restarting your webapp backend (please note that this should be a one time thing in order to allow you to proceed, closing the writer within the code does need to be the solution going forward)
    • move both the opening of the writer and the closing of the writer into the updateCurveSelection function. You'll want to close the writer immediately after writing, and ensure that your flow always hits writer.close(). Moving both the open and close into the same function seems like the most straightforward way to do that.

    So for example, your updated updateCurveSelection function:

    def updateCurveSelection(issuerid):
        master_df_w = master_edit.get_writer()
    
        try:
            selectionRowIndex=tblSourceStammdaten.selected.indices[0]
            #tblSourceStammdaten.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]})
        except ValueError as error:
            pass
        master_df_w.write_row_array(tblSourcestammdaten.to_df().iloc[selectionRowIndex].tolist())
        master_df_w.close()

    Let me know how this goes,
    Sarina

  • rreff
    rreff Partner Posts: 14 Partner

    Hello Sarina.

    yes the error is no longer thrown.
    but "write_row_array" acts differently than expected.
    on the one hand after the call only this one row is available in the dataset. on the other hand for example columns with date values are empty.

    Regards

    Roberto

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
    edited July 17

    Hi @rreff
    ,

    Ah I see, glad to hear that the error is no longer thrown.

    I wonder if it makes sense to switch to the append method outlined here, for example:

        mydataset = dataiku.Dataset("sample_data")
        df = mydataset.get_dataframe()
        append_df = pd.DataFrame(data=[{'device_id': 1, 'event_date': '02/22/2021', 'interesting_value':11, 'country':'USA'}])
        df = df.append(append_df)
        mydataset.write_with_schema(df)
    

    For the issue with some date values showing up empty, I would suggest printing out the dataframe values prior to writing them to the dataset in order to troubleshoot this.

    Thanks,
    Sarina

  • rreff
    rreff Partner Posts: 14 Partner
    edited July 17

    Sorry it took me so long to reply.

    About your proposal. I would assume by using

    write_with_schema(...)

    that the hole dataset get written back. So in case I do have a lot of data stored, it would'nt be the best solution or? I was hoping to find a solution only persists that single row change.

  • Sarina
    Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
    edited July 17

    Hi @rreff
    ,

    You are correct, you can use the following approach to append instead:

        mydataset = dataiku.Dataset("name_age")
        # this allows you to append instead of overwrite, prior to calling get_writer() 
        mydataset.spec_item["appendMode"] = True
        writer = mydataset.get_writer()
        append_df = pd.DataFrame(data=[{'name': 'jill',  'age': 11}])
        # use write_dataframe() instead of write_with_schema here:
        writer.write_dataframe(append_df)
        writer.close()


    Let me know if you have any questions about this.

    Thanks,
    Sarina

  • rreff
    rreff Partner Posts: 14 Partner

    Hi Sarina,

    Oh thats a nice hack. I will test soon.

    Regards Roberto

Setup Info
    Tags
      Help me…