Compare commits

...

2 commits

16 changed files with 406 additions and 45 deletions

View file

@ -13,6 +13,7 @@ urlpatterns = [
path("accounts/", include("accounts.urls")), path("accounts/", include("accounts.urls")),
path("documents/", include("documents.urls")), path("documents/", include("documents.urls")),
path("requests/", include("document_requests.urls")), path("requests/", include("document_requests.urls")),
path("authorization_requests/", include("authorization_requests.urls")),
path("questionnaires/", include("questionnaires.urls")), path("questionnaires/", include("questionnaires.urls")),
path("admin/", admin.site.urls), path("admin/", admin.site.urls),
path("schema/", SpectacularAPIView.as_view(), name="schema"), path("schema/", SpectacularAPIView.as_view(), name="schema"),

View file

@ -0,0 +1,11 @@
from django.contrib import admin
from unfold.admin import ModelAdmin
from .models import AuthorizationRequest
# Register your models here.
@admin.register(AuthorizationRequest)
class AuthorizationRequestAdmin(ModelAdmin):
search_fields = ["id"]
list_display = ["id", "date_requested", "status", "college"]

View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class AuthorizationRequestsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "authorization_requests"

View file

@ -0,0 +1,61 @@
# Generated by Django 5.1.3 on 2025-01-08 16:56
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="AuthorizationRequest",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("documents", models.TextField(max_length=2048)),
(
"date_requested",
models.DateTimeField(
default=django.utils.timezone.now, editable=False
),
),
("college", models.CharField(max_length=64)),
("purpose", models.TextField(max_length=512)),
("remarks", models.TextField(blank=True, max_length=512, null=True)),
(
"status",
models.CharField(
choices=[
("pending", "Pending"),
("approved", "Approved"),
("denied", "Denied"),
],
default="pending",
max_length=32,
),
),
(
"requester",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
],
),
]

View file

@ -0,0 +1,22 @@
from django.db import models
from django.utils.timezone import now
class AuthorizationRequest(models.Model):
requester = models.ForeignKey(
"accounts.CustomUser", on_delete=models.CASCADE)
documents = models.TextField(max_length=2048, blank=False, null=False)
date_requested = models.DateTimeField(default=now, editable=False)
college = models.CharField(max_length=64, blank=False, null=False)
purpose = models.TextField(max_length=512, blank=False, null=False)
STATUS_CHOICES = (
("pending", "Pending"),
("approved", "Approved"),
("denied", "Denied"),
)
remarks = models.TextField(max_length=512, blank=True, null=True)
status = models.CharField(
max_length=32, choices=STATUS_CHOICES, default="pending")

View file

@ -0,0 +1,109 @@
from rest_framework import serializers
from accounts.models import CustomUser
from emails.templates import RequestUpdateEmail
from .models import AuthorizationRequest
class AuthorizationRequestCreationSerializer(serializers.ModelSerializer):
requester = serializers.SlugRelatedField(
many=False, slug_field="id", queryset=CustomUser.objects.all(), required=False
)
documents = serializers.CharField(max_length=2048, required=True)
college = serializers.CharField(max_length=64)
purpose = serializers.CharField(max_length=512)
class Meta:
model = AuthorizationRequest
fields = ["requester", "college", "purpose", "documents"]
def create(self, validated_data):
user = self.context["request"].user
# Set requester to user who sent HTTP request to prevent spoofing
validated_data["requester"] = user
return AuthorizationRequest.objects.create(**validated_data)
class AuthorizationRequestSerializer(serializers.ModelSerializer):
requester = serializers.SlugRelatedField(
many=False,
slug_field="full_name",
queryset=CustomUser.objects.all(),
)
date_requested = serializers.DateTimeField(
format="%m-%d-%Y %I:%M %p", read_only=True
)
class Meta:
model = AuthorizationRequest
fields = [
"id",
"requester",
"college",
"purpose",
"date_requested",
"documents",
"remarks",
"status",
]
read_only_fields = [
"id",
"requester",
"college",
"purpose",
"date_requested",
"documents",
"remarks,"
"status",
]
class AuthorizationRequestUpdateSerializer(serializers.ModelSerializer):
status = serializers.ChoiceField(
choices=AuthorizationRequest.STATUS_CHOICES, required=True
)
class Meta:
model = AuthorizationRequest
fields = ["id", "status", "remarks"]
read_only_fields = ["id"]
def update(self, instance, validated_data):
print(validated_data)
if instance.status == "denied" or instance.status == "approved":
raise serializers.ValidationError(
{
"error": "Already approved/denied requests cannot be updated. You should instead create a new request and approve it from there"
}
)
elif "status" not in validated_data:
raise serializers.ValidationError(
{
"error": "No status value update provided"
}
)
elif validated_data["status"] == instance.status:
raise serializers.ValidationError(
{"error": "Request form status provided is the same as current status"}
)
elif validated_data["status"] == "denied" and "remarks" not in validated_data:
raise serializers.ValidationError(
{"error": "Request denial requires remarks"}
)
representation = super().update(instance, validated_data)
# Send an email on request status update
try:
email = RequestUpdateEmail()
email.context = {"request_status": validated_data["status"]}
if validated_data["status"] == "denied":
email.context = {"remarks": validated_data["remarks"]}
else:
email.context = {"remarks": "N/A"}
email.send(to=[instance.requester.email])
except:
# Silence out errors if email sending fails
pass
return representation

