Caching in a pipeline. Fetch data in constant time

Solved!
erbinlim
Level 2
Caching in a pipeline. Fetch data in constant time

We have a data processing pipeline that takes an input and queries a URL to get a response. We obviously want to go out to the network as little as possible and to cache previous queries in a database. From what I have seen the datasets in DSS are all relational table based, fetching from the database is then not in constant time. The way I would normally implement this is to generate a hash of the input. If the hash is in a key-value database (such as redis), then fetch the response from the key-value database instead of fetching it from the network. If it isn't in the key-value database, then fetch it from the network and save the response in the key-value database. Fetching from a key-value database if you know the key is generally achievable in constant time.

"Fetching from the URL" is a fairly generalised example, we could also have the situation where the parsing of input involves a long running process to get the output. In order to avoid running the long-running process over and over again for the same input, it is faster to simply cache the output in a key-value database.

How do I do something like this in DSS?

Thank you.

0 Kudos
1 Solution
Ignacio_Toledo

Hi @erbinlim.

No problem! I hope it helped a little.

About your question, I realize I've completely gone over that part of your post, and focused mainly on how to create a flow that allows you to avoid fetching results again from the url query. Once the local cached dataset is created I've not idea if the query time to a dataset that is stored as csv file (that happens when you have a dataset created in the filesystem) is constant or if it have a lineal or non-lineal dependency with the number of rows.

In the particular example I gave, all of this operations are happening on a python environment, where the datasets have been loaded into pandas dataframes stored in memory (I found this link with information on the times of making a query) and I use an algorithm like this:

 

dataset_unique_ids = dataiku.Dataset("unique_ids")
dataset_history = dataiku.Dataset("data_history")

df_uids = dataset_unique_ids.get_dataframe()
df_history = dataset_history.get_dataframe()
df_history.set_index('UID', drop=False)

cached_uids = df_history.UID.unique()

# getting the information for all unique_ids, thus the iteration
results = {}
for row in df_history.iterrows():
    uid = row[1].UID
    if uid in cached_uids:
        result[uid] = df_history.loc[uid] # here the query happens
    else:
        url = 'http://api.anywhere.aw/uid-check?uid=%s' % uid
        page = urllib2.urlopen(url, timeout=0.0001)
        data = page.readlines()
        # continues the code to parse the data

 

But if you were going to use a visual recipe in dataiku or other method, I wouldn't know how the query times would vary with the number or rows. 

@CoreyS, do you know some dataiker that could answer this part? 

All the best,

Ignacio

View solution in original post

0 Kudos
6 Replies
Ignacio_Toledo

Hi @erbinlim

I'd a similar problem, where I need to fetch the results of a URL query, and this is what I implemented (not sure though if it is an elegant or bullet proof solution... but has work now for a year an a half).

  1. The input, in our case, is a dataset with "unique ids" that keeps growing with time.
  2. We created a python script recipe and an associated output dataset, let's call it "data_updated", in the filesystem of the dataiku node. The dataset with the unique ids is included as an input to the recipe.
  3. The python scripts takes each unique id, create an url string for the query, and fetch the results, all using urllib2 library
  4. In this first run, all unique ids are processed, and the results are stored in dataframe, which is later stored into the "data_updated" dataset. For each unique id (UID) there is one row and then several columns with the parsed data from the url.
  5. Once the recipe is run the first time, we go back to the flow and examine the "data_updated" dataset, and we go to the "Settings" tab. Take note of the Path: (in this example I'm using a screenshot of the project where I did this, so the dataset name is not "data_updated")
    dataout.png
  6. Now we go back to the Flow, and we create a new filesystem dataset called "data_history", and we configure it to point to the same path as the "data_updated":
    Selection_002.png
  7. Back to the flow, open the python recipe created before, and add "data_history" as an input
  8. Modify the python script accordingly, so when new unique ids are added to the orginal table, the script first looks into the dataset "data_history":
    1. If the unique id is in data_history, retrieve the information from this "cache"
    2. If the unique id is not in data_history, make the url query, fetch and parse the results, and add them to the "data_updated" dataset.

I hope the steps are kind of clear. I didn't want to share my code as it was too specific to the particular query that we have to do. Some limitations:

  • If for some reason the url query for the same unique id changes, we are not going to catch that change and update the local cached dataset.
  • If the cached dataset becomes huge and it doesn't fit in the memory we could run into problems, when updating it. But in our case this is not a concern as the data is light weighted

Cheers!

Ignacio

 

0 Kudos

Just for completion, this is how this solutions looks in the flow:

Selection_003.png

โ€ƒ

0 Kudos
erbinlim
Level 2
Author

Thank you so much for your reply. I appreciate the elaborate response.

Can you elaborate on this point:

> If the unique id is in data_history, retrieve the information from this "cache"

How is this done in constant time? Querying an indexed relational database by id still takes O(logn) time.

0 Kudos
Ignacio_Toledo

Hi @erbinlim.

No problem! I hope it helped a little.

About your question, I realize I've completely gone over that part of your post, and focused mainly on how to create a flow that allows you to avoid fetching results again from the url query. Once the local cached dataset is created I've not idea if the query time to a dataset that is stored as csv file (that happens when you have a dataset created in the filesystem) is constant or if it have a lineal or non-lineal dependency with the number of rows.

In the particular example I gave, all of this operations are happening on a python environment, where the datasets have been loaded into pandas dataframes stored in memory (I found this link with information on the times of making a query) and I use an algorithm like this:

 

dataset_unique_ids = dataiku.Dataset("unique_ids")
dataset_history = dataiku.Dataset("data_history")

df_uids = dataset_unique_ids.get_dataframe()
df_history = dataset_history.get_dataframe()
df_history.set_index('UID', drop=False)

cached_uids = df_history.UID.unique()

# getting the information for all unique_ids, thus the iteration
results = {}
for row in df_history.iterrows():
    uid = row[1].UID
    if uid in cached_uids:
        result[uid] = df_history.loc[uid] # here the query happens
    else:
        url = 'http://api.anywhere.aw/uid-check?uid=%s' % uid
        page = urllib2.urlopen(url, timeout=0.0001)
        data = page.readlines()
        # continues the code to parse the data

 

But if you were going to use a visual recipe in dataiku or other method, I wouldn't know how the query times would vary with the number or rows. 

@CoreyS, do you know some dataiker that could answer this part? 

All the best,

Ignacio

0 Kudos
Andrey
Dataiker Alumni

Hi @erbinlim ,

Could you please share more details about your project? Do you already have a DSS flow that works without caching? If yes could you share it?

What do you use for fetching data, a code recipe? What's the volume of data that needs to be cached (an order of magnitude of number of records and size of each value)? 

Do you have an estimate of how frequently will cache misses be happening?

 

Regards,

Andrey Avtomonov
R&D Engineer @ Dataiku
erbinlim
Level 2
Author

Thank you for the reply @Andrey . Unfortunately we already have a DSS flow that works without caching but I cannot share it due to it being an internal DSS instance protected within a VPC.

The way we fetch data is through a python code recipe, yes.

The size of the data is not huge in that sense. @Ignacio_Toledo 's solution is in constant time and would work in our use case with the assumption that the entire dataset can fit in memory. We might have to vertically scale as time goes on but it's manageable.

I don't have the exact numbers now but I anticipate cache misses to be about 10-15% of the time.

--------------

I think in general due to the function of DSS (data processing pipeline), I had originally expected this to be a very trivial implementation / addition in the workflow. Hence my question. The proposed solution is fine for this use-case I have now but I might look into other solutions as we scale up.