Source code for asclepias_broker.monitoring.models
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 CERN.
#
# Asclepias Broker is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Monitoring database models."""
import uuid
import enum
import datetime
from sqlalchemy.sql.sqltypes import Boolean
from sqlalchemy import func
from invenio_db import db
from sqlalchemy.dialects import postgresql
from sqlalchemy_utils.models import Timestamp
from sqlalchemy_utils.types import JSONType, UUIDType
[docs]class HarvestStatus(enum.Enum):
"""Event status."""
New = 1
Processing = 2
Error = 3
Done = 4
[docs]class ErrorMonitoring(db.Model, Timestamp):
"""Error monitoring model."""
__tablename__ = 'error_monitoring'
id = db.Column(UUIDType, default=uuid.uuid4, primary_key=True)
event_id = db.Column(UUIDType)
origin = db.Column(db.String, nullable=False)
error = db.Column(db.String)
n_retries = db.Column(db.Integer)
payload = db.Column(
db.JSON()
.with_variant(postgresql.JSONB(none_as_null=True), 'postgresql')
.with_variant(JSONType(), 'sqlite'),
default=dict,
)
[docs] @classmethod
def getLastWeeksErrors(cls, **kwargs):
"""Gets all the errors from last week where it has been rerun for at least 2 times all ready"""
last_week = datetime.datetime.now() - datetime.timedelta(days = 8)
resp = cls.query.filter(cls.created > str(last_week), cls.n_retries > 2).all()
return resp
[docs] @classmethod
def getFromEvent(cls, event_id):
"""Dictionary representation of the error."""
return cls.query.filter_by(event_id=event_id).one_or_none()
[docs] def to_dict(self):
"""Dictionary representation of the error."""
return dict(created=self.created, updated = self.updated, id = self.id, origin = self.origin, error = self.error, payload = self.payload)
def __repr__(self):
"""String representation of the error."""
return str(self.to_dict())
[docs]class HarvestMonitoring(db.Model, Timestamp):
"""Harvesting monitoring model."""
__tablename__ = 'harvest_monitoring'
id = db.Column(UUIDType, default=uuid.uuid4, primary_key=True)
identifier = db.Column(db.String, nullable=False)
scheme = db.Column(db.String)
harvester = db.Column(db.String)
status = db.Column(db.Enum(HarvestStatus), nullable=False)
[docs] @classmethod
def get(cls, id: str = None, **kwargs):
"""Get the event from the database."""
return cls.query.filter_by(id=id).one_or_none()
[docs] @classmethod
def isRecentlyAdded(cls, identifier: str, scheme: str, harvester: str, **kwargs) -> Boolean:
"""Check if the same identifier has been queried for during the last week to avoid duplicates"""
last_week = datetime.datetime.now() - datetime.timedelta(days = 7)
resp = cls.query.filter(cls.identifier==identifier, cls.scheme==scheme, cls.harvester==harvester, cls.updated > str(last_week)).first()
return resp is not None
[docs] @classmethod
def getStatsFromLastWeek(cls):
"""Gets the stats from the last 7 days"""
last_week = datetime.datetime.now() - datetime.timedelta(days = 7)
resp = db.session.query(cls.status, func.count('*')).filter(cls.updated > str(last_week)).group_by(cls.status).all()
return resp
def __repr__(self):
"""String representation of the event."""
return f"<{self.id}: {self.created} : {self.identifier}>"