Querying Underlying LLM-as-a-Service through DSS LLM Mesh

Options
NathanCrock
NathanCrock Registered Posts: 3
edited July 16 in Using Dataiku

I have created an Azure_OpenAI connection in our DSS instance. I can connect to the instance from within Dataiku and externally via the dataiku-api-client module. For example:

import dataikuapi
import os

dss_instance = os.getenv('DSS_INSTANCE')
dss_api_key = os.getenv('DSS_API_KEY')
dss_project_key = os.getenv('DSS_PROJECT_KEY')
dss_llm_name = os.getenv('DSS_LLM_NAME')

client = dataikuapi.DSSClient(dss_instance, dss_api_key)
project_obj = client.get_project(dss_project_key)
llm_obj = project_obj.get_llm(dss_llm_name)
completion_obj = llm_obj.new_completion()
completion_obj.with_message(message='Speak only in iambic pentameter!', role='system')
query_obj = completion_obj.with_message(message="Count to 5", role='user')

This is helpful. However, I would like to query the underlying AzureOpenAI instance directly. For example, I would like to access its chat_with_tools function. Is there a way to access the underlying LLM object? Or perhaps there is a function that allows us to query a subset of the built-in functions and pass the custom args via some kwargs parameter?

Essentially, we would like to leverage the built-in logging, and toxicity/PII checks of the LLM Mesh while preserving the underlying API functionality of the LLM-as-a-Service. Is this something we can accomplish?


Operating system used: RHEL 7.9

Best Answer

  • NathanCrock
    NathanCrock Registered Posts: 3
    edited July 17 Answer ✓
    Options

    A member from our team looked into the source code of the dataikuapi module in our /usr/lib/python3.x/site-packages/ folder. We were able to extract the core functionality of the dataikuapi call to the LLM mesh with just the requests package. As a result we created our own DSSOpenAI class that allows us to preserve the SDK functionality of the OpenAI library while still routing the request through the LLM Mesh. We currently use llama_index, but replace the import with which ever gateway/session object you use.

    We wanted to share our solution here for posterity. Perhaps it may be helpful to some of your other clients, @ClemenceB
    .

    from llama_index.llms.openai import OpenAI
    from typing import (
        List,
        Any,
        Callable,
        Dict,
        List,
        Optional,
        Sequence,
    )
    from requests.auth import HTTPBasicAuth
    from requests import Session
    import json
    import httpx
    from llama_index.core.base.llms.types import (
        ChatMessage,
        ChatResponse,
        CompletionResponse,
        MessageRole,
    )
    from llama_index.core.callbacks import CallbackManager
    from llama_index.core.types import BaseOutputParser, PydanticProgramMode
    from llama_index.llms.openai.base import llm_retry_decorator
    
    
    class DSSOpenAI(OpenAI):
        # TODO Add async and streaming logic as needed
    
        dss_project: str = ""
    
        dss_model_deployment: str = ""
    
        def __init__(
            self,
            model: str = None,
            temperature: float = 1.0,
            max_tokens: Optional[int] = None,
            additional_kwargs: Optional[Dict[str, Any]] = None,
            max_retries: int = 3,
            timeout: float = 60.0,
            reuse_client: bool = True,
            api_key: Optional[str] = None,
            api_base: Optional[str] = None,
            api_version: Optional[str] = None,
            callback_manager: Optional[CallbackManager] = None,
            default_headers: Optional[Dict[str, str]] = None,
            http_client: Optional[httpx.Client] = None,
            # base class
            system_prompt: Optional[str] = None,
            messages_to_prompt: Optional[Callable[[Sequence[ChatMessage]], str]] = None,
            completion_to_prompt: Optional[Callable[[str], str]] = None,
            pydantic_program_mode: PydanticProgramMode = PydanticProgramMode.DEFAULT,
            output_parser: Optional[BaseOutputParser] = None,
            **kwargs: Any,
        ) -> None:
            super().__init__(
                model=model,
                temperature=temperature,
                max_tokens=max_tokens,
                additional_kwargs=additional_kwargs,
                max_retries=max_retries,
                callback_manager=callback_manager,
                api_key=api_key,
                api_version=api_version,
                api_base=api_base,
                timeout=timeout,
                reuse_client=reuse_client,
                default_headers=default_headers,
                system_prompt=system_prompt,
                messages_to_prompt=messages_to_prompt,
                completion_to_prompt=completion_to_prompt,
                pydantic_program_mode=pydantic_program_mode,
                output_parser=output_parser,
                **kwargs,
            )
    
            self.dss_project = additional_kwargs["dss_project"]
            self.dss_model_deployment = additional_kwargs["dss_model_deployment"]
    
        def class_name(cls) -> str:
            return "dss_openai_llm"
    
        def _get_client(self):
            raise NotImplementedError(
                "DSS OpenAI interface does not support this functionality."
            )
    
    
        # DSS SPECIFIC FUNCTIONS 
    
        def _format_dss_payload(self, messages: list, settings: dict) -> dict:
            """ """
    
            data = {
                "queries": [{"messages": messages}],
                "settings": settings,
                "llmId": self.dss_model_deployment,
            }
    
            data = json.dumps(data)
    
            return data
    
        def _auth_dss_session(self, session: Session) -> Session:
            """ """
    
            session = HTTPBasicAuth(self.api_key, "")(session)
    
            return session
    
        def _define_dss_session(self, verify: bool = False) -> Session:
            """ """
    
            session = Session()
            session.verify = verify
    
            session = self._auth_dss_session(session=session)
    
            return session
    
        def _get_dss_url(self) -> str:
            """ """
    
            url = f"{self.api_base}/dip/publicapi/projects/{self.dss_project}/llms/completions"
    
            return url
    
        def _parse_messages_to_dss(self, messages: List[ChatMessage]) -> list:
            """
            Converts llama index ChatMessages to a DSS understandable type
            """
    
            return_list = []
    
            for chat_message in messages:
                message_dict = {
                    "content": chat_message.content,
                    "role": chat_message.role.value,
                }
    
                return_list.append(message_dict)
    
            return return_list
    
        def _dss_call_llm(self, messages: List[ChatMessage], settings: dict = {}) -> dict:
            """ """
            # define variables for DSS request
            messages = self._parse_messages_to_dss(messages=messages)
    
            data = self._format_dss_payload(messages=messages, settings=settings)
    
            url = self._get_dss_url()
    
            session = self._define_dss_session()
    
            # make the requet
            response = session.request(
                method="POST", url=url, params=None, data=data, files=None, stream=False
            )
    
            # parse to json
            response = response.json()
    
            return response
    
    
        # END OF DSS FUNCTIONS, NOW OVERRIDING LLAMA FUNCTIONS TO INTEGRATE
    
        @llm_retry_decorator
        def complete(self, prompt, **kwargs: Any) -> CompletionResponse:
            return self._complete(prompt=prompt, **kwargs)
    
        @llm_retry_decorator
        def _complete(self, prompt: str, **kwargs: Any) -> CompletionResponse:
            """ """
    
            if isinstance(prompt, str):
                prompt = [ChatMessage(content=prompt, role=MessageRole.USER)]
    
            response = self._dss_call_llm(messages=prompt)
    
            text = response["responses"][0]["text"]
    
            return CompletionResponse(
                text=text,
                raw=response,
                logprobs=None,
                additional_kwargs=self._get_response_token_counts(response),
            )
    
        @llm_retry_decorator
        def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
            return self._chat(messages=messages, **kwargs)
    
        @llm_retry_decorator
        def _chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
            """ """
    
            response = self._dss_call_llm(messages=messages)
            message = response["responses"][0]["text"]
            logprobs = None
            return ChatResponse(
                message=ChatMessage(content=message, role=MessageRole.ASSISTANT),
                raw=response,
                logprobs=logprobs,
                additional_kwargs=self._get_response_token_counts(response),
            )