View file

@ -0,0 +1,12 @@
from django.urls import path, include
from .views import (
AuthorizationRequestCreateView,
AuthorizationRequestUpdateView,
AuthorizationRequestListView,
)
urlpatterns = [
path("create/", AuthorizationRequestCreateView.as_view()),
path("list/", AuthorizationRequestListView.as_view()),
path("update/<int:pk>/", AuthorizationRequestUpdateView.as_view()),
]

View file

@ -0,0 +1,53 @@
from rest_framework import generics
from rest_framework.permissions import IsAuthenticated
from rest_framework.pagination import PageNumberPagination
from accounts.permissions import IsHead, IsStaff
from rest_framework.pagination import PageNumberPagination
from .serializers import (
AuthorizationRequestCreationSerializer,
AuthorizationRequestSerializer,
AuthorizationRequestUpdateSerializer
)
from .models import AuthorizationRequest
class AuthorizationRequestCreateView(generics.CreateAPIView):
"""
Used by clients to create authorization requests. Requires passing in request information in addition to the documents themselves
"""
http_method_names = ["post"]
serializer_class = AuthorizationRequestCreationSerializer
permission_classes = [IsAuthenticated]
class AuthorizationRequestListView(generics.ListAPIView):
"""
Returns authorization requests. If authorization requests are approved, also returns the link to download the document.
Staff/Head are able to view all authorization requests here. Clients are only able to view their own requests.
"""
http_method_names = ["get"]
serializer_class = AuthorizationRequestSerializer
pagination_class = PageNumberPagination
permission_classes = [IsAuthenticated]
def get_queryset(self):
user = self.request.user
if user.role == "client":
queryset = AuthorizationRequest.objects.filter(requester=user)
else:
queryset = AuthorizationRequest.objects.all()
return queryset
class AuthorizationRequestUpdateView(generics.UpdateAPIView):
"""
Used by head approve or deny authorization requests.
"""
http_method_names = ["patch"]
serializer_class = AuthorizationRequestUpdateSerializer
permission_classes = [IsAuthenticated, IsHead]
queryset = AuthorizationRequest.objects.all()

View file

