Compare commits

..

3 Commits

8 changed files with 348 additions and 616 deletions

View File

@ -70,7 +70,6 @@ The tool uses environment variables for sensitive information:
```
"""
import dataclasses
import logging
import re
import subprocess
@ -80,36 +79,10 @@ from pathlib import Path
from . import secrets
from ._version import __version__ # noqa: F401
from .seen_issues_db import SeenIssuesDB
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class RepositoryConfig:
gitea_url: str
owner: str
repo: str
base_branch: str
def repo_url(self) -> str:
return f'{self.gitea_url}:{self.owner}/{self.repo}.git'.replace(
'https://',
'git@',
)
@dataclasses.dataclass(frozen=True)
class IssueResolution:
success: bool
pull_request_url: str | None = None
pull_request_id: int | None = None
def __post_init__(self):
assert self.pull_request_id is None or isinstance(self.pull_request_id, int)
assert self.pull_request_url is None or isinstance(self.pull_request_url, str)
def generate_branch_name(issue_number: str, issue_title: str) -> str:
"""Create a branch name by sanitizing the issue title.
@ -154,13 +127,19 @@ AIDER_LINT = bash_cmd(
)
LLM_MESSAGE_FORMAT = (
"""{issue}\nDo not wait for explicit approval before working on code changes."""
)
LLM_MESSAGE_FORMAT = """
{issue}
# CODE_MODEL = 'ollama/gemma3:4b'
CODE_MODEL = 'o4-mini'
EVALUATOR_MODEL = 'ollama/gemma3:27b'
# Solution Details
For code tasks:
1. Create a plan for how to solve the issue.
2. Write unit tests that proves that your solution works.
3. Then, solve the issue by writing the required code.
"""
MODEL = None
def create_aider_command(issue: str) -> list[str]:
@ -170,39 +149,31 @@ def create_aider_command(issue: str) -> list[str]:
'english',
'--no-stream',
'--no-analytics',
#'--no-check-update',
'--test-cmd',
AIDER_TEST,
'--lint-cmd',
AIDER_LINT,
'--auto-test',
'--no-auto-lint',
'--read',
'CONVENTIONS.md',
'--message',
LLM_MESSAGE_FORMAT.format(issue=issue),
'--yes',
]
for key in secrets.llm_api_keys():
l += ['--api-key', key]
if False:
l.append('--read')
l.append('CONVENTIONS.md')
if True:
l.append('--cache-prompts')
if False:
l.append('--architect')
if CODE_MODEL:
if MODEL:
l.append('--model')
l.append(CODE_MODEL)
if CODE_MODEL.startswith('ollama/'):
l.append('--auto-lint')
if True:
l.append('--message')
l.append(LLM_MESSAGE_FORMAT.format(issue=issue))
l.append(MODEL)
return l
@ -226,42 +197,29 @@ def get_commit_messages(cwd: Path, base_branch: str, current_branch: str) -> lis
capture_output=True,
text=True,
)
return list(reversed(result.stdout.strip().split('\n')))
return reversed(result.stdout.strip().split('\n'))
except subprocess.CalledProcessError:
logger.exception(f'Failed to get commit messages on branch {current_branch}')
return []
def get_diff(cwd: Path, base_branch: str, current_branch: str) -> str:
result = subprocess.run(
['git', 'diff', f'{base_branch}..{current_branch}', '--pretty=format:%s'],
check=True,
cwd=cwd,
capture_output=True,
text=True,
)
return result.stdout.strip()
return ''
def push_changes(
repository_config: RepositoryConfig,
cwd: Path,
branch_name: str,
issue_number: str,
issue_title: str,
base_branch: str,
gitea_client,
) -> IssueResolution:
owner: str,
repo: str,
) -> bool:
# Check if there are any commits on the branch before pushing
if not has_commits_on_branch(cwd, repository_config.base_branch, branch_name):
if not has_commits_on_branch(cwd, base_branch, branch_name):
logger.info('No commits made on branch %s, skipping push', branch_name)
return IssueResolution(False)
return False
# Get commit messages for PR description
commit_messages = get_commit_messages(
cwd,
repository_config.base_branch,
branch_name,
)
commit_messages = get_commit_messages(cwd, base_branch, branch_name)
description = f'This pull request resolves #{issue_number}\n\n'
if commit_messages:
@ -274,22 +232,16 @@ def push_changes(
run_cmd(cmd, cwd)
# Then create the PR with the aider label
pr_response = gitea_client.create_pull_request(
owner=repository_config.owner,
repo=repository_config.repo,
gitea_client.create_pull_request(
owner=owner,
repo=repo,
title=issue_title,
body=description,
head=branch_name,
base=repository_config.base_branch,
base=base_branch,
labels=['aider'],
)
# Extract PR number and URL if available
return IssueResolution(
True,
pr_response.get('html_url'),
int(pr_response.get('number')),
)
return True
def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool:
@ -304,9 +256,15 @@ def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> b
True if there are commits on the current branch not in the base branch, False otherwise.
"""
try:
commit_messages = get_commit_messages(cwd, base_branch, current_branch)
return bool(list(commit_messages))
except Exception:
result = subprocess.run(
['git', 'log', f'{base_branch}..{current_branch}', '--oneline'],
check=True,
cwd=cwd,
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
except subprocess.CalledProcessError:
logger.exception('Failed to check commits on branch %s', current_branch)
return False
@ -326,311 +284,134 @@ def run_cmd(cmd: list[str], cwd: Path | None = None, check=True) -> bool:
return result.returncode == 0
def issue_solution_round(repository_path, issue_content):
# Primary Aider command
aider_command = create_aider_command(issue_content)
print(aider_command)
aider_did_not_crash = run_cmd(
aider_command,
repository_path,
check=False,
)
if not aider_did_not_crash:
return aider_did_not_crash
# Auto-fix standard code quality stuff after aider
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], repository_path, check=False)
run_cmd(['git', 'add', '.'], repository_path)
run_cmd(['git', 'commit', '-m', 'Ruff after aider'], repository_path, check=False)
return True
def run_ollama(cwd: Path, texts: list[str]) -> str:
cmd = ['ollama', 'run', EVALUATOR_MODEL.removeprefix('ollama/')]
print(cmd)
process = subprocess.Popen(
cmd,
cwd=cwd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = process.communicate('\n'.join(texts))
print(stdout)
return stdout
def parse_yes_no_answer(text: str) -> bool | None:
text = text.lower().strip()
words = text.split('\n \t.,?-')
print(words)
if words[-1] in {'yes', 'agree'}:
return True
if words[-1] in {'no', 'disagree'}:
return False
return None
def run_ollama_and_get_yes_or_no(cwd, initial_texts: list[str]) -> bool:
texts = list(initial_texts)
texts.append('Think through your answer.')
while True:
response = run_ollama(cwd, texts)
yes_or_no = parse_yes_no_answer(response)
if yes_or_no is not None:
return yes_or_no
else:
texts.append(response)
texts.append('Please answer either "yes" or "no".')
def verify_solution(repository_path: Path, issue_content: str) -> bool:
if not EVALUATOR_MODEL:
return True
summary = run_ollama(
repository_path,
[
'Concisely summarize following changeset',
get_diff(repository_path, 'main', 'HEAD'),
],
)
return run_ollama_and_get_yes_or_no(
repository_path,
[
'Does this changeset accomplish the entire task?',
'# Change set',
summary,
'# Issue',
issue_content,
],
)
def get_head_commit_hash(repository_path: Path) -> str:
return subprocess.run(
['git', 'rev-parse', 'HEAD'],
check=True,
cwd=repository_path,
capture_output=True,
text=True,
).stdout.strip()
SKIP_AIDER = False
def solve_issue_in_repository(
repository_config: RepositoryConfig,
repository_path: Path,
args,
tmpdirname: Path,
branch_name: str,
issue_title: str,
issue_description: str,
issue_number: str,
gitea_client,
) -> IssueResolution:
gitea_client=None,
) -> bool:
logger.info('### %s #####', issue_title)
repo_url = f'{args.gitea_url}:{args.owner}/{args.repo}.git'.replace(
'https://',
'git@',
)
# Setup repository
run_cmd(['git', 'clone', repository_config.repo_url(), repository_path])
run_cmd(['bash', '-c', AIDER_TEST], repository_path)
run_cmd(['git', 'checkout', repository_config.base_branch], repository_path)
run_cmd(['git', 'checkout', '-b', branch_name], repository_path)
run_cmd(['git', 'clone', repo_url, tmpdirname])
run_cmd(['bash', '-c', AIDER_TEST], tmpdirname)
run_cmd(['git', 'checkout', args.base_branch], tmpdirname)
run_cmd(['git', 'checkout', '-b', branch_name], tmpdirname)
# Run initial ruff pass before aider
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], repository_path, check=False)
run_cmd(['git', 'add', '.'], repository_path)
run_cmd(['git', 'commit', '-m', 'Initial ruff pass'], repository_path, check=False)
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], tmpdirname, check=False)
run_cmd(['git', 'add', '.'], tmpdirname)
run_cmd(['git', 'commit', '-m', 'Initial ruff pass'], tmpdirname, check=False)
# Save the commit hash after ruff but before aider
result = subprocess.run(
['git', 'rev-parse', 'HEAD'],
check=True,
cwd=tmpdirname,
capture_output=True,
text=True,
)
pre_aider_commit = result.stdout.strip()
# Run aider
issue_content = f'# {issue_title}\n{issue_description}'
while True:
# Save the commit hash after ruff but before aider
pre_aider_commit = get_head_commit_hash(repository_path)
# Run aider
aider_did_not_crash = issue_solution_round(repository_path, issue_content)
if not aider_did_not_crash:
logger.error('Aider invocation failed for issue #%s', issue_number)
return IssueResolution(False)
# Check if aider made any changes beyond the initial ruff pass
if not has_commits_on_branch(repository_path, pre_aider_commit, 'HEAD'):
logger.error(
'Aider did not make any changes beyond the initial ruff pass for issue #%s',
issue_number,
)
return IssueResolution(False)
# Push changes and create/update the pull request on every iteration
resolution = push_changes(
repository_config,
repository_path,
branch_name,
issue_number,
issue_title,
gitea_client,
if not SKIP_AIDER:
succeeded = run_cmd(
create_aider_command(issue_content),
tmpdirname,
check=False,
)
if not resolution.success:
return resolution
else:
logger.warning('Skipping aider command (for testing)')
succeeded = True
if not succeeded:
logger.error('Aider invocation failed for issue #%s', issue_number)
return False
# Verify whether this is a satisfactory solution
if verify_solution(repository_path, issue_content):
return resolution
# Auto-fix standard code quality stuff after aider
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], tmpdirname, check=False)
run_cmd(['git', 'add', '.'], tmpdirname)
run_cmd(['git', 'commit', '-m', 'Ruff after aider'], tmpdirname, check=False)
# Check if aider made any changes beyond the initial ruff pass
result = subprocess.run(
['git', 'diff', pre_aider_commit, 'HEAD', '--name-only'],
check=True,
cwd=tmpdirname,
capture_output=True,
text=True,
)
files_changed = result.stdout.strip()
if not files_changed and not SKIP_AIDER:
logger.info(
'Aider did not make any changes beyond the initial ruff pass for issue #%s',
issue_number,
)
return False
# Push changes
return push_changes(
tmpdirname,
branch_name,
issue_number,
issue_title,
args.base_branch,
gitea_client,
args.owner,
args.repo,
)
def solve_issues_in_repository(
repository_config: RepositoryConfig,
client,
seen_issues_db,
):
def handle_issues(args, client, seen_issues_db):
"""Process all open issues with the 'aider' label.
Args:
repository_config: Command line arguments.
args: Command line arguments.
client: The Gitea client instance.
seen_issues_db: Database of previously processed issues.
"""
try:
issues = client.get_issues(repository_config.owner, repository_config.repo)
issues = client.get_issues(args.owner, args.repo)
except Exception:
logger.exception('Failed to retrieve issues')
sys.exit(1)
if not issues:
logger.info('No issues found for %s', repository_config.repo)
logger.info('No issues found for %s', args.repo)
return
for issue in issues:
issue_url = issue.get('web_url')
issue_number = issue.get('number')
issue_description = issue.get('body', '')
title = issue.get('title', f'Issue {issue_number}')
if seen_issues_db.has_seen(issue_url):
issue_text = f'{title}\n{issue_description}'
if seen_issues_db.has_seen(issue_text):
logger.info('Skipping already processed issue #%s: %s', issue_number, title)
else:
branch_name = generate_branch_name(issue_number, title)
with tempfile.TemporaryDirectory() as repository_path:
issue_resolution = solve_issue_in_repository(
repository_config,
Path(repository_path),
branch_name,
title,
issue_description,
issue_number,
client,
)
continue
# TODO: PR comment handling disabled for now due to missing functionality
if False:
# Handle unresolved pull request comments
handle_pr_comments(
repository_config,
issue_resolution.pull_request_id,
branch_name = generate_branch_name(issue_number, title)
with tempfile.TemporaryDirectory() as tmpdirname:
solved = solve_issue_in_repository(
args,
Path(tmpdirname),
branch_name,
Path(repository_path),
client,
seen_issues_db,
issue_url,
)
# Handle failing pipelines
handle_failing_pipelines(
repository_config,
issue_resolution.pull_request_id,
branch_name,
Path(repository_path),
client,
)
seen_issues_db.mark_as_seen(issue_url, str(issue_number))
seen_issues_db.update_pr_info(
issue_url,
issue_resolution.pull_request_id,
issue_resolution.pull_request_url,
)
logger.info(
'Stored PR #%s information for issue #%s',
issue_resolution.pull_request_id,
title,
issue_description,
issue_number,
client,
)
def handle_pr_comments(
repository_config,
pr_number: int,
branch_name,
repository_path,
client,
seen_issues_db,
issue_url,
):
"""Fetch unresolved PR comments and resolve them via aider."""
comments = client.get_pull_request_comments(
repository_config.owner,
repository_config.repo,
pr_number,
)
for comment in comments:
path = comment.get('path')
line = comment.get('line') or comment.get('position') or 0
file_path = repository_path / path
try:
lines = file_path.read_text().splitlines()
start = max(0, line - 3)
end = min(len(lines), line + 2)
context = '\n'.join(lines[start:end])
except Exception:
context = ''
body = comment.get('body', '')
issue = (
f'Resolve the following reviewer comment:\n{body}\n\n'
f'File: {path}\n\nContext:\n{context}'
)
# invoke aider on the comment context
issue_solution_round(repository_path, issue)
# commit and push changes for this comment
run_cmd(['git', 'add', path], repository_path, check=False)
run_cmd(
['git', 'commit', '-m', f'Resolve comment {comment.get("id")}'],
repository_path,
check=False,
)
run_cmd(['git', 'push', 'origin', branch_name], repository_path, check=False)
def handle_failing_pipelines(
repository_config: RepositoryConfig,
pr_number: str,
branch_name: str,
repository_path: Path,
client,
) -> None:
"""Fetch failing pipelines for the given PR and resolve them via aider."""
while True:
failed_runs = client.get_failed_pipelines(
repository_config.owner,
repository_config.repo,
pr_number,
)
if not failed_runs:
break
for run_id in failed_runs:
log = client.get_pipeline_log(
repository_config.owner,
repository_config.repo,
run_id,
)
lines = log.strip().split('\n')
context = '\n'.join(lines[-100:])
issue = f'Resolve the following failing pipeline run {run_id}:\n\n{context}'
issue_solution_round(repository_path, issue)
run_cmd(['git', 'add', '.'], repository_path, check=False)
run_cmd(
['git', 'commit', '-m', f'Resolve pipeline {run_id}'],
repository_path,
check=False,
)
run_cmd(
['git', 'push', 'origin', branch_name], repository_path, check=False,
)
if solved:
seen_issues_db.mark_as_seen(issue_text)

View File

@ -7,14 +7,23 @@ It assumes that the default branch (default "main") exists and that you have a v
import argparse
import logging
import time
from dataclasses import dataclass
from . import RepositoryConfig, secrets, solve_issues_in_repository
from . import handle_issues, secrets
from .gitea_client import GiteaClient
from .seen_issues_db import SeenIssuesDB
logger = logging.getLogger(__name__)
@dataclass
class AiderArgs:
gitea_url: str
owner: str
repo: str
base_branch: str
def parse_args():
parser = argparse.ArgumentParser(
description='Download issues and create pull requests for a Gitea repository.',
@ -45,16 +54,6 @@ def parse_args():
default=300,
help='Interval in seconds between checks in daemon mode (default: 300)',
)
parser.add_argument(
'--aider-model',
help='Model to use for generating code (overrides default)',
default=None,
)
parser.add_argument(
'--evaluator-model',
help='Model to use for evaluating code (overrides default)',
default=None,
)
return parser.parse_args()
@ -62,14 +61,6 @@ def main():
logging.basicConfig(level='INFO')
args = parse_args()
# Override default models if provided
import aider_gitea as core
if args.aider_model:
core.CODE_MODEL = args.aider_model
if args.evaluator_model:
core.EVALUATOR_MODEL = args.evaluator_model
seen_issues_db = SeenIssuesDB()
client = GiteaClient(args.gitea_url, secrets.gitea_token())
@ -81,13 +72,13 @@ def main():
while True:
logger.info('Checking for new issues...')
for repo in repositories:
repository_config = RepositoryConfig(
aider_args = AiderArgs(
gitea_url=args.gitea_url,
owner=args.owner,
repo=repo,
base_branch=args.base_branch,
)
solve_issues_in_repository(repository_config, client, seen_issues_db)
handle_issues(aider_args, client, seen_issues_db)
del repo
if not args.daemon:
break

View File

@ -166,56 +166,26 @@ class GiteaClient:
}
response = self.session.post(url, json=json_data)
# If a pull request for this head/base already exists, return it instead of crashing
if response.status_code == 409:
logger.warning(
'Pull request already exists for head %s and base %s',
head,
base,
)
prs = self.get_pull_requests(owner, repo)
for pr in prs:
if (
pr.get('head', {}).get('ref') == head
and pr.get('base', {}).get('ref') == base
):
return pr
# fallback to raise if we cant find it
response.raise_for_status()
response.raise_for_status()
return response.json()
def get_failed_pipelines(self, owner: str, repo: str, pr_number: str) -> list[int]:
"""Fetch pipeline runs for a PR and return IDs of failed runs."""
url = f'{self.gitea_url}/repos/{owner}/{repo}/actions/runs'
response = self.session.get(url)
response.raise_for_status()
runs = response.json().get('workflow_runs', [])
failed = []
for run in runs:
if any(
pr.get('number') == int(pr_number)
for pr in run.get('pull_requests', [])
):
if run.get('conclusion') not in ('success',):
failed.append(run.get('id'))
return failed
def get_pipeline_log(self, owner: str, repo: str, run_id: int) -> str:
"""Download the logs for a pipeline run."""
url = f'{self.gitea_url}/repos/{owner}/{repo}/actions/runs/{run_id}/logs'
response = self.session.get(url)
response.raise_for_status()
return response.text
def get_pull_requests(
self,
owner: str,
repo: str,
state: str = 'open',
def get_pull_request_comments(
self, owner: str, repo: str, pull_number: int,
) -> list[dict]:
"""Fetch pull requests for a repository."""
url = f'{self.gitea_url}/repos/{owner}/{repo}/pulls?state={state}'
"""Get comments for a specific pull request.
Args:
owner (str): Owner of the repository.
repo (str): Name of the repository.
pull_number (int): The pull request number.
Returns:
list[dict]: A list of comment dictionaries for the pull request.
Raises:
requests.HTTPError: If the API request fails.
"""
url = f'{self.gitea_url}/repos/{owner}/{repo}/pulls/{pull_number}/comments'
response = self.session.get(url)
response.raise_for_status()
return response.json()

View File

@ -1,22 +1,22 @@
"""Database module for tracking previously processed issues and pull requests.
"""Database module for tracking previously processed issues.
This module provides functionality to track which issues have already been processed
by the system to avoid duplicate processing. It uses a simple SQLite database to
store information about seen issues and their associated pull requests for efficient lookup.
store hashes of seen issues for efficient lookup.
"""
import sqlite3
from hashlib import sha256
DEFAULT_DB_PATH = 'output/seen_issues.db'
class SeenIssuesDB:
"""Database handler for tracking processed issues and pull requests.
"""Database handler for tracking processed issues.
This class manages a SQLite database that stores information about issues that have
already been processed and their associated pull requests. It provides methods to mark
issues as seen, check if an issue has been seen before, and retrieve pull request
information for an issue.
This class manages a SQLite database that stores hashes of issues that have
already been processed. It provides methods to mark issues as seen and check
if an issue has been seen before, helping to prevent duplicate processing.
Attributes:
conn: SQLite database connection
@ -34,97 +34,56 @@ class SeenIssuesDB:
def _create_table(self):
"""Create the seen_issues table if it doesn't exist.
Creates a table with columns for storing issue hashes and associated pull request information.
Creates a table with a single column for storing issue hashes.
"""
with self.conn:
self.conn.execute("""
CREATE TABLE IF NOT EXISTS seen_issues (
issue_url TEXT PRIMARY KEY,
issue_number TEXT,
pr_number TEXT,
pr_url TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.execute("""
CREATE TABLE IF NOT EXISTS resolved_comments (
issue_url TEXT,
comment_id TEXT,
PRIMARY KEY(issue_url, comment_id)
issue_hash TEXT PRIMARY KEY
)
""")
def mark_as_seen(
self,
issue_url: str,
issue_number: str | None = None,
pr_number: str | None = None,
pr_url: str | None = None,
):
def mark_as_seen(self, issue_text: str):
"""Mark an issue as seen in the database.
Computes a hash of the issue text and stores it in the database along with pull request information.
Computes a hash of the issue text and stores it in the database.
If the issue has already been marked as seen, this operation has no effect.
Args:
issue_url: The text content of the issue to mark as seen.
issue_number: The issue number.
pr_number: The pull request number associated with this issue.
pr_url: The URL of the pull request associated with this issue.
issue_text: The text content of the issue to mark as seen.
"""
issue_hash = self._compute_hash(issue_text)
with self.conn:
self.conn.execute(
'INSERT OR IGNORE INTO seen_issues (issue_url, issue_number, pr_number, pr_url) VALUES (?, ?, ?, ?)',
(issue_url, issue_number, pr_number, pr_url),
'INSERT OR IGNORE INTO seen_issues (issue_hash) VALUES (?)',
(issue_hash,),
)
def has_seen(self, issue_url: str) -> bool:
def has_seen(self, issue_text: str) -> bool:
"""Check if an issue has been seen before.
Computes a hash of the issue text and checks if it exists in the database.
Args:
issue_url: The text content of the issue to check.
issue_text: The text content of the issue to check.
Returns:
True if the issue has been seen before, False otherwise.
"""
issue_hash = self._compute_hash(issue_text)
cursor = self.conn.execute(
'SELECT 1 FROM seen_issues WHERE issue_url = ?',
(issue_url,),
'SELECT 1 FROM seen_issues WHERE issue_hash = ?',
(issue_hash,),
)
return cursor.fetchone() is not None
def get_pr_info(self, issue_url: str) -> tuple[str, str] | None:
"""Get pull request information for an issue.
def _compute_hash(self, text: str) -> str:
"""Compute a SHA-256 hash of the given text.
Args:
issue_url: The text content of the issue to check.
text: The text to hash.
Returns:
A tuple containing (pr_number, pr_url) if found, None otherwise.
A hexadecimal string representation of the hash.
"""
cursor = self.conn.execute(
'SELECT pr_number, pr_url FROM seen_issues WHERE issue_url = ?',
(issue_url,),
)
result = cursor.fetchone()
return result if result else None
def update_pr_info(self, issue_url: str, pr_number: str, pr_url: str) -> bool:
"""Update pull request information for an existing issue.
Args:
issue_url: The text content of the issue to update.
pr_number: The pull request number.
pr_url: The URL of the pull request.
Returns:
True if the update was successful, False if the issue wasn't found.
"""
with self.conn:
cursor = self.conn.execute(
'UPDATE seen_issues SET pr_number = ?, pr_url = ? WHERE issue_url = ?',
(pr_number, pr_url, issue_url),
)
return cursor.rowcount > 0
return sha256(text.encode('utf-8')).hexdigest()

View File

@ -0,0 +1,71 @@
from unittest.mock import MagicMock, patch
from aider_gitea.gitea_client import GiteaClient
class TestGiteaClientPRComments:
def setup_method(self):
"""Set up test fixtures."""
self.client = GiteaClient('https://gitea.example.com', 'fake-token')
self.owner = 'test-owner'
self.repo = 'test-repo'
self.pull_number = 123
@patch('requests.Session.get')
def test_get_pull_request_comments(self, mock_get):
"""Test retrieving comments for a pull request."""
# Mock response data
mock_response = MagicMock()
mock_response.json.return_value = [
{
'id': 1,
'body': 'This is a test comment',
'user': {'login': 'test-user'},
'created_at': '2023-01-01T00:00:00Z',
},
{
'id': 2,
'body': 'Another test comment',
'user': {'login': 'another-user'},
'created_at': '2023-01-02T00:00:00Z',
},
]
mock_get.return_value = mock_response
# Call the method
comments = self.client.get_pull_request_comments(
self.owner, self.repo, self.pull_number,
)
# Verify the request was made correctly
mock_get.assert_called_once_with(
f'{self.client.gitea_url}/repos/{self.owner}/{self.repo}/pulls/{self.pull_number}/comments',
)
# Verify the response was processed correctly
assert len(comments) == 2
assert comments[0]['id'] == 1
assert comments[0]['body'] == 'This is a test comment'
assert comments[1]['id'] == 2
assert comments[1]['body'] == 'Another test comment'
@patch('requests.Session.get')
def test_get_pull_request_comments_empty(self, mock_get):
"""Test retrieving comments for a pull request with no comments."""
# Mock response data
mock_response = MagicMock()
mock_response.json.return_value = []
mock_get.return_value = mock_response
# Call the method
comments = self.client.get_pull_request_comments(
self.owner, self.repo, self.pull_number,
)
# Verify the request was made correctly
mock_get.assert_called_once_with(
f'{self.client.gitea_url}/repos/{self.owner}/{self.repo}/pulls/{self.pull_number}/comments',
)
# Verify the response was processed correctly
assert len(comments) == 0

View File

@ -1,58 +0,0 @@
from pathlib import Path
from unittest.mock import patch
from aider_gitea import has_commits_on_branch
class TestHasCommitsOnBranch:
def setup_method(self):
self.cwd = Path('/tmp/test-repo')
self.base_branch = 'main'
self.current_branch = 'feature-branch'
@patch('aider_gitea.get_commit_messages')
def test_has_commits_true(self, mock_get_commit_messages):
# Setup mock to return some commit messages
mock_get_commit_messages.return_value = ['Commit 1', 'Commit 2']
# Test function returns True when there are commits
assert (
has_commits_on_branch(self.cwd, self.base_branch, self.current_branch)
is True
)
# Verify get_commit_messages was called with correct arguments
mock_get_commit_messages.assert_called_once_with(
self.cwd,
self.base_branch,
self.current_branch,
)
@patch('aider_gitea.get_commit_messages')
def test_has_commits_false(self, mock_get_commit_messages):
# Setup mock to return empty list
mock_get_commit_messages.return_value = []
# Test function returns False when there are no commits
assert (
has_commits_on_branch(self.cwd, self.base_branch, self.current_branch)
is False
)
# Verify get_commit_messages was called with correct arguments
mock_get_commit_messages.assert_called_once_with(
self.cwd,
self.base_branch,
self.current_branch,
)
@patch('aider_gitea.get_commit_messages')
def test_has_commits_exception(self, mock_get_commit_messages):
# Setup mock to raise an exception
mock_get_commit_messages.side_effect = Exception('Git command failed')
# Test function returns False when an exception occurs
assert (
has_commits_on_branch(self.cwd, self.base_branch, self.current_branch)
is False
)

View File

@ -1,77 +0,0 @@
import os
import tempfile
from aider_gitea.seen_issues_db import SeenIssuesDB
class TestSeenIssuesDBPRInfo:
def setup_method(self):
# Create a temporary database file
self.db_fd, self.db_path = tempfile.mkstemp()
self.db = SeenIssuesDB(self.db_path)
# Test data
self.issue_text = 'Test issue title\nTest issue description'
self.issue_number = '123'
self.pr_number = '456'
self.pr_url = 'https://gitea.example.com/owner/repo/pulls/456'
def teardown_method(self):
# Close and remove the temporary database
self.db.conn.close()
os.close(self.db_fd)
os.unlink(self.db_path)
def test_mark_as_seen_with_pr_info(self):
# Mark an issue as seen with PR info
self.db.mark_as_seen(
self.issue_text,
issue_number=self.issue_number,
pr_number=self.pr_number,
pr_url=self.pr_url,
)
# Verify the issue is marked as seen
assert self.db.has_seen(self.issue_text)
# Verify PR info is stored correctly
pr_info = self.db.get_pr_info(self.issue_text)
assert pr_info is not None
assert pr_info[0] == self.pr_number
assert pr_info[1] == self.pr_url
def test_update_pr_info(self):
# First mark the issue as seen without PR info
self.db.mark_as_seen(self.issue_text, issue_number=self.issue_number)
# Verify no PR info is available
assert self.db.get_pr_info(self.issue_text) == (None, None)
# Update with PR info
updated = self.db.update_pr_info(self.issue_text, self.pr_number, self.pr_url)
# Verify update was successful
assert updated
# Verify PR info is now available
pr_info = self.db.get_pr_info(self.issue_text)
assert pr_info[0] == self.pr_number
assert pr_info[1] == self.pr_url
def test_update_nonexistent_issue(self):
# Try to update PR info for an issue that doesn't exist
updated = self.db.update_pr_info(
'Nonexistent issue',
self.pr_number,
self.pr_url,
)
# Verify update failed
assert not updated
def test_get_pr_info_nonexistent(self):
# Try to get PR info for an issue that doesn't exist
pr_info = self.db.get_pr_info('Nonexistent issue')
# Verify no PR info is available
assert pr_info is None

View File

@ -0,0 +1,95 @@
from pathlib import Path
from unittest.mock import MagicMock, patch
from aider_gitea import solve_issue_in_repository
class TestSolveIssueInRepository:
def setup_method(self):
self.args = MagicMock()
self.args.gitea_url = 'https://gitea.example.com'
self.args.owner = 'test-owner'
self.args.repo = 'test-repo'
self.args.base_branch = 'main'
self.gitea_client = MagicMock()
self.tmpdirname = Path('/tmp/test-repo')
self.branch_name = 'issue-123-test-branch'
self.issue_title = 'Test Issue'
self.issue_description = 'This is a test issue'
self.issue_number = '123'
@patch('aider_gitea.secrets.llm_api_keys', return_value='fake-api-key')
@patch('aider_gitea.run_cmd')
@patch('aider_gitea.push_changes')
@patch('subprocess.run')
def test_solve_issue_with_aider_changes(
self,
mock_subprocess_run,
mock_push_changes,
mock_run_cmd,
mock_llm_api_key,
):
# Setup mocks
mock_run_cmd.return_value = True
mock_push_changes.return_value = True
# Mock subprocess.run to return different commit hashes and file changes
mock_subprocess_run.side_effect = [
MagicMock(stdout='abc123\n', returncode=0), # First git rev-parse
MagicMock(
stdout='file1.py\nfile2.py\n',
returncode=0,
), # git diff with changes
]
# Call the function
result = solve_issue_in_repository(
self.args,
self.tmpdirname,
self.branch_name,
self.issue_title,
self.issue_description,
self.issue_number,
self.gitea_client,
)
# Verify results
assert result is True
assert mock_run_cmd.call_count >= 8 # Verify all expected commands were run
mock_push_changes.assert_called_once()
@patch('aider_gitea.secrets.llm_api_keys', return_value='fake-api-key')
@patch('aider_gitea.run_cmd')
@patch('aider_gitea.push_changes')
@patch('subprocess.run')
def test_solve_issue_without_aider_changes(
self,
mock_subprocess_run,
mock_push_changes,
mock_run_cmd,
mock_llm_api_key,
):
# Setup mocks
mock_run_cmd.return_value = True
# Mock subprocess.run to return same commit hash and no file changes
mock_subprocess_run.side_effect = [
MagicMock(stdout='abc123\n', returncode=0), # First git rev-parse
MagicMock(stdout='', returncode=0), # git diff with no changes
]
# Call the function
result = solve_issue_in_repository(
self.args,
self.tmpdirname,
self.branch_name,
self.issue_title,
self.issue_description,
self.issue_number,
self.gitea_client,
)
# Verify results
assert result is False
assert mock_push_changes.call_count == 0 # push_changes should not be called