Answers

  • ClemenceB
    ClemenceB Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Product Ideas Manager Posts: 18 Dataiker
    Options

    Hi Nathan,
    Thank you for your request! I'm assuming that by chat_with_tools function, you're referring to function calls. Is that right?
    Adding kwargs parameters, although an interesting request, brings many challenges because some arguments may change the query and response format, making it impossible to automatically apply guardrails (PII detection, toxicity, ...).
    However, adding the ability to use function calls among the list of arguments supported in our completion endpoint is something we're investigating.
    I hope it answers your question.
    Best,
    Clémence

  • NathanCrock
    NathanCrock Registered Posts: 3
    edited July 17
    Options

    Thank you for the follow up, Clémence!

    I think I understand. Just to ensure I've got the right message for our team, let me try to summarize what I'm planning to say based on your response.

    When querying LMs through Dataiku's LLM Mesh we can access methods, such as...

    import dataikuapi
    client = dataikuapi.DSSClient(dss_instance, dss_api_key)
    project_obj = client.get_project(dss_project_key)
    llm_obj = project_obj.get_llm(dss_model_name)
    completion_obj = llm_obj.new_completion()
    dir(completion_obj)


    >> ['__class__', ..., 'cq', 'execute', 'execute_streamed', 'llm', 'settings', 'with_message']

    So currently, we can access the methods/objects: cq, execute, execute_streamted, llm, settings, with_message

    Our wish of accessing the underlying LLM object, or all the methods of the underlying LLM-as-a-Service object (e.g. those available in the AzureOpenAI module, such as chat_with_tools, just to name one) is not something currently available or on the planned roadmap? However, you are currently investigating ways to expose tooling functionality for LLMs that support it.

    Is that a fair summary of the current state of things?

    Thanks again!

  • ClemenceB
    ClemenceB Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Product Ideas Manager Posts: 18 Dataiker
    Options

    Hi Nathan,

    Yes, this is right.

    Best,

    Clémence

Setup Info
    Tags
      Help me…