@ -18,8 +18,9 @@ import logging
import time import time
from ollama import Client from ollama import Client
from pydantic import BaseModel from pydantic import BaseModel
from datetime import date, datetime
from typing import Optional from typing import Optional
import json import calendar
class PDFHandler(FileSystemEventHandler): class PDFHandler(FileSystemEventHandler):
@ -87,11 +88,6 @@ class PDFHandler(FileSystemEventHandler):
# Try to pass image to the Ollama image recognition API first # Try to pass image to the Ollama image recognition API first
try: try:
class DocumentCategory(BaseModel):
category: str = "other"
sent_from: str = "N/A"
explanation: Optional[str] = None
client = Client( client = Client(
host=get_secret("OLLAMA_URL"), host=get_secret("OLLAMA_URL"),
auth=httpx.BasicAuth( auth=httpx.BasicAuth(
@ -101,15 +97,54 @@ class PDFHandler(FileSystemEventHandler):
encoded_image = base64.b64encode( encoded_image = base64.b64encode(
img_buffer.getvalue()).decode() img_buffer.getvalue()).decode()
# First LLM API call to determine category
class DocumentSchema(BaseModel):
category: str = "other"
explanation: Optional[str] = None
possible_categories = set((Document.objects.all().values_list( possible_categories = set((Document.objects.all().values_list(
"document_type", flat=True), "Documented Procedures Manual", "Form", "Special Order", "Memorandum")) "document_type", flat=True), "Documented Procedures Manual", "Form", "Special Order", "Memorandum"))
prompt = f""" prompt = f"""
Read the text from the image and provide a category. Return as JSON. Read the text from the image and provide a document_type.
Possible categories are: {possible_categories}. You are free to create a new one if none are suitable. Possible document types are: {possible_categories}. You are free to create a new one if none are suitable.
If the document is of type Special Order or Memorandum, provide the sender of the document. Possible senders are Vice President, President, Chancellor. If the document_type is Special Order or Memorandum, provide the sender of the document under sent_from.
provide N/A.
Do all of this and return your output in JSON.
"""
response = client.chat(
model=get_secret("OLLAMA_MODEL"),
messages=[
{"role": "user",
"content": prompt,
"images": [encoded_image]},
],
format=DocumentSchema.model_json_schema(),
options={
"temperature": 0
},
)
result = DocumentSchema.model_validate_json(
response.message.content)
document_type = result.category
# Second LLM API call to determine other details
class DocumentSchema(BaseModel):
sent_from: str = "N/A"
subject: str = "N/A"
document_date: Optional[date]
explanation: Optional[str] = None
prompt = f"""
Determine who sent the document. Otherwise, return N/A.
Identify the subject or possible title of the document.
Return the date of the document if it exists.
Do all of this and return your output in JSON.
""" """
response = client.chat( response = client.chat(
model=get_secret("OLLAMA_MODEL"), model=get_secret("OLLAMA_MODEL"),
@ -118,55 +153,62 @@ class PDFHandler(FileSystemEventHandler):
"content": prompt, "content": prompt,
"images": [encoded_image]}, "images": [encoded_image]},
], ],
format=DocumentCategory.model_json_schema(), format=DocumentSchema.model_json_schema(),
options={ options={
"temperature": 0 "temperature": 0
}, },
) )
result = DocumentSchema.model_validate_json(
DocumentCategory.model_validate_json(
response.message.content) response.message.content)
result = json.loads(response.message.content)
document_type = result.get("category") sent_from = result.sent_from
sent_from = result.get("sent_from") document_date = result.document_date
if document_date:
document_month = document_date.strftime("%B")
document_year = result.document_date.year
# Set as none for invalid dates
if document_year < 1980:
document_month = "no_month"
document_year = "no_year"
else:
document_month = "no_month"
document_year = "no_year"
# If that fails, just use regular OCR read the title as a dirty fix/fallback # If that fails, just use regular OCR read the title as a dirty fix/fallback
except Exception as e: except Exception as e:
document_type = "other"
sent_from = "N/A"
document_month = "no_month"
document_year = "no_year"
self.logger.warning(f"Error! {e}") self.logger.warning(f"Error! {e}")
self.logger.warning( self.logger.warning(
"Ollama OCR offload failed. Falling back to default OCR") "Ollama OCR offload failed. Using defaults for missing values")
lines = text.split("\n")
for line in lines:
if line.strip():
document_type = line.strip().lower()
break
if not document_type:
document_type = "other"
metadata += text metadata += text
# Open the file for instance creation # Open the file for instance creation
DOCUMENT, created = Document.objects.get_or_create( DOCUMENT = Document.objects.filter(
name=filename.replace(".pdf", ""), name=filename.replace(".pdf", "")).first()
defaults={ if not DOCUMENT:
"number_pages": num_pages, DOCUMENT = Document.objects.create(
"ocr_metadata": metadata, name=filename.replace(".pdf", ""),
"document_type": document_type, number_pages=num_pages,
}, ocr_metadata=metadata,
) document_type=document_type,
sent_from=sent_from,
document_month=document_month,
document_year=document_year
)
if created:
DOCUMENT.file.save( DOCUMENT.file.save(
name=filename, content=File(open(file_path, "rb"))) name=filename, content=File(open(file_path, "rb")))
self.logger.info( self.logger.info(
f"Document '{filename}' created successfully with type '{ f"Document '{filename}' created successfully with type '{
document_type}'. sent_from: {sent_from}" document_type}'. sent_from: {sent_from}, document_month: {document_month}, document_year: {document_year}"
) )
DOCUMENT.sent_from = sent_from
DOCUMENT.save()
else: else:
self.logger.info(f"Document '{filename}' already exists.") self.logger.info(f"Document '{filename}' already exists.")

View file

@ -96,6 +96,7 @@ INSTALLED_APPS = [
"accounts", "accounts",
"documents", "documents",
"document_requests", "document_requests",
"authorization_requests",
"questionnaires", "questionnaires",
"django_cleanup.apps.CleanupConfig", "django_cleanup.apps.CleanupConfig",
] ]

View file

@ -35,6 +35,10 @@ class DocumentRequestCreationSerializer(serializers.ModelSerializer):
def create(self, validated_data): def create(self, validated_data):
user = self.context["request"].user user = self.context["request"].user
documents_data = validated_data.pop("documents") documents_data = validated_data.pop("documents")
if not documents_data:
raise serializers.ValidationError(
{"error": "No documents provided"}
)
# Set requester to user who sent HTTP request to prevent spoofing # Set requester to user who sent HTTP request to prevent spoofing
validated_data["requester"] = user validated_data["requester"] = user
@ -200,9 +204,9 @@ class DocumentRequestUpdateSerializer(serializers.ModelSerializer):
# Send an email on request status update # Send an email on request status update
try: try:
email = RequestUpdateEmail() email = RequestUpdateEmail()
email.context = {"request_status": instance.status} email.context = {"request_status": validated_data["status"]}
if instance.status == "denied": if validated_data["status"] == "denied":
email.context = {"remarks": instance.remarks} email.context = {"remarks": validated_data["remarks"]}
else: else:
email.context = {"remarks": "N/A"} email.context = {"remarks": "N/A"}
email.send(to=[instance.requester.email]) email.send(to=[instance.requester.email])

View file

@ -7,5 +7,7 @@ from .models import Document
@admin.register(Document) @admin.register(Document)
class DocumentAdmin(ModelAdmin): class DocumentAdmin(ModelAdmin):
model = Document model = Document
search_fields = ["id", "name", "document_type"] search_fields = ["id", "name", "subject", "sent_from", "document_year",
list_display = ["id", "name", "document_type", "date_uploaded"] "document_month", "document_type"]
list_display = ["id", "name", "subject", "sent_from", "document_year",
"document_month", "document_type", "date_uploaded"]

View file

@ -0,0 +1,28 @@
# Generated by Django 5.1.3 on 2025-01-08 14:41
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("documents", "0004_rename_memorandum_from_document_sent_from"),
]
operations = [
migrations.AddField(
model_name="document",
name="document_month",
field=models.CharField(blank=True, max_length=128, null=True),
),
migrations.AddField(
model_name="document",
name="document_year",
field=models.CharField(blank=True, max_length=128, null=True),
),
migrations.AddField(
model_name="document",
name="subject",
field=models.CharField(blank=True, max_length=128, null=True),
),
]

View file

@ -12,12 +12,21 @@ class Document(models.Model):
sent_from = models.CharField( sent_from = models.CharField(
max_length=128, null=True, blank=True max_length=128, null=True, blank=True
) )
document_month = models.CharField(
max_length=128, null=True, blank=True
)
document_year = models.CharField(
max_length=128, null=True, blank=True
)
subject = models.CharField(
max_length=128, null=True, blank=True
)
number_pages = models.IntegerField(null=False, blank=False) number_pages = models.IntegerField(null=False, blank=False)
ocr_metadata = models.TextField(null=True, blank=True) ocr_metadata = models.TextField(null=True, blank=True)
def upload_to(instance, filename): def upload_to(instance, filename):
_, extension = filename.rsplit(".", 1) _, extension = filename.rsplit(".", 1)
return "documents/%s_%s.%s" % (now(), str(uuid.uuid4()), extension) return f"documents/{instance.document_type}/{instance.document_year}/{str(uuid.uuid4())}.{extension}"
file = models.FileField(upload_to=upload_to) file = models.FileField(upload_to=upload_to)