mirror of
https://github.com/SamurAIGPT/llm-wiki-agent.git
synced 2026-07-03 02:47:03 +00:00
refactor: extract shared utilities into tools/_utils.py (closes #57)
Consolidate 7 duplicated utility functions into tools/_utils.py. Net -224 lines, single source of truth.
This commit is contained in:
parent
8464f0320c
commit
11f66f1166
9 changed files with 188 additions and 272 deletions
0
tools/__init__.py
Normal file
0
tools/__init__.py
Normal file
141
tools/_utils.py
Normal file
141
tools/_utils.py
Normal file
|
|
@ -0,0 +1,141 @@
|
|||
"""
|
||||
Shared utilities for LLM Wiki tools.
|
||||
|
||||
Centralizes functions that were previously copy-pasted across tool files:
|
||||
read_file, write_file, call_llm, sha256, extract_wikilinks, all_wiki_pages, append_log.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# ── Paths ──────────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
WIKI_DIR = REPO_ROOT / "wiki"
|
||||
RAW_DIR = REPO_ROOT / "raw"
|
||||
INDEX_FILE = WIKI_DIR / "index.md"
|
||||
LOG_FILE = WIKI_DIR / "log.md"
|
||||
OVERVIEW_FILE = WIKI_DIR / "overview.md"
|
||||
GRAPH_DIR = REPO_ROOT / "graph"
|
||||
SCHEMA_FILE = REPO_ROOT / "CLAUDE.md"
|
||||
|
||||
# Default metadata files to exclude from wiki page listings.
|
||||
_META_EXCLUDE = {"index.md", "log.md", "lint-report.md"}
|
||||
|
||||
|
||||
# ── File I/O ───────────────────────────────────────────────────────────
|
||||
|
||||
def read_file(path: Path) -> str:
|
||||
"""Read file contents as UTF-8. Returns empty string if file doesn't exist."""
|
||||
return path.read_text(encoding="utf-8") if path.exists() else ""
|
||||
|
||||
|
||||
def write_file(path: Path, content: str):
|
||||
"""Write UTF-8 content to file, creating parent directories as needed."""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(content, encoding="utf-8")
|
||||
print(f" wrote: {path.relative_to(REPO_ROOT)}")
|
||||
|
||||
|
||||
# ── LLM ────────────────────────────────────────────────────────────────
|
||||
|
||||
def call_llm(
|
||||
prompt: str,
|
||||
model_env: str = "LLM_MODEL",
|
||||
default_model: str = "claude-3-5-sonnet-latest",
|
||||
max_tokens: int = 4096,
|
||||
) -> str:
|
||||
"""Call an LLM via litellm.
|
||||
|
||||
Args:
|
||||
prompt: The user prompt.
|
||||
model_env: Environment variable name for model selection.
|
||||
default_model: Fallback model if env var is unset.
|
||||
max_tokens: Maximum response tokens. 0 or None to omit the limit.
|
||||
"""
|
||||
try:
|
||||
from litellm import completion
|
||||
except ImportError:
|
||||
print("Error: litellm not installed. Run: pip install litellm")
|
||||
sys.exit(1)
|
||||
|
||||
model = os.getenv(model_env, default_model)
|
||||
|
||||
kwargs: dict = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": prompt}],
|
||||
}
|
||||
if max_tokens:
|
||||
kwargs["max_tokens"] = max_tokens
|
||||
|
||||
response = completion(**kwargs)
|
||||
return response.choices[0].message.content
|
||||
|
||||
|
||||
# ── Hashing ────────────────────────────────────────────────────────────
|
||||
|
||||
def sha256(text: str, truncate: int = 0) -> str:
|
||||
"""SHA-256 hex digest of *text*, optionally truncated to *truncate* chars.
|
||||
|
||||
Default is the full 64-char hash. Pass truncate=16 for the short form
|
||||
used by ingest.py and refresh.py.
|
||||
"""
|
||||
h = hashlib.sha256(text.encode()).hexdigest()
|
||||
return h[:truncate] if truncate else h
|
||||
|
||||
|
||||
# ── Wiki helpers ───────────────────────────────────────────────────────
|
||||
|
||||
def extract_wikilinks(content: str, unique: bool = False) -> list[str]:
|
||||
"""Extract all [[WikiLink]] targets from page content.
|
||||
|
||||
Args:
|
||||
unique: Deduplicate results (used by build_graph.py).
|
||||
"""
|
||||
links = re.findall(r"\[\[([^\]]+)\]\]", content)
|
||||
return list(set(links)) if unique else links
|
||||
|
||||
|
||||
def all_wiki_pages(extra_exclude: set[str] | None = None) -> list[Path]:
|
||||
"""Return all .md files in wiki/, excluding metadata files.
|
||||
|
||||
Args:
|
||||
extra_exclude: Additional filenames to skip (e.g. {"health-report.md"}).
|
||||
"""
|
||||
exclude = _META_EXCLUDE | (extra_exclude or set())
|
||||
return [p for p in WIKI_DIR.rglob("*.md") if p.name not in exclude]
|
||||
|
||||
|
||||
def append_log(entry: str):
|
||||
"""Prepend a log entry to wiki/log.md (newest-first).
|
||||
|
||||
Creates the file with a standard header if it doesn't exist.
|
||||
Preserves the prepend semantics used by ingest.py, query.py, and lint.py.
|
||||
"""
|
||||
entry_text = entry.strip()
|
||||
|
||||
if not LOG_FILE.exists():
|
||||
LOG_FILE.write_text(
|
||||
"# Wiki Log\n\n"
|
||||
"> Records important additions, revisions, and clarifications in the "
|
||||
"project knowledge layer. Maintained in append-only mode for agent and "
|
||||
"human traceability.\n\n"
|
||||
f"{entry_text}\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return
|
||||
|
||||
existing = read_file(LOG_FILE).rstrip()
|
||||
if not existing:
|
||||
existing = (
|
||||
"# Wiki Log\n\n"
|
||||
"> Records important additions, revisions, and clarifications in the "
|
||||
"project knowledge layer. Maintained in append-only mode for agent and "
|
||||
"human traceability."
|
||||
)
|
||||
LOG_FILE.write_text(existing + "\n\n" + entry_text + "\n", encoding="utf-8")
|
||||
|
|
@ -20,15 +20,20 @@ Edge types:
|
|||
"""
|
||||
|
||||
import re
|
||||
import sys
|
||||
import json
|
||||
import hashlib
|
||||
import argparse
|
||||
import statistics
|
||||
import webbrowser
|
||||
from pathlib import Path
|
||||
from datetime import date
|
||||
|
||||
import os
|
||||
# Bootstrap shared utilities
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from tools._utils import (
|
||||
REPO_ROOT, WIKI_DIR, GRAPH_DIR, LOG_FILE, SCHEMA_FILE,
|
||||
read_file, call_llm, sha256, all_wiki_pages, extract_wikilinks, append_log,
|
||||
)
|
||||
|
||||
try:
|
||||
import networkx as nx
|
||||
|
|
@ -38,15 +43,10 @@ except ImportError:
|
|||
HAS_NETWORKX = False
|
||||
print("Warning: networkx not installed. Community detection disabled. Run: pip install networkx")
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
WIKI_DIR = REPO_ROOT / "wiki"
|
||||
GRAPH_DIR = REPO_ROOT / "graph"
|
||||
GRAPH_JSON = GRAPH_DIR / "graph.json"
|
||||
GRAPH_HTML = GRAPH_DIR / "graph.html"
|
||||
CACHE_FILE = GRAPH_DIR / ".cache.json"
|
||||
INFERRED_EDGES_FILE = GRAPH_DIR / ".inferred_edges.jsonl"
|
||||
LOG_FILE = WIKI_DIR / "log.md"
|
||||
SCHEMA_FILE = REPO_ROOT / "CLAUDE.md"
|
||||
|
||||
# Node type → color mapping
|
||||
TYPE_COLORS = {
|
||||
|
|
@ -64,45 +64,6 @@ EDGE_COLORS = {
|
|||
}
|
||||
|
||||
|
||||
def read_file(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8") if path.exists() else ""
|
||||
|
||||
|
||||
def call_llm(prompt: str, model_env: str, default_model: str, max_tokens: int = 4096) -> str:
|
||||
try:
|
||||
from litellm import completion
|
||||
except ImportError:
|
||||
print("Error: litellm not installed. Run: pip install litellm")
|
||||
import sys
|
||||
sys.exit(1)
|
||||
|
||||
model = os.getenv(model_env, default_model)
|
||||
|
||||
kwargs = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": prompt}]
|
||||
}
|
||||
|
||||
if max_tokens:
|
||||
kwargs["max_tokens"] = max_tokens
|
||||
|
||||
response = completion(**kwargs)
|
||||
return response.choices[0].message.content
|
||||
|
||||
|
||||
def sha256(text: str) -> str:
|
||||
return hashlib.sha256(text.encode()).hexdigest()
|
||||
|
||||
|
||||
def all_wiki_pages() -> list[Path]:
|
||||
return [p for p in WIKI_DIR.rglob("*.md")
|
||||
if p.name not in ("index.md", "log.md", "lint-report.md")]
|
||||
|
||||
|
||||
def extract_wikilinks(content: str) -> list[str]:
|
||||
return list(set(re.findall(r'\[\[([^\]]+)\]\]', content)))
|
||||
|
||||
|
||||
def extract_frontmatter_type(content: str) -> str:
|
||||
match = re.search(r'^type:\s*(\S+)', content, re.MULTILINE)
|
||||
return match.group(1).strip('"\'') if match else "unknown"
|
||||
|
|
@ -161,7 +122,7 @@ def build_extracted_edges(pages: list[Path]) -> list[dict]:
|
|||
for p in pages:
|
||||
content = read_file(p)
|
||||
src = page_id(p)
|
||||
for link in extract_wikilinks(content):
|
||||
for link in extract_wikilinks(content, unique=True):
|
||||
target = stem_map.get(link.lower())
|
||||
if target and target != src:
|
||||
key = (src, target)
|
||||
|
|
@ -1180,27 +1141,6 @@ applyFilters();
|
|||
</html>"""
|
||||
|
||||
|
||||
def append_log(entry: str):
|
||||
log_path = WIKI_DIR / "log.md"
|
||||
entry_text = entry.strip()
|
||||
if not log_path.exists():
|
||||
log_path.write_text(
|
||||
"# Wiki Log\n\n"
|
||||
"> Records important additions, revisions, and clarifications in the project knowledge layer. Maintained in append-only mode for agent and human traceability.\n\n"
|
||||
f"{entry_text}\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return
|
||||
|
||||
existing = read_file(log_path).rstrip()
|
||||
if not existing:
|
||||
existing = (
|
||||
"# Wiki Log\n\n"
|
||||
"> Records important additions, revisions, and clarifications in the project knowledge layer. Maintained in append-only mode for agent and human traceability."
|
||||
)
|
||||
log_path.write_text(existing + "\n\n" + entry_text + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
def build_graph(infer: bool = True, open_browser: bool = False, clean: bool = False,
|
||||
report: bool = False, save: bool = False):
|
||||
pages = all_wiki_pages()
|
||||
|
|
|
|||
|
|
@ -2,35 +2,27 @@
|
|||
"""
|
||||
Graph Self-Healing Tool
|
||||
|
||||
Automatically retrieves "Missing Entity Pages" from the wiki and generates
|
||||
comprehensive definition pages for them using the LLM.
|
||||
Automatically retrieves "Missing Entity Pages" from the wiki and generates
|
||||
comprehensive definition pages for them using the LLM.
|
||||
It resolves broken entity links by scanning existing contexts where the entity is referenced.
|
||||
|
||||
Usage:
|
||||
python tools/heal.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
from litellm import completion
|
||||
except ImportError:
|
||||
print("Error: litellm not installed. Run: pip install litellm")
|
||||
sys.exit(1)
|
||||
|
||||
# Ensure tools can be imported
|
||||
# Bootstrap shared utilities
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from tools._utils import REPO_ROOT, WIKI_DIR, call_llm, all_wiki_pages
|
||||
from tools.lint import find_missing_entities
|
||||
|
||||
from tools.lint import find_missing_entities, all_wiki_pages
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
WIKI_DIR = REPO_ROOT / "wiki"
|
||||
ENTITIES_DIR = WIKI_DIR / "entities"
|
||||
|
||||
|
||||
|
||||
def sanitize_filename(name: str) -> str:
|
||||
"""Strip characters that are unsafe in filenames.
|
||||
|
||||
|
|
@ -45,17 +37,6 @@ def sanitize_filename(name: str) -> str:
|
|||
raise ValueError(f"Entity name became empty after sanitization: {original!r}")
|
||||
return name
|
||||
|
||||
def call_llm(prompt: str, max_tokens: int = 1500) -> str:
|
||||
# Use litellm standard environment variables
|
||||
# e.g., GEMINI_API_KEY, ANTHROPIC_API_KEY, OPENAI_API_KEY
|
||||
model = os.getenv("LLM_MODEL", "claude-3-5-haiku-latest") # default to fast model
|
||||
|
||||
response = completion(
|
||||
model=model,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
max_tokens=max_tokens
|
||||
)
|
||||
return response.choices[0].message.content
|
||||
|
||||
def search_sources(entity: str, pages: list[Path]) -> list[Path]:
|
||||
"""Find up to 15 pages where this entity is mentioned natively."""
|
||||
|
|
@ -105,7 +86,7 @@ sources: {[s.name for s in sources]}
|
|||
Write a comprehensive paragraph defining what `{entity}` means in the context of this wiki, its main significance, and any actions or associations related to it.
|
||||
"""
|
||||
try:
|
||||
result = call_llm(prompt)
|
||||
result = call_llm(prompt, default_model="claude-3-5-haiku-latest", max_tokens=1500)
|
||||
safe_name = sanitize_filename(entity)
|
||||
out_path = ENTITIES_DIR / f"{safe_name}.md"
|
||||
# Safety: ensure resolved path stays within entities directory
|
||||
|
|
|
|||
|
|
@ -30,25 +30,14 @@ import argparse
|
|||
from pathlib import Path
|
||||
from datetime import date
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
WIKI_DIR = REPO_ROOT / "wiki"
|
||||
INDEX_FILE = WIKI_DIR / "index.md"
|
||||
LOG_FILE = WIKI_DIR / "log.md"
|
||||
# Bootstrap shared utilities
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from tools._utils import REPO_ROOT, WIKI_DIR, INDEX_FILE, LOG_FILE, read_file, all_wiki_pages
|
||||
|
||||
# Minimum content length (excluding frontmatter) to not be considered a stub
|
||||
STUB_THRESHOLD_CHARS = 100
|
||||
|
||||
|
||||
def read_file(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8") if path.exists() else ""
|
||||
|
||||
|
||||
def all_wiki_pages() -> list[Path]:
|
||||
"""All .md files in wiki/, excluding meta files."""
|
||||
exclude = {"index.md", "log.md", "lint-report.md", "health-report.md"}
|
||||
return [p for p in WIKI_DIR.rglob("*.md") if p.name not in exclude]
|
||||
|
||||
|
||||
def strip_frontmatter(content: str) -> str:
|
||||
"""Remove YAML frontmatter (--- ... ---) from content."""
|
||||
if content.startswith("---"):
|
||||
|
|
@ -198,7 +187,7 @@ def check_log_coverage(pages: list[Path]) -> list[dict]:
|
|||
|
||||
def run_health() -> dict:
|
||||
"""Run all health checks, return structured results."""
|
||||
pages = all_wiki_pages()
|
||||
pages = all_wiki_pages(extra_exclude={"health-report.md"})
|
||||
|
||||
return {
|
||||
"date": date.today().isoformat(),
|
||||
|
|
|
|||
|
|
@ -24,10 +24,8 @@ The LLM reads the source, extracts knowledge, and updates the wiki:
|
|||
- Runs post-ingest validation (broken links, index coverage)
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import hashlib
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
|
|
@ -35,11 +33,12 @@ from pathlib import Path
|
|||
from collections import defaultdict
|
||||
from datetime import date
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
WIKI_DIR = REPO_ROOT / "wiki"
|
||||
LOG_FILE = WIKI_DIR / "log.md"
|
||||
INDEX_FILE = WIKI_DIR / "index.md"
|
||||
OVERVIEW_FILE = WIKI_DIR / "overview.md"
|
||||
# Bootstrap shared utilities
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from tools._utils import (
|
||||
REPO_ROOT, WIKI_DIR, INDEX_FILE, OVERVIEW_FILE, LOG_FILE, SCHEMA_FILE,
|
||||
read_file, write_file, call_llm, sha256, extract_wikilinks, all_wiki_pages, append_log,
|
||||
)
|
||||
|
||||
# File extensions that can be auto-converted to markdown via markitdown.
|
||||
# .md files are ingested directly without conversion.
|
||||
|
|
@ -51,11 +50,6 @@ CONVERTIBLE_EXTENSIONS = {
|
|||
".wav", ".mp3", # audio transcription via markitdown
|
||||
}
|
||||
ALL_SUPPORTED_EXTENSIONS = {".md"} | CONVERTIBLE_EXTENSIONS
|
||||
SCHEMA_FILE = REPO_ROOT / "CLAUDE.md"
|
||||
|
||||
|
||||
def sha256(text: str) -> str:
|
||||
return hashlib.sha256(text.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
def clip(text: str, limit: int = 260) -> str:
|
||||
|
|
@ -66,37 +60,6 @@ def clip(text: str, limit: int = 260) -> str:
|
|||
return clipped + "..."
|
||||
|
||||
|
||||
def read_file(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8") if path.exists() else ""
|
||||
|
||||
|
||||
def call_llm(prompt: str, max_tokens: int = 8192) -> str:
|
||||
try:
|
||||
from litellm import completion
|
||||
except ImportError:
|
||||
print("Error: litellm not installed. Run: pip install litellm")
|
||||
sys.exit(1)
|
||||
|
||||
model = os.getenv("LLM_MODEL", "claude-3-5-sonnet-latest")
|
||||
|
||||
kwargs = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": prompt}]
|
||||
}
|
||||
|
||||
if max_tokens:
|
||||
kwargs["max_tokens"] = max_tokens
|
||||
|
||||
response = completion(**kwargs)
|
||||
return response.choices[0].message.content
|
||||
|
||||
|
||||
def write_file(path: Path, content: str):
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(content, encoding="utf-8")
|
||||
print(f" wrote: {path.relative_to(REPO_ROOT)}")
|
||||
|
||||
|
||||
def build_wiki_context() -> str:
|
||||
parts = []
|
||||
if INDEX_FILE.exists():
|
||||
|
|
@ -135,25 +98,6 @@ def update_index(new_entry: str, section: str = "Sources"):
|
|||
write_file(INDEX_FILE, content)
|
||||
|
||||
|
||||
def append_log(entry: str):
|
||||
existing = read_file(LOG_FILE)
|
||||
write_file(LOG_FILE, entry.strip() + "\n\n" + existing)
|
||||
|
||||
|
||||
def extract_wikilinks(content: str) -> list[str]:
|
||||
"""Extract all [[WikiLink]] targets from page content."""
|
||||
return re.findall(r'\[\[([^\]]+)\]\]', content)
|
||||
|
||||
|
||||
def all_wiki_pages() -> set[str]:
|
||||
"""Return set of all wiki page stems (case-insensitive)."""
|
||||
pages = set()
|
||||
for p in WIKI_DIR.rglob("*.md"):
|
||||
if p.name not in ("index.md", "log.md", "lint-report.md"):
|
||||
pages.add(p.stem.lower())
|
||||
return pages
|
||||
|
||||
|
||||
def validate_ingest(changed_pages: list[str] | None = None) -> dict:
|
||||
"""Validate wiki integrity after an ingest.
|
||||
|
||||
|
|
@ -163,7 +107,7 @@ def validate_ingest(changed_pages: list[str] | None = None) -> dict:
|
|||
|
||||
Returns dict with 'broken_links' and 'unindexed' lists.
|
||||
"""
|
||||
existing_pages = all_wiki_pages()
|
||||
existing_pages = {p.stem.lower() for p in all_wiki_pages()}
|
||||
index_content = read_file(INDEX_FILE).lower()
|
||||
|
||||
# Determine which pages to scan for broken links
|
||||
|
|
@ -252,7 +196,7 @@ def ingest(source_path: str, auto_convert: bool = True):
|
|||
source = converted_path
|
||||
|
||||
source_content = source.read_text(encoding="utf-8")
|
||||
source_hash = sha256(source_content)
|
||||
source_hash = sha256(source_content, truncate=16)
|
||||
today = date.today().isoformat()
|
||||
|
||||
print(f"\nIngesting: {source.name} (hash: {source_hash})")
|
||||
|
|
@ -386,7 +330,6 @@ if __name__ == "__main__":
|
|||
else:
|
||||
print("No broken wikilinks found.")
|
||||
print()
|
||||
pages = all_wiki_pages()
|
||||
index_content = read_file(INDEX_FILE).lower()
|
||||
unindexed_all = []
|
||||
for p in WIKI_DIR.rglob("*.md"):
|
||||
|
|
|
|||
|
|
@ -25,43 +25,14 @@ from pathlib import Path
|
|||
from collections import defaultdict
|
||||
from datetime import date
|
||||
|
||||
import os
|
||||
# Bootstrap shared utilities
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from tools._utils import (
|
||||
REPO_ROOT, WIKI_DIR, GRAPH_DIR, LOG_FILE, SCHEMA_FILE,
|
||||
read_file, call_llm, all_wiki_pages, extract_wikilinks, append_log,
|
||||
)
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
WIKI_DIR = REPO_ROOT / "wiki"
|
||||
GRAPH_DIR = REPO_ROOT / "graph"
|
||||
GRAPH_JSON = GRAPH_DIR / "graph.json"
|
||||
LOG_FILE = WIKI_DIR / "log.md"
|
||||
SCHEMA_FILE = REPO_ROOT / "CLAUDE.md"
|
||||
|
||||
|
||||
def read_file(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8") if path.exists() else ""
|
||||
|
||||
|
||||
def call_llm(prompt: str, model_env: str, default_model: str, max_tokens: int = 4096) -> str:
|
||||
try:
|
||||
from litellm import completion
|
||||
except ImportError:
|
||||
print("Error: litellm not installed. Run: pip install litellm")
|
||||
sys.exit(1)
|
||||
|
||||
model = os.getenv(model_env, default_model)
|
||||
response = completion(
|
||||
model=model,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
max_tokens=max_tokens
|
||||
)
|
||||
return response.choices[0].message.content
|
||||
|
||||
|
||||
def all_wiki_pages() -> list[Path]:
|
||||
return [p for p in WIKI_DIR.rglob("*.md")
|
||||
if p.name not in ("index.md", "log.md", "lint-report.md")]
|
||||
|
||||
|
||||
def extract_wikilinks(content: str) -> list[str]:
|
||||
return re.findall(r'\[\[([^\]]+)\]\]', content)
|
||||
|
||||
|
||||
def page_name_to_path(name: str) -> list[Path]:
|
||||
|
|
@ -432,11 +403,6 @@ Be specific — name the exact pages and claims involved.
|
|||
return report
|
||||
|
||||
|
||||
def append_log(entry: str):
|
||||
existing = read_file(LOG_FILE)
|
||||
LOG_FILE.write_text(entry.strip() + "\n\n" + existing, encoding="utf-8")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Lint the LLM Wiki")
|
||||
parser.add_argument("--save", action="store_true", help="Save lint report to wiki/lint-report.md")
|
||||
|
|
|
|||
|
|
@ -19,39 +19,12 @@ import argparse
|
|||
from pathlib import Path
|
||||
from datetime import date
|
||||
|
||||
import os
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
WIKI_DIR = REPO_ROOT / "wiki"
|
||||
INDEX_FILE = WIKI_DIR / "index.md"
|
||||
LOG_FILE = WIKI_DIR / "log.md"
|
||||
SCHEMA_FILE = REPO_ROOT / "CLAUDE.md"
|
||||
|
||||
|
||||
def read_file(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8") if path.exists() else ""
|
||||
|
||||
|
||||
def write_file(path: Path, content: str):
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(content, encoding="utf-8")
|
||||
print(f" saved: {path.relative_to(REPO_ROOT)}")
|
||||
|
||||
|
||||
def call_llm(prompt: str, model_env: str, default_model: str, max_tokens: int = 4096) -> str:
|
||||
try:
|
||||
from litellm import completion
|
||||
except ImportError:
|
||||
print("Error: litellm not installed. Run: pip install litellm")
|
||||
sys.exit(1)
|
||||
|
||||
model = os.getenv(model_env, default_model)
|
||||
response = completion(
|
||||
model=model,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
max_tokens=max_tokens
|
||||
)
|
||||
return response.choices[0].message.content
|
||||
# Bootstrap shared utilities
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from tools._utils import (
|
||||
REPO_ROOT, WIKI_DIR, INDEX_FILE, LOG_FILE, SCHEMA_FILE,
|
||||
read_file, write_file, call_llm, append_log,
|
||||
)
|
||||
|
||||
|
||||
def find_relevant_pages(question: str, index_content: str) -> list[Path]:
|
||||
|
|
@ -108,11 +81,6 @@ def find_relevant_pages(question: str, index_content: str) -> list[Path]:
|
|||
return relevant[:15] # cap to avoid context overflow
|
||||
|
||||
|
||||
def append_log(entry: str):
|
||||
existing = read_file(LOG_FILE)
|
||||
LOG_FILE.write_text(entry.strip() + "\n\n" + existing, encoding="utf-8")
|
||||
|
||||
|
||||
def query(question: str, save_path: str | None = None):
|
||||
today = date.today().isoformat()
|
||||
|
||||
|
|
|
|||
|
|
@ -11,28 +11,18 @@ Compares raw document hashes against stored hashes to detect changes.
|
|||
Re-ingests changed documents to update wiki/sources/ pages with accurate facts.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import hashlib
|
||||
import re
|
||||
from typing import Optional
|
||||
from pathlib import Path
|
||||
from datetime import date
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
WIKI_DIR = REPO_ROOT / "wiki"
|
||||
RAW_DIR = REPO_ROOT / "raw"
|
||||
# Bootstrap shared utilities
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from tools._utils import REPO_ROOT, WIKI_DIR, RAW_DIR, GRAPH_DIR, read_file, sha256
|
||||
|
||||
SOURCES_DIR = WIKI_DIR / "sources"
|
||||
REFRESH_CACHE = REPO_ROOT / "graph" / ".refresh_cache.json"
|
||||
|
||||
|
||||
def sha256(text: str) -> str:
|
||||
return hashlib.sha256(text.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
def read_file(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8") if path.exists() else ""
|
||||
REFRESH_CACHE = GRAPH_DIR / ".refresh_cache.json"
|
||||
|
||||
|
||||
def load_refresh_cache() -> dict:
|
||||
|
|
@ -49,7 +39,7 @@ def save_refresh_cache(cache: dict):
|
|||
REFRESH_CACHE.write_text(json.dumps(cache, indent=2, ensure_ascii=False))
|
||||
|
||||
|
||||
def extract_source_file(content: str) -> Optional[str]:
|
||||
def extract_source_file(content: str) -> str | None:
|
||||
"""Extract source_file from YAML frontmatter."""
|
||||
match = re.search(r'^source_file:\s*(.+)$', content, re.MULTILINE)
|
||||
if match:
|
||||
|
|
@ -79,7 +69,7 @@ def find_stale_sources(force: bool = False) -> list[tuple[Path, Path]]:
|
|||
continue
|
||||
|
||||
raw_content = read_file(raw_path)
|
||||
current_hash = sha256(raw_content)
|
||||
current_hash = sha256(raw_content, truncate=16)
|
||||
cached_hash = cache.get(str(raw_path))
|
||||
|
||||
if force or cached_hash != current_hash:
|
||||
|
|
@ -90,10 +80,8 @@ def find_stale_sources(force: bool = False) -> list[tuple[Path, Path]]:
|
|||
|
||||
def refresh_page(wiki_page: Path, raw_path: Path) -> bool:
|
||||
"""Re-ingest a single source document."""
|
||||
# Import ingest function
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
try:
|
||||
from ingest import ingest
|
||||
from tools.ingest import ingest
|
||||
print(f"\n{'='*60}")
|
||||
print(f" Refreshing: {wiki_page.name}")
|
||||
print(f" From: {raw_path}")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue