Add job tracking with PostgreSQL, image storage, and review workflow

- Add PostgreSQL service to docker-compose with health check and postgres_data volume
- Mount ./ocr_images as bind volume for persistent image storage
- Add backend/database.py with schema init and get_db() context manager
- Add 5 new API endpoints: POST /api/jobs, GET /api/jobs (search), GET /api/jobs/{id},
  GET /api/jobs/{id}/image, PUT /api/jobs/{id}/review
- Jobs are saved with author/book/chapter/page metadata, auto UUID, and submitted_at timestamp
- Jobs start as 'unreviewed'; review captures edited text, reviewer name, and reviewed_at
- Add MetadataForm.jsx (author/book/chapter/page inputs) to the New Job panel
- Add JobsPanel.jsx with search/filter, paginated list, and detail pane with review form
- Add "Commit Job" button to ResultPanel (plain_ocr mode only) with success/error feedback
- Add "New Job" / "Browse Jobs" navigation to the app header

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Aaron Roberts
2026-06-09 16:48:12 +01:00
parent 68147eb97c
commit fd747e6c23
9 changed files with 1208 additions and 212 deletions

View File

@@ -1,14 +1,17 @@
import os
import re
import uuid
import tempfile
import shutil
import base64
from typing import List, Dict, Any, Optional
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
from pydantic import BaseModel
import torch
from transformers import AutoModel, AutoTokenizer
from PIL import Image
@@ -24,6 +27,9 @@ from pdf_utils import (
clean_markdown_content
)
from format_converter import DocumentConverter
from database import init_db, get_db
OCR_IMAGES_DIR = env_config("OCR_IMAGES_DIR", default="/data/ocr_images")
# -----------------------------
# Lifespan context for model loading
@@ -36,6 +42,15 @@ async def lifespan(app: FastAPI):
"""Load model on startup, cleanup on shutdown"""
global model, tokenizer
# Image storage directory
os.makedirs(OCR_IMAGES_DIR, exist_ok=True)
# Database
try:
init_db()
except Exception as exc:
print(f"Warning: database initialization failed: {exc}")
# Environment setup
os.environ.pop("TRANSFORMERS_CACHE", None)
MODEL_NAME = env_config("MODEL_NAME", default="deepseek-ai/DeepSeek-OCR")
@@ -581,6 +596,238 @@ async def process_pdf(
print(traceback.format_exc())
raise HTTPException(status_code=500, detail="An internal error occurred during PDF processing.")
# -----------------------------
# Job management routes
# -----------------------------
class ReviewRequest(BaseModel):
reviewed_text: str
reviewer_name: str
def _job_row_to_dict(row) -> Dict[str, Any]:
"""Convert a DB row (RealDictRow) to a plain dict with serialisable values."""
d = dict(row)
for key, val in d.items():
if isinstance(val, datetime):
d[key] = val.isoformat()
elif val is not None and hasattr(val, '__str__') and type(val).__name__ == 'UUID':
d[key] = str(val)
return d
@app.post("/api/jobs")
async def commit_job(
image: UploadFile = File(...),
author: str = Form(""),
book: str = Form(""),
chapter: str = Form(""),
page: str = Form(""),
ocr_text: str = Form(""),
mode: str = Form("plain_ocr"),
):
"""Commit an OCR job: save the image and insert a DB record."""
job_id = str(uuid.uuid4())
# Determine file extension from original filename or content type
original_filename = image.filename or "image"
ext = os.path.splitext(original_filename)[1].lower()
if not ext:
ct = (image.content_type or "").lower()
ext_map = {
"image/png": ".png", "image/jpeg": ".jpg", "image/jpg": ".jpg",
"image/webp": ".webp", "image/gif": ".gif", "image/bmp": ".bmp",
}
ext = ext_map.get(ct, ".png")
image_path = os.path.join(OCR_IMAGES_DIR, f"{job_id}{ext}")
try:
content = await image.read()
with open(image_path, "wb") as f:
f.write(content)
except Exception as exc:
raise HTTPException(status_code=500, detail="Failed to save image file.")
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO ocr_jobs
(id, author, book, chapter, page, image_path, original_filename,
ocr_text, mode, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'unreviewed')
RETURNING *
""",
(job_id, author or None, book or None, chapter or None,
page or None, image_path, original_filename,
ocr_text or None, mode),
)
row = cur.fetchone()
except Exception as exc:
# Clean up saved image if DB insert fails
try:
os.remove(image_path)
except Exception:
pass
print(f"Job commit DB error: {exc}")
raise HTTPException(status_code=500, detail="Failed to save job to database.")
return JSONResponse(_job_row_to_dict(row), status_code=201)
@app.get("/api/jobs")
async def list_jobs(
search: Optional[str] = Query(None, description="General text search across all fields"),
author: Optional[str] = Query(None),
book: Optional[str] = Query(None),
chapter: Optional[str] = Query(None),
status: Optional[str] = Query(None, description="unreviewed | reviewed"),
limit: int = Query(20, ge=1, le=200),
offset: int = Query(0, ge=0),
):
"""Search and list jobs. All filters are optional and combinable."""
conditions = []
params: List[Any] = []
if search:
conditions.append(
"(author ILIKE %s OR book ILIKE %s OR chapter ILIKE %s "
"OR page ILIKE %s OR ocr_text ILIKE %s OR reviewer_name ILIKE %s)"
)
like = f"%{search}%"
params.extend([like, like, like, like, like, like])
if author:
conditions.append("author ILIKE %s")
params.append(f"%{author}%")
if book:
conditions.append("book ILIKE %s")
params.append(f"%{book}%")
if chapter:
conditions.append("chapter ILIKE %s")
params.append(f"%{chapter}%")
if status:
conditions.append("status = %s")
params.append(status)
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
f"SELECT COUNT(*) AS total FROM ocr_jobs {where}",
params,
)
total = cur.fetchone()["total"]
cur.execute(
f"""
SELECT id, author, book, chapter, page, submitted_at, status,
reviewer_name, reviewed_at, mode, original_filename
FROM ocr_jobs {where}
ORDER BY submitted_at DESC
LIMIT %s OFFSET %s
""",
params + [limit, offset],
)
rows = [_job_row_to_dict(r) for r in cur.fetchall()]
except Exception as exc:
print(f"list_jobs DB error: {exc}")
raise HTTPException(status_code=500, detail="Database error.")
return JSONResponse({"total": total, "limit": limit, "offset": offset, "jobs": rows})
@app.get("/api/jobs/{job_id}")
async def get_job(job_id: str):
"""Retrieve full job record including OCR text."""
try:
uuid.UUID(job_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid job ID.")
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM ocr_jobs WHERE id = %s", (job_id,))
row = cur.fetchone()
except Exception as exc:
print(f"get_job DB error: {exc}")
raise HTTPException(status_code=500, detail="Database error.")
if not row:
raise HTTPException(status_code=404, detail="Job not found.")
return JSONResponse(_job_row_to_dict(row))
@app.get("/api/jobs/{job_id}/image")
async def get_job_image(job_id: str):
"""Serve the stored image for a job."""
try:
uuid.UUID(job_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid job ID.")
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT image_path FROM ocr_jobs WHERE id = %s", (job_id,))
row = cur.fetchone()
except Exception as exc:
print(f"get_job_image DB error: {exc}")
raise HTTPException(status_code=500, detail="Database error.")
if not row:
raise HTTPException(status_code=404, detail="Job not found.")
path = row["image_path"]
if not os.path.isfile(path):
raise HTTPException(status_code=404, detail="Image file not found on disk.")
return FileResponse(path)
@app.put("/api/jobs/{job_id}/review")
async def review_job(job_id: str, body: ReviewRequest):
"""Mark a job as reviewed with the corrected text and reviewer name."""
try:
uuid.UUID(job_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid job ID.")
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE ocr_jobs
SET status = 'reviewed',
reviewed_text = %s,
reviewer_name = %s,
reviewed_at = NOW()
WHERE id = %s
RETURNING *
""",
(body.reviewed_text, body.reviewer_name, job_id),
)
row = cur.fetchone()
except Exception as exc:
print(f"review_job DB error: {exc}")
raise HTTPException(status_code=500, detail="Database error.")
if not row:
raise HTTPException(status_code=404, detail="Job not found.")
return JSONResponse(_job_row_to_dict(row))
if __name__ == "__main__":
host = env_config("API_HOST", default="0.0.0.0")
port = env_config("API_PORT", default=8000, cast=int)