Update DSS Datasets in Bokeh WebApps

rreff
Level 2
Update DSS Datasets in Bokeh WebApps

Is it possible not only to use dataset for visualization but also to persist change on the same?

Within the visualization, the user of the Bokeh WebApp can maintain some data updates. These changes should not be lost.

 

 

# from this dataset/ dataframe we read the data
master = dataiku.Dataset("typed_Stammdaten")
master_df = master.get_dataframe()

# we created a second dataset/ dataframe ref we want to write to
master_edit = dataiku.Dataset("typed_Stammdaten")
master_ds_w = master_edit.get_writer()

tblsource = ColumnDataSource(master_df)

# callback from select widget to update the record
def updateCurveSelection(issuerid):
    # get record index
    selectionRowIndex=tblsource.selected.indices[0]
    # updating source
    tblsource.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]})
    # ? how to write back without getting an error
	master_ds_w.write_row_array(tblsource.to_df().iloc[selectionRowIndex].tolist())

 

 

0 Kudos
12 Replies
CoreyS
Dataiker Alumni

Hi, @rreff! Can you provide any further details on the thread to assist users in helping you find a solution (insert examples like DSS version etc.) Also, can you let us know if youโ€™ve tried any fixes already?This should lead to a quicker response from the community.

Looking for more resources to help you use Dataiku effectively and upskill your knowledge? Check out these great resources: Dataiku Academy | Documentation | Knowledge Base

A reply answered your question? Mark as โ€˜Accepted Solutionโ€™ to help others like you!
0 Kudos
rreff
Level 2
Author

Hi Corey.

 

Sure.

Version 8.0.5. Free Edition.

We have tried the standard solution to solve this task.

as shown in the source code, this part:

master_ds_w.write_row_array(tblsource.to_df().iloc[selectionRowIndex].tolist())

should store the row update.

 

Regards

Roberto

0 Kudos
rreff
Level 2
Author

Do you still miss information to answer our request from us? 

0 Kudos
SarinaS
Dataiker

Hi @rreff ,

I tested out your setup, and I think writing in this way should work as you expect.  One thing I did notice is that it looks like you aren't closing the writer in the snippet you provided:

master_ds_w

 

The writer does need to be closed after use, with master_ds_w.close(). Once the writer is closed, you should see the rows written:

2021-03-30 17:14:49,660 INFO Initializing write data stream (KJg5kExH6X)
2021-03-30 17:14:49,662 INFO Remote Stream Writer: start generate
2021-03-30 17:14:49,662 INFO Waiting for data to send ...
2021-03-30 17:14:49,662 INFO Sending data (19)
2021-03-30 17:14:49,663 INFO Waiting for data to send ...
2021-03-30 17:14:49,664 INFO Remote Stream Writer closed
2021-03-30 17:14:49,666 INFO Got end mark, ending send
1 rows successfully written (KJg5kExH6X)

Can you try that out?  If it still does not work for you, can you attach any errors that you are getting when you try to write the data?  To do so, you can attach the contents of your webapp Log after clicking "refresh" on the log.   

Thanks,
Sarina

0 Kudos
rreff
Level 2
Author

Hello SarinaS.

 

Sounds good and makes sense but does not work yet.

Please find below the last state log and source code.

 

Regards

Roberto

 

the log:

/appl/dataikudata/test1/pyenv/lib/python3.6/site-packages/tornado/concurrent.py:521: DeprecationWarning: @return_future is deprecated, use coroutines instead
  DeprecationWarning)
2021-03-31 10:48:49,827 INFO use old-style cookie sniffing handler
2021-03-31 10:48:49,830 INFO Starting Bokeh server version 0.12.16 (running on Tornado 5.1.1)
2021-03-31 10:48:49,831 WARNING Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
2021-03-31 10:48:49,833 INFO Bokeh app running at: http://localhost:21900/backend
2021-03-31 10:48:49,847 INFO 200 GET /static/js/bokeh.min.js (127.0.0.1) 7.26ms
2021-03-31 10:48:50,057 INFO 200 GET /static/js/bokeh.min.js (127.0.0.1) 2.16ms
2021-03-31 10:48:50,540 INFO Initializing dataset writer for dataset RENTENVALIDIERUNG.typed_Stammdaten
2021-03-31 10:48:50,702 INFO 200 GET /backend (127.0.0.1) 260.30ms
2021-03-31 10:48:50,749 INFO Initializing dataset writer for dataset RENTENVALIDIERUNG.typed_Stammdaten
2021-03-31 10:48:50,749 ERROR Error running application handler <bokeh.application.handlers.directory.DirectoryHandler object at 0x7f9f71ef94e0>: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).
File "dataset_write.py", line 276, in __init__:
raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name) Traceback (most recent call last):
  File "/appl/dataikuapps/dataiku-dss-9.0.0/python36.packages/bokeh/application/handlers/code_runner.py", line 163, in run
    exec(self._code, module.__dict__)
  File "/appl/dataikudata/test1/tmp/RENTENVALIDIERUNG/web_apps/xky7LBc/backend/main.py", line 54, in <module>
    master_df_w = master_edit.get_writer()
  File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset.py", line 830, in get_writer
    return dataset_write.DatasetWriter(self,)
  File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset_write.py", line 276, in __init__
    raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name)
Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).

