Fix RCE vulnerability and harden security
- Replace eval() with ast.literal_eval() in pdf_utils.py to fix unauthenticated remote code execution via crafted PDF uploads (reported by OX Security) - Sanitize HTML output with DOMPurify to prevent XSS - Restrict CORS origins (configurable via CORS_ORIGINS env var) - Suppress raw exception details in API error responses - Cap Image.MAX_IMAGE_PIXELS to prevent decompression bomb DoS - Add security regression test suite Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,9 @@ FRONTEND_PORT=3000
|
|||||||
MODEL_NAME=deepseek-ai/DeepSeek-OCR
|
MODEL_NAME=deepseek-ai/DeepSeek-OCR
|
||||||
HF_HOME=/models
|
HF_HOME=/models
|
||||||
|
|
||||||
|
# CORS Configuration (comma-separated origins, defaults to http://localhost:3000)
|
||||||
|
CORS_ORIGINS=http://localhost:3000
|
||||||
|
|
||||||
# Upload Configuration
|
# Upload Configuration
|
||||||
MAX_UPLOAD_SIZE_MB=100
|
MAX_UPLOAD_SIZE_MB=100
|
||||||
|
|
||||||
|
|||||||
@@ -86,11 +86,14 @@ app = FastAPI(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# CORS middleware for React frontend
|
# CORS middleware for React frontend
|
||||||
|
CORS_ORIGINS = env_config("CORS_ORIGINS", default="").split(",")
|
||||||
|
CORS_ORIGINS = [o.strip() for o in CORS_ORIGINS if o.strip()]
|
||||||
|
|
||||||
app.add_middleware(
|
app.add_middleware(
|
||||||
CORSMiddleware,
|
CORSMiddleware,
|
||||||
allow_origins=["*"],
|
allow_origins=CORS_ORIGINS if CORS_ORIGINS else ["http://localhost:3000"],
|
||||||
allow_credentials=True,
|
allow_credentials=True,
|
||||||
allow_methods=["*"],
|
allow_methods=["GET", "POST"],
|
||||||
allow_headers=["*"],
|
allow_headers=["*"],
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -373,7 +376,8 @@ async def ocr_inference(
|
|||||||
})
|
})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise HTTPException(status_code=500, detail=f"{type(e).__name__}: {str(e)}")
|
print(f"OCR inference error: {type(e).__name__}: {str(e)}")
|
||||||
|
raise HTTPException(status_code=500, detail="An internal error occurred during OCR processing.")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if tmp_img:
|
if tmp_img:
|
||||||
@@ -573,9 +577,9 @@ async def process_pdf(
|
|||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
import traceback
|
import traceback
|
||||||
print(f"❌ Error processing PDF: {e}")
|
print(f"Error processing PDF: {e}")
|
||||||
print(traceback.format_exc())
|
print(traceback.format_exc())
|
||||||
raise HTTPException(status_code=500, detail=f"{type(e).__name__}: {str(e)}")
|
raise HTTPException(status_code=500, detail="An internal error occurred during PDF processing.")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
host = env_config("API_HOST", default="0.0.0.0")
|
host = env_config("API_HOST", default="0.0.0.0")
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ PDF Processing Utilities for DeepSeek OCR
|
|||||||
Handles PDF to image conversion and batch processing
|
Handles PDF to image conversion and batch processing
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import ast
|
||||||
import io
|
import io
|
||||||
import re
|
import re
|
||||||
from typing import List, Tuple, Dict, Any
|
from typing import List, Tuple, Dict, Any
|
||||||
@@ -39,8 +40,8 @@ def pdf_to_images_high_quality(pdf_bytes: bytes, dpi: int = 144) -> List[Image.I
|
|||||||
# Render page to pixmap
|
# Render page to pixmap
|
||||||
pixmap = page.get_pixmap(matrix=matrix, alpha=False)
|
pixmap = page.get_pixmap(matrix=matrix, alpha=False)
|
||||||
|
|
||||||
# Allow large images
|
# Allow reasonably large images (200 megapixels) but not decompression bombs
|
||||||
Image.MAX_IMAGE_PIXELS = None
|
Image.MAX_IMAGE_PIXELS = 200_000_000
|
||||||
|
|
||||||
# Convert to PIL Image
|
# Convert to PIL Image
|
||||||
img_data = pixmap.tobytes("png")
|
img_data = pixmap.tobytes("png")
|
||||||
@@ -130,7 +131,7 @@ def parse_coordinates(ref_text: Tuple, image_width: int, image_height: int) -> D
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
label_type = ref_text[1]
|
label_type = ref_text[1]
|
||||||
cor_list = eval(ref_text[2])
|
cor_list = ast.literal_eval(ref_text[2])
|
||||||
|
|
||||||
# Scale coordinates from 0-999 to actual pixels
|
# Scale coordinates from 0-999 to actual pixels
|
||||||
scaled_boxes = []
|
scaled_boxes = []
|
||||||
|
|||||||
150
backend/test_security.py
Normal file
150
backend/test_security.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
"""
|
||||||
|
Security regression tests for the eval() RCE vulnerability (OX Security disclosure).
|
||||||
|
|
||||||
|
The vulnerability allowed arbitrary code execution via crafted OCR output
|
||||||
|
that was passed to eval() in parse_coordinates(). The fix uses ast.literal_eval()
|
||||||
|
which only allows literal data structures.
|
||||||
|
|
||||||
|
This test is self-contained and does not require backend dependencies.
|
||||||
|
|
||||||
|
Run: python test_security.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
import ast
|
||||||
|
|
||||||
|
|
||||||
|
def parse_coordinates(ref_text, image_width, image_height):
|
||||||
|
"""
|
||||||
|
Minimal reproduction of pdf_utils.parse_coordinates using the patched code.
|
||||||
|
This mirrors the fixed version that uses ast.literal_eval() instead of eval().
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
label_type = ref_text[1]
|
||||||
|
cor_list = ast.literal_eval(ref_text[2])
|
||||||
|
|
||||||
|
scaled_boxes = []
|
||||||
|
for points in cor_list:
|
||||||
|
x1, y1, x2, y2 = points
|
||||||
|
scaled_box = [
|
||||||
|
int(x1 / 999 * image_width),
|
||||||
|
int(y1 / 999 * image_height),
|
||||||
|
int(x2 / 999 * image_width),
|
||||||
|
int(y2 / 999 * image_height)
|
||||||
|
]
|
||||||
|
scaled_boxes.append(scaled_box)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'label': label_type,
|
||||||
|
'boxes': scaled_boxes
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [Blocked] {type(e).__name__}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def test_legitimate_coordinates():
|
||||||
|
"""Verify that normal coordinate parsing still works."""
|
||||||
|
ref_text = ("full_match", "text", "[[312, 339, 480, 681]]")
|
||||||
|
result = parse_coordinates(ref_text, 1000, 1000)
|
||||||
|
|
||||||
|
assert result is not None, "Legitimate coordinates should parse successfully"
|
||||||
|
assert result['label'] == 'text'
|
||||||
|
assert len(result['boxes']) == 1
|
||||||
|
print("PASS: Legitimate coordinates parse correctly")
|
||||||
|
|
||||||
|
|
||||||
|
def test_multiple_boxes():
|
||||||
|
"""Verify multiple bounding boxes still work."""
|
||||||
|
ref_text = ("full_match", "image", "[[100, 200, 300, 400], [500, 600, 700, 800]]")
|
||||||
|
result = parse_coordinates(ref_text, 1000, 1000)
|
||||||
|
|
||||||
|
assert result is not None, "Multiple boxes should parse successfully"
|
||||||
|
assert len(result['boxes']) == 2
|
||||||
|
print("PASS: Multiple bounding boxes parse correctly")
|
||||||
|
|
||||||
|
|
||||||
|
def test_rce_blocked_import_os():
|
||||||
|
"""The original exploit: __import__('os').system('...') must be blocked."""
|
||||||
|
malicious = "__import__('os').system('echo HACKED')"
|
||||||
|
ref_text = ("full_match", "exploit", malicious)
|
||||||
|
result = parse_coordinates(ref_text, 1000, 1000)
|
||||||
|
|
||||||
|
assert result is None, "Code execution payload should be rejected"
|
||||||
|
print("PASS: __import__('os').system() payload is blocked")
|
||||||
|
|
||||||
|
|
||||||
|
def test_rce_blocked_exec():
|
||||||
|
"""exec() based payloads must be blocked."""
|
||||||
|
malicious = "exec('import os; os.system(\"echo HACKED\")')"
|
||||||
|
ref_text = ("full_match", "exploit", malicious)
|
||||||
|
result = parse_coordinates(ref_text, 1000, 1000)
|
||||||
|
|
||||||
|
assert result is None, "exec() payload should be rejected"
|
||||||
|
print("PASS: exec() payload is blocked")
|
||||||
|
|
||||||
|
|
||||||
|
def test_rce_blocked_eval():
|
||||||
|
"""Nested eval() payloads must be blocked."""
|
||||||
|
malicious = "eval('__import__(\"os\").popen(\"id\").read()')"
|
||||||
|
ref_text = ("full_match", "exploit", malicious)
|
||||||
|
result = parse_coordinates(ref_text, 1000, 1000)
|
||||||
|
|
||||||
|
assert result is None, "Nested eval() payload should be rejected"
|
||||||
|
print("PASS: Nested eval() payload is blocked")
|
||||||
|
|
||||||
|
|
||||||
|
def test_rce_blocked_lambda():
|
||||||
|
"""Lambda-based payloads must be blocked."""
|
||||||
|
malicious = "(lambda: __import__('os').system('echo HACKED'))()"
|
||||||
|
ref_text = ("full_match", "exploit", malicious)
|
||||||
|
result = parse_coordinates(ref_text, 1000, 1000)
|
||||||
|
|
||||||
|
assert result is None, "Lambda payload should be rejected"
|
||||||
|
print("PASS: Lambda payload is blocked")
|
||||||
|
|
||||||
|
|
||||||
|
def test_rce_blocked_comprehension():
|
||||||
|
"""List comprehension code execution must be blocked."""
|
||||||
|
malicious = "[__import__('os').system('echo HACKED') for x in [1]]"
|
||||||
|
ref_text = ("full_match", "exploit", malicious)
|
||||||
|
result = parse_coordinates(ref_text, 1000, 1000)
|
||||||
|
|
||||||
|
assert result is None, "List comprehension payload should be rejected"
|
||||||
|
print("PASS: List comprehension payload is blocked")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("=" * 60)
|
||||||
|
print("Security Regression Tests (OX Security RCE disclosure)")
|
||||||
|
print("=" * 60)
|
||||||
|
print()
|
||||||
|
|
||||||
|
tests = [
|
||||||
|
test_legitimate_coordinates,
|
||||||
|
test_multiple_boxes,
|
||||||
|
test_rce_blocked_import_os,
|
||||||
|
test_rce_blocked_exec,
|
||||||
|
test_rce_blocked_eval,
|
||||||
|
test_rce_blocked_lambda,
|
||||||
|
test_rce_blocked_comprehension,
|
||||||
|
]
|
||||||
|
|
||||||
|
passed = 0
|
||||||
|
failed = 0
|
||||||
|
for test in tests:
|
||||||
|
try:
|
||||||
|
test()
|
||||||
|
passed += 1
|
||||||
|
except AssertionError as e:
|
||||||
|
print(f"FAIL: {test.__name__}: {e}")
|
||||||
|
failed += 1
|
||||||
|
except Exception as e:
|
||||||
|
print(f"ERROR: {test.__name__}: {e}")
|
||||||
|
failed += 1
|
||||||
|
|
||||||
|
print()
|
||||||
|
print(f"Results: {passed} passed, {failed} failed out of {len(tests)} tests")
|
||||||
|
if failed == 0:
|
||||||
|
print("All security tests passed - RCE vulnerability is patched.")
|
||||||
|
else:
|
||||||
|
print("WARNING: Some tests failed!")
|
||||||
@@ -10,6 +10,7 @@
|
|||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"axios": "^1.6.5",
|
"axios": "^1.6.5",
|
||||||
|
"dompurify": "^3.3.3",
|
||||||
"framer-motion": "^11.0.0",
|
"framer-motion": "^11.0.0",
|
||||||
"lucide-react": "^0.344.0",
|
"lucide-react": "^0.344.0",
|
||||||
"react": "^18.3.1",
|
"react": "^18.3.1",
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import { useEffect, useRef, useState, useCallback } from 'react'
|
|||||||
import { motion, AnimatePresence } from 'framer-motion'
|
import { motion, AnimatePresence } from 'framer-motion'
|
||||||
import { Copy, Download, Sparkles, Loader2, CheckCircle2, ChevronDown } from 'lucide-react'
|
import { Copy, Download, Sparkles, Loader2, CheckCircle2, ChevronDown } from 'lucide-react'
|
||||||
import ReactMarkdown from 'react-markdown'
|
import ReactMarkdown from 'react-markdown'
|
||||||
|
import DOMPurify from 'dompurify'
|
||||||
|
|
||||||
export default function ResultPanel({ result, loading, imagePreview, onCopy, onDownload }) {
|
export default function ResultPanel({ result, loading, imagePreview, onCopy, onDownload }) {
|
||||||
const canvasRef = useRef(null)
|
const canvasRef = useRef(null)
|
||||||
@@ -230,7 +231,7 @@ export default function ResultPanel({ result, loading, imagePreview, onCopy, onD
|
|||||||
{isHTML ? (
|
{isHTML ? (
|
||||||
<div
|
<div
|
||||||
className="prose prose-invert prose-sm max-w-none"
|
className="prose prose-invert prose-sm max-w-none"
|
||||||
dangerouslySetInnerHTML={{ __html: result.text }}
|
dangerouslySetInnerHTML={{ __html: DOMPurify.sanitize(result.text) }}
|
||||||
style={{
|
style={{
|
||||||
color: '#e5e7eb',
|
color: '#e5e7eb',
|
||||||
}}
|
}}
|
||||||
|
|||||||
Reference in New Issue
Block a user