DocManagerBackend/docmanager_backend/config/management/commands/start_watcher.py

295 lines
11 KiB
Python
Raw Normal View History

2024-12-07 02:44:45 +08:00
import base64
import httpx
from django.core.management.base import BaseCommand
2024-11-27 00:32:28 +08:00
from io import BytesIO
from documents.models import Document
from PIL import Image
import pytesseract
import fitz
import os
from config.settings import MEDIA_ROOT
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from documents.models import Document
2024-12-07 02:44:45 +08:00
from config.settings import get_secret
from django.core.files import File
2024-11-27 00:32:28 +08:00
import logging
import time
2024-12-07 02:44:45 +08:00
from ollama import Client
from pydantic import BaseModel
from datetime import date, datetime
from typing import Optional
import calendar
2024-11-27 00:32:28 +08:00
class PDFHandler(FileSystemEventHandler):
def __init__(self):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
2024-11-27 00:32:28 +08:00
self.logger = logging.getLogger(__name__)
self.logger.info("Starting Document Watcher...")
def on_created(self, event):
if event.is_directory:
return None
if event.src_path.endswith(".pdf"):
2024-11-27 00:32:28 +08:00
self.logger.info(f"New PDF file detected: {event.src_path}")
self.process_pdf(event.src_path)
def process_pdf(self, file_path):
try:
2024-12-07 02:44:45 +08:00
# Get the original filename and directory
original_filename = os.path.basename(file_path)
original_dir = os.path.dirname(file_path)
# Check if the filename contains spaces
if " " in original_filename:
# Create the new filename by replacing spaces
new_filename = original_filename.replace(" ", "_")
# Construct the new full file path
new_file_path = os.path.join(original_dir, new_filename)
# Rename the file
os.rename(file_path, new_file_path)
# Update the filename and file_path variables
filename = new_filename
file_path = new_file_path
else:
filename = original_filename
2024-11-27 00:32:28 +08:00
metadata = ""
document_type = ""
with fitz.open(file_path) as doc:
num_pages = len(doc)
# Perform OCR only on the first page
page = doc[0]
pix = page.get_pixmap(matrix=(1.2, 1.2))
# Convert pixmap to bytes
img_bytes = pix.tobytes()
# Create a BytesIO object
img_buffer = BytesIO(img_bytes)
# Create a PIL Image object from the bytes
img = Image.open(img_buffer)
# Perform OCR
text = pytesseract.image_to_string(img).strip()
# Try to pass image to the Ollama image recognition API first
try:
client = Client(
host=get_secret("OLLAMA_URL"),
auth=httpx.BasicAuth(
username=get_secret("OLLAMA_USERNAME"), password=get_secret("OLLAMA_PASSWORD")) if get_secret("OLLAMA_USE_AUTH") else None,
)
encoded_image = base64.b64encode(
img_buffer.getvalue()).decode()
2025-01-09 14:53:40 +08:00
# Determine category
class DocumentSchema(BaseModel):
category: str = "other"
explanation: Optional[str] = None
possible_categories = set((Document.objects.all().values_list(
"document_type", flat=True), "Documented Procedures Manual", "Form", "Special Order", "Memorandum"))
prompt = f"""
Read the text from the image and provide a document_type.
Possible document types are: {possible_categories}. You are free to create a new one if none are suitable.
Do all of this and return your output in JSON.
"""
response = client.chat(
model=get_secret("OLLAMA_MODEL"),
messages=[
{"role": "user",
"content": prompt,
"images": [encoded_image]},
],
format=DocumentSchema.model_json_schema(),
options={
"temperature": 0
},
)
result = DocumentSchema.model_validate_json(
response.message.content)
document_type = result.category
2025-01-09 14:53:40 +08:00
# Determine sender
class DocumentSchema(BaseModel):
sent_from: str = "N/A"
explanation: Optional[str] = None
prompt = f"""
Determine who sent the document. Otherwise, return N/A.
2025-01-09 14:53:40 +08:00
Do all of this and return your output in JSON.
"""
response = client.chat(
model=get_secret("OLLAMA_MODEL"),
messages=[
{"role": "user",
"content": prompt,
"images": [encoded_image]},
],
format=DocumentSchema.model_json_schema(),
options={
"temperature": 0
},
)
result = DocumentSchema.model_validate_json(
response.message.content)
sent_from = result.sent_from
# Determine subject
class DocumentSchema(BaseModel):
subject: str = "N/A"
explanation: Optional[str] = None
prompt = f"""
Identify the subject of the document if it exists.
Do all of this and return your output in JSON.
"""
response = client.chat(
model=get_secret("OLLAMA_MODEL"),
messages=[
{"role": "user",
"content": prompt,
"images": [encoded_image]},
],
format=DocumentSchema.model_json_schema(),
options={
"temperature": 0
},
)
result = DocumentSchema.model_validate_json(
response.message.content)
2025-01-09 14:53:40 +08:00
document_subject = result.subject
# Determine date
class DocumentSchema(BaseModel):
document_date: Optional[date]
explanation: Optional[str] = None
prompt = f"""
Identify the date of the document if it exists.
If you are unable to determine the date, return nothing.
Do all of this and return your output in JSON.
"""
response = client.chat(
model=get_secret("OLLAMA_MODEL"),
messages=[
{"role": "user",
"content": prompt,
"images": [encoded_image]},
],
format=DocumentSchema.model_json_schema(),
options={
"temperature": 0
},
)
result = DocumentSchema.model_validate_json(
response.message.content)
document_date = result.document_date
if document_date:
document_month = document_date.strftime("%B")
document_year = result.document_date.year
# Set as none for invalid dates
if document_year < 1980:
document_month = "no_month"
document_year = "no_year"
else:
document_month = "no_month"
document_year = "no_year"
# If that fails, just use regular OCR read the title as a dirty fix/fallback
except Exception as e:
document_type = "other"
sent_from = "N/A"
document_month = "no_month"
document_year = "no_year"
self.logger.warning(f"Error! {e}")
self.logger.warning(
"Ollama OCR offload failed. Using defaults for missing values")
metadata += text
2024-11-27 00:32:28 +08:00
# Open the file for instance creation
DOCUMENT = Document.objects.filter(
name=filename.replace(".pdf", "")).first()
if not DOCUMENT:
DOCUMENT = Document.objects.create(
name=filename.replace(".pdf", ""),
number_pages=num_pages,
ocr_metadata=metadata,
document_type=document_type,
sent_from=sent_from,
document_month=document_month,
2025-01-09 14:53:40 +08:00
document_year=document_year,
subject=document_subject
)
DOCUMENT.file.save(
name=filename, content=File(open(file_path, "rb")))
self.logger.info(
f"Document '{filename}' created successfully with type '{
document_type}'. sent_from: {sent_from}, document_month: {document_month}, document_year: {document_year}"
)
2024-11-27 00:32:28 +08:00
else:
self.logger.info(f"Document '{filename}' already exists.")
os.remove(file_path)
except Exception as e:
self.logger.error(f"Error processing PDF: {str(e)}")
class PDFWatcher:
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = PDFHandler()
2024-11-27 00:49:20 +08:00
watch_directory = os.path.join(MEDIA_ROOT, "uploads")
2024-11-27 00:32:28 +08:00
self.observer.schedule(event_handler, watch_directory, recursive=True)
2024-11-27 00:32:28 +08:00
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
self.observer.join()
class Command(BaseCommand):
2024-11-27 00:35:59 +08:00
help = "Runs a dedicated file watcher service"
2024-11-27 00:32:28 +08:00
def handle(self, *args, **options):
watcher = PDFWatcher()
watcher.run()