2021-03-31 10:48:50,752 INFO 200 GET /backend (127.0.0.1) 49.22ms
2021-03-31 10:48:50,753 ERROR Error running application handler <bokeh.application.handlers.directory.DirectoryHandler object at 0x7f9f71ef94e0>: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).
File "dataset_write.py", line 276, in __init__:
raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name) Traceback (most recent call last):
  File "/appl/dataikuapps/dataiku-dss-9.0.0/python36.packages/bokeh/application/handlers/code_runner.py", line 163, in run
    exec(self._code, module.__dict__)
  File "/appl/dataikudata/test1/tmp/RENTENVALIDIERUNG/web_apps/xky7LBc/backend/main.py", line 54, in <module>
    master_df_w = master_edit.get_writer()
  File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset.py", line 830, in get_writer
    return dataset_write.DatasetWriter(self,)
  File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset_write.py", line 276, in __init__
    raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name)
Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).

2021-03-31 10:48:50,756 INFO 200 GET /backend (127.0.0.1) 3.13ms
2021-03-31 10:48:51,070 INFO 101 GET /backend/ws?bokeh-protocol-version=1.0&bokeh-session-id=m3V0qpty9ANnMLrOfLgU7TaOeH0vd0Pz1lV7D79MjDJh (127.0.0.1) 0.56ms
2021-03-31 10:48:51,070 INFO WebSocket connection opened
2021-03-31 10:48:51,070 INFO ServerConnection created

 

and source code:

 

import dataiku
import numpy as np
import pandas as pd

from dataiku import pandasutils as pdu
from datetime import date, datetime

from bokeh.models.widgets import (
    Panel, 
    Tabs, 
    Select, 
    Slider, 
    TextInput, 
    Paragraph,
    DataTable, 
    PreText, 
    Button, 
    DateFormatter, 
    TableColumn, 
    Dropdown, 
    DatePicker,
)
from bokeh.models import (
    ColumnDataSource,
    Button, 
    DataTable, 
    TableColumn, 
    CustomJS, 
    TextInput, 
    Paragraph, 
    Div,
    Label,
    CheckboxGroup,
    Panel,
    Tabs,
    DatetimeTickFormatter,
    LinearAxis,
    LogAxis,
)
from bokeh.layouts import widgetbox, row, column, layout
from bokeh.models.callbacks import CustomJS
from bokeh.models.ranges import FactorRange
from bokeh.io import curdoc
from bokeh.events import ButtonClick
from bokeh.plotting import show, figure

# link some tbl
master = dataiku.Dataset("typed_Stammdaten")
master_edit = dataiku.Dataset("typed_Stammdaten")
issuercurves = dataiku.Dataset("prepared_IssuerCurves")
cashFlows = dataiku.Dataset("Calculated_Cashflows")

master_df = master.get_dataframe()
master_df_w = master_edit.get_writer()
issuercurves_df = issuercurves.get_dataframe()
cashFlows_df = cashFlows.get_dataframe()

# org sample base source: https://docs.bokeh.org/en/latest/docs/user_guide/server.html
# docu: https://docs.bokeh.org/en/latest/docs/reference/document.html

# doc
doc = curdoc()
doc.title = "FI Validation"

# setup tbl views, including formats
colDefStammdaten = [
        TableColumn(field="ISIN", title="ISIN", width=100),
        TableColumn(field="Issuer", title="Issuer", width=130),
        TableColumn(field="StartDate", title="Start", width=75, formatter=DateFormatter(format = '%d.%m.%Y')),
        TableColumn(field="EndDate", title="End", width=75, formatter=DateFormatter(format = '%d.%m.%Y')),
        TableColumn(field="WP_Art", title="WP Art", width=140),
        TableColumn(field="Kommentar", title="Kommentar"),
    ]

colDefCurves = [
    TableColumn(field="tenor", title="Tenor", width=250),
    TableColumn(field="value", title="Value", width=250),
]

colDefCashFlows = [
    TableColumn(field="Cashflow", title="Cashflow", width=250),
    TableColumn(field="date", title="Date", width=250),
]

