Beginner Help: Deploying an API Service with Pickle Model from Jupyter Notebook in Dataiku

Velichka
Velichka Registered Posts: 4 ✭✭

Hello Dear Community,

I am a complete beginner in Dataiku and have created a Jupyter Notebook as a mini test model. I used Pickle to save the model and vectorizer into a managed folder named "Models". My goal is to make this model available as an API service, but I’m struggling with the process and would greatly appreciate step-by-step instructions to achieve this.

Below is the code I have written in my notebook:

API Designer

Settings: Function name → predict_cluster, code env →inherit project def (DSS builtin env)

Code:

import pickle
import dataiku
import dataikuapi


client = dataiku.api_client()
project_key = "EMPF_SYS"
project = client.get_project(project_key)
folder_name = "Models"

managed_folder = dataiku.Folder(folder_name)


with managed_folder.get_download_stream("vectorizer.pkl") as stream:
vectorizer, kmeans = pickle.load(stream)

# Modell und Vektorisierer aus dem Managed Folder laden
#folder = Folder("Models")
#with folder.get_download_stream("vectorizer.pkl") as f:
# vectorizer, kmeans = pickle.load(f)

# Endpunkt-Handler
def predict_cluster(request):

try:
data = request.json
input_text = data.get("text", "")

if not input_text:
return {"error": "No text provided"}, 400

vectorized_text = vectorizer.transform([input_text])

cluster = kmeans.predict(vectorized_text)[0]

# Ergebnis zurückgeben
return {"cluster": int(cluster)}, 200
except Exception as e:
return {"error": str(e)}, 500 Test Query {
"text": "Test"
}

Security: Authorization method: Public

When I test the API query, I encounter the following error:

Dev server deployment 

FAILED

Failed to initiate function server : <class 'Exception'> : Default project key is not specified (no DKU_CURRENT_PROJECT_KEY in env)

I would be incredibly grateful for any hints, guidance, or resources that could help me resolve this issue, especially regarding deploying the model stored in the managed folder and exposing it as an API service.

Thank you in advance for your support! 🙏

Best regards,
Veli

Operating system used: Windows 11

