import os from tempfile import NamedTemporaryFile import tempfile from io import BytesIO import time import openai from langchain.document_loaders import TextLoader from langchain.embeddings.openai import OpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from utils import compute_sha1_from_content from langchain.schema import Document from fastapi import UploadFile # # Create a function to transcribe audio using Whisper # def _transcribe_audio(api_key, audio_file, stats_db): # openai.api_key = api_key # transcript = "" # with BytesIO(audio_file.read()) as audio_bytes: # # Get the extension of the uploaded file # file_extension = os.path.splitext(audio_file.name)[-1] # # Create a temporary file with the uploaded audio data and the correct extension # with tempfile.NamedTemporaryFile(delete=True, suffix=file_extension) as temp_audio_file: # temp_audio_file.write(audio_bytes.read()) # temp_audio_file.seek(0) # Move the file pointer to the beginning of the file # transcript = openai.Audio.translate("whisper-1", temp_audio_file) # return transcript async def process_audio(vector_store, upload_file: UploadFile, stats_db): file_sha = "" dateshort = time.strftime("%Y%m%d-%H%M%S") file_meta_name = f"audiotranscript_{dateshort}.txt" ### uploaded file to file object openai_api_key = os.environ.get("OPENAI_API_KEY") # Here, we're writing the uploaded file to a temporary file, so we can use it with your existing code. with tempfile.NamedTemporaryFile(delete=False, suffix=upload_file.filename) as tmp_file: await upload_file.seek(0) content = await upload_file.read() tmp_file.write(content) tmp_file.flush() tmp_file.close() with open(tmp_file.name, "rb") as audio_file: transcript = openai.Audio.transcribe("whisper-1", audio_file) file_sha = compute_sha1_from_content(transcript.text.encode("utf-8")) file_size = len(transcript.text.encode("utf-8")) print(file_size) ## Load chunk size and overlap from sidebar chunk_size = 500 chunk_overlap = 0 text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=chunk_size, chunk_overlap=chunk_overlap) print(transcript) print("#########") texts = text_splitter.split_text(transcript) docs_with_metadata = [Document(page_content=text, metadata={"file_sha1": file_sha,"file_size": file_size, "file_name": file_meta_name, "chunk_size": chunk_size, "chunk_overlap": chunk_overlap, "date": dateshort}) for text in texts] # if st.secrets.self_hosted == "false": # add_usage(stats_db, "embedding", "audio", metadata={"file_name": file_meta_name,"file_type": ".txt", "chunk_size": chunk_size, "chunk_overlap": chunk_overlap}) vector_store.add_documents(docs_with_metadata) return vector_store