Creación de un bot de chat RAG con la API NoSQL de Azure Cosmos DB
En esta guía, se muestra cómo crear una aplicación de Patrón RAG mediante un subconjunto del conjunto de datos Movie Lens. En este ejemplo se aprovecha el SDK de Python para Azure Cosmos DB for NoSQL para ejecutar el vector de búsqueda de RAG, almacenar y recuperar el historial de chat, y almacenar vectores del historial de chat para usarlos como caché semántica. Se usa Azure OpenAI para generar inserciones y finalizaciones del modelo de lenguaje grande (LLM).
Al final, se creará una experiencia de usuario sencilla con Gradio para permitir que los usuarios escriban preguntas y muestren respuestas generadas por Azure OpenAI o servidas desde la caché. Las respuestas también mostrarán un tiempo transcurrido para mostrar el impacto del almacenamiento en caché en el rendimiento frente a la generación de una respuesta.
Sugerencia
Puede encontrar el ejemplo completo del cuaderno de Python aquí.
Para obtener más ejemplos de RAG, visite: AzureDataRetrievalAugmentedGenerationSamples
Importante
Este ejemplo requiere configurar cuentas para Azure Cosmos DB for NoSQL y Azure OpenAI. Para empezar, visite lo siguiente:
1. Instalar los paquetes necesarios
Instale los paquetes de Python necesarios para interactuar con Azure Cosmos DB y otros servicios.
! pip install --user python-dotenv
! pip install --user aiohttp
! pip install --user openai
! pip install --user gradio
! pip install --user ijson
! pip install --user nest_asyncio
! pip install --user tenacity
# Note: ensure you have azure-cosmos version 4.7 or higher installed
! pip install --user azure-cosmos
2. Inicialización de la conexión de cliente
Rellene sample_env_file.env
con las credenciales adecuadas para Azure Cosmos DB y Azure OpenAI.
cosmos_uri = "https://<replace with cosmos db account name>.documents.azure.com:443/"
cosmos_key = "<replace with cosmos db account key>"
cosmos_database_name = "database"
cosmos_collection_name = "vectorstore"
cosmos_vector_property_name = "vector"
cosmos_cache_database_name = "database"
cosmos_cache_collection_name = "vectorcache"
openai_endpoint = "<replace with azure openai endpoint>"
openai_key = "<replace with azure openai key>"
openai_type = "azure"
openai_api_version = "2023-05-15"
openai_embeddings_deployment = "<replace with azure openai embeddings deployment name>"
openai_embeddings_model = "<replace with azure openai embeddings model - e.g. text-embedding-3-large"
openai_embeddings_dimensions = "1536"
openai_completions_deployment = "<replace with azure openai completions deployment name>"
openai_completions_model = "<replace with azure openai completions model - e.g. gpt-35-turbo>"
storage_file_url = "https://cosmosdbcosmicworks.blob.core.windows.net/fabcondata/movielens_dataset.json"
# Import the required libraries
import time
import json
import uuid
import urllib
import ijson
import zipfile
from dotenv import dotenv_values
from openai import AzureOpenAI
from azure.core.exceptions import AzureError
from azure.cosmos import PartitionKey, exceptions
from time import sleep
import gradio as gr
# Cosmos DB imports
from azure.cosmos import CosmosClient
# Load configuration
env_name = "sample_env_file.env"
config = dotenv_values(env_name)
cosmos_conn = config['cosmos_uri']
cosmos_key = config['cosmos_key']
cosmos_database = config['cosmos_database_name']
cosmos_collection = config['cosmos_collection_name']
cosmos_vector_property = config['cosmos_vector_property_name']
comsos_cache_db = config['cosmos_cache_database_name']
cosmos_cache = config['cosmos_cache_collection_name']
# Create the Azure Cosmos DB for NoSQL async client for faster data loading
cosmos_client = CosmosClient(url=cosmos_conn, credential=cosmos_key)
openai_endpoint = config['openai_endpoint']
openai_key = config['openai_key']
openai_api_version = config['openai_api_version']
openai_embeddings_deployment = config['openai_embeddings_deployment']
openai_embeddings_dimensions = int(config['openai_embeddings_dimensions'])
openai_completions_deployment = config['openai_completions_deployment']
# Movies file url
storage_file_url = config['storage_file_url']
# Create the OpenAI client
openai_client = AzureOpenAI(azure_endpoint=openai_endpoint, api_key=openai_key, api_version=openai_api_version)
3. Creación de una base de datos y contenedores con directivas de vector
Esta función toma un objeto de base de datos, un nombre de colección, el nombre de la propiedad de documento que almacena los vectores y el número de dimensiones de vector usadas para las inserciones.
db = cosmos_client.create_database_if_not_exists(cosmos_database)
# Create the vector embedding policy to specify vector details
vector_embedding_policy = {
"vectorEmbeddings": [
{
"path":"/" + cosmos_vector_property,
"dataType":"float32",
"distanceFunction":"cosine",
"dimensions":openai_embeddings_dimensions
},
]
}
# Create the vector index policy to specify vector details
indexing_policy = {
"includedPaths": [
{
"path": "/*"
}
],
"excludedPaths": [
{
"path": "/\"_etag\"/?",
"path": "/" + cosmos_vector_property + "/*",
}
],
"vectorIndexes": [
{
"path": "/"+cosmos_vector_property,
"type": "quantizedFlat"
}
]
}
# Create the data collection with vector index (note: this creates a container with 10000 RUs to allow fast data load)
try:
movies_container = db.create_container_if_not_exists(id=cosmos_collection,
partition_key=PartitionKey(path='/id'),
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy,
offer_throughput=10000)
print('Container with id \'{0}\' created'.format(movies_container.id))
except exceptions.CosmosHttpResponseError:
raise
# Create the cache collection with vector index
try:
cache_container = db.create_container_if_not_exists(id=cosmos_cache,
partition_key=PartitionKey(path='/id'),
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy,
offer_throughput=1000)
print('Container with id \'{0}\' created'.format(cache_container.id))
except exceptions.CosmosHttpResponseError:
raise
4. Generación de inserciones desde Azure OpenAI
Esta función vectoriza la entrada del usuario para el vector de búsqueda. Asegúrese de que la dimensionalidad y el modelo usados coinciden con los datos de ejemplo proporcionados, o bien vuelva a generar vectores con el modelo deseado.
from tenacity import retry, stop_after_attempt, wait_random_exponential
import logging
@retry(wait=wait_random_exponential(min=2, max=300), stop=stop_after_attempt(20))
def generate_embeddings(text):
try:
response = openai_client.embeddings.create(
input=text,
model=openai_embeddings_deployment,
dimensions=openai_embeddings_dimensions
)
embeddings = response.model_dump()
return embeddings['data'][0]['embedding']
except Exception as e:
# Log the exception with traceback for easier debugging
logging.error("An error occurred while generating embeddings.", exc_info=True)
raise
5. Carga de datos desde el archivo JSON
Extraiga el conjunto de datos MovieLens pre vectorizado del archivo ZIP (consulte la ubicación en el repositorio de cuadernos aquí).
# Unzip the data file
with zipfile.ZipFile("../../DataSet/Movies/MovieLens-4489-256D.zip", 'r') as zip_ref:
zip_ref.extractall("/Data")
zip_ref.close()
# Load the data file
data = []
with open('/Data/MovieLens-4489-256D.json', 'r') as d:
data = json.load(d)
# View the number of documents in the data (4489)
len(data)
6. Almacenamiento de datos en Azure Cosmos DB
Actualice o inserte (upsert) los datos en Azure Cosmos DB for NoSQL. Los registros se escriben de forma asincrónica.
#The following code to get raw movies data is commented out in favour of
#getting prevectorized data. If you want to vectorize the raw data from
#storage_file_url, uncomment the below, and set vectorizeFlag=True
#data = urllib.request.urlopen(storage_file_url)
#data = json.load(data)
vectorizeFlag=False
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
async def generate_vectors(items, vector_property):
# Create a thread pool executor for the synchronous generate_embeddings
loop = asyncio.get_event_loop()
# Define a function to call generate_embeddings using run_in_executor
async def generate_embedding_for_item(item):
try:
# Offload the sync generate_embeddings to a thread
vectorArray = await loop.run_in_executor(None, generate_embeddings, item['overview'])
item[vector_property] = vectorArray
except Exception as e:
# Log or handle exceptions if needed
logging.error(f"Error generating embedding for item: {item['overview'][:50]}...", exc_info=True)
# Create tasks for all the items to generate embeddings concurrently
tasks = [generate_embedding_for_item(item) for item in items]
# Run all the tasks concurrently and wait for their completion
await asyncio.gather(*tasks)
return items
async def insert_data(vectorize=False):
start_time = time.time() # Record the start time
# If vectorize flag is True, generate vectors for the data
if vectorize:
print("Vectorizing data, please wait...")
global data
data = await generate_vectors(data, "vector")
counter = 0
tasks = []
max_concurrency = 5 # Adjust this value to control the level of concurrency
semaphore = asyncio.Semaphore(max_concurrency)
print("Starting doc load, please wait...")
def upsert_item_sync(obj):
movies_container.upsert_item(body=obj)
async def upsert_object(obj):
nonlocal counter
async with semaphore:
await asyncio.get_event_loop().run_in_executor(None, upsert_item_sync, obj)
# Progress reporting
counter += 1
if counter % 100 == 0:
print(f"Sent {counter} documents for insertion into collection.")
for obj in data:
tasks.append(asyncio.create_task(upsert_object(obj)))
# Run all upsert tasks concurrently within the limits set by the semaphore
await asyncio.gather(*tasks)
end_time = time.time() # Record the end time
duration = end_time - start_time # Calculate the duration
print(f"All {counter} documents inserted!")
print(f"Time taken: {duration:.2f} seconds ({duration:.3f} milliseconds)")
# Run the async function with the vectorize flag set to True or False as needed
await insert_data(vectorizeFlag) # or await insert_data() for default
7. Ejecución del vector de búsqueda
Esta función define un vector de búsqueda sobre los datos de películas y las colecciones de caché de chat.
def vector_search(container, vectors, similarity_score=0.02, num_results=5):
results = container.query_items(
query='''
SELECT TOP @num_results c.overview, VectorDistance(c.vector, @embedding) as SimilarityScore
FROM c
WHERE VectorDistance(c.vector,@embedding) > @similarity_score
ORDER BY VectorDistance(c.vector,@embedding)
''',
parameters=[
{"name": "@embedding", "value": vectors},
{"name": "@num_results", "value": num_results},
{"name": "@similarity_score", "value": similarity_score}
],
enable_cross_partition_query=True,
populate_query_metrics=True
)
results = list(results)
formatted_results = [{'SimilarityScore': result.pop('SimilarityScore'), 'document': result} for result in results]
return formatted_results
8. Obtención del historial de chat reciente
Esta función proporciona contexto de conversación al LLM, lo que le permite tener una mejor conversación con el usuario.
def get_chat_history(container, completions=3):
results = container.query_items(
query='''
SELECT TOP @completions *
FROM c
ORDER BY c._ts DESC
''',
parameters=[
{"name": "@completions", "value": completions},
],
enable_cross_partition_query=True
)
results = list(results)
return results
9. Funciones de finalización de chat
Defina las funciones para controlar el proceso de finalización del chat, incluidas las respuestas de almacenamiento en caché.
def generate_completion(user_prompt, vector_search_results, chat_history):
system_prompt = '''
You are an intelligent assistant for movies. You are designed to provide helpful answers to user questions about movies in your database.
You are friendly, helpful, and informative and can be lighthearted. Be concise in your responses, but still friendly.
- Only answer questions related to the information provided below. Provide at least 3 candidate movie answers in a list.
- Write two lines of whitespace between each answer in the list.
'''
messages = [{'role': 'system', 'content': system_prompt}]
for chat in chat_history:
messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})
messages.append({'role': 'user', 'content': user_prompt})
for result in vector_search_results:
messages.append({'role': 'system', 'content': json.dumps(result['document'])})
response = openai_client.chat.completions.create(
model=openai_completions_deployment,
messages=messages,
temperature=0.1
)
return response.model_dump()
def chat_completion(cache_container, movies_container, user_input):
print("starting completion")
# Generate embeddings from the user input
user_embeddings = generate_embeddings(user_input)
# Query the chat history cache first to see if this question has been asked before
cache_results = get_cache(container=cache_container, vectors=user_embeddings, similarity_score=0.99, num_results=1)
if len(cache_results) > 0:
print("Cached Result\n")
return cache_results[0]['completion'], True
else:
# Perform vector search on the movie collection
print("New result\n")
search_results = vector_search(movies_container, user_embeddings)
print("Getting Chat History\n")
# Chat history
chat_history = get_chat_history(cache_container, 3)
# Generate the completion
print("Generating completions \n")
completions_results = generate_completion(user_input, search_results, chat_history)
print("Caching response \n")
# Cache the response
cache_response(cache_container, user_input, user_embeddings, completions_results)
print("\n")
# Return the generated LLM completion
return completions_results['choices'][0]['message']['content'], False
10. Respuestas generadas por la caché
Guarde los mensajes del usuario y las finalizaciones generadas en la caché para obtener respuestas futuras más rápidas.
def cache_response(container, user_prompt, prompt_vectors, response):
chat_document = {
'id': str(uuid.uuid4()),
'prompt': user_prompt,
'completion': response['choices'][0]['message']['content'],
'completionTokens': str(response['usage']['completion_tokens']),
'promptTokens': str(response['usage']['prompt_tokens']),
'totalTokens': str(response['usage']['total_tokens']),
'model': response['model'],
'vector': prompt_vectors
}
container.create_item(body=chat_document)
def get_cache(container, vectors, similarity_score=0.0, num_results=5):
# Execute the query
results = container.query_items(
query= '''
SELECT TOP @num_results *
FROM c
WHERE VectorDistance(c.vector,@embedding) > @similarity_score
ORDER BY VectorDistance(c.vector,@embedding)
''',
parameters=[
{"name": "@embedding", "value": vectors},
{"name": "@num_results", "value": num_results},
{"name": "@similarity_score", "value": similarity_score},
],
enable_cross_partition_query=True, populate_query_metrics=True)
results = list(results)
return results
11. Creación de una experiencia de usuario simple en Gradio
Cree una interfaz de usuario mediante Gradio para interactuar con la aplicación de inteligencia artificial.
chat_history = []
with gr.Blocks() as demo:
chatbot = gr.Chatbot(label="Cosmic Movie Assistant")
msg = gr.Textbox(label="Ask me about movies in the Cosmic Movie Database!")
clear = gr.Button("Clear")
def user(user_message, chat_history):
start_time = time.time()
response_payload, cached = chat_completion(cache_container, movies_container, user_message)
end_time = time.time()
elapsed_time = round((end
time - start_time) * 1000, 2)
details = f"\n (Time: {elapsed_time}ms)"
if cached:
details += " (Cached)"
chat_history.append([user_message, response_payload + details])
return gr.update(value=""), chat_history
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False)
clear.click(lambda: None, None, chatbot, queue=False)
# Launch the Gradio interface
demo.launch(debug=True)
# Be sure to run this cell to close or restart the Gradio demo
demo.close()
Soluciones de base de datos vectoriales
Extensión pgvector del servidor Azure PostgreSQL
Contenido relacionado
- Evaluación gratuita de 30 días sin suscripción de Azure
- Evaluación gratuita de 90 días y hasta 6000 USD en créditos de rendimiento con la ventaja de Azure AI