# create default tbl data links
tblSourceStammdaten = ColumnDataSource(master_df)
#tblSourceCurves = ColumnDataSource(pd.DataFrame(columns = ["tenor", "value"]))
tblSourceCurves = ColumnDataSource({'tenor': np.array([]), 'value': np.array([])})
#tblSourceCashFlows = ColumnDataSource(pd.DataFrame(columns = ["Cashflow", "ISIN", "date"]))
tblSourceCashFlows = ColumnDataSource({'Cashflow': np.array([]), 'date': np.array([])})

# stammdaten list view
tblViewStammdaten = DataTable(source=tblSourceStammdaten, columns=colDefStammdaten, width=880, height=800, fit_columns=False, index_position=None, reorderable=False, name="Master")

# stammdaten details view items and add to unique tab
txtISIN = TextInput(title="ISIN",value="", width=400, height=30, disabled=True)
txtWPArt = TextInput(title="WP Art",value="", width=400, height=30, disabled=True)
txtIssuer = TextInput(title="Issuer",value="", width=400, height=30, disabled=True)
txtNotionalCurrency = TextInput(title="Currency Notional",value="", width=400, height=30, disabled=True)
txtNotional = TextInput(title="Notional",value="", width=400, height=30, disabled=True)
txtStart = TextInput(title="Startdate",value="", width=400, height=30, disabled=True)
txtEnd = TextInput(title="Enddate",value="", width=400, height=30, disabled=True)

tab_Details = Panel(child = column(txtISIN, txtWPArt, txtIssuer, txtNotionalCurrency, \
                                   txtNotional, txtStart, txtEnd), title = "Details")

# curves details view items and add to unique tab
curveSelect = Select(title='Issuer Curve', value="", options= issuercurves_df["Curve_ID"].unique().tolist() + [""])
tblViewCurves = DataTable(source = tblSourceCurves, columns=colDefCurves, width=500, height=600, fit_columns=False, index_position=None, editable=True)

pCurve = figure(plot_width=500, plot_height=400, x_range=["3M"], toolbar_location="below", x_axis_label='tenor', y_axis_label='value') # tools=["box_select", "hover", "reset"], toolbar_location=None, 
pCurve.yaxis.visible = False
pCurve.min_border_left = 30
yaxis = LinearAxis()
pCurve.add_layout(yaxis, 'right')
pCurve.line(x='tenor', y='value', source=tblSourceCurves, line_alpha=1, color="firebrick", line_width=2)
pCurve.circle(x='tenor', y='value', source=tblSourceCurves, size=5, fill_color="white")

tab_Curves = Panel(child=column(curveSelect, pCurve, tblViewCurves), title="Curves")

# create tab with curve detail items and add to unique tab
tblViewCashFlows = DataTable(source = tblSourceCashFlows, columns=colDefCashFlows, width=500, height=600, fit_columns=False, index_position=None, editable=False)
tab_CashFlows = Panel(child=row(tblViewCashFlows), title="Cash Flows")

# create tab view and add all tabs
tabs = Tabs(tabs=[ tab_Details, tab_Curves, tab_CashFlows], width = 400)

# add list and tabs to doc root
doc.add_root(row(tblViewStammdaten, tabs))

# callback implementations
def cbRowSelected(attr, old, new):
    selectionRowIndex=tblSourceStammdaten.selected.indices[0]
    txtISIN.value= tblSourceStammdaten.data["ISIN"][selectionRowIndex]
    txtWPArt.value= tblSourceStammdaten.data["WP_Art"][selectionRowIndex]
    txtIssuer.value= tblSourceStammdaten.data["Issuer"][selectionRowIndex]
    txtNotionalCurrency.value= tblSourceStammdaten.data["Notional_Currency"][selectionRowIndex]
    txtNotional.value= tblSourceStammdaten.data["Nominal_in_Curr1"][selectionRowIndex].astype("str")
    txtStart.value= tblSourceStammdaten.data["StartDate"][selectionRowIndex].astype("str")
    txtEnd.value= tblSourceStammdaten.data["EndDate"][selectionRowIndex].astype("str")
    
    # change value of detail curve selection will also raise callback=cbSelectCurve
    curveSelect.value = tblSourceStammdaten.data["Issuer_ID"][selectionRowIndex]
    selectCashFlowByIsin(tblSourceStammdaten.data["ISIN"][selectionRowIndex])

