RAG with Vector Stores
RAG with Vector Stores
Summary of RAG Workflows
Vector Store Workflow for Conversational Exchanges:
Generate semantic embedding for each new conversation, add the message body to a vector store for retrieval, query the vector store for relevant messages to fill in the LLM context.
Modified Workflow for an Arbitrary Document:
Divide the document into chunks and process them into useful messages, Generate semantic embedding for each new document chunk, add the chunk bodies to a vector store for retrieval, Query the vector store for relevant chunks to fill in the LLM context (optional: Modify/synthesize results for better LLM results).
Extended Workflow for a Directory of Arbitrary Documents:
Divide the document into chunks and process them into useful messages, Generate semantic embedding for each new document chunk, add the chunk bodies to a vector store for retrieval, Optional: Exploit hierarchical or metadata structures for larger systems, Query the vector store for relevant chunks to fill in the LLM context (optional: Modify/synthesize results for better LLM results).
RAG for Conversation History
Getting a conversation
Consider a conversation crafted using Llama-13B between a chat agent and a blue bear named Beras.
conversation = [ ## This conversation was generated partially by an AI system, and modified to exhibit desirable properties
"[User] Hello! My name is Beras, and I'm a big blue bear! Can you please tell me about the rocky mountains?",
"[Agent] The Rocky Mountains are a beautiful and majestic range of mountains that stretch across North America",
"[Beras] Wow, that sounds amazing! Ive never been to the Rocky Mountains before, but Ive heard many great things about them.",
"[Agent] I hope you get to visit them someday, Beras! It would be a great adventure for you!"
"[Beras] Thank you for the suggestion! Ill definitely keep it in mind for the future.",
"[Agent] In the meantime, you can learn more about the Rocky Mountains by doing some research online or watching documentaries about them."
"[Beras] I live in the arctic, so I'm not used to the warm climate there. I was just curious, ya know!",
"[Agent] Absolutely! Lets continue the conversation and explore more about the Rocky Mountains and their significance!"
]
Constructing Vector Store Retriever
Vector Stores, or vector storage systems, abstract away most of the low-level details of the embedding/comparison strategies and provide a simple interface to load and compare vectors.
We can feed our conversation into a FAISS vector store via the from_texts
constructor. This will take our conversational data and the embedding model to create a searchable index over our discussion.
%%time
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings
from langchain.vectorstores import FAISS
# Streamlined from_texts FAISS vectorstore construction from text list
convstore = FAISS.from_texts(conversation, embedding=embedder)
# reinterpret
retriever = convstore.as_retriever()
The retriever can now be used like any other LangChain runnable to query the vector store for some relevant documents:
# invoke query into retrieer:
pprint(retriever.invoke("What is your name?"))
[
Document(
page_content="[User] Hello! My name is Beras, and I'm a big blue bear! Can you please tell me about the
rocky mountains?"
),
Document(
page_content='[Agent] Absolutely! Lets continue the conversation and explore more about the Rocky Mountains
and their significance!'
),
Document(
page_content='[Agent] I hope you get to visit them someday, Beras! It would be a great adventure for
you![Beras] Thank you for the suggestion! Ill definitely keep it in mind for the future.'
),
Document(
page_content="[Agent] In the meantime, you can learn more about the Rocky Mountains by doing some research
online or watching documentaries about them.[Beras] I live in the arctic, so I'm not used to the warm climate
there. I was just curious, ya know!"
)
]
pprint(retriever.invoke("Where are the Rocky Mountains?"))
[
Document(
page_content='[Agent] The Rocky Mountains are a beautiful and majestic range of mountains that stretch
across North America'
),
Document(
page_content="[Agent] In the meantime, you can learn more about the Rocky Mountains by doing some research
online or watching documentaries about them.[Beras] I live in the arctic, so I'm not used to the warm climate
there. I was just curious, ya know!"
),
Document(
page_content="[User] Hello! My name is Beras, and I'm a big blue bear! Can you please tell me about the
rocky mountains?"
),
Document(
page_content='[Agent] Absolutely! Lets continue the conversation and explore more about the Rocky Mountains
and their significance!'
)
]
A retrieval of "Beras" for "your name" may be problematic for the chatbot if provided out of context. Anticipating the potential problems and creating synergies between your LLM components can increase the likelihood of good RAG behavior, so keep an eye out for such pitfalls and opportunities.
Incorporating Conversation Retrieval Into Chain
Now that we have our loaded retriever component as a chain, we can incorporate it into our existing chat system as before. Specifically, we can start with an always-on RAG formulation where:
A retriever is always retrieving context by default.
A generator is acting on the retrieved context.
from langchain.document_transformers import LongContextReorder
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnableLambda
from langchain.schema.runnable.passthrough import RunnableAssign
from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddings
from functools import partial
from operator import itemgetter
def RPrint(preface=""):
"""Simple passthrough "prints, then returns" chain"""
def print_and_return(x, preface):
if preface: print(preface, end="")
pprint(x)
return x
return RunnableLambda(partial(print_and_return, preface=preface))
# RPrint, 打印并返回Runnable
def docs2str(docs, title="Document"):
"""Useful utility for making chunks into context string. Optional, but useful"""
out_str = ""
for doc in docs:
doc_name = getattr(doc, 'metadata', {}).get('Title', title)
if doc_name:
out_str += f"[Quote from {doc_name}] "
out_str += getattr(doc, 'page_content', str(doc)) + "\n"
return out_str
# docs2str 把文档串联成字符串
long_reorder = RunnableLambda(LongContextReorder().transform_documents)
# 长文档重排(原理待解明)
context_prompt = ChatPromptTemplate.from_template(
"Answer the question using only the context"
"\n\nRetrieved Context: {context}"
"\n\nUser Question: {question}"
"\nAnswer the user conversationally. User is not aware of context."
)
# 获取上下文和问题,但不让用户意识到上下文被特意使用
chain = (
{
'context': convstore.as_retriever() | long_reorder | docs2str,
# 文档检索器 重排(可选) 转字符串
# 注意,这里并不需要显式地把question传入检索器,invoke之后问题会自己在里边走
'question': (lambda x:x)
}
| context_prompt # 把上边俩东西塞进prompt传给llm
# | RPrint() # 中间结果
| instruct_llm
| StrOutputParser()
)
pprint(chain.invoke("Where does Beras live?"))
# convstore里有之前留下的对话内容,所以直接问就行,如果没有,它就答不出来
Based on the context provided, Beras lives in the Arctic. It seems like the Rocky Mountains are a fascinating place
for Beras to learn about, considering the vastly different climate from where they currently reside!
Automatic Conversation Storage
we can perform one last integration to allow our conversation to add new entries to our conversation: a runnable that calls the add_texts
method for us to update the store state.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from operator import itemgette
convstore = FAISS.from_texts(conversation, embedding=embedder)
def save_memory_and_get_output(d, vstore):
"""Accepts 'input'/'output' dictionary and saves to convstore"""
vstore.add_texts([f"User said {d.get('input')}", f"Agent said {d.get('output')}"])
return d.get('output')
chat_prompt = ChatPromptTemplate.from_template(
"Answer the question using only the context"
"\n\nRetrieved Context: {context}"
"\n\nUser Question: {input}"
"\nAnswer the user conversationally. Make sure the conversation flows naturally.\n"
"[Agent]"
)
conv_chain = (
{
'context': convstore.as_retriever() | long_reorder | docs2str,
'input': (lambda x:x)
}
| RunnableAssign({'output' : chat_prompt | instruct_llm | StrOutputParser()})
| partial(save_memory_and_get_output, vstore=convstore)
# partial把save_memory_and_get_output函数的vstore参数值固定为convstore并返回一个新函数,这样可以确保对话在更新,不过我觉得其实没必要,每次传新记录进来也行
)
pprint(conv_chain.invoke("I'm glad you agree! I can't wait to get some ice cream there! It's such a good food!"))
print()
pprint(conv_chain.invoke("Can you guess what my favorite food is?"))
print()
pprint(conv_chain.invoke("Actually, my favorite is honey! Not sure where you got that idea?"))
print()
pprint(conv_chain.invoke("I see! Fair enough! Do you know my favorite food now?"))
This approach ensures some amount of consolidation which can keep the context length from getting out of hand. It's still not a full-proof strategy on its own, but it's a stark improvement for unstructured conversations (and doesn't even require a strong instruction-tuned model to perform slot-filling).
RAG For Document Chunk Retrieval
Loading And Chunking Your Documents
Documents are cut off prior to the "References" section if one exists. This will keep our system from considering the citations and appendix sections, which tend to be long and distracting.
A chunk that lists the available documents is inserted to provide a high-level view of all available documents in a single chunk. If your pipeline does not provide metadata on each retrieval, this is a useful component and can even be listed among a list of higher-priority pieces if appropriate.
Additionally, the metadata entries are also inserted to provide general information. Ideally, there would also be some synthetic chunks that merge the metadata into interesting cross-document chunks.
import json
from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import ArxivLoader
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, chunk_overlap=100,
separators=["\n\n", "\n", ".", ";", ",", " "],
)
print("Loading Documents")
docs = [
ArxivLoader(query="1706.03762").load(), ## Attention Is All You Need Paper
ArxivLoader(query="1810.04805").load(), ## BERT Paper
ArxivLoader(query="2005.11401").load(), ## RAG Paper
ArxivLoader(query="2205.00445").load(), ## MRKL Paper
ArxivLoader(query="2310.06825").load(), ## Mistral Paper
ArxivLoader(query="2306.05685").load(), ## LLM-as-a-Judge
]
for doc in docs:
content = json.dumps(doc[0].page_content)
if "References" in content:
doc[0].page_content = content[:content.index("References")]
print("Chunking Documents")
docs_chunks = [text_splitter.split_documents(doc) for doc in docs]
docs_chunks = [[c for c in dchunks if len(c.page_content) > 200] for dchunks in docs_chunks]
doc_string = "Available Documents:"
doc_metadata = []
for chunks in docs_chunks:
metadata = getattr(chunks[0], 'metadata', {})
doc_string += "\n - " + metadata.get('Title')
doc_metadata += [str(metadata)]
extra_chunks = [doc_string] + doc_metadata
pprint(doc_string, '\n')
for i, chunks in enumerate(docs_chunks):
print(f"Document {i}")
print(f" - # Chunks: {len(chunks)}")
print(f" - Metadata: ")
pprint(chunks[0].metadata)
print()
Construct Document Vector Stores
%%time
print("Constructing Vector Stores")
vecstores = [FAISS.from_texts(extra_chunks, embedder)]
vecstores += [FAISS.from_documents(doc_chunks, embedder) for doc_chunks in docs_chunks]
Combine indices into a single one using the following utility:
from faiss import IndexFlatL2
from langchain_community.docstore.in_memory import InMemoryDocstore
embed_dims = len(embedder.embed_query("test"))
def default_FAISS():
'''Useful utility for making an empty FAISS vectorstore'''
return FAISS(
embedding_function=embedder,
index=IndexFlatL2(embed_dims),
docstore=InMemoryDocstore(),
index_to_docstore_id={},
normalize_L2=False
)
def aggregate_vstores(vectorstores):
## Initialize an empty FAISS Index and merge others into it
## We'll use default_faiss for simplicity, though it's tied to your embedder by reference
agg_vstore = default_FAISS()
for vstore in vectorstores:
agg_vstore.merge_from(vstore)
return agg_vstore
## Unintuitive optimization; merge_from seems to optimize constituent vector stores away
docstore = aggregate_vstores(vecstores)
print(f"Constructed aggregate docstore with {len(docstore.docstore._dict)} chunks")
Implement the RAG Chain
from langchain.document_transformers import LongContextReorder
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.passthrough import RunnableAssign
from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import gradio as gr
from functools import partial
from operator import itemgetter
# NVIDIAEmbeddings.get_available_models()
embedder = NVIDIAEmbeddings(model="nvidia/nv-embed-v1", truncate="END")
# ChatNVIDIA.get_available_models()
instruct_llm = ChatNVIDIA(model="mistralai/mixtral-8x7b-instruct-v0.1")
# instruct_llm = ChatNVIDIA(model="meta/llama-3.1-8b-instruct")
convstore = default_FAISS()
def save_memory_and_get_output(d, vstore):
"""Accepts 'input'/'output' dictionary and saves to convstore"""
vstore.add_texts([
f"User previously responded with {d.get('input')}",
f"Agent previously responded with {d.get('output')}"
])
return d.get('output')
initial_msg = (
"Hello! I am a document chat agent here to help the user!"
f" I have access to the following documents: {doc_string}\n\nHow can I help you?"
)
chat_prompt = ChatPromptTemplate.from_messages([("system",
"You are a document chatbot. Help the user as they ask questions about documents."
" User messaged just asked: {input}\n\n"
" From this, we have retrieved the following potentially-useful info: "
" Conversation History Retrieval:\n{history}\n\n"
" Document Retrieval:\n{context}\n\n"
" (Answer only from retrieval. Only cite sources that are used. Make your response conversational.)"
), ('user', '{input}')])
stream_chain = chat_prompt| RPrint() | instruct_llm | StrOutputParser()
retrieval_chain = (
{'input' : (lambda x: x)}
| RunnableAssign({'history' : itemgetter('input') | convstore.as_retriever() | long_reorder | docs2str})
| RunnableAssign({'context' : itemgetter('input') | docstore.as_retriever() | long_reorder | docs2str})
| RPrint()
)
def chat_gen(message, history=[], return_buffer=True):
buffer = ""
## First perform the retrieval based on the input message
retrieval = retrieval_chain.invoke(message)
line_buffer = ""
## Then, stream the results of the stream_chain
for token in stream_chain.stream(retrieval):
buffer += token
## If you're using standard print, keep line from getting too long
yield buffer if return_buffer else token
## Lastly, save the chat exchange to the conversation memory buffer
save_memory_and_get_output({'input': message, 'output': buffer}, convstore)
## Start of Agent Event Loop
test_question = "Tell me about RAG!" ## <- modify as desired
## Before you launch your gradio interface, make sure your thing works
for response in chat_gen(test_question, return_buffer=False):
print(response, end='')
Structure of this retrieval chain:
Interface
chatbot = gr.Chatbot(value = [[None, initial_msg]])
demo = gr.ChatInterface(chat_gen, chatbot=chatbot).queue()
try:
demo.launch(debug=True, share=True, show_api=False)
demo.close()
except Exception as e:
demo.close()
print(e)
raise e
This will start an interface, and all data will be print on shell at the same time: