Source code for asclepias_broker.events.api
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 CERN.
#
# Asclepias Broker is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Events API."""
import jsonschema
from flask import current_app
from invenio_db import db
from marshmallow.exceptions import \
ValidationError as MarshmallowValidationError
from werkzeug.local import LocalProxy
from ..graph.tasks import process_event
from ..jsonschemas import EVENT_SCHEMA, SCHOLIX_SCHEMA
from ..schemas.loaders import RelationshipSchema
from .models import Event, EventStatus
def _jsonschema_validator_func():
schema_host = current_app.config['JSONSCHEMAS_HOST']
schema_store = {
f'{schema_host}/scholix-v3.json': SCHOLIX_SCHEMA,
f'{schema_host}/event.json': EVENT_SCHEMA,
}
resolver = jsonschema.RefResolver(
schema_host, EVENT_SCHEMA, schema_store)
return jsonschema.Draft4Validator(EVENT_SCHEMA, resolver=resolver)
[docs]class EventAPI:
"""Event API."""
_jsonschema_validator = LocalProxy(_jsonschema_validator_func)
"""Event JSONSchema validator."""
[docs] @classmethod
def validate_payload(cls, event):
"""Validate the event payload."""
# TODO: Use invenio-jsonschemas/jsonresolver instead of this
# Validate against Event JSONSchema
# NOTE: raises `jsonschemas.ValidationError`
cls._jsonschema_validator.validate(event)
# Validate using marshmallow loader
for payload in event:
errors = RelationshipSchema(check_existing=True).validate(payload)
if errors:
raise MarshmallowValidationError(str(errors) + "payload" + str(payload))
[docs] @classmethod
def handle_event(cls, event: dict, no_index: bool = False,
user_id: int = None, eager: bool = False) -> Event:
"""Handle an event payload."""
cls.validate_payload(event)
event_obj = Event(payload=event, status=EventStatus.New,
user_id=user_id)
db.session.add(event_obj)
db.session.commit()
event_uuid = str(event_obj.id)
idx_enabled = current_app.config['ASCLEPIAS_SEARCH_INDEXING_ENABLED'] \
and (not no_index)
task = process_event.s(
event_uuid=event_uuid, indexing_enabled=idx_enabled)
if eager:
task.apply(throw=True)
else:
task.apply_async()
return event_obj
@classmethod
def rerun_event(cls, event: Event, no_index: bool, eager:bool = False):
event_uuid = str(event.id)
idx_enabled = current_app.config['ASCLEPIAS_SEARCH_INDEXING_ENABLED'] \
and (not no_index)
task = process_event.s(
event_uuid=event_uuid, indexing_enabled=idx_enabled)
if eager:
task.apply(throw=True)
else:
task.apply_async()
return event