mirror of
https://github.com/QuivrHQ/quivr.git
synced 2024-12-15 09:32:22 +03:00
🚑 use multithreading instead of multiprocessing for container in ECS (#525)
This commit is contained in:
parent
22e8189057
commit
0edc4f628c
@ -1,4 +1,4 @@
|
||||
import multiprocessing as mp
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import List
|
||||
|
||||
from langchain.embeddings.openai import OpenAIEmbeddings
|
||||
@ -100,25 +100,18 @@ def get_unique_files_from_vector_ids(vectors_ids: List[int]):
|
||||
"""
|
||||
print("vectors_ids", vectors_ids)
|
||||
|
||||
manager = mp.Manager()
|
||||
vectors_responses = manager.list()
|
||||
|
||||
# constants
|
||||
BATCH_SIZE = 5
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# multiprocessing pool initialization
|
||||
with ThreadPoolExecutor() as executor:
|
||||
futures = []
|
||||
for i in range(0, len(vectors_ids), BATCH_SIZE):
|
||||
batch_ids = vectors_ids[i:i + BATCH_SIZE]
|
||||
future = executor.submit(process_batch, batch_ids)
|
||||
futures.append(future)
|
||||
|
||||
pool = mp.Pool()
|
||||
results = []
|
||||
for i in range(0, len(vectors_ids), BATCH_SIZE):
|
||||
batch_ids = vectors_ids[i:i + BATCH_SIZE]
|
||||
result = pool.apply_async(process_batch, args=(batch_ids,), error_callback=error_callback)
|
||||
results.append(result)
|
||||
# Retrieve the results
|
||||
vectors_responses = [result.get() for result in results]
|
||||
pool.close()
|
||||
pool.join()
|
||||
# Retrieve the results
|
||||
vectors_responses = [future.result() for future in futures]
|
||||
|
||||
documents = [item for sublist in vectors_responses for item in sublist]
|
||||
print('document', documents)
|
||||
|
Loading…
Reference in New Issue
Block a user