mirror of
https://github.com/OpenBMB/ChatDev.git
synced 2024-11-07 18:40:13 +03:00
221 lines
6.9 KiB
Python
221 lines
6.9 KiB
Python
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
|
|
# Licensed under the Apache License, Version 2.0 (the “License”);
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an “AS IS” BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
|
|
import os
|
|
import re
|
|
import zipfile
|
|
from functools import wraps
|
|
from typing import Any, Callable, List, Optional, Set, TypeVar
|
|
|
|
import requests
|
|
import tiktoken
|
|
|
|
from camel.messages import OpenAIMessage
|
|
from camel.typing import ModelType, TaskType
|
|
|
|
F = TypeVar('F', bound=Callable[..., Any])
|
|
|
|
import time
|
|
|
|
|
|
def count_tokens_openai_chat_models(
|
|
messages: List[OpenAIMessage],
|
|
encoding: Any,
|
|
) -> int:
|
|
r"""Counts the number of tokens required to generate an OpenAI chat based
|
|
on a given list of messages.
|
|
|
|
Args:
|
|
messages (List[OpenAIMessage]): The list of messages.
|
|
encoding (Any): The encoding method to use.
|
|
|
|
Returns:
|
|
int: The number of tokens required.
|
|
"""
|
|
num_tokens = 0
|
|
for message in messages:
|
|
# message follows <im_start>{role/name}\n{content}<im_end>\n
|
|
num_tokens += 4
|
|
for key, value in message.items():
|
|
num_tokens += len(encoding.encode(value))
|
|
if key == "name": # if there's a name, the role is omitted
|
|
num_tokens += -1 # role is always 1 token
|
|
num_tokens += 2 # every reply is primed with <im_start>assistant
|
|
return num_tokens
|
|
|
|
|
|
def num_tokens_from_messages(
|
|
messages: List[OpenAIMessage],
|
|
model: ModelType,
|
|
) -> int:
|
|
r"""Returns the number of tokens used by a list of messages.
|
|
|
|
Args:
|
|
messages (List[OpenAIMessage]): The list of messages to count the
|
|
number of tokens for.
|
|
model (ModelType): The OpenAI model used to encode the messages.
|
|
|
|
Returns:
|
|
int: The total number of tokens used by the messages.
|
|
|
|
Raises:
|
|
NotImplementedError: If the specified `model` is not implemented.
|
|
|
|
References:
|
|
- https://github.com/openai/openai-python/blob/main/chatml.md
|
|
- https://platform.openai.com/docs/models/gpt-4
|
|
- https://platform.openai.com/docs/models/gpt-3-5
|
|
"""
|
|
try:
|
|
value_for_tiktoken = model.value_for_tiktoken
|
|
encoding = tiktoken.encoding_for_model(value_for_tiktoken)
|
|
except KeyError:
|
|
encoding = tiktoken.get_encoding("cl100k_base")
|
|
|
|
if model in {
|
|
ModelType.GPT_3_5_TURBO, ModelType.GPT_4, ModelType.GPT_4_32k,
|
|
ModelType.STUB
|
|
}:
|
|
return count_tokens_openai_chat_models(messages, encoding)
|
|
else:
|
|
raise NotImplementedError(
|
|
f"`num_tokens_from_messages`` is not presently implemented "
|
|
f"for model {model}. "
|
|
f"See https://github.com/openai/openai-python/blob/main/chatml.md "
|
|
f"for information on how messages are converted to tokens. "
|
|
f"See https://platform.openai.com/docs/models/gpt-4"
|
|
f"or https://platform.openai.com/docs/models/gpt-3-5"
|
|
f"for information about openai chat models.")
|
|
|
|
|
|
def get_model_token_limit(model: ModelType) -> int:
|
|
r"""Returns the maximum token limit for a given model.
|
|
|
|
Args:
|
|
model (ModelType): The type of the model.
|
|
|
|
Returns:
|
|
int: The maximum token limit for the given model.
|
|
"""
|
|
if model == ModelType.GPT_3_5_TURBO:
|
|
return 16384
|
|
elif model == ModelType.GPT_4:
|
|
return 8192
|
|
elif model == ModelType.GPT_4_32k:
|
|
return 32768
|
|
elif model == ModelType.STUB:
|
|
return 4096
|
|
else:
|
|
raise ValueError("Unknown model type")
|
|
|
|
|
|
def openai_api_key_required(func: F) -> F:
|
|
r"""Decorator that checks if the OpenAI API key is available in the
|
|
environment variables.
|
|
|
|
Args:
|
|
func (callable): The function to be wrapped.
|
|
|
|
Returns:
|
|
callable: The decorated function.
|
|
|
|
Raises:
|
|
ValueError: If the OpenAI API key is not found in the environment
|
|
variables.
|
|
"""
|
|
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
from camel.agents.chat_agent import ChatAgent
|
|
if not isinstance(self, ChatAgent):
|
|
raise ValueError("Expected ChatAgent")
|
|
if self.model == ModelType.STUB:
|
|
return func(self, *args, **kwargs)
|
|
elif 'OPENAI_API_KEY' in os.environ:
|
|
return func(self, *args, **kwargs)
|
|
else:
|
|
raise ValueError('OpenAI API key not found.')
|
|
|
|
return wrapper
|
|
|
|
|
|
def print_text_animated(text, delay: float = 0.005, end: str = ""):
|
|
r"""Prints the given text with an animated effect.
|
|
|
|
Args:
|
|
text (str): The text to print.
|
|
delay (float, optional): The delay between each character printed.
|
|
(default: :obj:`0.02`)
|
|
end (str, optional): The end character to print after the text.
|
|
(default: :obj:`""`)
|
|
"""
|
|
for char in text:
|
|
print(char, end=end, flush=True)
|
|
time.sleep(delay)
|
|
print('\n')
|
|
|
|
|
|
def get_prompt_template_key_words(template: str) -> Set[str]:
|
|
r"""Given a string template containing curly braces {}, return a set of
|
|
the words inside the braces.
|
|
|
|
Args:
|
|
template (str): A string containing curly braces.
|
|
|
|
Returns:
|
|
List[str]: A list of the words inside the curly braces.
|
|
|
|
Example:
|
|
>>> get_prompt_template_key_words('Hi, {name}! How are you {status}?')
|
|
{'name', 'status'}
|
|
"""
|
|
return set(re.findall(r'{([^}]*)}', template))
|
|
|
|
|
|
def get_first_int(string: str) -> Optional[int]:
|
|
r"""Returns the first integer number found in the given string.
|
|
|
|
If no integer number is found, returns None.
|
|
|
|
Args:
|
|
string (str): The input string.
|
|
|
|
Returns:
|
|
int or None: The first integer number found in the string, or None if
|
|
no integer number is found.
|
|
"""
|
|
match = re.search(r'\d+', string)
|
|
if match:
|
|
return int(match.group())
|
|
else:
|
|
return None
|
|
|
|
|
|
def download_tasks(task: TaskType, folder_path: str) -> None:
|
|
# Define the path to save the zip file
|
|
zip_file_path = os.path.join(folder_path, "tasks.zip")
|
|
|
|
# Download the zip file from the Google Drive link
|
|
response = requests.get("https://huggingface.co/datasets/camel-ai/"
|
|
f"metadata/resolve/main/{task.value}_tasks.zip")
|
|
|
|
# Save the zip file
|
|
with open(zip_file_path, "wb") as f:
|
|
f.write(response.content)
|
|
|
|
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
|
|
zip_ref.extractall(folder_path)
|
|
|
|
# Delete the zip file
|
|
os.remove(zip_file_path)
|