|
import os |
|
import streamlit as st |
|
from langchain.embeddings.openai import OpenAIEmbeddings |
|
from langchain.vectorstores import Chroma |
|
from langchain.chains import RetrievalQA |
|
from langchain.chat_models import ChatOpenAI |
|
from dotenv import load_dotenv, find_dotenv |
|
from tempfile import NamedTemporaryFile |
|
|
|
|
|
load_dotenv(find_dotenv(), override=True) |
|
|
|
|
|
def load_document(uploaded_file): |
|
with NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file: |
|
tmp_file.write(uploaded_file.getvalue()) |
|
tmp_file_path = tmp_file.name |
|
|
|
|
|
extension = os.path.splitext(uploaded_file.name)[1] |
|
if extension == '.pdf': |
|
from langchain.document_loaders import PyPDFLoader |
|
loader = PyPDFLoader(tmp_file_path) |
|
elif extension == '.docx': |
|
from langchain.document_loaders import Docx2txtLoader |
|
loader = Docx2txtLoader(tmp_file_path) |
|
elif extension == '.txt': |
|
from langchain.document_loaders import TextLoader |
|
loader = TextLoader(tmp_file_path) |
|
else: |
|
print('Document format is not supported!') |
|
return None |
|
|
|
data = loader.load() |
|
return data |
|
|
|
|
|
def chunk_data(data, chunk_size=256, chunk_overlap=20): |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) |
|
chunks = text_splitter.split_documents(data) |
|
return chunks |
|
|
|
|
|
|
|
def create_embeddings(chunks): |
|
embeddings = OpenAIEmbeddings() |
|
vector_store = Chroma.from_documents(chunks, embeddings) |
|
return vector_store |
|
|
|
|
|
def ask_and_get_answer(vector_store, q, k=3): |
|
llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=1) |
|
retriever = vector_store.as_retriever(search_type='similarity', search_kwargs={'k': k}) |
|
chain = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever) |
|
|
|
answer = chain.run(q) |
|
return answer |
|
|
|
|
|
|
|
def calculate_embedding_cost(texts): |
|
import tiktoken |
|
enc = tiktoken.encoding_for_model('text-embedding-ada-002') |
|
total_tokens = sum([len(enc.encode(page.page_content)) for page in texts]) |
|
|
|
|
|
return total_tokens, total_tokens / 1000 * 0.0004 |
|
|
|
def clear_history(): |
|
if 'history' in st.session_state: |
|
del st.session_state['history'] |
|
|
|
if 'uploaded_files_per_tab' not in st.session_state: |
|
st.session_state['uploaded_files_per_tab'] = {} |
|
|
|
|
|
def main(): |
|
st.image('logo.png') |
|
st.subheader('LLM Question-Answering Application 🤖') |
|
|
|
|
|
with st.sidebar: |
|
api_key = st.text_input('OpenAI API Key:', type='password') |
|
if api_key: |
|
os.environ['OPENAI_API_KEY'] = api_key |
|
|
|
|
|
num_tabs = st.sidebar.number_input("Number of Tabs", min_value=1, max_value=10, value=1) |
|
|
|
|
|
for i in range(num_tabs): |
|
if f'vector_store_{i}' not in st.session_state: |
|
st.session_state[f'vector_store_{i}'] = None |
|
if f'chat_history_{i}' not in st.session_state: |
|
st.session_state[f'chat_history_{i}'] = [] |
|
|
|
|
|
tabs = st.tabs([f"Chat {i+1}" for i in range(num_tabs)]) |
|
for i, tab in enumerate(tabs): |
|
with tab: |
|
|
|
uploaded_file = st.file_uploader(f'Upload a file for Chat {i+1}:', type=['pdf', 'docx', 'txt'], key=f'file_uploader_{i}') |
|
chunk_size = st.number_input('Chunk size:', min_value=100, max_value=2048, value=512, key=f'chunk_size_{i}') |
|
process_file = st.button('Process File', key=f'process_file_{i}') |
|
if process_file and uploaded_file: |
|
data = load_document(uploaded_file) |
|
chunks = chunk_data(data, chunk_size) |
|
tokens, embedding_cost = calculate_embedding_cost(chunks) |
|
vector_store = create_embeddings(chunks) |
|
st.session_state[f'vector_store_{i}'] = vector_store |
|
st.success(f'File for Chat {i+1} uploaded, chunked, and embedded successfully.') |
|
if f'tab_{i}' not in st.session_state['uploaded_files_per_tab']: |
|
st.session_state['uploaded_files_per_tab'][f'tab_{i}'] = [] |
|
st.session_state['uploaded_files_per_tab'][f'tab_{i}'].append(uploaded_file.name) |
|
|
|
|
|
user_input = st.text_area("Your question:", key=f'input_{i}', height=100) |
|
submit_button = st.button("Send", key=f'send_{i}') |
|
st.write(f"Uploaded Files for Chat {i+1}:") |
|
for file_name in st.session_state['uploaded_files_per_tab'].get(f'tab_{i}', []): |
|
st.write(file_name) |
|
if submit_button and st.session_state[f'vector_store_{i}']: |
|
modified_input = f"{user_input} Answer only based on the text you received as input. Don't search external sources. If you can't answer then return `I DONT KNOW`." |
|
response = ask_and_get_answer(st.session_state[f'vector_store_{i}'], modified_input, 3) |
|
st.session_state[f'chat_history_{i}'].append((user_input, response)) |
|
|
|
|
|
chat_history = "\n\n".join([f"Q: {q}\nA: {a}" for q, a in st.session_state[f'chat_history_{i}']]) |
|
st.text_area("Chat History", value=chat_history, key=f'history_{i}', height=300) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|
|
st.sidebar.markdown("### Additional Resources") |
|
st.sidebar.markdown("[Checkout Avinash's Hugging Face Profile](https://huggingface.co/AvinashPolineni)") |
|
st.sidebar.markdown("[Checkout Avinash's GitHub Profile](https://github.com/polineniavinash)") |
|
st.sidebar.markdown("[Contact Me on LinkedIn](https://linkedin.com/in/avinash-polineni/)") |
|
|
|
|
|
st.markdown("---") |
|
st.caption("© 2023 Avinash Polineni.") |