Answers

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,577 Neuron
    edited January 22

    An API service running in the API node and doesn't have a project context. In fact the API node doesn't even have access to the project data. I am guessing that you are only testing your API service in the Designer Node which comes with the API Note embedded into it which is why you haven't seen the URL error you should see with the above code if you try it from a deployed API service in the API node.

    So there are two ways to do what you want:

    You can use a Python Prediction endpoint (rather than Python function)

    Exposing a Python prediction model — Dataiku DSS 13 documentation

    When you package your service, the contents of the folder is bundled with the package, and your custom code receives the path to the managed folder content.

    Or you continue using a Python Function:

    https://doc.dataiku.com/dss/latest/apinode/endpoint-python-function.html

    When you package your service, the contents of the folders are bundled with the package, and your custom code receives the paths to the managed folder contents.

    Full API node documentation here:

    API Node & API Deployer: Real-time APIs — Dataiku DSS 13 documentation

  • Velichka
    Velichka Registered Posts: 4 ✭✭

    Hello Turribeach,

    Thank you for your quick response. I’ve already gone through the documentation and the pages you referenced, but I’m still struggling to make progress.

    You mentioned that I should package my service—could you provide more details on how to do that? I understand that I would need to use a Python Function since the Recommender plugin doesn’t work with Oracle. However, before I dive into that, I need to get the test model up and running.

    Do you, by any chance, have any code examples or instructions from a project where you’ve successfully implemented the second option (Python Function) in the Designer Node? That would be incredibly helpful and would save me a lot of trial and error.

    Looking forward to your guidance! :)

  • Turribeach
    Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,577 Neuron
    edited January 23

    Hi, the Exposing Python function link I provided gives you sample code of how to import a pickle model from a Dataiku managed folder. You need to use this code not what you currently have. Furthermore you can not test this code in a Jupyter Notebook, this code will only work on the API node. So you must deploy this API service to an API node to test it. The API node documentation provides examples on how to test API services, please review these.

    The one thing the documentation seems to be missing is how you add the managed folder to the API service so that it's packaged by the service as it is deployed to the API node. This can be done in Settings section of the API Endpoint tab:

    image.png

    So perform these changes, deploy your API service to the API node and then test it. If it is failing please post both the API function code and the logs showing the error in the API node.

  • Velichka
    Velichka Registered Posts: 4 ✭✭

    Thank you for your response. I tried to follow everything exactly with a new mini-model: Here is my Jupyter Notebook code, which I save as a .pkl file in the "Models" folder.

    import pickle

    # Einfache Modellklasse
    class SimpleModel:
    def predict(self, input_value):
    # Überprüfen, ob die Eingabe eine Zahl ist
    if not isinstance(input_value, (int, float)):
    return {"error": "Input must be a number"}

    # Beispiel-Vorhersage: Multipliziere den Eingabewert mit 2
    result = input_value * 2
    return {"prediction": result}

    # Modell instanziieren
    model = SimpleModel()

    # Dataiku-spezifische Speicherung
    import dataiku
    folder_name = "Models" # Name des Managed Folders in Dataiku
    managed_folder = dataiku.Folder(folder_name)

    # Pipeline-Modell als Pickle speichern
    with managed_folder.get_writer("simple_model.pkl") as writer:
    pickle.dump(model, writer)

    Here ist my Folder "Models " with the pkl file:

    image.png

    Here ist my Code in Api

    image.png

    Here the Code under Settings Section

    import pickle
    import dataiku
    import os.path

    folder_path = folders[0]
    file_path = os.path.join(folder_path, "simple_model.pkl")

    with open(file_path) as f:
    data = pickle.load(f)

    def predict(myparam):
    return data.predict(myparam)

    Here my Test Query

    {
    "myparam": 10
    }

    When i run the test query i become this Error:

    Dev server deployment 

    FAILED

    Failed to initiate function server : <class 'UnicodeDecodeError'> : utf-8

    [2025/01/23-15:49:43.889] [qtp284427775-40] [ERROR] [dku.lambda.api] - API call '/admin/api/services/notebookSkript/actions/switchToNewest' failedcom.dataiku.dip.io.SocketBlockLinkKernelException: Failed to initiate function server : <class 'UnicodeDecodeError'> : utf-8 at com.dataiku.dip.io.SocketBlockLinkInteraction.throwExceptionFromPython(SocketBlockLinkInteraction.java:302) at com.dataiku.dip.io.SocketBlockLinkInteraction$AsyncResult.checkException(SocketBlockLinkInteraction.java:215) at com.dataiku.dip.io.SocketBlockLinkInteraction$AsyncResult.get(SocketBlockLinkInteraction.java:190) at com.dataiku.dip.io.ResponderKernelLink$1.call(ResponderKernelLink.java:120) at com.dataiku.dip.io.ResponderKernelLink.execute(ResponderKernelLink.java:83) at com.dataiku.lambda.endpoints.pyfunction.PyFunctionPipeline.<init>(PyFunctionPipeline.java:51) at com.dataiku.lambda.endpoints.pyfunction.PyFunctionEndpointHandler.instantiatePipeline(PyFunctionEndpointHandler.java:55) at com.dataiku.lambda.endpoints.pyfunction.PyFunctionEndpointHandler.instantiatePipeline(PyFunctionEndpointHandler.java:25) at com.dataiku.lambda.endpoints.pool.PipelinePool.acquire(PipelinePool.java:168) at com.dataiku.lambda.endpoints.pool.PipelinePool.init(PipelinePool.java:67) at com.dataiku.lambda.endpoints.pyfunction.PyFunctionEndpointHandler.init(PyFunctionEndpointHandler.java:50) at com.dataiku.lambda.services.ServiceManager.mount(ServiceManager.java:350) at com.dataiku.lambda.services.ServiceManager.mountIfNeeded(ServiceManager.java:397) at com.dataiku.lambda.services.ServicesService.setMapping(ServicesService.java:286) at com.dataiku.lambda.services.ServicesService.switchToSingleGeneration(ServicesService.java:332) at com.dataiku.lambda.services.ServicesService.switchToNewestGeneration(ServicesService.java:314) at com.dataiku.lambda.admin.ServicesAdminController.switchToNewest(ServicesAdminController.java:211) at com.dataiku.lambda.admin.ServicesAdminController$$FastClassBySpringCGLIB$$3c7950d9.invoke(<generated>) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:792) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:762) at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:89) at com.dataiku.lambda.LambdaCallTracingAspect.doCall(LambdaCallTracingAspect.java:88) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:634) at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624) at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:762) at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:762) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:707) at com.dataiku.lambda.admin.ServicesAdminController$$EnhancerBySpringCGLIB$$2e7ab96f.switchToNewest(<generated>) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:903) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:809) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1072) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:965) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) at javax.servlet.http.HttpServlet.service(HttpServlet.java:523) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:764) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:529) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:221) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1384) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:176) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:484) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:174) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1306) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:129) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122) at org.eclipse.jetty.server.Server.handle(Server.java:563) at org.eclipse.jetty.server.HttpChannel$RequestDispatchable.dispatch(HttpChannel.java:1598) at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:753) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:501) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:287) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:314) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:100) at org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53) at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:421) at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:390) at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:277) at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:199) at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:411) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:969) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1194) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1149) at java.base/java.lang.Thread.run(Thread.java:840)

    Sorry to ask you again, but I'm really starting to feel desperate 😣

  • EvanPrianto
    EvanPrianto Registered Posts: 7 ✭✭

    i also have the same issue.
    already create the folder:

    image.png

    and set the folder in setting:

    image.png

    and i got error in this line:
    with open(file_path, 'rb') as f:

    self.model = pickle.load(f)

    this is the error:

    Can't get attribute 'ImpulseDetector' on <module 'dataiku.apinode.predict.customserver' from '/dss_data/dataiku-dss-13.1.2/python/dataiku/apinode/predict/customserver.py'>

    but if just read the csv like this line, it works:
    stat_path = os.path.join(data_folder, "stat.csv")

    stat_df = pd.read_csv(stat_path)

    can anyone give a works example for this case?

    my detail training and pickle dump code is like this:

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    # -*- coding: utf-8 -*-
    import os
    import dataiku
    import pandas as pd, numpy as np
    from dataiku import pandasutils as pdu
    from datetime import datetime, timedelta
    import pickle

    # Read recipe inputs
    v_infa_dynamic_monitoring = dataiku.Dataset("v_infa_dynamic_monitoring")
    v_infa_dynamic_monitoring_df = v_infa_dynamic_monitoring.get_dataframe()[['ds','end_time','table_target','source_row','target_row','status','mode','ds_job']]
    v_infa_dynamic_monitoring_df

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    # 1. Your input DataFrame
    df = v_infa_dynamic_monitoring_df

    # 2. Define date filter
    ninety_days_ago = datetime.now() - timedelta(days=90)

    # 3. Convert `end_time` column to datetime if it’s not already
    df['end_time'] = pd.to_datetime(df['end_time'])

    # 4. Filter the data
    filtered_df = df[
    (df['status'].isin(['TRUE', 'WARNING'])) &
    (df['end_time'] >= ninety_days_ago)
    ].copy()
    # filtered_df = filtered_df[filtered_df['table_target'] == table_target_test]

    # 5. Add `ds_valid` column
    filtered_df['ds_valid'] = filtered_df.apply(
    lambda row: row['ds'] if 'overwrite' in str(row['mode']).lower()
    else row['ds_job'] if 'append' in str(row['mode']).lower()
    else row['ds'],
    axis=1
    )

    # 6. Select and reorder columns as in SQL
    df_pd_recent_90_days = filtered_df[[
    'ds', 'end_time', 'table_target', 'source_row', 'target_row',
    'status', 'mode', 'ds_job', 'ds_valid'
    ]].sort_values(by='end_time', ascending=False).reset_index(drop=True)

    # df_pd_recent_90_days[df_pd_recent_90_days['ds_valid']=='anomaly']
    df_pd_recent_90_days['ds_valid'] = pd.to_numeric(df_pd_recent_90_days['ds_valid'], errors='coerce')
    df_pd_recent_90_days = df_pd_recent_90_days.dropna(subset=['ds_valid'])
    df_pd_recent_90_days['ds_valid'] = df_pd_recent_90_days['ds_valid'].astype(int)
    df_pd_recent_90_days

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    # 1. Keep only the latest row for each ds_valid (based on end_time)
    df_sorted = df_pd_recent_90_days.sort_values(['table_target','ds_valid', 'end_time'], ascending=[True,True, False])
    df_ds_valid_last = df_sorted.drop_duplicates(subset=['table_target','ds_valid'], keep='first')
    df_ds_valid_last

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    # Step 1: Filter table_target groups with at least 20 rows
    table_counts = df_ds_valid_last["table_target"].value_counts()
    valid_tables = table_counts[table_counts >= 20].index

    df_ds_valid_last_ = df_ds_valid_last[df_ds_valid_last["table_target"].isin(valid_tables)]
    df_ds_valid_last_

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    # Ensure ds_valid is treated as a comparable type (e.g., integer or datetime)
    df_ds_valid_last_["ds_valid"] = pd.to_numeric(df_ds_valid_last_["ds_valid"], errors="coerce")

    # Get the latest rows for each table_target based on ds_valid
    df_latest_train = (
    df_ds_valid_last_
    .sort_values("ds_valid", ascending=False)
    .groupby("table_target", group_keys=False)
    .head(39)
    )
    df_latest_train=df_latest_train.sort_values(['table_target','ds_valid'], ascending=[True,False])
    df_latest_train

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    class ImpulseDetector:
    def __init__(self):
    self.result_df_2d = None # Will store mean/std per table_target

    def fit(self, df: pd.DataFrame):
    def stat_value(group):
    mean_40 = group['target_row'].mean()
    std_40 = group['target_row'].std()
    group['mean_long'] = mean_40
    group['std_long'] = std_40
    return group

    result_df = df.groupby('table_target', group_keys=False).apply(stat_value)
    self.result_df_2d = (
    result_df
    .groupby('table_target')
    .head(1)
    .reset_index(drop=True)[['table_target', 'mean_long', 'std_long']]
    )

    def fitted(self, stat_df: pd.DataFrame):
    # Validate required columns exist
    required_cols = {'table_target', 'mean_long', 'std_long'}
    if not required_cols.issubset(stat_df.columns):
    raise ValueError(f"Input DataFrame must contain columns: {required_cols}")

    self.result_df_2d = stat_df.copy()



    def compute_pulse_flags(self, group, mean_col='mean_long', std_col='std_long',
    flag_col='flag_sudden_impulse', ratio_col='impulse_ratio', ratio_val='impulse_value'):
    mean = group[mean_col]
    std = group[std_col]

    with np.errstate(invalid='ignore', divide='ignore'):
    upper = mean + 2.2 * std
    lower = mean - 2.2 * std
    upper_percent = 0.53 * mean*2
    lower_percent = 0.47 * mean*2


    pulse_ratio = ((abs(group['target_row'] - mean) - 2 * std) / std).clip(lower=0)
    pulse_ratio = pulse_ratio.replace([np.inf, -np.inf], -1).fillna(0)
    pulse_value = abs(group['target_row'] - mean)

    group[ratio_col] = pulse_ratio
    group[ratio_val] = pulse_value

    group[flag_col] = (
    ((group['target_row'] > upper) & (group['target_row'] > upper_percent)) |
    ((group['target_row'] < lower) & (group['target_row'] < lower_percent))
    )

    return group

    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Merges df with precomputed mean/std and computes impulse flags
    """
    if self.result_df_2d is None:
    raise ValueError("Model must be fit first with .fit() before prediction.")

    merged = df.merge(self.result_df_2d, on='table_target', how='left')
    return self.compute_pulse_flags(merged)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    detector = ImpulseDetector()
    detector.fit(df_latest_train)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    # Write recipe outputs
    new_pickle = dataiku.Folder("HKWgLYbz")
    folder_path = new_pickle.get_path()
    pickle_file_path = os.path.join(folder_path, "impulse_detector.pkl")


    # Save model to pickle
    with open(pickle_file_path, "wb") as f:
    pickle.dump(detector, f)
  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,329 Dataiker

    Hi @EvanPrianto ,
    Based on your screenshots and code, it's not compatible to usea custom prediction endpoint; you must follow the guidelines
    https://doc.dataiku.com/dss/latest/apinode/endpoint-python-prediction.html#structure-of-the-code

    Thus need need to write a single Python class. This class must extend dataiku.apinode.predict.predictor.ClassificationPredictor or dataiku.apinode.predict.predictor.RegressionPredictor

    What you seem to be doing may work in a custom Python endpoint instead.


    I would suggest you raise a support and attach the export of the API endpoint along with the error or API node/deployment diagnostics.

    Thanks

  • EvanPrianto
    EvanPrianto Registered Posts: 7 ✭✭

    okey than, how to do this?

    To create a custom model, you need to write a single Python class. This class must extend dataiku.apinode.predict.predictor.ClassificationPredictor or dataiku.apinode.predict.predictor.RegressionPredictor. The name of the class does not matter. DSS will automatically find your class.

    The constructor of the class receives the path to the managed folder, if any.


    Any step by step to do this?

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,329 Dataiker

    @EvanPrianto

    We can continue this in the new thread you've created

    https://community.dataiku.com/discussion/45183/use-pickle-to-create-api-service

Setup Info
    Tags
      Help me…