Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update eval_quality.py #454

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
373 changes: 196 additions & 177 deletions chatdev/eval_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,196 +4,215 @@
import subprocess
import time
import numpy as np
from typing import List, Tuple, Dict
import logging
from openai import OpenAI
import yaml
import unittest

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load configuration
with open('config.yaml', 'r') as config_file:
config = yaml.safe_load(config_file)

client = OpenAI(
api_key='',
base_url="",
api_key=config['openai_api_key'],
base_url=config['openai_base_url'],
)

def getFilesFromType(sourceDir, filetype):
files = []
for root, directories, filenames in os.walk(sourceDir):
for filename in filenames:
if filename.endswith(filetype):
files.append(os.path.join(root, filename))
return files

def get_code(directory):
def _format_code(code):
code = "\n".join([line for line in code.split("\n") if len(line.strip()) > 0])
return code

codebooks = {}
filepaths = getFilesFromType(directory, ".py")
for filepath in filepaths:
filename = os.path.basename(filepath)
codebooks[filename] = _format_code(open(filepath, "r", encoding="utf-8").read())

code = ""
for filename in codebooks.keys():
code += "{}\n```Python\n{}\n```\n\n".format(filename, codebooks[filename])

if len(code) == 0:
code = "# None"

return code.strip()

def get_completeness(directory):
assert os.path.isdir(directory)
vn = get_code(directory)
lines = vn.split("\n")
lines = [line for line in lines if
"password" not in line.lower() and "passenger" not in line.lower() and "passed" not in line.lower() and "passes" not in line.lower()]
lines = [line for line in lines if "pass" in line.lower() or "todo" in line.lower()]
if len(lines) > 0:
return 0.0
return 1.0

def get_executability(directory):
assert os.path.isdir(directory)
def findFile(directory, target):
main_py_path = None
for subroot, _, filenames in os.walk(directory):
class CodeAnalyzer:
@staticmethod
def get_files_from_type(source_dir: str, file_type: str) -> List[str]:
"""Get all files of a specific type from a directory."""
files = []
for root, _, filenames in os.walk(source_dir):
for filename in filenames:
if target in filename:
main_py_path = os.path.join(subroot, filename)
return main_py_path

def exist_bugs(directory):
assert os.path.isdir(directory)
success_info = "The software run successfully without errors."
try:
command = "cd \"{}\"; ls -l; python3 main.py;".format(directory)
process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
time.sleep(3)

error_type = ""
return_code = process.returncode
if process.poll() is None:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
if return_code == 0:
return False, success_info, error_type
else:
error_output = process.stderr.read().decode('utf-8')
try:
error_pattern = r'\w+Error:'
error_matches = re.findall(error_pattern, error_output)
error_type = error_matches[0].replace(":", "")
except:
pass
if error_output:
if filename.endswith(file_type):
files.append(os.path.join(root, filename))
return files

@staticmethod
def get_code(directory: str) -> str:
"""Get all Python code from a directory."""
def _format_code(code: str) -> str:
return "\n".join([line for line in code.split("\n") if len(line.strip()) > 0])

codebooks = {}
filepaths = CodeAnalyzer.get_files_from_type(directory, ".py")
for filepath in filepaths:
filename = os.path.basename(filepath)
with open(filepath, "r", encoding="utf-8") as file:
codebooks[filename] = _format_code(file.read())

code = ""
for filename, content in codebooks.items():
code += f"{filename}\n```Python\n{content}\n```\n\n"

return code.strip() or "# None"

@staticmethod
def get_completeness(directory: str) -> float:
"""Check the completeness of the code."""
assert os.path.isdir(directory), f"Directory does not exist: {directory}"
vn = CodeAnalyzer.get_code(directory)
lines = vn.split("\n")
lines = [line for line in lines if
all(word not in line.lower() for word in ["password", "passenger", "passed", "passes"])]
incomplete_lines = [line for line in lines if "pass" in line.lower() or "todo" in line.lower()]
return 0.0 if incomplete_lines else 1.0

@staticmethod
def get_executability(directory: str) -> float:
"""Check if the code is executable."""
assert os.path.isdir(directory), f"Directory does not exist: {directory}"

def find_file(directory: str, target: str) -> str:
for subroot, _, filenames in os.walk(directory):
for filename in filenames:
if target in filename:
return os.path.join(subroot, filename)
return None

