SimpleLearn_2 / src /documentProcessing.py
MrSimple01's picture
Update src/documentProcessing.py
b559d3b verified
Raw
History Blame Contribute Delete
3.04 kB
import docx
import PyPDF2
import os
import re
import json
import time
import tempfile
from typing import Dict, Any, List, Optional
from src.quiz_processing import analyze_document
def extract_text_from_pdf(pdf_path):
text = ""
try:
with open(pdf_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
for page_num in range(len(reader.pages)):
text += reader.pages[page_num].extract_text() + "\n"
return text
except Exception as e:
raise Exception(f"Error extracting text from PDF: {str(e)}")
def extract_text_from_docx(docx_path):
try:
doc = docx.Document(docx_path)
text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
return text
except Exception as e:
raise Exception(f"Error extracting text from DOCX: {str(e)}")
def extract_text_from_txt(txt_path):
try:
with open(txt_path, 'r', encoding='utf-8') as file:
text = file.read()
return text
except Exception as e:
raise Exception(f"Error extracting text from TXT: {str(e)}")
def process_document(document_path, gemini_api_key, language, content_type):
try:
# Create a temporary file
file_extension = os.path.splitext(document_path.name)[-1].lower()
temp_file = tempfile.mktemp(suffix=file_extension)
# Handle different file-like objects
if hasattr(document_path, 'read'):
# If it's a file-like object with read method
with open(temp_file, 'wb') as f:
f.write(document_path.read())
elif hasattr(document_path, 'file'):
# If it's a Django or similar web framework file upload
with open(temp_file, 'wb') as f:
for chunk in document_path.file.chunks():
f.write(chunk)
elif isinstance(document_path, str):
# If it's a file path string
temp_file = document_path
else:
raise Exception("Unsupported document_path type")
# Process based on file type
if file_extension == '.pdf':
text = extract_text_from_pdf(temp_file)
elif file_extension == '.docx':
text = extract_text_from_docx(temp_file)
elif file_extension == '.txt':
text = extract_text_from_txt(temp_file)
else:
raise Exception(f"Unsupported file type: {file_extension}")
text_file_path = tempfile.mktemp(suffix='.txt')
with open(text_file_path, 'w', encoding='utf-8') as f:
f.write(text)
# Assume this function is defined elsewhere
formatted_output, json_path, txt_path = analyze_document(
text, gemini_api_key, language, content_type
)
return f"Document processed successfully", text_file_path, formatted_output, txt_path, json_path
except Exception as e:
error_message = f"Error processing document: {str(e)}"
return error_message, None, error_message, None, None