import asyncio
import logging
import queue
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from django.db import transaction
from logmancer.conf import get_bool
from logmancer.models import LogEntry
from logmancer.notifications.manager import notification_manager
logger = logging.getLogger("logmancer.utils")
[docs]
class LogEvent:
_notification_executor = ThreadPoolExecutor(max_workers=3, thread_name_prefix="logmancer_notif")
_notification_queue = queue.Queue(maxsize=1000)
_worker_started = False
@classmethod
def _log(cls, level, message, **kwargs):
"""Internal log method with notification support"""
def create_log():
try:
log_entry = LogEntry.objects.create(
message=message,
level=level,
source=kwargs.get("source", "manual"),
path=kwargs.get("path"),
method=kwargs.get("method"),
status_code=kwargs.get("status_code"),
meta=kwargs.get("meta", {}),
user=kwargs.get("user"),
actor_type=kwargs.get("actor_type", "user"),
)
notify = kwargs.pop("notify", False)
enabled = get_bool("ENABLE_NOTIFICATIONS")
if notify and enabled:
cls._queue_notification(log_entry, kwargs)
return log_entry
except Exception as e:
logger.error(f"LogEvent _log error: {e}")
transaction.on_commit(create_log)
@classmethod
def _queue_notification(cls, log_entry, context):
"""Queue notification for async processing"""
try:
cls._notification_queue.put_nowait((log_entry, context))
if not cls._worker_started:
cls._start_notification_worker()
cls._worker_started = True
except queue.Full:
logger.error("Notification queue is full, dropping notification")
@classmethod
def _start_notification_worker(cls):
"""Start background worker for processing notifications"""
def notification_worker():
while True:
try:
log_entry, context = cls._notification_queue.get(timeout=30)
cls._notification_executor.submit(
cls._send_notification_async, log_entry, context
)
cls._notification_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Notification worker error: {e}")
worker_thread = Thread(target=notification_worker, daemon=True, name="logmancer_worker")
worker_thread.start()
@classmethod
def _send_notification_async(cls, log_entry, context):
"""Run async notifications inside this worker thread"""
try:
asyncio.run(notification_manager.send_notifications(log_entry, context))
except Exception as e:
logger.error(f"Sending notification failed: {e}")
[docs]
@classmethod
def info(cls, message, **kwargs):
cls._log("INFO", message, **kwargs)
[docs]
@classmethod
def warning(cls, message, **kwargs):
cls._log("WARNING", message, **kwargs)
[docs]
@classmethod
def error(cls, message, **kwargs):
cls._log("ERROR", message, **kwargs)
[docs]
@classmethod
def debug(cls, message, **kwargs):
cls._log("DEBUG", message, **kwargs)
[docs]
@classmethod
def critical(cls, message, **kwargs):
cls._log("CRITICAL", message, **kwargs)
[docs]
@classmethod
def fatal(cls, message, **kwargs):
cls._log("FATAL", message, **kwargs)
[docs]
@classmethod
def notset(cls, message, **kwargs):
cls._log("NOTSET", message, **kwargs)