- Convert magic numbers and hardcoded values to module-level constants - Add comprehensive input validation with detailed error messages - Improve error handling for file operations and edge cases - Add proper type checking and validation for model detection - Create comprehensive test suite for new validation features - Fix existing tests to match actual implementation This addresses issue #112 by ensuring substantial code improvements accompany formatting changes, making the codebase more maintainable and robust. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
973 lines
30 KiB
Python
973 lines
30 KiB
Python
"""Aider Gitea.
|
|
|
|
A code automation tool that integrates Gitea with AI assistants to automatically solve issues.
|
|
|
|
This program monitors your [Gitea](https://about.gitea.com/) repository for issues with the 'aider' label.
|
|
When such an issue is found, it:
|
|
|
|
1. Creates a new branch.
|
|
2. Invokes an AI assistant (Aider or Claude Code) to solve the issue using a Large-Language Model.
|
|
3. Runs tests and code quality checks.
|
|
4. Creates a pull request with the solution.
|
|
|
|
The tool automatically selects the appropriate AI assistant based on the specified model:
|
|
- **Aider**: Used for non-Anthropic models (e.g., GPT, Ollama, Gemini)
|
|
- **Claude Code**: Used for Anthropic models (e.g., Claude, Sonnet, Haiku, Opus)
|
|
|
|
Inspired by [the AI workflows](https://github.com/oscoreio/ai-workflows/)
|
|
project.
|
|
|
|
## Usage
|
|
|
|
An application token must be supplied for the `gitea_token` secret. This must
|
|
have the following permissions:
|
|
|
|
- `read:issue`: To be able to read issues on the specified repository.
|
|
- `write:repository`: To be able to create pull requests.
|
|
- `read:user`: Needed to iterate all user's repositories.
|
|
|
|
### Command Line
|
|
|
|
```bash
|
|
# Run with default settings (uses Aider)
|
|
python -m aider_gitea --aider-model gpt-4
|
|
|
|
# Use Claude Code with Anthropic models
|
|
python -m aider_gitea --aider-model claude-3-sonnet
|
|
python -m aider_gitea --aider-model claude-3-haiku
|
|
python -m aider_gitea --aider-model anthropic/claude-3-opus
|
|
|
|
# Use Aider with various models
|
|
python -m aider_gitea --aider-model gpt-4
|
|
python -m aider_gitea --aider-model ollama/llama3
|
|
python -m aider_gitea --aider-model gemini-pro
|
|
|
|
# Specify custom repository and owner
|
|
python -m aider_gitea --owner myorg --repo myproject --aider-model claude-3-sonnet
|
|
|
|
# Use a custom Gitea URL
|
|
python -m aider_gitea --gitea-url https://gitea.example.com --aider-model gpt-4
|
|
|
|
# Specify a different base branch
|
|
python -m aider_gitea --base-branch develop --aider-model claude-3-haiku
|
|
```
|
|
|
|
### AI Assistant Selection
|
|
|
|
The tool automatically routes to the appropriate AI assistant based on the model name:
|
|
|
|
**Claude Code Integration (Anthropic Models):**
|
|
- Model names containing: `claude`, `anthropic`, `sonnet`, `haiku`, `opus`
|
|
- Examples: `claude-3-sonnet`, `claude-3-haiku`, `anthropic/claude-3-opus`
|
|
- Requires: `ANTHROPIC_API_KEY` environment variable
|
|
|
|
**Aider Integration (All Other Models):**
|
|
- Any model not matching Anthropic patterns
|
|
- Examples: `gpt-4`, `ollama/llama3`, `gemini-pro`, `mistral-7b`
|
|
- Requires: `LLM_API_KEY` environment variable
|
|
|
|
### Python API
|
|
|
|
```python
|
|
from aider_gitea import solve_issue_in_repository, create_code_solver
|
|
from pathlib import Path
|
|
import argparse
|
|
|
|
# Solve an issue programmatically with automatic AI assistant selection
|
|
repository_config = RepositoryConfig(
|
|
gitea_url="https://gitea.example.com",
|
|
owner="myorg",
|
|
repo="myproject",
|
|
base_branch="main"
|
|
)
|
|
|
|
# Set the model to control which AI assistant is used
|
|
import aider_gitea
|
|
aider_gitea.CODE_MODEL = "claude-3-sonnet" # Will use Claude Code
|
|
# aider_gitea.CODE_MODEL = "gpt-4" # Will use Aider
|
|
|
|
code_solver = create_code_solver() # Automatically selects based on model
|
|
|
|
solve_issue_in_repository(
|
|
repository_config,
|
|
Path("/path/to/repo"),
|
|
"issue-123-fix-bug",
|
|
"Fix critical bug",
|
|
"The application crashes when processing large files",
|
|
"123",
|
|
gitea_client,
|
|
code_solver
|
|
)
|
|
```
|
|
|
|
### Environment Configuration
|
|
|
|
The tool uses environment variables for sensitive information:
|
|
|
|
**Required for all setups:**
|
|
- `GITEA_TOKEN`: Your Gitea API token
|
|
|
|
**For Aider (non-Anthropic models):**
|
|
- `LLM_API_KEY`: API key for the language model (OpenAI, Ollama, etc.)
|
|
|
|
**For Claude Code (Anthropic models):**
|
|
- `ANTHROPIC_API_KEY`: Your Anthropic API key for Claude models
|
|
|
|
### Model Examples
|
|
|
|
**Anthropic Models (→ Claude Code):**
|
|
```bash
|
|
--aider-model claude-3-sonnet
|
|
--aider-model claude-3-haiku
|
|
--aider-model claude-3-opus
|
|
--aider-model anthropic/claude-3-sonnet
|
|
```
|
|
|
|
**Non-Anthropic Models (→ Aider):**
|
|
```bash
|
|
--aider-model gpt-4
|
|
--aider-model gpt-3.5-turbo
|
|
--aider-model ollama/llama3
|
|
--aider-model ollama/codellama
|
|
--aider-model gemini-pro
|
|
--aider-model mistral-7b
|
|
```
|
|
```
|
|
"""
|
|
|
|
import dataclasses
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from . import secrets
|
|
from ._version import __version__ # noqa: F401
|
|
from .seen_issues_db import SeenIssuesDB as SeenIssuesDB
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Constants for common configuration values
|
|
DEFAULT_TIMEOUT_SECONDS = 10_000
|
|
DEFAULT_DAEMON_INTERVAL_SECONDS = 300
|
|
DEFAULT_BASE_BRANCH = 'main'
|
|
DEFAULT_PR_CONTEXT_LINES = 3
|
|
DEFAULT_LOG_TAIL_LINES = 100
|
|
DEFAULT_MINIMUM_PYTHON_VERSION = '3.9'
|
|
|
|
# Git and repository constants
|
|
INITIAL_RUFF_COMMIT_MESSAGE = 'Initial ruff pass'
|
|
RUFF_CLEANUP_COMMIT_MESSAGE = 'Ruff after'
|
|
PIPELINE_RESOLUTION_COMMIT_PREFIX = 'Resolve pipeline'
|
|
COMMENT_RESOLUTION_COMMIT_PREFIX = 'Resolve comment'
|
|
|
|
# API and model configuration constants
|
|
DEFAULT_ANTHROPIC_INDICATORS = [
|
|
'claude',
|
|
'anthropic',
|
|
'sonnet',
|
|
'haiku',
|
|
'opus',
|
|
]
|
|
|
|
# Error messages as constants
|
|
ERROR_GITEA_URL_EMPTY = 'gitea_url cannot be empty'
|
|
ERROR_OWNER_EMPTY = 'owner cannot be empty'
|
|
ERROR_REPO_EMPTY = 'repo cannot be empty'
|
|
ERROR_BASE_BRANCH_EMPTY = 'base_branch cannot be empty'
|
|
ERROR_GITEA_URL_API_SUFFIX = "gitea_url should not include '/api/v1' suffix"
|
|
ERROR_ISSUE_PARAMS_TYPE = 'Both issue_number and issue_title must be strings'
|
|
ERROR_ISSUE_NUMBER_EMPTY = 'Issue number cannot be empty'
|
|
ERROR_MODEL_TYPE = 'Model must be a string, got {}'
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class RepositoryConfig:
|
|
"""Configuration for a repository to process issues from.
|
|
|
|
Attributes:
|
|
gitea_url: Base URL for the Gitea instance (without '/api/v1').
|
|
owner: Owner/organization name of the repository.
|
|
repo: Repository name.
|
|
base_branch: Base branch name to create feature branches from.
|
|
"""
|
|
|
|
gitea_url: str
|
|
owner: str
|
|
repo: str
|
|
base_branch: str
|
|
|
|
def __post_init__(self):
|
|
"""Validate repository configuration fields."""
|
|
if not self.gitea_url or not self.gitea_url.strip():
|
|
raise ValueError(ERROR_GITEA_URL_EMPTY)
|
|
if not self.owner or not self.owner.strip():
|
|
raise ValueError(ERROR_OWNER_EMPTY)
|
|
if not self.repo or not self.repo.strip():
|
|
raise ValueError(ERROR_REPO_EMPTY)
|
|
if not self.base_branch or not self.base_branch.strip():
|
|
raise ValueError(ERROR_BASE_BRANCH_EMPTY)
|
|
|
|
# Ensure gitea_url doesn't end with /api/v1 (common mistake)
|
|
if self.gitea_url.rstrip('/').endswith('/api/v1'):
|
|
raise ValueError(ERROR_GITEA_URL_API_SUFFIX)
|
|
|
|
def repo_url(self) -> str:
|
|
"""Generate the git clone URL for this repository."""
|
|
return f'{self.gitea_url}:{self.owner}/{self.repo}.git'.replace(
|
|
'https://',
|
|
'git@',
|
|
)
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class IssueResolution:
|
|
success: bool
|
|
pull_request_url: str | None = None
|
|
pull_request_id: int | None = None
|
|
|
|
def __post_init__(self):
|
|
assert self.pull_request_id is None or isinstance(self.pull_request_id, int)
|
|
assert self.pull_request_url is None or isinstance(self.pull_request_url, str)
|
|
|
|
|
|
def generate_branch_name(issue_number: str, issue_title: str) -> str:
|
|
"""Create a branch name by sanitizing the issue title.
|
|
|
|
Non-alphanumeric characters (except spaces) are removed,
|
|
the text is lowercased, and spaces are replaced with dashes.
|
|
|
|
Args:
|
|
issue_number: The issue number to include in the branch name.
|
|
issue_title: The issue title to sanitize and include in the branch name.
|
|
|
|
Returns:
|
|
A sanitized branch name combining the issue number and title.
|
|
|
|
Raises:
|
|
ValueError: If issue_number is empty or invalid.
|
|
TypeError: If arguments are not strings.
|
|
"""
|
|
if not isinstance(issue_number, str) or not isinstance(issue_title, str):
|
|
raise TypeError(ERROR_ISSUE_PARAMS_TYPE)
|
|
|
|
if not issue_number.strip():
|
|
raise ValueError(ERROR_ISSUE_NUMBER_EMPTY)
|
|
|
|
# Sanitize the title, handling empty titles gracefully
|
|
sanitized_title = issue_title.strip() if issue_title else 'untitled'
|
|
sanitized = re.sub(r'[^0-9a-zA-Z ]+', '', sanitized_title)
|
|
parts = ['issue', str(issue_number), *sanitized.lower().split()]
|
|
return '-'.join(parts)
|
|
|
|
|
|
def bash_cmd(*commands: str) -> str:
|
|
commands = ('set -e', *commands)
|
|
return 'bash -c "' + ';'.join(commands) + '"'
|
|
|
|
|
|
AIDER_TEST = bash_cmd(
|
|
'echo "Setting up virtual environment"',
|
|
'virtualenv venv',
|
|
'echo "Activating virtual environment"',
|
|
'source venv/bin/activate',
|
|
'echo "Installing package"',
|
|
'pip install -e .',
|
|
'echo "Testing package"',
|
|
'pytest test',
|
|
)
|
|
|
|
RUFF_FORMAT_AND_AUTO_FIX = bash_cmd(
|
|
'ruff format --silent',
|
|
'ruff check --fix --ignore RUF022 --ignore PGH004 --silent',
|
|
'ruff format --silent',
|
|
'ruff check --fix --ignore RUF022 --ignore PGH004 --silent',
|
|
)
|
|
|
|
AIDER_LINT = bash_cmd(
|
|
RUFF_FORMAT_AND_AUTO_FIX,
|
|
'ruff format',
|
|
'ruff check --ignore RUF022 --ignore PGH004',
|
|
)
|
|
|
|
|
|
LLM_MESSAGE_FORMAT = """{issue}
|
|
|
|
Go ahead with the changes you deem appropriate without waiting for explicit approval.
|
|
|
|
Do not draft changes beforehand; produce changes only once prompted for a specific file.
|
|
"""
|
|
|
|
CLAUDE_CODE_MESSAGE_FORMAT = """{issue}
|
|
|
|
Please fix this issue by making the necessary code changes. Follow these guidelines:
|
|
1. Run tests after making changes to ensure they pass
|
|
2. Follow existing code style and conventions
|
|
3. Make minimal, focused changes to address the issue
|
|
4. Commit your changes with a descriptive message
|
|
|
|
The test command for this project is: {test_command}
|
|
The lint command for this project is: {lint_command}
|
|
"""
|
|
|
|
CODE_MODEL = None
|
|
EVALUATOR_MODEL = None
|
|
|
|
MODEL_EDIT_MODES = {
|
|
'ollama/qwen3:32b': 'diff',
|
|
'ollama/hf.co/unsloth/Qwen3-30B-A3B-GGUF:Q4_K_M': 'diff',
|
|
}
|
|
|
|
|
|
def run_post_solver_cleanup(repository_path: Path, solver_name: str) -> None:
|
|
"""Run standard code quality fixes and commit changes after a code solver.
|
|
|
|
Args:
|
|
repository_path: Path to the repository
|
|
solver_name: Name of the solver (for commit message)
|
|
"""
|
|
# Auto-fix standard code quality stuff
|
|
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], repository_path, check=False)
|
|
run_cmd(['git', 'add', '.'], repository_path)
|
|
run_cmd(
|
|
['git', 'commit', '-m', f'{RUFF_CLEANUP_COMMIT_MESSAGE} {solver_name}'],
|
|
repository_path,
|
|
check=False,
|
|
)
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class CodeSolverStrategy:
|
|
"""Base interface for code solving strategies."""
|
|
|
|
def solve_issue_round(self, repository_path: Path, issue_content: str) -> bool:
|
|
"""Attempt to solve an issue in a single round.
|
|
|
|
Args:
|
|
repository_path: Path to the repository
|
|
issue_content: The issue description to solve
|
|
|
|
Returns:
|
|
True if the solution round completed without crashing, False otherwise
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class AiderCodeSolver(CodeSolverStrategy):
|
|
"""Code solver that uses Aider for issue resolution."""
|
|
|
|
def _create_aider_command(self, issue: str) -> list[str]:
|
|
"""Create the Aider command with all necessary flags."""
|
|
l = [
|
|
'aider',
|
|
'--chat-language',
|
|
'english',
|
|
'--no-stream',
|
|
'--no-analytics',
|
|
'--test-cmd',
|
|
AIDER_TEST,
|
|
'--lint-cmd',
|
|
AIDER_LINT,
|
|
'--auto-test',
|
|
'--no-auto-lint',
|
|
'--yes',
|
|
'--disable-playwright',
|
|
'--timeout',
|
|
str(DEFAULT_TIMEOUT_SECONDS),
|
|
]
|
|
|
|
if edit_format := MODEL_EDIT_MODES.get(CODE_MODEL):
|
|
l.append('--edit-format')
|
|
l.append(edit_format)
|
|
del edit_format
|
|
|
|
for key in secrets.llm_api_keys():
|
|
l += ['--api-key', key]
|
|
|
|
if False:
|
|
l.append('--read')
|
|
l.append('CONVENTIONS.md')
|
|
|
|
if True:
|
|
l.append('--cache-prompts')
|
|
|
|
if False:
|
|
l.append('--architect')
|
|
|
|
if CODE_MODEL:
|
|
l.append('--model')
|
|
l.append(CODE_MODEL)
|
|
|
|
if CODE_MODEL.startswith('ollama/') and False:
|
|
l.append('--auto-lint')
|
|
|
|
if True:
|
|
l.append('--message')
|
|
l.append(LLM_MESSAGE_FORMAT.format(issue=issue))
|
|
|
|
return l
|
|
|
|
def solve_issue_round(self, repository_path: Path, issue_content: str) -> bool:
|
|
"""Solve an issue using Aider."""
|
|
# Primary Aider command
|
|
aider_command = self._create_aider_command(issue_content)
|
|
aider_did_not_crash = run_cmd(
|
|
aider_command,
|
|
cwd=repository_path,
|
|
check=False,
|
|
)
|
|
if not aider_did_not_crash:
|
|
return aider_did_not_crash
|
|
|
|
# Run post-solver cleanup
|
|
run_post_solver_cleanup(repository_path, 'aider')
|
|
|
|
return True
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class ClaudeCodeSolver(CodeSolverStrategy):
|
|
"""Code solver that uses Claude Code for issue resolution."""
|
|
|
|
def _create_claude_command(self, issue: str) -> list[str]:
|
|
"""Create the Claude Code command for programmatic use."""
|
|
cmd = [
|
|
'claude',
|
|
'-p',
|
|
'--output-format',
|
|
'stream-json',
|
|
#'--max-turns', '100',
|
|
'--debug',
|
|
'--verbose',
|
|
'--dangerously-skip-permissions',
|
|
]
|
|
|
|
if CODE_MODEL:
|
|
cmd.extend(['--model', CODE_MODEL])
|
|
|
|
cmd.append(issue)
|
|
return cmd
|
|
|
|
def solve_issue_round(self, repository_path: Path, issue_content: str) -> bool:
|
|
"""Solve an issue using Claude Code."""
|
|
# Prepare the issue prompt for Claude Code
|
|
enhanced_issue = CLAUDE_CODE_MESSAGE_FORMAT.format(
|
|
issue=issue_content,
|
|
test_command=AIDER_TEST,
|
|
lint_command=AIDER_LINT,
|
|
)
|
|
|
|
# Create Claude Code command
|
|
claude_command = self._create_claude_command(enhanced_issue)
|
|
|
|
# Run Claude Code
|
|
run_cmd(
|
|
claude_command,
|
|
cwd=repository_path,
|
|
check=False,
|
|
)
|
|
|
|
# Run post-solver cleanup
|
|
run_post_solver_cleanup(repository_path, 'Claude Code')
|
|
|
|
return True
|
|
|
|
|
|
def is_anthropic_model(model: str) -> bool:
|
|
"""Check if the model string indicates an Anthropic/Claude model.
|
|
|
|
Args:
|
|
model: The model name/identifier to check.
|
|
|
|
Returns:
|
|
True if the model appears to be an Anthropic model, False otherwise.
|
|
|
|
Raises:
|
|
TypeError: If model is not a string.
|
|
"""
|
|
if model is None:
|
|
return False
|
|
|
|
if not isinstance(model, str):
|
|
raise TypeError(ERROR_MODEL_TYPE.format(type(model)))
|
|
|
|
if not model.strip():
|
|
return False
|
|
|
|
anthropic_indicators = DEFAULT_ANTHROPIC_INDICATORS
|
|
|
|
model_lower = model.lower()
|
|
return any(indicator in model_lower for indicator in anthropic_indicators)
|
|
|
|
|
|
def create_code_solver() -> CodeSolverStrategy:
|
|
"""Create the appropriate code solver based on the configured model."""
|
|
if is_anthropic_model(CODE_MODEL):
|
|
return ClaudeCodeSolver()
|
|
else:
|
|
return AiderCodeSolver()
|
|
|
|
|
|
def get_commit_messages(cwd: Path, base_branch: str, current_branch: str) -> list[str]:
|
|
"""Get commit messages between base branch and current branch.
|
|
|
|
Args:
|
|
cwd: The current working directory (repository path).
|
|
base_branch: The name of the base branch to compare against.
|
|
current_branch: The name of the current branch to check for commits.
|
|
|
|
Returns:
|
|
A string containing all commit messages, one per line.
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
['git', 'log', f'{base_branch}..{current_branch}', '--pretty=format:%s'],
|
|
check=True,
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return list(reversed(result.stdout.strip().split('\n')))
|
|
except subprocess.CalledProcessError:
|
|
logger.exception(f'Failed to get commit messages on branch {current_branch}')
|
|
return []
|
|
|
|
|
|
def get_diff(cwd: Path, base_branch: str, current_branch: str) -> str:
|
|
result = subprocess.run(
|
|
['git', 'diff', f'{base_branch}..{current_branch}', '--pretty=format:%s'],
|
|
check=True,
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return result.stdout.strip()
|
|
|
|
|
|
def push_changes(
|
|
repository_config: RepositoryConfig,
|
|
cwd: Path,
|
|
branch_name: str,
|
|
issue_number: str,
|
|
issue_title: str,
|
|
gitea_client,
|
|
) -> IssueResolution:
|
|
# Check if there are any commits on the branch before pushing
|
|
if not has_commits_on_branch(cwd, repository_config.base_branch, branch_name):
|
|
logger.info('No commits made on branch %s, skipping push', branch_name)
|
|
return IssueResolution(False)
|
|
|
|
# Get commit messages for PR description
|
|
commit_messages = get_commit_messages(
|
|
cwd,
|
|
repository_config.base_branch,
|
|
branch_name,
|
|
)
|
|
description = f'This pull request resolves #{issue_number}\n\n'
|
|
|
|
if commit_messages:
|
|
description += '## Commit Messages\n\n'
|
|
for message in commit_messages:
|
|
description += f'- {message}\n'
|
|
|
|
# First push the branch without creating a PR
|
|
cmd = ['git', 'push', 'origin', branch_name, '--force']
|
|
run_cmd(cmd, cwd)
|
|
|
|
# Then create the PR with the aider label
|
|
pr_response = gitea_client.create_pull_request(
|
|
owner=repository_config.owner,
|
|
repo=repository_config.repo,
|
|
title=issue_title,
|
|
body=description,
|
|
head=branch_name,
|
|
base=repository_config.base_branch,
|
|
labels=['aider'],
|
|
)
|
|
|
|
# Extract PR number and URL if available
|
|
return IssueResolution(
|
|
True,
|
|
pr_response.get('html_url'),
|
|
int(pr_response.get('number')),
|
|
)
|
|
|
|
|
|
def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool:
|
|
"""Check if there are any commits on the current branch that aren't in the base branch.
|
|
|
|
Args:
|
|
cwd: The current working directory (repository path).
|
|
base_branch: The name of the base branch to compare against.
|
|
current_branch: The name of the current branch to check for commits.
|
|
|
|
Returns:
|
|
True if there are commits on the current branch not in the base branch, False otherwise.
|
|
"""
|
|
try:
|
|
commit_messages = get_commit_messages(cwd, base_branch, current_branch)
|
|
return bool(list(commit_messages))
|
|
except Exception:
|
|
logger.exception('Failed to check commits on branch %s', current_branch)
|
|
return False
|
|
|
|
|
|
def run_cmd(cmd: list[str], cwd: Path | None = None, check=True) -> bool:
|
|
"""Run a shell command and return its success status.
|
|
|
|
Args:
|
|
cmd: The command to run as a list of strings.
|
|
cwd: The directory to run the command in.
|
|
check: Whether to raise an exception if the command fails.
|
|
|
|
Returns:
|
|
True if the command succeeded, False otherwise.
|
|
"""
|
|
result = subprocess.run(cmd, check=check, cwd=cwd)
|
|
return result.returncode == 0
|
|
|
|
|
|
def remove_thinking_tokens(text: str) -> str:
|
|
text = re.sub(r'^\s*<think>.*?</think>', '', text, flags=re.MULTILINE | re.DOTALL)
|
|
text = text.strip()
|
|
return text
|
|
|
|
|
|
assert remove_thinking_tokens('<think>Hello</think>\nWorld\n') == 'World'
|
|
assert remove_thinking_tokens('<think>\nHello\n</think>\nWorld\n') == 'World'
|
|
assert remove_thinking_tokens('\n<think>\nHello\n</think>\nWorld\n') == 'World'
|
|
|
|
|
|
def run_ollama(cwd: Path, texts: list[str]) -> str:
|
|
cmd = ['ollama', 'run', EVALUATOR_MODEL.removeprefix('ollama/')]
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
cwd=cwd,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
stdout, stderr = process.communicate('\n'.join(texts))
|
|
stdout = remove_thinking_tokens(stdout)
|
|
return stdout
|
|
|
|
|
|
def parse_yes_no_answer(text: str) -> bool | None:
|
|
interword = '\n \t.,?-'
|
|
text = text.lower().strip(interword)
|
|
words = text.split(interword)
|
|
if words[-1] in {'yes', 'agree'}:
|
|
return True
|
|
if words[-1] in {'no', 'disagree'}:
|
|
return False
|
|
return None
|
|
|
|
|
|
assert parse_yes_no_answer('Yes.') == True
|
|
assert parse_yes_no_answer('no') == False
|
|
|
|
|
|
def run_ollama_and_get_yes_or_no(cwd, initial_texts: list[str]) -> bool:
|
|
texts = list(initial_texts)
|
|
texts.append('Think through your answer.')
|
|
while True:
|
|
response = run_ollama(cwd, texts)
|
|
yes_or_no = parse_yes_no_answer(response)
|
|
if yes_or_no is not None:
|
|
return yes_or_no
|
|
else:
|
|
texts.append(response)
|
|
texts.append('Please answer either "yes" or "no".')
|
|
|
|
|
|
def verify_solution(repository_path: Path, issue_content: str) -> bool:
|
|
if not EVALUATOR_MODEL:
|
|
return True
|
|
|
|
summary = run_ollama(
|
|
repository_path,
|
|
[
|
|
'Concisely summarize following changeset',
|
|
get_diff(repository_path, 'main', 'HEAD'),
|
|
],
|
|
)
|
|
|
|
return run_ollama_and_get_yes_or_no(
|
|
repository_path,
|
|
[
|
|
'Does this changeset accomplish the entire task?',
|
|
'# Change set',
|
|
summary,
|
|
'# Issue',
|
|
issue_content,
|
|
],
|
|
)
|
|
|
|
|
|
def get_head_commit_hash(repository_path: Path) -> str:
|
|
return subprocess.run(
|
|
['git', 'rev-parse', 'HEAD'],
|
|
check=True,
|
|
cwd=repository_path,
|
|
capture_output=True,
|
|
text=True,
|
|
).stdout.strip()
|
|
|
|
|
|
def solve_issue_in_repository(
|
|
repository_config: RepositoryConfig,
|
|
repository_path: Path,
|
|
branch_name: str,
|
|
issue_title: str,
|
|
issue_description: str,
|
|
issue_number: str,
|
|
gitea_client,
|
|
code_solver: CodeSolverStrategy,
|
|
) -> IssueResolution:
|
|
logger.info('### %s #####', issue_title)
|
|
|
|
# Setup repository
|
|
run_cmd(['git', 'clone', repository_config.repo_url(), repository_path])
|
|
run_cmd(['bash', '-c', AIDER_TEST], repository_path)
|
|
run_cmd(['git', 'checkout', repository_config.base_branch], repository_path)
|
|
run_cmd(['git', 'checkout', '-b', branch_name], repository_path)
|
|
|
|
# Run initial ruff pass before code solver
|
|
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], repository_path, check=False)
|
|
run_cmd(['git', 'add', '.'], repository_path)
|
|
run_cmd(
|
|
['git', 'commit', '-m', INITIAL_RUFF_COMMIT_MESSAGE],
|
|
repository_path,
|
|
check=False,
|
|
)
|
|
|
|
# Run code solver
|
|
issue_content = f'# {issue_title}\n{issue_description}'
|
|
|
|
while True:
|
|
# Save the commit hash after ruff but before code solver
|
|
pre_aider_commit = get_head_commit_hash(repository_path)
|
|
|
|
# Run code solver
|
|
solver_did_not_crash = code_solver.solve_issue_round(
|
|
repository_path,
|
|
issue_content,
|
|
)
|
|
if not solver_did_not_crash:
|
|
logger.error('Code solver invocation failed for issue #%s', issue_number)
|
|
return IssueResolution(False)
|
|
|
|
# Check if solver made any changes beyond the initial ruff pass
|
|
if not has_commits_on_branch(repository_path, pre_aider_commit, 'HEAD'):
|
|
logger.error(
|
|
'Code solver did not make any changes beyond the initial ruff pass for issue #%s',
|
|
issue_number,
|
|
)
|
|
return IssueResolution(False)
|
|
|
|
# Push changes and create/update the pull request on every iteration
|
|
resolution = push_changes(
|
|
repository_config,
|
|
repository_path,
|
|
branch_name,
|
|
issue_number,
|
|
issue_title,
|
|
gitea_client,
|
|
)
|
|
if not resolution.success:
|
|
return resolution
|
|
|
|
# Verify whether this is a satisfactory solution
|
|
if verify_solution(repository_path, issue_content):
|
|
return resolution
|
|
|
|
|
|
def solve_issues_in_repository(
|
|
repository_config: RepositoryConfig,
|
|
client,
|
|
seen_issues_db,
|
|
):
|
|
"""Process all open issues with the 'aider' label.
|
|
|
|
Args:
|
|
repository_config: Command line arguments.
|
|
client: The Gitea client instance.
|
|
seen_issues_db: Database of previously processed issues.
|
|
"""
|
|
try:
|
|
issues = client.get_issues(repository_config.owner, repository_config.repo)
|
|
except Exception:
|
|
logger.exception('Failed to retrieve issues')
|
|
sys.exit(1)
|
|
|
|
if not issues:
|
|
logger.info('No issues found for %s', repository_config.repo)
|
|
return
|
|
|
|
for issue in issues:
|
|
issue_url = issue.get('web_url')
|
|
issue_number = issue.get('number')
|
|
issue_description = issue.get('body', '')
|
|
title = issue.get('title', f'Issue {issue_number}')
|
|
if seen_issues_db.has_seen(issue_url):
|
|
logger.info('Skipping already processed issue #%s: %s', issue_number, title)
|
|
else:
|
|
branch_name = generate_branch_name(issue_number, title)
|
|
code_solver = create_code_solver()
|
|
with tempfile.TemporaryDirectory() as repository_path:
|
|
issue_resolution = solve_issue_in_repository(
|
|
repository_config,
|
|
Path(repository_path),
|
|
branch_name,
|
|
title,
|
|
issue_description,
|
|
issue_number,
|
|
client,
|
|
code_solver,
|
|
)
|
|
seen_issues_db.mark_as_seen(issue_url, str(issue_number))
|
|
seen_issues_db.update_pr_info(
|
|
issue_url,
|
|
issue_resolution.pull_request_id,
|
|
issue_resolution.pull_request_url,
|
|
)
|
|
logger.info(
|
|
'Stored PR #%s information for issue #%s',
|
|
issue_resolution.pull_request_id,
|
|
issue_number,
|
|
)
|
|
|
|
# TODO: PR comment handling disabled for now due to missing functionality
|
|
if False:
|
|
# Handle unresolved pull request comments
|
|
handle_pr_comments(
|
|
repository_config,
|
|
issue_resolution.pull_request_id,
|
|
branch_name,
|
|
Path(repository_path),
|
|
client,
|
|
seen_issues_db,
|
|
issue_url,
|
|
code_solver,
|
|
)
|
|
|
|
# Handle failing pipelines
|
|
handle_failing_pipelines(
|
|
repository_config,
|
|
issue_resolution.pull_request_id,
|
|
branch_name,
|
|
Path(repository_path),
|
|
client,
|
|
code_solver,
|
|
)
|
|
|
|
|
|
def handle_pr_comments(
|
|
repository_config,
|
|
pr_number: int,
|
|
branch_name,
|
|
repository_path,
|
|
client,
|
|
seen_issues_db,
|
|
issue_url,
|
|
code_solver: CodeSolverStrategy,
|
|
):
|
|
"""Fetch unresolved PR comments and resolve them via code solver."""
|
|
comments = client.get_pull_request_comments(
|
|
repository_config.owner,
|
|
repository_config.repo,
|
|
pr_number,
|
|
)
|
|
for comment in comments:
|
|
path = comment.get('path')
|
|
line = comment.get('line') or comment.get('position') or 0
|
|
|
|
if not path:
|
|
logger.warning('Comment has no path, skipping')
|
|
continue
|
|
|
|
file_path = repository_path / path
|
|
try:
|
|
lines = file_path.read_text(encoding='utf-8').splitlines()
|
|
start = max(0, line - DEFAULT_PR_CONTEXT_LINES)
|
|
end = min(len(lines), line + DEFAULT_PR_CONTEXT_LINES - 1)
|
|
context = '\n'.join(lines[start:end])
|
|
except FileNotFoundError:
|
|
logger.warning('File %s not found for comment context', path)
|
|
context = f'File {path} not found'
|
|
except UnicodeDecodeError as e:
|
|
logger.warning('Failed to decode file %s: %s', path, e)
|
|
context = f'Unable to read file {path} (encoding issue)'
|
|
except Exception as e:
|
|
logger.warning('Failed to read file %s for comment context: %s', path, e)
|
|
context = f'Error reading file {path}'
|
|
body = comment.get('body', '')
|
|
issue = (
|
|
f'Resolve the following reviewer comment:\n{body}\n\n'
|
|
f'File: {path}\n\nContext:\n{context}'
|
|
)
|
|
# invoke code solver on the comment context
|
|
code_solver.solve_issue_round(repository_path, issue)
|
|
# commit and push changes for this comment
|
|
run_cmd(['git', 'add', path], repository_path, check=False)
|
|
run_cmd(
|
|
[
|
|
'git',
|
|
'commit',
|
|
'-m',
|
|
f'{COMMENT_RESOLUTION_COMMIT_PREFIX} {comment.get("id")}',
|
|
],
|
|
repository_path,
|
|
check=False,
|
|
)
|
|
run_cmd(['git', 'push', 'origin', branch_name], repository_path, check=False)
|
|
|
|
|
|
def handle_failing_pipelines(
|
|
repository_config: RepositoryConfig,
|
|
pr_number: str,
|
|
branch_name: str,
|
|
repository_path: Path,
|
|
client,
|
|
code_solver: CodeSolverStrategy,
|
|
) -> None:
|
|
"""Fetch failing pipelines for the given PR and resolve them via code solver."""
|
|
while True:
|
|
failed_runs = client.get_failed_pipelines(
|
|
repository_config.owner,
|
|
repository_config.repo,
|
|
pr_number,
|
|
)
|
|
if not failed_runs:
|
|
break
|
|
for run_id in failed_runs:
|
|
log = client.get_pipeline_log(
|
|
repository_config.owner,
|
|
repository_config.repo,
|
|
run_id,
|
|
)
|
|
lines = log.strip().split('\n')
|
|
context = '\n'.join(lines[-DEFAULT_LOG_TAIL_LINES:])
|
|
issue = f'Resolve the following failing pipeline run {run_id}:\n\n{context}'
|
|
code_solver.solve_issue_round(repository_path, issue)
|
|
run_cmd(['git', 'add', '.'], repository_path, check=False)
|
|
run_cmd(
|
|
[
|
|
'git',
|
|
'commit',
|
|
'-m',
|
|
f'{PIPELINE_RESOLUTION_COMMIT_PREFIX} {run_id}',
|
|
],
|
|
repository_path,
|
|
check=False,
|
|
)
|
|
run_cmd(
|
|
['git', 'push', 'origin', branch_name],
|
|
repository_path,
|
|
check=False,
|
|
)
|