Survey banner
Switching to Dataiku - a new area to help users who are transitioning from other tools and diving into Dataiku! CHECK IT OUT

Querying Underlying LLM-as-a-Service through DSS LLM Mesh

Solved!
NathanCrock
Level 2
Querying Underlying LLM-as-a-Service through DSS LLM Mesh

I have created an Azure_OpenAI connection in our DSS instance. I can connect to the instance from within Dataiku and externally via the dataiku-api-client module. For example:

import dataikuapi
import os

dss_instance = os.getenv('DSS_INSTANCE')
dss_api_key = os.getenv('DSS_API_KEY')
dss_project_key = os.getenv('DSS_PROJECT_KEY')
dss_llm_name = os.getenv('DSS_LLM_NAME')

client = dataikuapi.DSSClient(dss_instance, dss_api_key)
project_obj = client.get_project(dss_project_key)
llm_obj = project_obj.get_llm(dss_llm_name)
completion_obj = llm_obj.new_completion()
completion_obj.with_message(message='Speak only in iambic pentameter!', role='system')
query_obj = completion_obj.with_message(message="Count to 5", role='user')

This is helpful. However, I would like to query the underlying AzureOpenAI instance directly. For example, I would like to access its chat_with_tools function. Is there a way to access the underlying LLM object? Or perhaps there is a function that allows us to query a subset of the built-in functions and pass the custom args via some kwargs parameter?

Essentially, we would like to leverage the built-in logging, and toxicity/PII checks of the LLM Mesh while preserving the underlying API functionality of the LLM-as-a-Service. Is this something we can accomplish?


Operating system used: RHEL 7.9

0 Kudos
1 Solution
NathanCrock
Level 2
Author

A member from our team looked into the source code of the dataikuapi module in our /usr/lib/python3.x/site-packages/ folder. We were able to extract the core functionality of the dataikuapi call to the LLM mesh with just the requests package. As a result we created our own DSSOpenAI class that allows us to preserve the SDK functionality of the OpenAI library while still routing the request through the LLM Mesh. We currently use llama_index, but replace the import with which ever gateway/session object you use.

We wanted to share our solution here for posterity. Perhaps it may be helpful to some of your other clients, @ClemenceB

 

from llama_index.llms.openai import OpenAI
from typing import (
    List,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Sequence,
)
from requests.auth import HTTPBasicAuth
from requests import Session
import json
import httpx
from llama_index.core.base.llms.types import (
    ChatMessage,
    ChatResponse,
    CompletionResponse,
    MessageRole,
)
from llama_index.core.callbacks import CallbackManager
from llama_index.core.types import BaseOutputParser, PydanticProgramMode
from llama_index.llms.openai.base import llm_retry_decorator


