| | import os, shutil, tempfile, re |
| | from pathlib import Path |
| | import gradio as gr |
| | from git import Repo |
| | import requests |
| |
|
| | |
| | OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") |
| | OPENROUTER_MODEL = "nvidia/nemotron-nano-12b-v2-vl:free" |
| | OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" |
| | HEADERS = { |
| | "Authorization": f"Bearer {OPENROUTER_API_KEY}", |
| | "Content-Type": "application/json", |
| | } |
| |
|
| | ALLOWED_EXT = { |
| | ".py", ".ipynb", ".md", ".txt", ".js", ".ts", ".tsx", ".jsx", ".java", |
| | ".kt", ".c", ".cpp", ".cs", ".go", ".rs", ".rb", ".php", ".sql", ".html", |
| | ".css", ".yml", ".yaml", ".toml", ".ini", ".json" |
| | } |
| | SKIP_DIRS = { |
| | "node_modules", ".git", "dist", "build", "out", "venv", ".venv", |
| | "__pycache__", ".next", ".cache", "target", "bin", "obj", ".idea", ".vscode" |
| | } |
| | MAX_FILE_BYTES = 800_000 |
| |
|
| | |
| | def clone_repo(url: str) -> Path: |
| | d = Path(tempfile.mkdtemp(prefix=".tmp_repo_")).resolve() |
| | Repo.clone_from(url, d, depth=1) |
| | return d |
| |
|
| | def read_repo_text(repo_dir: Path) -> str: |
| | buf = [] |
| | for root, dirs, files in os.walk(repo_dir): |
| | dirs[:] = [x for x in dirs if x not in SKIP_DIRS] |
| | for f in files: |
| | p = Path(root) / f |
| | if p.suffix.lower() in ALLOWED_EXT and p.stat().st_size <= MAX_FILE_BYTES: |
| | try: |
| | txt = p.read_text(encoding="utf-8", errors="ignore") |
| | if txt.strip(): |
| | rel = str(p.relative_to(repo_dir)) |
| | buf.append(f"\n=== FILE: {rel} ===\n{txt}") |
| | except Exception: |
| | pass |
| | return "\n".join(buf) |
| |
|
| | def analyze_repo(url: str): |
| | if not url or not re.match(r"^https?://", url.strip()): |
| | return None, "β Invalid URL" |
| | repo_dir = None |
| | try: |
| | repo_dir = clone_repo(url.strip()) |
| | text = read_repo_text(repo_dir) |
| | if not text.strip(): |
| | return None, "β οΈ No readable text files found" |
| | kb_size = len(text) // 1000 |
| | return text, f"β
Repo loaded successfully ({kb_size} KB of text)" |
| | except Exception as e: |
| | return None, f"β Error: {e}" |
| | finally: |
| | if repo_dir and Path(repo_dir).exists(): |
| | shutil.rmtree(repo_dir, ignore_errors=True) |
| |
|
| | |
| | def openrouter_chat(system_prompt, user_prompt, context=""): |
| | messages = [{"role": "system", "content": system_prompt}] |
| | if context: |
| | messages.append({"role": "system", "content": f"Repository context:\n{context}"}) |
| | messages.append({"role": "user", "content": user_prompt}) |
| |
|
| | payload = {"model": OPENROUTER_MODEL, "messages": messages} |
| | try: |
| | r = requests.post(OPENROUTER_URL, headers=HEADERS, json=payload, timeout=120) |
| | r.raise_for_status() |
| | obj = r.json() |
| | if "choices" in obj and obj["choices"]: |
| | msg = obj["choices"][0]["message"]["content"] |
| | return msg.strip() |
| | return "[OpenRouter] Unexpected response format." |
| | except Exception as e: |
| | return f"[OpenRouter error] {e}" |
| |
|
| | |
| | SYSTEM_PROMPT = ( |
| | "You are an expert developer assistant. You help users explore and understand " |
| | "a GitHub repository. Base every response strictly on the repo's content and structure. " |
| | "If unsure, say so. Explain clearly and concisely. Avoid hallucinating." |
| | ) |
| |
|
| | def chat_repo(user_msg, chat_history, repo_text): |
| | if not repo_text: |
| | chat_history.append({"role": "assistant", "content": "β Please analyze a repository first."}) |
| | return chat_history, "" |
| | |
| | context = repo_text[:120000] |
| | response = openrouter_chat(SYSTEM_PROMPT, user_msg, context) |
| | chat_history.append({"role": "user", "content": user_msg}) |
| | chat_history.append({"role": "assistant", "content": response}) |
| | return chat_history, "" |
| |
|
| | |
| | with gr.Blocks(title="Repo Chatbot Β· OpenRouter") as demo: |
| | gr.Markdown( |
| | """ |
| | # π€ Repo Chatbot β powered by OpenRouter |
| | Chat with your GitHub repository! |
| | Upload a repo URL and ask anything about its **code, structure, or design**. |
| | _(No embeddings, just pure context reasoning.)_ |
| | """ |
| | ) |
| |
|
| | repo_state = gr.State() |
| | chat_history = gr.State([]) |
| |
|
| | with gr.Row(): |
| | repo_url = gr.Textbox( |
| | label="GitHub repo URL", |
| | placeholder="https://github.com/owner/repo", |
| | scale=4 |
| | ) |
| | analyze_btn = gr.Button("π Analyze Repo", scale=1) |
| |
|
| | status_box = gr.Markdown() |
| |
|
| | |
| | chatbot = gr.Chatbot(label="Repo Chatbot", height=500, type="messages") |
| | user_box = gr.Textbox(label="Ask something about the repo...") |
| |
|
| | clear_btn = gr.Button("π§Ή Clear Chat") |
| |
|
| | |
| | def analyze_repo_cb(url): |
| | text, status = analyze_repo(url) |
| | return text, status |
| |
|
| | analyze_btn.click(analyze_repo_cb, inputs=[repo_url], outputs=[repo_state, status_box]) |
| | user_box.submit(chat_repo, inputs=[user_box, chat_history, repo_state], |
| | outputs=[chatbot, user_box]) |
| | clear_btn.click(lambda: ([], ""), None, [chatbot, user_box]) |
| |
|
| | demo.queue() |
| | demo.launch(server_name="0.0.0.0", server_port=7860,mcp_server=True) |
| |
|