Update DSS Datasets in Bokeh WebApps
Is it possible not only to use dataset for visualization but also to persist change on the same?
Within the visualization, the user of the Bokeh WebApp can maintain some data updates. These changes should not be lost.
# from this dataset/ dataframe we read the data master = dataiku.Dataset("typed_Stammdaten") master_df = master.get_dataframe() # we created a second dataset/ dataframe ref we want to write to master_edit = dataiku.Dataset("typed_Stammdaten") master_ds_w = master_edit.get_writer() tblsource = ColumnDataSource(master_df) # callback from select widget to update the record def updateCurveSelection(issuerid): # get record index selectionRowIndex=tblsource.selected.indices[0] # updating source tblsource.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]}) # ? how to write back without getting an error master_ds_w.write_row_array(tblsource.to_df().iloc[selectionRowIndex].tolist())
Answers
-
CoreyS Dataiker Alumni, Dataiku DSS Core Designer, Dataiku DSS Core Concepts, Registered Posts: 1,150 ✭✭✭✭✭✭✭✭✭
Hi, @rreff
! Can you provide any further details on the thread to assist users in helping you find a solution (insert examples like DSS version etc.) Also, can you let us know if you’ve tried any fixes already?This should lead to a quicker response from the community. -
Hi Corey.
Sure.
Version 8.0.5. Free Edition.
We have tried the standard solution to solve this task.
as shown in the source code, this part:
master_ds_w.write_row_array(tblsource.to_df().iloc[selectionRowIndex].tolist())
should store the row update.
Regards
Roberto
-
Do you still miss information to answer our request from us?
-
Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
Hi @rreff
,I tested out your setup, and I think writing in this way should work as you expect. One thing I did notice is that it looks like you aren't closing the writer in the snippet you provided:
master_ds_w
The writer does need to be closed after use, with master_ds_w.close(). Once the writer is closed, you should see the rows written:
2021-03-30 17:14:49,660 INFO Initializing write data stream (KJg5kExH6X) 2021-03-30 17:14:49,662 INFO Remote Stream Writer: start generate 2021-03-30 17:14:49,662 INFO Waiting for data to send ... 2021-03-30 17:14:49,662 INFO Sending data (19) 2021-03-30 17:14:49,663 INFO Waiting for data to send ... 2021-03-30 17:14:49,664 INFO Remote Stream Writer closed 2021-03-30 17:14:49,666 INFO Got end mark, ending send 1 rows successfully written (KJg5kExH6X)
Can you try that out? If it still does not work for you, can you attach any errors that you are getting when you try to write the data? To do so, you can attach the contents of your webapp Log after clicking "refresh" on the log.
Thanks,
Sarina -
Hello SarinaS.
Sounds good and makes sense but does not work yet.
Please find below the last state log and source code.
Regards
Roberto
the log:
/appl/dataikudata/test1/pyenv/lib/python3.6/site-packages/tornado/concurrent.py:521: DeprecationWarning: @return_future is deprecated, use coroutines instead DeprecationWarning) 2021-03-31 10:48:49,827 INFO use old-style cookie sniffing handler 2021-03-31 10:48:49,830 INFO Starting Bokeh server version 0.12.16 (running on Tornado 5.1.1) 2021-03-31 10:48:49,831 WARNING Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly 2021-03-31 10:48:49,833 INFO Bokeh app running at: http://localhost:21900/backend 2021-03-31 10:48:49,847 INFO 200 GET /static/js/bokeh.min.js (127.0.0.1) 7.26ms 2021-03-31 10:48:50,057 INFO 200 GET /static/js/bokeh.min.js (127.0.0.1) 2.16ms 2021-03-31 10:48:50,540 INFO Initializing dataset writer for dataset RENTENVALIDIERUNG.typed_Stammdaten 2021-03-31 10:48:50,702 INFO 200 GET /backend (127.0.0.1) 260.30ms 2021-03-31 10:48:50,749 INFO Initializing dataset writer for dataset RENTENVALIDIERUNG.typed_Stammdaten 2021-03-31 10:48:50,749 ERROR Error running application handler <bokeh.application.handlers.directory.DirectoryHandler object at 0x7f9f71ef94e0>: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten). File "dataset_write.py", line 276, in __init__: raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name) Traceback (most recent call last): File "/appl/dataikuapps/dataiku-dss-9.0.0/python36.packages/bokeh/application/handlers/code_runner.py", line 163, in run exec(self._code, module.__dict__) File "/appl/dataikudata/test1/tmp/RENTENVALIDIERUNG/web_apps/xky7LBc/backend/main.py", line 54, in <module> master_df_w = master_edit.get_writer() File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset.py", line 830, in get_writer return dataset_write.DatasetWriter(self,) File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset_write.py", line 276, in __init__ raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name) Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten). 2021-03-31 10:48:50,752 INFO 200 GET /backend (127.0.0.1) 49.22ms 2021-03-31 10:48:50,753 ERROR Error running application handler <bokeh.application.handlers.directory.DirectoryHandler object at 0x7f9f71ef94e0>: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten). File "dataset_write.py", line 276, in __init__: raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name) Traceback (most recent call last): File "/appl/dataikuapps/dataiku-dss-9.0.0/python36.packages/bokeh/application/handlers/code_runner.py", line 163, in run exec(self._code, module.__dict__) File "/appl/dataikudata/test1/tmp/RENTENVALIDIERUNG/web_apps/xky7LBc/backend/main.py", line 54, in <module> master_df_w = master_edit.get_writer() File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset.py", line 830, in get_writer return dataset_write.DatasetWriter(self,) File "/appl/dataikuapps/dataiku/python/dataiku/core/dataset_write.py", line 276, in __init__ raise Exception('Unable to instanciate a new dataset writer. There is already another active writer for this dataset (%s).'%dataset.full_name) Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten). 2021-03-31 10:48:50,756 INFO 200 GET /backend (127.0.0.1) 3.13ms 2021-03-31 10:48:51,070 INFO 101 GET /backend/ws?bokeh-protocol-version=1.0&bokeh-session-id=m3V0qpty9ANnMLrOfLgU7TaOeH0vd0Pz1lV7D79MjDJh (127.0.0.1) 0.56ms 2021-03-31 10:48:51,070 INFO WebSocket connection opened 2021-03-31 10:48:51,070 INFO ServerConnection created
and source code:
import dataiku import numpy as np import pandas as pd from dataiku import pandasutils as pdu from datetime import date, datetime from bokeh.models.widgets import ( Panel, Tabs, Select, Slider, TextInput, Paragraph, DataTable, PreText, Button, DateFormatter, TableColumn, Dropdown, DatePicker, ) from bokeh.models import ( ColumnDataSource, Button, DataTable, TableColumn, CustomJS, TextInput, Paragraph, Div, Label, CheckboxGroup, Panel, Tabs, DatetimeTickFormatter, LinearAxis, LogAxis, ) from bokeh.layouts import widgetbox, row, column, layout from bokeh.models.callbacks import CustomJS from bokeh.models.ranges import FactorRange from bokeh.io import curdoc from bokeh.events import ButtonClick from bokeh.plotting import show, figure # link some tbl master = dataiku.Dataset("typed_Stammdaten") master_edit = dataiku.Dataset("typed_Stammdaten") issuercurves = dataiku.Dataset("prepared_IssuerCurves") cashFlows = dataiku.Dataset("Calculated_Cashflows") master_df = master.get_dataframe() master_df_w = master_edit.get_writer() issuercurves_df = issuercurves.get_dataframe() cashFlows_df = cashFlows.get_dataframe() # org sample base source: https://docs.bokeh.org/en/latest/docs/user_guide/server.html # docu: https://docs.bokeh.org/en/latest/docs/reference/document.html # doc doc = curdoc() doc.title = "FI Validation" # setup tbl views, including formats colDefStammdaten = [ TableColumn(field="ISIN", title="ISIN", width=100), TableColumn(field="Issuer", title="Issuer", width=130), TableColumn(field="StartDate", title="Start", width=75, formatter=DateFormatter(format = '%d.%m.%Y')), TableColumn(field="EndDate", title="End", width=75, formatter=DateFormatter(format = '%d.%m.%Y')), TableColumn(field="WP_Art", title="WP Art", width=140), TableColumn(field="Kommentar", title="Kommentar"), ] colDefCurves = [ TableColumn(field="tenor", title="Tenor", width=250), TableColumn(field="value", title="Value", width=250), ] colDefCashFlows = [ TableColumn(field="Cashflow", title="Cashflow", width=250), TableColumn(field="date", title="Date", width=250), ] # create default tbl data links tblSourceStammdaten = ColumnDataSource(master_df) #tblSourceCurves = ColumnDataSource(pd.DataFrame(columns = ["tenor", "value"])) tblSourceCurves = ColumnDataSource({'tenor': np.array([]), 'value': np.array([])}) #tblSourceCashFlows = ColumnDataSource(pd.DataFrame(columns = ["Cashflow", "ISIN", "date"])) tblSourceCashFlows = ColumnDataSource({'Cashflow': np.array([]), 'date': np.array([])}) # stammdaten list view tblViewStammdaten = DataTable(source=tblSourceStammdaten, columns=colDefStammdaten, width=880, height=800, fit_columns=False, index_position=None, reorderable=False, name="Master") # stammdaten details view items and add to unique tab txtISIN = TextInput(title="ISIN",value="", width=400, height=30, disabled=True) txtWPArt = TextInput(title="WP Art",value="", width=400, height=30, disabled=True) txtIssuer = TextInput(title="Issuer",value="", width=400, height=30, disabled=True) txtNotionalCurrency = TextInput(title="Currency Notional",value="", width=400, height=30, disabled=True) txtNotional = TextInput(title="Notional",value="", width=400, height=30, disabled=True) txtStart = TextInput(title="Startdate",value="", width=400, height=30, disabled=True) txtEnd = TextInput(title="Enddate",value="", width=400, height=30, disabled=True) tab_Details = Panel(child = column(txtISIN, txtWPArt, txtIssuer, txtNotionalCurrency, \ txtNotional, txtStart, txtEnd), title = "Details") # curves details view items and add to unique tab curveSelect = Select(title='Issuer Curve', value="", options= issuercurves_df["Curve_ID"].unique().tolist() + [""]) tblViewCurves = DataTable(source = tblSourceCurves, columns=colDefCurves, width=500, height=600, fit_columns=False, index_position=None, editable=True) pCurve = figure(plot_width=500, plot_height=400, x_range=["3M"], toolbar_location="below", x_axis_label='tenor', y_axis_label='value') # tools=["box_select", "hover", "reset"], toolbar_location=None, pCurve.yaxis.visible = False pCurve.min_border_left = 30 yaxis = LinearAxis() pCurve.add_layout(yaxis, 'right') pCurve.line(x='tenor', y='value', source=tblSourceCurves, line_alpha=1, color="firebrick", line_width=2) pCurve.circle(x='tenor', y='value', source=tblSourceCurves, size=5, fill_color="white") tab_Curves = Panel(child=column(curveSelect, pCurve, tblViewCurves), title="Curves") # create tab with curve detail items and add to unique tab tblViewCashFlows = DataTable(source = tblSourceCashFlows, columns=colDefCashFlows, width=500, height=600, fit_columns=False, index_position=None, editable=False) tab_CashFlows = Panel(child=row(tblViewCashFlows), title="Cash Flows") # create tab view and add all tabs tabs = Tabs(tabs=[ tab_Details, tab_Curves, tab_CashFlows], width = 400) # add list and tabs to doc root doc.add_root(row(tblViewStammdaten, tabs)) # callback implementations def cbRowSelected(attr, old, new): selectionRowIndex=tblSourceStammdaten.selected.indices[0] txtISIN.value= tblSourceStammdaten.data["ISIN"][selectionRowIndex] txtWPArt.value= tblSourceStammdaten.data["WP_Art"][selectionRowIndex] txtIssuer.value= tblSourceStammdaten.data["Issuer"][selectionRowIndex] txtNotionalCurrency.value= tblSourceStammdaten.data["Notional_Currency"][selectionRowIndex] txtNotional.value= tblSourceStammdaten.data["Nominal_in_Curr1"][selectionRowIndex].astype("str") txtStart.value= tblSourceStammdaten.data["StartDate"][selectionRowIndex].astype("str") txtEnd.value= tblSourceStammdaten.data["EndDate"][selectionRowIndex].astype("str") # change value of detail curve selection will also raise callback=cbSelectCurve curveSelect.value = tblSourceStammdaten.data["Issuer_ID"][selectionRowIndex] selectCashFlowByIsin(tblSourceStammdaten.data["ISIN"][selectionRowIndex]) def cbSelectCurve(attr, old, new): data = issuercurves_df[issuercurves_df["Curve_ID"] == new] updateCurveSelection(new) #l = data.to_dict('list') #l = ColumnDataSource({'tenor': np.array(data['tenor']), 'value': np.array(data['value'])}) l = ColumnDataSource(data.to_dict('list')).data #tblSourceCurves.data = l.data #print ("type", len(l)) #print ("new", l) # update underlying datasets #tblSourceCurves.data = ColumnDataSource(data.to_dict('list')).data try: tblSourceCurves.data = l except ValueError as error: pass pCurve.x_range.factors = data["tenor"].unique().tolist() # update implementations def updateCurveSelection(issuerid): #print ("type", type(issuerid)) #df['column'].describe() #print ("type", issuerid) try: selectionRowIndex=tblSourceStammdaten.selected.indices[0] #tblSourceStammdaten.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]}) except ValueError as error: pass # todo: Update DataIku Dataset with changed data #master_df_w.write_row_array(tblSourceStammdaten.to_df().iloc[selectionRowIndex].tolist()) #master_df_w.close() #pass def selectCashFlowByIsin(identifier): data = cashFlows_df[cashFlows_df["ISIN"] == identifier] tblSourceCashFlows.data = ColumnDataSource(data.to_dict('list')).data # add callbacks tblSourceStammdaten.on_change('selected', cbRowSelected) curveSelect.on_change('value', cbSelectCurve)
-
Hello.
As a supplement please note that the underlying storage is based on filesystem files.
This as the client is currently testing the dataiku possibilities.
Regards
-
Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
Hi @rreff
,Thank you for attaching your logs and full code!
Seeing the current error log, I think that this confirms that the issue is still with not closing the writer:
Exception: Unable to instanciate a new dataset writer. There is already another active writer for this dataset (RENTENVALIDIERUNG.typed_Stammdaten).
For instance, if you run your code twice without a master_df_w.close() statement, then this is actually expected. That's because the previous writer is still open, so you can't create a new one until the previous one is closed.
To resolve this, I would suggest:
- restarting your webapp backend (please note that this should be a one time thing in order to allow you to proceed, closing the writer within the code does need to be the solution going forward)
- move both the opening of the writer and the closing of the writer into the updateCurveSelection function. You'll want to close the writer immediately after writing, and ensure that your flow always hits writer.close(). Moving both the open and close into the same function seems like the most straightforward way to do that.
So for example, your updated updateCurveSelection function:
def updateCurveSelection(issuerid): master_df_w = master_edit.get_writer() try: selectionRowIndex=tblSourceStammdaten.selected.indices[0] #tblSourceStammdaten.patch({"Issuer_ID": [(selectionRowIndex, issuerid)]}) except ValueError as error: pass master_df_w.write_row_array(tblSourcestammdaten.to_df().iloc[selectionRowIndex].tolist()) master_df_w.close()
Let me know how this goes,
Sarina -
Hello Sarina.
yes the error is no longer thrown.
but "write_row_array" acts differently than expected.
on the one hand after the call only this one row is available in the dataset. on the other hand for example columns with date values are empty.Regards
Roberto
-
Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
Hi @rreff
,Ah I see, glad to hear that the error is no longer thrown.
I wonder if it makes sense to switch to the append method outlined here, for example:
mydataset = dataiku.Dataset("sample_data") df = mydataset.get_dataframe() append_df = pd.DataFrame(data=[{'device_id': 1, 'event_date': '02/22/2021', 'interesting_value':11, 'country':'USA'}]) df = df.append(append_df) mydataset.write_with_schema(df)
For the issue with some date values showing up empty, I would suggest printing out the dataframe values prior to writing them to the dataset in order to troubleshoot this.
Thanks,
Sarina -
Sorry it took me so long to reply.
About your proposal. I would assume by using
write_with_schema(...)
that the hole dataset get written back. So in case I do have a lot of data stored, it would'nt be the best solution or? I was hoping to find a solution only persists that single row change.
-
Sarina Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 317 Dataiker
Hi @rreff
,You are correct, you can use the following approach to append instead:
mydataset = dataiku.Dataset("name_age") # this allows you to append instead of overwrite, prior to calling get_writer() mydataset.spec_item["appendMode"] = True writer = mydataset.get_writer() append_df = pd.DataFrame(data=[{'name': 'jill', 'age': 11}]) # use write_dataframe() instead of write_with_schema here: writer.write_dataframe(append_df) writer.close()
Let me know if you have any questions about this.Thanks,
Sarina -
Hi Sarina,
Oh thats a nice hack. I will test soon.
Regards Roberto