def exist_bugs(directory: str) -> Tuple[bool, str, str]:
success_info = "The software run successfully without errors."
try:
command = f"cd \"{directory}\"; ls -l; python3 main.py;"
process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
time.sleep(3)

error_type = ""
return_code = process.poll()
if return_code is None:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
return True, "Process timed out", "Timeout"
if return_code == 0:
return False, success_info, error_type
else:
error_output = process.stderr.read().decode('utf-8')
try:
error_pattern = r'\w+Error:'
error_matches = re.findall(error_pattern, error_output)
error_type = error_matches[0].replace(":", "")
except IndexError:
pass
if "Traceback".lower() in error_output.lower():
errs = error_output.replace(directory + "/", "")
errs = error_output.replace(f"{directory}/", "")
return True, errs, error_type
else:
return False, success_info, error_type
except subprocess.CalledProcessError as e:
return True, f"Error: {e}", "subprocess.CalledProcessError"
except Exception as ex:
return True, f"An error occurred: {ex}", "OtherException"

return False, success_info, error_type

main_py_path = findFile(directory, ".py")
pass_flag, error_type = True, ""
if main_py_path is not None:
main_py_path = os.path.dirname(main_py_path)
bug_flag, info, error_type = exist_bugs(main_py_path)
pass_flag = not bug_flag
else:
pass_flag, error_type = False, "NoMain"

if error_type == "":
error_type = info.replace("\n", "\\n")

if pass_flag:
return 1.0
return 0.0

def get_consistency(directory):
def remove_comments(string):
def remove_comments_by_regex(string, regex):
lines = string.split("\n")
lines = [line for line in lines if not line.strip().startswith("#")]
string = "\n".join(lines)
comments = []
matches = re.finditer(regex, string, re.DOTALL)
for match in matches:
group1 = match.group(1)
comments.append(group1)
for comment in comments + ["''''''\n"]:
string = string.replace(comment, "")
else:
return False, success_info, error_type
except subprocess.CalledProcessError as e:
return True, f"Error: {e}", "subprocess.CalledProcessError"
except Exception as ex:
return True, f"An error occurred: {ex}", "OtherException"

main_py_path = find_file(directory, ".py")
if main_py_path is not None:
main_py_path = os.path.dirname(main_py_path)
bug_flag, info, error_type = exist_bugs(main_py_path)
return 0.0 if bug_flag else 1.0
else:
return 0.0

@staticmethod
def get_consistency(directory: str) -> float:
"""Check the consistency between the task description and the code."""
assert os.path.isdir(directory), f"Directory does not exist: {directory}"

def remove_comments(string: str) -> str:
def remove_comments_by_regex(string: str, regex: str) -> str:
lines = string.split("\n")
lines = [line for line in lines if not line.strip().startswith("#")]
string = "\n".join(lines)
comments = re.findall(regex, string, re.DOTALL)
for comment in comments + ["''''''\n"]:
string = string.replace(comment, "")
return string

string = remove_comments_by_regex(string, r"'''(.*?)'''")
string = remove_comments_by_regex(string, r"\"\"\"(.*?)\"\"\"")
return string

string = remove_comments_by_regex(string, r"'''(.*?)'''")
string = remove_comments_by_regex(string, r"\"\"\"(.*?)\"\"\"")
return string

def get_text_embedding(text: str):
if text == "":
text = "None"
ada_embedding = client.embeddings.create(input=text, model="text-embedding-ada-002").model_dump()['data'][0]['embedding']
return ada_embedding

def get_code_embedding(code: str):
if code == "":
code = "#"
ada_embedding = client.embeddings.create(input=code, model="text-embedding-ada-002").model_dump()['data'][0]['embedding']
return ada_embedding

def get_cosine_similarity(embeddingi, embeddingj):
embeddingi = np.array(embeddingi)
embeddingj = np.array(embeddingj)
cos_sim = embeddingi.dot(embeddingj) / (np.linalg.norm(embeddingi) * np.linalg.norm(embeddingj))
return cos_sim

assert os.path.isdir(directory)
files = getFilesFromType(directory, ".txt")
if len(files) == 0:
print()
filepath = files[0]
task = open(filepath).read().strip()
codes = get_code(directory)
codes = remove_comments(codes)

