Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
I have created an Azure_OpenAI connection in our DSS instance. I can connect to the instance from within Dataiku and externally via the dataiku-api-client module. For example:
import dataikuapi
import os
dss_instance = os.getenv('DSS_INSTANCE')
dss_api_key = os.getenv('DSS_API_KEY')
dss_project_key = os.getenv('DSS_PROJECT_KEY')
dss_llm_name = os.getenv('DSS_LLM_NAME')
client = dataikuapi.DSSClient(dss_instance, dss_api_key)
project_obj = client.get_project(dss_project_key)
llm_obj = project_obj.get_llm(dss_llm_name)
completion_obj = llm_obj.new_completion()
completion_obj.with_message(message='Speak only in iambic pentameter!', role='system')
query_obj = completion_obj.with_message(message="Count to 5", role='user')
This is helpful. However, I would like to query the underlying AzureOpenAI instance directly. For example, I would like to access its chat_with_tools function. Is there a way to access the underlying LLM object? Or perhaps there is a function that allows us to query a subset of the built-in functions and pass the custom args via some kwargs parameter?
Essentially, we would like to leverage the built-in logging, and toxicity/PII checks of the LLM Mesh while preserving the underlying API functionality of the LLM-as-a-Service. Is this something we can accomplish?
Operating system used: RHEL 7.9
A member from our team looked into the source code of the dataikuapi module in our /usr/lib/python3.x/site-packages/ folder. We were able to extract the core functionality of the dataikuapi call to the LLM mesh with just the requests package. As a result we created our own DSSOpenAI class that allows us to preserve the SDK functionality of the OpenAI library while still routing the request through the LLM Mesh. We currently use llama_index, but replace the import with which ever gateway/session object you use.
We wanted to share our solution here for posterity. Perhaps it may be helpful to some of your other clients, @ClemenceB.
from llama_index.llms.openai import OpenAI
from typing import (
List,
Any,
Callable,
Dict,
List,
Optional,
Sequence,
)
from requests.auth import HTTPBasicAuth
from requests import Session
import json
import httpx
from llama_index.core.base.llms.types import (
ChatMessage,
ChatResponse,
CompletionResponse,
MessageRole,
)
from llama_index.core.callbacks import CallbackManager
from llama_index.core.types import BaseOutputParser, PydanticProgramMode
from llama_index.llms.openai.base import llm_retry_decorator
class DSSOpenAI(OpenAI):
# TODO Add async and streaming logic as needed
dss_project: str = ""
dss_model_deployment: str = ""
def __init__(
self,
model: str = None,
temperature: float = 1.0,
max_tokens: Optional[int] = None,
additional_kwargs: Optional[Dict[str, Any]] = None,
max_retries: int = 3,
timeout: float = 60.0,
reuse_client: bool = True,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
api_version: Optional[str] = None,
callback_manager: Optional[CallbackManager] = None,
default_headers: Optional[Dict[str, str]] = None,
http_client: Optional[httpx.Client] = None,
# base class
system_prompt: Optional[str] = None,
messages_to_prompt: Optional[Callable[[Sequence[ChatMessage]], str]] = None,
completion_to_prompt: Optional[Callable[[str], str]] = None,
pydantic_program_mode: PydanticProgramMode = PydanticProgramMode.DEFAULT,
output_parser: Optional[BaseOutputParser] = None,
**kwargs: Any,
) -> None:
super().__init__(
model=model,
temperature=temperature,
max_tokens=max_tokens,
additional_kwargs=additional_kwargs,
max_retries=max_retries,
callback_manager=callback_manager,
api_key=api_key,
api_version=api_version,
api_base=api_base,
timeout=timeout,
reuse_client=reuse_client,
default_headers=default_headers,
system_prompt=system_prompt,
messages_to_prompt=messages_to_prompt,
completion_to_prompt=completion_to_prompt,
pydantic_program_mode=pydantic_program_mode,
output_parser=output_parser,
**kwargs,
)
self.dss_project = additional_kwargs["dss_project"]
self.dss_model_deployment = additional_kwargs["dss_model_deployment"]
def class_name(cls) -> str:
return "dss_openai_llm"
def _get_client(self):
raise NotImplementedError(
"DSS OpenAI interface does not support this functionality."
)
# DSS SPECIFIC FUNCTIONS
def _format_dss_payload(self, messages: list, settings: dict) -> dict:
""" """
data = {
"queries": [{"messages": messages}],
"settings": settings,
"llmId": self.dss_model_deployment,
}
data = json.dumps(data)
return data
def _auth_dss_session(self, session: Session) -> Session:
""" """
session = HTTPBasicAuth(self.api_key, "")(session)
return session
def _define_dss_session(self, verify: bool = False) -> Session:
""" """
session = Session()
session.verify = verify
session = self._auth_dss_session(session=session)
return session
def _get_dss_url(self) -> str:
""" """
url = f"{self.api_base}/dip/publicapi/projects/{self.dss_project}/llms/completions"
return url
def _parse_messages_to_dss(self, messages: List[ChatMessage]) -> list:
"""
Converts llama index ChatMessages to a DSS understandable type
"""
return_list = []
for chat_message in messages:
message_dict = {
"content": chat_message.content,
"role": chat_message.role.value,
}
return_list.append(message_dict)
return return_list
def _dss_call_llm(self, messages: List[ChatMessage], settings: dict = {}) -> dict:
""" """
# define variables for DSS request
messages = self._parse_messages_to_dss(messages=messages)
data = self._format_dss_payload(messages=messages, settings=settings)
url = self._get_dss_url()
session = self._define_dss_session()
# make the requet
response = session.request(
method="POST", url=url, params=None, data=data, files=None, stream=False
)
# parse to json
response = response.json()
return response
# END OF DSS FUNCTIONS, NOW OVERRIDING LLAMA FUNCTIONS TO INTEGRATE
@llm_retry_decorator
def complete(self, prompt, **kwargs: Any) -> CompletionResponse:
return self._complete(prompt=prompt, **kwargs)
@llm_retry_decorator
def _complete(self, prompt: str, **kwargs: Any) -> CompletionResponse:
""" """
if isinstance(prompt, str):
prompt = [ChatMessage(content=prompt, role=MessageRole.USER)]
response = self._dss_call_llm(messages=prompt)
text = response["responses"][0]["text"]
return CompletionResponse(
text=text,
raw=response,
logprobs=None,
additional_kwargs=self._get_response_token_counts(response),
)
@llm_retry_decorator
def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
return self._chat(messages=messages, **kwargs)
@llm_retry_decorator
def _chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
""" """
response = self._dss_call_llm(messages=messages)
message = response["responses"][0]["text"]
logprobs = None
return ChatResponse(
message=ChatMessage(content=message, role=MessageRole.ASSISTANT),
raw=response,
logprobs=logprobs,
additional_kwargs=self._get_response_token_counts(response),
)
Hi Nathan,
Thank you for your request! I'm assuming that by chat_with_tools function, you're referring to function calls. Is that right?
Adding kwargs parameters, although an interesting request, brings many challenges because some arguments may change the query and response format, making it impossible to automatically apply guardrails (PII detection, toxicity, ...).
However, adding the ability to use function calls among the list of arguments supported in our completion endpoint is something we're investigating.
I hope it answers your question.
Best,
Clรฉmence
Thank you for the follow up, Clรฉmence!
I think I understand. Just to ensure I've got the right message for our team, let me try to summarize what I'm planning to say based on your response.
When querying LMs through Dataiku's LLM Mesh we can access methods, such as...
import dataikuapi
client = dataikuapi.DSSClient(dss_instance, dss_api_key)
project_obj = client.get_project(dss_project_key)
llm_obj = project_obj.get_llm(dss_model_name)
completion_obj = llm_obj.new_completion()
dir(completion_obj)
>> ['__class__', ..., 'cq', 'execute', 'execute_streamed', 'llm', 'settings', 'with_message']
So currently, we can access the methods/objects: cq, execute, execute_streamted, llm, settings, with_message
Our wish of accessing the underlying LLM object, or all the methods of the underlying LLM-as-a-Service object (e.g. those available in the AzureOpenAI module, such as chat_with_tools, just to name one) is not something currently available or on the planned roadmap? However, you are currently investigating ways to expose tooling functionality for LLMs that support it.
Is that a fair summary of the current state of things?
Thanks again!
Hi Nathan,
Yes, this is right.
Best,
Clรฉmence
A member from our team looked into the source code of the dataikuapi module in our /usr/lib/python3.x/site-packages/ folder. We were able to extract the core functionality of the dataikuapi call to the LLM mesh with just the requests package. As a result we created our own DSSOpenAI class that allows us to preserve the SDK functionality of the OpenAI library while still routing the request through the LLM Mesh. We currently use llama_index, but replace the import with which ever gateway/session object you use.
We wanted to share our solution here for posterity. Perhaps it may be helpful to some of your other clients, @ClemenceB.
from llama_index.llms.openai import OpenAI
from typing import (
List,
Any,
Callable,
Dict,
List,
Optional,
Sequence,
)
from requests.auth import HTTPBasicAuth
from requests import Session
import json
import httpx
from llama_index.core.base.llms.types import (
ChatMessage,
ChatResponse,
CompletionResponse,
MessageRole,
)
from llama_index.core.callbacks import CallbackManager
from llama_index.core.types import BaseOutputParser, PydanticProgramMode
from llama_index.llms.openai.base import llm_retry_decorator
class DSSOpenAI(OpenAI):
# TODO Add async and streaming logic as needed
dss_project: str = ""
dss_model_deployment: str = ""
def __init__(
self,
model: str = None,
temperature: float = 1.0,
max_tokens: Optional[int] = None,
additional_kwargs: Optional[Dict[str, Any]] = None,
max_retries: int = 3,
timeout: float = 60.0,
reuse_client: bool = True,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
api_version: Optional[str] = None,
callback_manager: Optional[CallbackManager] = None,
default_headers: Optional[Dict[str, str]] = None,
http_client: Optional[httpx.Client] = None,
# base class
system_prompt: Optional[str] = None,
messages_to_prompt: Optional[Callable[[Sequence[ChatMessage]], str]] = None,
completion_to_prompt: Optional[Callable[[str], str]] = None,
pydantic_program_mode: PydanticProgramMode = PydanticProgramMode.DEFAULT,
output_parser: Optional[BaseOutputParser] = None,
**kwargs: Any,
) -> None:
super().__init__(
model=model,
temperature=temperature,
max_tokens=max_tokens,
additional_kwargs=additional_kwargs,
max_retries=max_retries,
callback_manager=callback_manager,
api_key=api_key,
api_version=api_version,
api_base=api_base,
timeout=timeout,
reuse_client=reuse_client,
default_headers=default_headers,
system_prompt=system_prompt,
messages_to_prompt=messages_to_prompt,
completion_to_prompt=completion_to_prompt,
pydantic_program_mode=pydantic_program_mode,
output_parser=output_parser,
**kwargs,
)
self.dss_project = additional_kwargs["dss_project"]
self.dss_model_deployment = additional_kwargs["dss_model_deployment"]
def class_name(cls) -> str:
return "dss_openai_llm"
def _get_client(self):
raise NotImplementedError(
"DSS OpenAI interface does not support this functionality."
)
# DSS SPECIFIC FUNCTIONS
def _format_dss_payload(self, messages: list, settings: dict) -> dict:
""" """
data = {
"queries": [{"messages": messages}],
"settings": settings,
"llmId": self.dss_model_deployment,
}
data = json.dumps(data)
return data
def _auth_dss_session(self, session: Session) -> Session:
""" """
session = HTTPBasicAuth(self.api_key, "")(session)
return session
def _define_dss_session(self, verify: bool = False) -> Session:
""" """
session = Session()
session.verify = verify
session = self._auth_dss_session(session=session)
return session
def _get_dss_url(self) -> str:
""" """
url = f"{self.api_base}/dip/publicapi/projects/{self.dss_project}/llms/completions"
return url
def _parse_messages_to_dss(self, messages: List[ChatMessage]) -> list:
"""
Converts llama index ChatMessages to a DSS understandable type
"""
return_list = []
for chat_message in messages:
message_dict = {
"content": chat_message.content,
"role": chat_message.role.value,
}
return_list.append(message_dict)
return return_list
def _dss_call_llm(self, messages: List[ChatMessage], settings: dict = {}) -> dict:
""" """
# define variables for DSS request
messages = self._parse_messages_to_dss(messages=messages)
data = self._format_dss_payload(messages=messages, settings=settings)
url = self._get_dss_url()
session = self._define_dss_session()
# make the requet
response = session.request(
method="POST", url=url, params=None, data=data, files=None, stream=False
)
# parse to json
response = response.json()
return response
# END OF DSS FUNCTIONS, NOW OVERRIDING LLAMA FUNCTIONS TO INTEGRATE
@llm_retry_decorator
def complete(self, prompt, **kwargs: Any) -> CompletionResponse:
return self._complete(prompt=prompt, **kwargs)
@llm_retry_decorator
def _complete(self, prompt: str, **kwargs: Any) -> CompletionResponse:
""" """
if isinstance(prompt, str):
prompt = [ChatMessage(content=prompt, role=MessageRole.USER)]
response = self._dss_call_llm(messages=prompt)
text = response["responses"][0]["text"]
return CompletionResponse(
text=text,
raw=response,
logprobs=None,
additional_kwargs=self._get_response_token_counts(response),
)
@llm_retry_decorator
def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
return self._chat(messages=messages, **kwargs)
@llm_retry_decorator
def _chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse:
""" """
response = self._dss_call_llm(messages=messages)
message = response["responses"][0]["text"]
logprobs = None
return ChatResponse(
message=ChatMessage(content=message, role=MessageRole.ASSISTANT),
raw=response,
logprobs=logprobs,
additional_kwargs=self._get_response_token_counts(response),
)