def cbSelectCurve(attr, old, new):
    data = issuercurves_df[issuercurves_df["Curve_ID"] == new]
    
    updateCurveSelection(new)
    #l = data.to_dict('list')
    #l = ColumnDataSource({'tenor': np.array(data['tenor']), 'value': np.array(data['value'])})
    l = ColumnDataSource(data.to_dict('list')).data
    #tblSourceCurves.data = l.data
    #print ("type", len(l))
    #print ("new", l)
    
    # update underlying datasets
    #tblSourceCurves.data = ColumnDataSource(data.to_dict('list')).data
    try:
        tblSourceCurves.data = l
    except ValueError as error:
        pass
    
    pCurve.x_range.factors = data["tenor"].unique().tolist()

# update implementations
def updateCurveSelection(issuerid):
    #print ("type", type(issuerid)) #df['column'].describe()
    #print ("type", issuerid)
    
    try:
        selectionRowIndex=tblSourceStammdaten.selected.indices[0]
        #tblSourceStammdaten.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]})
    except ValueError as error:
        pass
    
    # todo: Update DataIku Dataset with changed data
    #master_df_w.write_row_array(tblSourceStammdaten.to_df().iloc[selectionRowIndex].tolist())
    #master_df_w.close()
    #pass

def selectCashFlowByIsin(identifier):
    data = cashFlows_df[cashFlows_df["ISIN"] == identifier]
    tblSourceCashFlows.data = ColumnDataSource(data.to_dict('list')).data

# add callbacks
tblSourceStammdaten.on_change('selected', cbRowSelected)
curveSelect.on_change('value', cbSelectCurve)

 

0 Kudos
rreff
Level 2
Author

Hello.

 

As a supplement please note that the underlying storage is based on filesystem files.

This as the client is currently testing the dataiku possibilities.

 

Regards

0 Kudos
SarinaS
Dataiker

Hi @rreff ,

Thank you for attaching your logs and full code!  

Seeing the current error log, I think that this confirms that the issue is still with not closing the writer:

 

Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).

 

For instance, if you run your code twice without a master_df_w.close() statement, then this is actually expected. That's because the previous writer is still open, so you can't create a new one until the previous one is closed.

To resolve this, I would suggest: 

  • restarting your webapp backend (please note that this should be a one time thing in order to allow you to proceed, closing the writer within the code does need to be the solution going forward) 
  • move both the opening of the writer and the closing of the writer into the updateCurveSelection function.  You'll want to close the writer immediately after writing, and ensure that your flow always hits writer.close().  Moving both the open and close into the same function seems like the most straightforward way to do that.  

So for example, your updated updateCurveSelection function:

 

def updateCurveSelection(issuerid):
    master_df_w = master_edit.get_writer()

    try:
        selectionRowIndex=tblSourceStammdaten.selected.indices[0]
        #tblSourceStammdaten.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]})
    except ValueError as error:
        pass
    master_df_w.write_row_array(tblSourcestammdaten.to_df().iloc[selectionRowIndex].tolist())
    master_df_w.close()

 

Let me know how this goes,
Sarina

  

0 Kudos
rreff
Level 2
Author

Hello Sarina.

yes the error is no longer thrown.
but "write_row_array" acts differently than expected.
on the one hand after the call only this one row is available in the dataset. on the other hand for example columns with date values are empty.

Regards

Roberto

0 Kudos
SarinaS
Dataiker

Hi @rreff ,

Ah I see, glad to hear that the error is no longer thrown.  

I wonder if it makes sense to switch to the append method outlined here, for example: 

    mydataset = dataiku.Dataset("sample_data")
    df = mydataset.get_dataframe()
    append_df = pd.DataFrame(data=[{'device_id': 1, 'event_date': '02/22/2021', 'interesting_value':11, 'country':'USA'}])
    df = df.append(append_df)
    mydataset.write_with_schema(df)

 

For the issue with some date values showing up empty, I would suggest printing out the dataframe values prior to writing them to the dataset in order to troubleshoot this.  

Thanks,
Sarina

0 Kudos
rreff
Level 2
Author

Sorry it took me so long to reply.

About your proposal. I would assume by using 

write_with_schema(...)

that the hole dataset get written back. So in case I do have a lot of data stored, it would'nt be the best solution or? I was hoping to find a solution only persists that single row change.

0 Kudos
SarinaS
Dataiker

Hi @rreff ,

You are correct, you can use the following approach to append instead: 

    mydataset = dataiku.Dataset("name_age")
    # this allows you to append instead of overwrite, prior to calling get_writer() 
    mydataset.spec_item["appendMode"] = True
    writer = mydataset.get_writer()
    append_df = pd.DataFrame(data=[{'name': 'jill',  'age': 11}])
    # use write_dataframe() instead of write_with_schema here:
    writer.write_dataframe(append_df)
    writer.close()


Let me know if you have any questions about this. 

Thanks,
Sarina

0 Kudos
rreff
Level 2
Author

Hi Sarina,

Oh thats a nice hack. I will test soon.

Regards Roberto