text_embedding = get_text_embedding(task)
code_embedding = get_code_embedding(codes)
task_code_alignment = get_cosine_similarity(text_embedding, code_embedding)

return task_code_alignment

def main(warehouse_root):
def write_string(string):
writer.write(string)
print(string, end="")

directories = []
for directory in os.listdir(warehouse_root):
directories.append(os.path.join(warehouse_root, directory))
directories = sorted(directories)
directories = [directory for directory in directories if os.path.isdir(directory)]
print("len(directories):", len(directories))
def get_embedding(text: str, is_code: bool = False) -> List[float]:
if not text:
text = "None" if not is_code else "#"
return client.embeddings.create(input=text, model="text-embedding-ada-002").model_dump()['data'][0]['embedding']

suffix = warehouse_root.replace("/", "__").replace("-", "_")
tsv_file = __file__.replace(".py", ".{}.tsv".format(suffix))
print("tsv_file:", tsv_file)
def get_cosine_similarity(embedding_i: List[float], embedding_j: List[float]) -> float:
embedding_i = np.array(embedding_i)
embedding_j = np.array(embedding_j)
return embedding_i.dot(embedding_j) / (np.linalg.norm(embedding_i) * np.linalg.norm(embedding_j))

files = CodeAnalyzer.get_files_from_type(directory, ".txt")
if not files:
logger.warning(f"No .txt files found in {directory}")
return 0.0

counter = 0
completeness_list, executability_list, consistency_list = [], [], []
with open(tsv_file, "a", encoding="utf-8") as writer:
for i, directory in enumerate(directories):
directory_basename = os.path.basename(directory)
with open(files[0], 'r') as file:
task = file.read().strip()

completeness = get_completeness(directory)
executability = get_executability(directory)
consistency = get_consistency(directory)
codes = CodeAnalyzer.get_code(directory)
codes = remove_comments(codes)

completeness_list.append(completeness)
executability_list.append(executability)
consistency_list.append(consistency)
text_embedding = get_embedding(task)
code_embedding = get_embedding(codes, is_code=True)
return get_cosine_similarity(text_embedding, code_embedding)

counter += 1
def main(warehouse_root: str):
"""Main function to analyze code in the warehouse."""
directories = [os.path.join(warehouse_root, d) for d in os.listdir(warehouse_root) if os.path.isdir(os.path.join(warehouse_root, d))]
directories.sort()
logger.info(f"Number of directories to analyze: {len(directories)}")

main(warehouse_root = "./WareHouse")
suffix = warehouse_root.replace("/", "__").replace("-", "_")
tsv_file = f"{os.path.splitext(__file__)[0]}.{suffix}.tsv"
logger.info(f"Results will be written to: {tsv_file}")

results = []
for directory in directories:
try:
completeness = CodeAnalyzer.get_completeness(directory)
executability = CodeAnalyzer.get_executability(directory)
consistency = CodeAnalyzer.get_consistency(directory)
results.append((os.path.basename(directory), completeness, executability, consistency))
except Exception as e:
logger.error(f"Error analyzing directory {directory}: {str(e)}")

with open(tsv_file, "w", encoding="utf-8") as writer:
writer.write("Directory\tCompleteness\tExecutability\tConsistency\n")
for result in results:
writer.write(f"{result[0]}\t{result[1]}\t{result[2]}\t{result[3]}\n")

logger.info(f"Analysis complete. Results written to {tsv_file}")

class TestCodeAnalyzer(unittest.TestCase):
def setUp(self):
self.test_dir = "./test_warehouse"
os.makedirs(self.test_dir, exist_ok=True)
with open(os.path.join(self.test_dir, "test.py"), "w") as f:
f.write("print('Hello, World!')")
with open(os.path.join(self.test_dir, "task.txt"), "w") as f:
f.write("Create a Hello World program")

def tearDown(self):
import shutil
shutil.rmtree(self.test_dir)

def test_get_completeness(self):
completeness = CodeAnalyzer.get_completeness(self.test_dir)
self.assertEqual(completeness, 1.0)

def test_get_executability(self):
executability = CodeAnalyzer.get_executability(self.test_dir)
self.assertEqual(executability, 1.0)

def test_get_consistency(self):
consistency = CodeAnalyzer.get_consistency(self.test_dir)
self.assertGreater(consistency, 0.5)

if __name__ == "__main__":
main(warehouse_root="./WareHouse")
unittest.main()