DocManagerBackend/docmanager_backend/config/management/commands/start_watcher.py

205 lines
7 KiB
Python
Raw Normal View History

2024-12-07 02:44:45 +08:00
import base64
import httpx
from django.core.management.base import BaseCommand
2024-11-27 00:32:28 +08:00
from io import BytesIO
from documents.models import Document
from PIL import Image
import pytesseract
import fitz
import os
from config.settings import MEDIA_ROOT
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from documents.models import Document
2024-12-07 02:44:45 +08:00
from config.settings import get_secret
from django.core.files import File
2024-11-27 00:32:28 +08:00
import logging
import time
2024-12-07 02:44:45 +08:00
from ollama import Client
from pydantic import BaseModel
from typing import Optional
import json
2024-11-27 00:32:28 +08:00
class PDFHandler(FileSystemEventHandler):
def __init__(self):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
2024-11-27 00:32:28 +08:00
self.logger = logging.getLogger(__name__)
self.logger.info("Starting Document Watcher...")
def on_created(self, event):
if event.is_directory:
return None
if event.src_path.endswith(".pdf"):
2024-11-27 00:32:28 +08:00
self.logger.info(f"New PDF file detected: {event.src_path}")
self.process_pdf(event.src_path)
def process_pdf(self, file_path):
try:
2024-12-07 02:44:45 +08:00
# Get the original filename and directory
original_filename = os.path.basename(file_path)
original_dir = os.path.dirname(file_path)
# Check if the filename contains spaces
if " " in original_filename:
# Create the new filename by replacing spaces
new_filename = original_filename.replace(" ", "_")
# Construct the new full file path
new_file_path = os.path.join(original_dir, new_filename)
# Rename the file
os.rename(file_path, new_file_path)
# Update the filename and file_path variables
filename = new_filename
file_path = new_file_path
else:
filename = original_filename
2024-11-27 00:32:28 +08:00
metadata = ""
document_type = ""
with fitz.open(file_path) as doc:
num_pages = len(doc)
# Perform OCR only on the first page
page = doc[0]
pix = page.get_pixmap(matrix=(1.2, 1.2))
# Convert pixmap to bytes
img_bytes = pix.tobytes()
# Create a BytesIO object
img_buffer = BytesIO(img_bytes)
# Create a PIL Image object from the bytes
img = Image.open(img_buffer)
# Perform OCR
text = pytesseract.image_to_string(img).strip()
# Try to pass image to the Ollama image recognition API first
try:
class DocumentCategory(BaseModel):
category: str = "other"
sent_from: str = "N/A"
explanation: Optional[str] = None
client = Client(
host=get_secret("OLLAMA_URL"),
auth=httpx.BasicAuth(
username=get_secret("OLLAMA_USERNAME"), password=get_secret("OLLAMA_PASSWORD")) if get_secret("OLLAMA_USE_AUTH") else None,
)
encoded_image = base64.b64encode(
img_buffer.getvalue()).decode()
possible_categories = set((Document.objects.all().values_list(
"document_type", flat=True), "Documented Procedures Manual", "Form", "Special Order", "Memorandum"))
prompt = f"""
Read the text from the image and provide a category. Return as JSON.
Possible categories are: {possible_categories}. You are free to create a new one if none are suitable.
If the document is of type Special Order or Memorandum, provide the sender of the document. Possible senders are Vice President, President, Chancellor.
provide N/A.
"""
response = client.chat(
model=get_secret("OLLAMA_MODEL"),
messages=[
{"role": "user",
"content": prompt,
"images": [encoded_image]},
],
format=DocumentCategory.model_json_schema(),
options={
"temperature": 0
},
)
DocumentCategory.model_validate_json(
response.message.content)
result = json.loads(response.message.content)
document_type = result.get("category")
sent_from = result.get("sent_from")
# If that fails, just use regular OCR read the title as a dirty fix/fallback
except Exception as e:
self.logger.warning(f"Error! {e}")
self.logger.warning(
"Ollama OCR offload failed. Falling back to default OCR")
lines = text.split("\n")
for line in lines:
if line.strip():
document_type = line.strip().lower()
break
if not document_type:
document_type = "other"
metadata += text
2024-11-27 00:32:28 +08:00
# Open the file for instance creation
DOCUMENT, created = Document.objects.get_or_create(
2024-12-16 14:58:50 +08:00
name=filename.replace(".pdf", ""),
2024-11-27 00:32:28 +08:00
defaults={
"number_pages": num_pages,
"ocr_metadata": metadata,
"document_type": document_type,
},
2024-11-27 00:32:28 +08:00
)
if created:
DOCUMENT.file.save(
name=filename, content=File(open(file_path, "rb")))
self.logger.info(
f"Document '{filename}' created successfully with type '{
document_type}'. sent_from: {sent_from}"
)
DOCUMENT.sent_from = sent_from
DOCUMENT.save()
2024-11-27 00:32:28 +08:00
else:
self.logger.info(f"Document '{filename}' already exists.")
os.remove(file_path)
except Exception as e:
self.logger.error(f"Error processing PDF: {str(e)}")
class PDFWatcher:
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = PDFHandler()
2024-11-27 00:49:20 +08:00
watch_directory = os.path.join(MEDIA_ROOT, "uploads")
2024-11-27 00:32:28 +08:00
self.observer.schedule(event_handler, watch_directory, recursive=True)
2024-11-27 00:32:28 +08:00
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
self.observer.join()
class Command(BaseCommand):
2024-11-27 00:35:59 +08:00
help = "Runs a dedicated file watcher service"
2024-11-27 00:32:28 +08:00
def handle(self, *args, **options):
watcher = PDFWatcher()
watcher.run()