DocManagerBackend/docmanager_backend/config/management/commands/start_watcher.py

246 lines
8.9 KiB
Python

import base64
import httpx
from django.core.management.base import BaseCommand
from io import BytesIO
from documents.models import Document
from PIL import Image
import pytesseract
import fitz
import os
from config.settings import MEDIA_ROOT
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from documents.models import Document
from config.settings import get_secret
from django.core.files import File
import logging
import time
from ollama import Client
from pydantic import BaseModel
from datetime import date, datetime
from typing import Optional
import calendar
class PDFHandler(FileSystemEventHandler):
def __init__(self):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
self.logger = logging.getLogger(__name__)
self.logger.info("Starting Document Watcher...")
def on_created(self, event):
if event.is_directory:
return None
if event.src_path.endswith(".pdf"):
self.logger.info(f"New PDF file detected: {event.src_path}")
self.process_pdf(event.src_path)
def process_pdf(self, file_path):
try:
# Get the original filename and directory
original_filename = os.path.basename(file_path)
original_dir = os.path.dirname(file_path)
# Check if the filename contains spaces
if " " in original_filename:
# Create the new filename by replacing spaces
new_filename = original_filename.replace(" ", "_")
# Construct the new full file path
new_file_path = os.path.join(original_dir, new_filename)
# Rename the file
os.rename(file_path, new_file_path)
# Update the filename and file_path variables
filename = new_filename
file_path = new_file_path
else:
filename = original_filename
metadata = ""
document_type = ""
with fitz.open(file_path) as doc:
num_pages = len(doc)
# Perform OCR only on the first page
page = doc[0]
pix = page.get_pixmap(matrix=(1.2, 1.2))
# Convert pixmap to bytes
img_bytes = pix.tobytes()
# Create a BytesIO object
img_buffer = BytesIO(img_bytes)
# Create a PIL Image object from the bytes
img = Image.open(img_buffer)
# Perform OCR
text = pytesseract.image_to_string(img).strip()
# Try to pass image to the Ollama image recognition API first
try:
client = Client(
host=get_secret("OLLAMA_URL"),
auth=httpx.BasicAuth(
username=get_secret("OLLAMA_USERNAME"), password=get_secret("OLLAMA_PASSWORD")) if get_secret("OLLAMA_USE_AUTH") else None,
)
encoded_image = base64.b64encode(
img_buffer.getvalue()).decode()
# First LLM API call to determine category
class DocumentSchema(BaseModel):
category: str = "other"
explanation: Optional[str] = None
possible_categories = set((Document.objects.all().values_list(
"document_type", flat=True), "Documented Procedures Manual", "Form", "Special Order", "Memorandum"))
prompt = f"""
Read the text from the image and provide a document_type.
Possible document types are: {possible_categories}. You are free to create a new one if none are suitable.
If the document_type is Special Order or Memorandum, provide the sender of the document under sent_from.
Do all of this and return your output in JSON.
"""
response = client.chat(
model=get_secret("OLLAMA_MODEL"),
messages=[
{"role": "user",
"content": prompt,
"images": [encoded_image]},
],
format=DocumentSchema.model_json_schema(),
options={
"temperature": 0
},
)
result = DocumentSchema.model_validate_json(
response.message.content)
document_type = result.category
# Second LLM API call to determine other details
class DocumentSchema(BaseModel):
sent_from: str = "N/A"
subject: str = "N/A"
document_date: Optional[date]
explanation: Optional[str] = None
prompt = f"""
Determine who sent the document. Otherwise, return N/A.
Identify the subject or possible title of the document.
Return the date of the document if it exists.
Do all of this and return your output in JSON.
"""
response = client.chat(
model=get_secret("OLLAMA_MODEL"),
messages=[
{"role": "user",
"content": prompt,
"images": [encoded_image]},
],
format=DocumentSchema.model_json_schema(),
options={
"temperature": 0
},
)
result = DocumentSchema.model_validate_json(
response.message.content)
sent_from = result.sent_from
document_date = result.document_date
if document_date:
document_month = document_date.strftime("%B")
document_year = result.document_date.year
# Set as none for invalid dates
if document_year < 1980:
document_month = "no_month"
document_year = "no_year"
else:
document_month = "no_month"
document_year = "no_year"
# If that fails, just use regular OCR read the title as a dirty fix/fallback
except Exception as e:
document_type = "other"
sent_from = "N/A"
document_month = "no_month"
document_year = "no_year"
self.logger.warning(f"Error! {e}")
self.logger.warning(
"Ollama OCR offload failed. Using defaults for missing values")
metadata += text
# Open the file for instance creation
DOCUMENT = Document.objects.filter(
name=filename.replace(".pdf", "")).first()
if not DOCUMENT:
DOCUMENT = Document.objects.create(
name=filename.replace(".pdf", ""),
number_pages=num_pages,
ocr_metadata=metadata,
document_type=document_type,
sent_from=sent_from,
document_month=document_month,
document_year=document_year
)
DOCUMENT.file.save(
name=filename, content=File(open(file_path, "rb")))
self.logger.info(
f"Document '{filename}' created successfully with type '{
document_type}'. sent_from: {sent_from}, document_month: {document_month}, document_year: {document_year}"
)
else:
self.logger.info(f"Document '{filename}' already exists.")
os.remove(file_path)
except Exception as e:
self.logger.error(f"Error processing PDF: {str(e)}")
class PDFWatcher:
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = PDFHandler()
watch_directory = os.path.join(MEDIA_ROOT, "uploads")
self.observer.schedule(event_handler, watch_directory, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
self.observer.join()
class Command(BaseCommand):
help = "Runs a dedicated file watcher service"
def handle(self, *args, **options):
watcher = PDFWatcher()
watcher.run()