class DSSOpenAI(OpenAI):
    # TODO Add async and streaming logic as needed

    dss_project: str = ""

    dss_model_deployment: str = ""

    def __init__(
        self,
        model: str = None,
        temperature: float = 1.0,
        max_tokens: Optional[int] = None,
        additional_kwargs: Optional[Dict[str, Any]] = None,
        max_retries: int = 3,
        timeout: float = 60.0,
        reuse_client: bool = True,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        api_version: Optional[str] = None,
        callback_manager: Optional[CallbackManager] = None,
        default_headers: Optional[Dict[str, str]] = None,
        http_client: Optional[httpx.Client] = None,
        # base class
        system_prompt: Optional[str] = None,
        messages_to_prompt: Optional[Callable[[Sequence[ChatMessage]], str]] = None,
        completion_to_prompt: Optional[Callable[[str], str]] = None,
        pydantic_program_mode: PydanticProgramMode = PydanticProgramMode.DEFAULT,
        output_parser: Optional[BaseOutputParser] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(
            model=model,
            temperature=temperature,
            max_tokens=max_tokens,
            additional_kwargs=additional_kwargs,
            max_retries=max_retries,
            callback_manager=callback_manager,
            api_key=api_key,
            api_version=api_version,
            api_base=api_base,
            timeout=timeout,
            reuse_client=reuse_client,
            default_headers=default_headers,
            system_prompt=system_prompt,
            messages_to_prompt=messages_to_prompt,
            completion_to_prompt=completion_to_prompt,
            pydantic_program_mode=pydantic_program_mode,
            output_parser=output_parser,
            **kwargs,
        )

        self.dss_project = additional_kwargs["dss_project"]
        self.dss_model_deployment = additional_kwargs["dss_model_deployment"]

    def class_name(cls) -> str:
        return "dss_openai_llm"

    def _get_client(self):
        raise NotImplementedError(
            "DSS OpenAI interface does not support this functionality."
        )


    # DSS SPECIFIC FUNCTIONS 

    def _format_dss_payload(self, messages: list, settings: dict) -> dict:
        """ """

        data = {
            "queries": [{"messages": messages}],
            "settings": settings,
            "llmId": self.dss_model_deployment,
        }

        data = json.dumps(data)

        return data

    def _auth_dss_session(self, session: Session) -> Session:
        """ """

        session = HTTPBasicAuth(self.api_key, "")(session)

        return session

    def _define_dss_session(self, verify: bool = False) -> Session:
        """ """

        session = Session()
        session.verify = verify

        session = self._auth_dss_session(session=session)

        return session

    def _get_dss_url(self) -> str:
        """ """

        url = f"{self.api_base}/dip/publicapi/projects/{self.dss_project}/llms/completions"

        return url

    def _parse_messages_to_dss(self, messages: List[ChatMessage]) -> list:
        """
        Converts llama index ChatMessages to a DSS understandable type
        """

        return_list = []

        for chat_message in messages:
            message_dict = {
                "content": chat_message.content,
                "role": chat_message.role.value,
            }

            return_list.append(message_dict)

        return return_list

    def _dss_call_llm(self, messages: List[ChatMessage], settings: dict = {}) -> dict:
        """ """
        # define variables for DSS request
        messages = self._parse_messages_to_dss(messages=messages)

        data = self._format_dss_payload(messages=messages, settings=settings)

        url = self._get_dss_url()

        session = self._define_dss_session()

        # make the requet
        response = session.request(
            method="POST", url=url, params=None, data=data, files=None, stream=False
        )

        # parse to json
        response = response.json()

        return response


    # END OF DSS FUNCTIONS, NOW OVERRIDING LLAMA FUNCTIONS TO INTEGRATE

    @llm_retry_decorator
    def complete(self, prompt, **kwargs: Any) -> CompletionResponse:
        return self._complete(prompt=prompt, **kwargs)

    @llm_retry_decorator
    def _complete(self, prompt: str, **kwargs: Any) -> CompletionResponse:
        """ """

        if isinstance(prompt, str):
            prompt = [ChatMessage(content=prompt, role=MessageRole.USER)]

        response = self._dss_call_llm(messages=prompt)

        text = response["responses"][0]["text"]

        return CompletionResponse(
            text=text,
            raw=response,
            logprobs=None,
            additional_kwargs=self._get_response_token_counts(response),
        )

    @llm_retry_decorator
    def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
        return self._chat(messages=messages, **kwargs)

    @llm_retry_decorator
    def _chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
        """ """

        response = self._dss_call_llm(messages=messages)
        message = response["responses"][0]["text"]
        logprobs = None
        return ChatResponse(
            message=ChatMessage(content=message, role=MessageRole.ASSISTANT),
            raw=response,
            logprobs=logprobs,
            additional_kwargs=self._get_response_token_counts(response),
        )

 

 

View solution in original post

0 Kudos
4 Replies
ClemenceB
Dataiker

Hi Nathan,
Thank you for your request! I'm assuming that by chat_with_tools function, you're referring to function calls. Is that right?
Adding kwargs parameters, although an interesting request, brings many challenges because some arguments may change the query and response format, making it impossible to automatically apply guardrails (PII detection, toxicity, ...).
However, adding the ability to use function calls among the list of arguments supported in our completion endpoint is something we're investigating.
I hope it answers your question.
Best,
Clémence

NathanCrock
Level 2
Author

Thank you for the follow up, Clémence!

I think I understand. Just to ensure I've got the right message for our team, let me try to summarize what I'm planning to say based on your response.

When querying LMs through Dataiku's LLM Mesh we can access methods, such as...

import dataikuapi
client = dataikuapi.DSSClient(dss_instance, dss_api_key)
project_obj = client.get_project(dss_project_key)
llm_obj = project_obj.get_llm(dss_model_name)
completion_obj = llm_obj.new_completion()
dir(completion_obj)


>> ['__class__', ..., 'cq', 'execute', 'execute_streamed', 'llm', 'settings', 'with_message']

So currently, we can access the methods/objects: cq, execute, execute_streamted, llm, settings, with_message

Our wish of accessing the underlying LLM object, or all the methods of the underlying LLM-as-a-Service object (e.g. those available in the AzureOpenAI module, such as chat_with_tools, just to name one) is not something currently available or on the planned roadmap? However, you are currently investigating ways to expose tooling functionality for LLMs that support it.

Is that a fair summary of the current state of things?

Thanks again!

Hi Nathan,

Yes, this is right.

Best,

Clémence

NathanCrock
Level 2
Author

A member from our team looked into the source code of the dataikuapi module in our /usr/lib/python3.x/site-packages/ folder. We were able to extract the core functionality of the dataikuapi call to the LLM mesh with just the requests package. As a result we created our own DSSOpenAI class that allows us to preserve the SDK functionality of the OpenAI library while still routing the request through the LLM Mesh. We currently use llama_index, but replace the import with which ever gateway/session object you use.

We wanted to share our solution here for posterity. Perhaps it may be helpful to some of your other clients, @ClemenceB

 

from llama_index.llms.openai import OpenAI
from typing import (
    List,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Sequence,
)
from requests.auth import HTTPBasicAuth
from requests import Session
import json
import httpx
from llama_index.core.base.llms.types import (
    ChatMessage,
    ChatResponse,
    CompletionResponse,
    MessageRole,
)
from llama_index.core.callbacks import CallbackManager
from llama_index.core.types import BaseOutputParser, PydanticProgramMode
from llama_index.llms.openai.base import llm_retry_decorator


class DSSOpenAI(OpenAI):
    # TODO Add async and streaming logic as needed

    dss_project: str = ""

    dss_model_deployment: str = ""

    def __init__(
        self,
        model: str = None,
        temperature: float = 1.0,
        max_tokens: Optional[int] = None,
        additional_kwargs: Optional[Dict[str, Any]] = None,
        max_retries: int = 3,
        timeout: float = 60.0,
        reuse_client: bool = True,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        api_version: Optional[str] = None,
        callback_manager: Optional[CallbackManager] = None,
        default_headers: Optional[Dict[str, str]] = None,
        http_client: Optional[httpx.Client] = None,
        # base class
        system_prompt: Optional[str] = None,
        messages_to_prompt: Optional[Callable[[Sequence[ChatMessage]], str]] = None,
        completion_to_prompt: Optional[Callable[[str], str]] = None,
        pydantic_program_mode: PydanticProgramMode = PydanticProgramMode.DEFAULT,
        output_parser: Optional[BaseOutputParser] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(
            model=model,
            temperature=temperature,
            max_tokens=max_tokens,
            additional_kwargs=additional_kwargs,
            max_retries=max_retries,
            callback_manager=callback_manager,
            api_key=api_key,
            api_version=api_version,
            api_base=api_base,
            timeout=timeout,
            reuse_client=reuse_client,
            default_headers=default_headers,
            system_prompt=system_prompt,
            messages_to_prompt=messages_to_prompt,
            completion_to_prompt=completion_to_prompt,
            pydantic_program_mode=pydantic_program_mode,
            output_parser=output_parser,
            **kwargs,
        )

        self.dss_project = additional_kwargs["dss_project"]
        self.dss_model_deployment = additional_kwargs["dss_model_deployment"]

    def class_name(cls) -> str:
        return "dss_openai_llm"

    def _get_client(self):
        raise NotImplementedError(
            "DSS OpenAI interface does not support this functionality."
        )


    # DSS SPECIFIC FUNCTIONS 

    def _format_dss_payload(self, messages: list, settings: dict) -> dict:
        """ """

        data = {
            "queries": [{"messages": messages}],
            "settings": settings,
            "llmId": self.dss_model_deployment,
        }

        data = json.dumps(data)

        return data

    def _auth_dss_session(self, session: Session) -> Session:
        """ """

        session = HTTPBasicAuth(self.api_key, "")(session)

        return session

    def _define_dss_session(self, verify: bool = False) -> Session:
        """ """

        session = Session()
        session.verify = verify

        session = self._auth_dss_session(session=session)

        return session

    def _get_dss_url(self) -> str:
        """ """

        url = f"{self.api_base}/dip/publicapi/projects/{self.dss_project}/llms/completions"

        return url

    def _parse_messages_to_dss(self, messages: List[ChatMessage]) -> list:
        """
        Converts llama index ChatMessages to a DSS understandable type
        """

        return_list = []

        for chat_message in messages:
            message_dict = {
                "content": chat_message.content,
                "role": chat_message.role.value,
            }

            return_list.append(message_dict)

        return return_list

    def _dss_call_llm(self, messages: List[ChatMessage], settings: dict = {}) -> dict:
        """ """
        # define variables for DSS request
        messages = self._parse_messages_to_dss(messages=messages)

        data = self._format_dss_payload(messages=messages, settings=settings)

        url = self._get_dss_url()

        session = self._define_dss_session()

        # make the requet
        response = session.request(
            method="POST", url=url, params=None, data=data, files=None, stream=False
        )

        # parse to json
        response = response.json()

        return response


    # END OF DSS FUNCTIONS, NOW OVERRIDING LLAMA FUNCTIONS TO INTEGRATE

    @llm_retry_decorator
    def complete(self, prompt, **kwargs: Any) -> CompletionResponse:
        return self._complete(prompt=prompt, **kwargs)

    @llm_retry_decorator
    def _complete(self, prompt: str, **kwargs: Any) -> CompletionResponse:
        """ """

        if isinstance(prompt, str):
            prompt = [ChatMessage(content=prompt, role=MessageRole.USER)]

        response = self._dss_call_llm(messages=prompt)

        text = response["responses"][0]["text"]

        return CompletionResponse(
            text=text,
            raw=response,
            logprobs=None,
            additional_kwargs=self._get_response_token_counts(response),
        )

    @llm_retry_decorator
    def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
        return self._chat(messages=messages, **kwargs)

    @llm_retry_decorator
    def _chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
        """ """

        response = self._dss_call_llm(messages=messages)
        message = response["responses"][0]["text"]
        logprobs = None
        return ChatResponse(
            message=ChatMessage(content=message, role=MessageRole.ASSISTANT),
            raw=response,
            logprobs=logprobs,
            additional_kwargs=self._get_response_token_counts(response),
        )

 

 

0 Kudos