Only rerun if the issue text has been updated #16
|
@ -15,8 +15,8 @@ import subprocess
|
|||
import os
|
||||
|
||||
import re
|
||||
|
||||
from . import secrets
|
||||
from .seen_issues_db import SeenIssuesDB
|
||||
|
||||
def generate_branch_name(issue_number: str, issue_title: str) -> str:
|
||||
"""
|
||||
|
@ -134,6 +134,7 @@ def main():
|
|||
logging.basicConfig(level='INFO')
|
||||
args = parse_args()
|
||||
|
||||
seen_issues_db = SeenIssuesDB()
|
||||
client = GiteaClient(args.gitea_url, secrets.gitea_token())
|
||||
|
||||
try:
|
||||
|
@ -150,7 +151,13 @@ def main():
|
|||
issue_number = issue.get("number")
|
||||
issue_description = issue.get("body", "")
|
||||
title = issue.get("title", f"Issue {issue_number}")
|
||||
issue_text = f"{title}\n{issue_description}"
|
||||
if seen_issues_db.has_seen(issue_text):
|
||||
logger.info(f"Skipping already processed issue #{issue_number}: {title}")
|
||||
continue
|
||||
|
||||
branch_name = generate_branch_name(issue_number, title)
|
||||
seen_issues_db.mark_as_seen(issue_text)
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
process_issue(args, Path(tmpdirname), branch_name, title, issue_description, issue_number)
|
||||
|
|
28
aider_gitea/seen_issues_db.py
Normal file
28
aider_gitea/seen_issues_db.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
import sqlite3
|
||||
from hashlib import sha256
|
||||
|
||||
class SeenIssuesDB:
|
||||
def __init__(self, db_path='seen_issues.db'):
|
||||
self.conn = sqlite3.connect(db_path)
|
||||
self._create_table()
|
||||
|
||||
def _create_table(self):
|
||||
with self.conn:
|
||||
self.conn.execute('''
|
||||
CREATE TABLE IF NOT EXISTS seen_issues (
|
||||
issue_hash TEXT PRIMARY KEY
|
||||
)
|
||||
''')
|
||||
|
||||
def mark_as_seen(self, issue_text: str):
|
||||
issue_hash = self._compute_hash(issue_text)
|
||||
with self.conn:
|
||||
self.conn.execute('INSERT OR IGNORE INTO seen_issues (issue_hash) VALUES (?)', (issue_hash,))
|
||||
|
||||
def has_seen(self, issue_text: str) -> bool:
|
||||
issue_hash = self._compute_hash(issue_text)
|
||||
cursor = self.conn.execute('SELECT 1 FROM seen_issues WHERE issue_hash = ?', (issue_hash,))
|
||||
return cursor.fetchone() is not None
|
||||
|
||||
def _compute_hash(self, text: str) -> str:
|
||||
return sha256(text.encode('utf-8')).hexdigest()
|
25
test/test_seen_issues_db.py
Normal file
25
test/test_seen_issues_db.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
# Add the aider_gitea module to the Python path
|
||||
sys.path.append(str(Path(__file__).resolve().parent.parent))
|
||||
from aider_gitea.seen_issues_db import SeenIssuesDB
|
||||
|
||||
class TestSeenIssuesDB(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.db = SeenIssuesDB(':memory:')
|
||||
|
||||
def test_mark_and_check_seen_issue(self):
|
||||
issue_text = "Test issue"
|
||||
self.assertFalse(self.db.has_seen(issue_text))
|
||||
self.db.mark_as_seen(issue_text)
|
||||
self.assertTrue(self.db.has_seen(issue_text))
|
||||
|
||||
def test_unseen_issue(self):
|
||||
issue_text = "Unseen issue"
|
||||
self.assertFalse(self.db.has_seen(issue_text))
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user