AvinashPolineni's picture
Upload app.py
832bbe1
import os
import streamlit as st
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from dotenv import load_dotenv, find_dotenv
from tempfile import NamedTemporaryFile
# loading environment variables
load_dotenv(find_dotenv(), override=True)
# loading PDF, DOCX and TXT files as LangChain Documents
def load_document(uploaded_file):
with NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file:
tmp_file.write(uploaded_file.getvalue())
tmp_file_path = tmp_file.name
# Use the appropriate loader based on the file extension
extension = os.path.splitext(uploaded_file.name)[1]
if extension == '.pdf':
from langchain.document_loaders import PyPDFLoader
loader = PyPDFLoader(tmp_file_path)
elif extension == '.docx':
from langchain.document_loaders import Docx2txtLoader
loader = Docx2txtLoader(tmp_file_path)
elif extension == '.txt':
from langchain.document_loaders import TextLoader
loader = TextLoader(tmp_file_path)
else:
print('Document format is not supported!')
return None
data = loader.load()
return data
# splitting data in chunks
def chunk_data(data, chunk_size=256, chunk_overlap=20):
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
chunks = text_splitter.split_documents(data)
return chunks
# create embeddings using OpenAIEmbeddings() and save them in a Chroma vector store
def create_embeddings(chunks):
embeddings = OpenAIEmbeddings()
vector_store = Chroma.from_documents(chunks, embeddings)
return vector_store
def ask_and_get_answer(vector_store, q, k=3):
llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=1)
retriever = vector_store.as_retriever(search_type='similarity', search_kwargs={'k': k})
chain = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever)
answer = chain.run(q)
return answer
# calculate embedding cost using tiktoken
def calculate_embedding_cost(texts):
import tiktoken
enc = tiktoken.encoding_for_model('text-embedding-ada-002')
total_tokens = sum([len(enc.encode(page.page_content)) for page in texts])
# print(f'Total Tokens: {total_tokens}')
# print(f'Embedding Cost in USD: {total_tokens / 1000 * 0.0004:.6f}')
return total_tokens, total_tokens / 1000 * 0.0004
# clear the chat history from streamlit session state
def clear_history():
if 'history' in st.session_state:
del st.session_state['history']
if 'uploaded_files_per_tab' not in st.session_state:
st.session_state['uploaded_files_per_tab'] = {}
def main():
st.image('logo.png')
st.subheader('LLM Question-Answering Application 🤖')
# API Key input
with st.sidebar:
api_key = st.text_input('OpenAI API Key:', type='password')
if api_key:
os.environ['OPENAI_API_KEY'] = api_key
# Tab configuration
num_tabs = st.sidebar.number_input("Number of Tabs", min_value=1, max_value=10, value=1)
# Initialize session state for each tab
for i in range(num_tabs):
if f'vector_store_{i}' not in st.session_state:
st.session_state[f'vector_store_{i}'] = None
if f'chat_history_{i}' not in st.session_state:
st.session_state[f'chat_history_{i}'] = []
# Tab interface
tabs = st.tabs([f"Chat {i+1}" for i in range(num_tabs)])
for i, tab in enumerate(tabs):
with tab:
# Document upload and processing
uploaded_file = st.file_uploader(f'Upload a file for Chat {i+1}:', type=['pdf', 'docx', 'txt'], key=f'file_uploader_{i}')
chunk_size = st.number_input('Chunk size:', min_value=100, max_value=2048, value=512, key=f'chunk_size_{i}')
process_file = st.button('Process File', key=f'process_file_{i}')
if process_file and uploaded_file:
data = load_document(uploaded_file)
chunks = chunk_data(data, chunk_size)
tokens, embedding_cost = calculate_embedding_cost(chunks)
vector_store = create_embeddings(chunks)
st.session_state[f'vector_store_{i}'] = vector_store
st.success(f'File for Chat {i+1} uploaded, chunked, and embedded successfully.')
if f'tab_{i}' not in st.session_state['uploaded_files_per_tab']:
st.session_state['uploaded_files_per_tab'][f'tab_{i}'] = []
st.session_state['uploaded_files_per_tab'][f'tab_{i}'].append(uploaded_file.name)
# Chat interaction
user_input = st.text_area("Your question:", key=f'input_{i}', height=100)
submit_button = st.button("Send", key=f'send_{i}')
st.write(f"Uploaded Files for Chat {i+1}:")
for file_name in st.session_state['uploaded_files_per_tab'].get(f'tab_{i}', []):
st.write(file_name)
if submit_button and st.session_state[f'vector_store_{i}']:
modified_input = f"{user_input} Answer only based on the text you received as input. Don't search external sources. If you can't answer then return `I DONT KNOW`."
response = ask_and_get_answer(st.session_state[f'vector_store_{i}'], modified_input, 3) # 'k' value is set to 3
st.session_state[f'chat_history_{i}'].append((user_input, response))
# Display chat history
chat_history = "\n\n".join([f"Q: {q}\nA: {a}" for q, a in st.session_state[f'chat_history_{i}']])
st.text_area("Chat History", value=chat_history, key=f'history_{i}', height=300)
# Run the main function
if __name__ == "__main__":
main()
# Additional Resources
st.sidebar.markdown("### Additional Resources")
st.sidebar.markdown("[Checkout Avinash's Hugging Face Profile](https://huggingface.co/AvinashPolineni)")
st.sidebar.markdown("[Checkout Avinash's GitHub Profile](https://github.com/polineniavinash)")
st.sidebar.markdown("[Contact Me on LinkedIn](https://linkedin.com/in/avinash-polineni/)")
# Footer
st.markdown("---")
st.caption("© 2023 Avinash Polineni.")