Source code for asclepias_broker.metadata.api
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 CERN.
#
# Asclepias Broker is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Metadata functions."""
from datetime import datetime
from typing import List
import idutils
from flask import current_app
from invenio_db import db
from ..core.models import Identifier, Relation, Relationship
from ..graph.api import get_group_from_id, get_or_create_groups
from ..graph.models import GroupRelationship, GroupType
from ..utils import chunks
from .models import GroupMetadata, GroupRelationshipMetadata
from ..monitoring.models import ErrorMonitoring
# TODO: When merging/splitting groups there is some merging/duplicating of
# metadata as well
[docs]def update_metadata_from_event(relationship: Relationship, payload: dict):
"""Updates the metadata of the source, target and relationship groups."""
# Get identity groups for source and targer
# TODO: Do something for this case?
if relationship.relation == Relation.IsIdenticalTo:
return
src_group = next((id2g.group for id2g in relationship.source.id2groups
if id2g.group.type == GroupType.Identity), None)
trg_group = next((id2g.group for id2g in relationship.target.id2groups
if id2g.group.type == GroupType.Identity), None)
rel_group = GroupRelationship.query.filter_by(
source=src_group, target=trg_group, relation=relationship.relation,
type=GroupType.Identity).one_or_none()
if src_group:
src_metadata = src_group.data or GroupMetadata(group_id=src_group.id)
src_metadata.update(payload['Source'])
if trg_group:
trg_metadata = trg_group.data or GroupMetadata(group_id=trg_group.id)
trg_metadata.update(payload['Target'])
if rel_group:
rel_metadata = rel_group.data or \
GroupRelationshipMetadata(group_relationship_id=rel_group.id)
rel_metadata.update(
{k: v for k, v in payload.items()
if k in ('LinkPublicationDate', 'LinkProvider')})
[docs]def update_metadata(id_value: str, scheme: str, data: dict,
create_identity_events: bool = True,
create_missing_groups: bool = True,
providers: List[str] = None,
link_publication_date: str = None):
"""."""
from ..events.api import EventAPI
scheme = scheme.lower()
id_value = idutils.normalize_pid(id_value, scheme)
target_identifiers = set()
for i in data.get('Identifier', []):
value, target_scheme = i['ID'], i['IDScheme'].lower()
value = idutils.normalize_pid(value, target_scheme)
target_identifiers.add((value, target_scheme))
# Check if there are identity links that can be created:
if create_identity_events and len(target_identifiers) > 0:
events = []
providers = providers or ['unknown']
providers = [{'Name': provider} for provider in providers]
link_publication_date = link_publication_date or \
datetime.now().isoformat()
source_id_obj = {'ID': id_value, 'IDScheme': scheme}
for target_value, target_scheme in target_identifiers:
if not ((id_value, scheme) == (target_value, target_scheme)):
target_id_obj = {'ID': target_value, 'IDScheme': target_scheme}
payload = {
'RelationshipType': {
'Name': 'IsRelatedTo',
'SubTypeSchema': 'DataCite',
'SubType': 'IsIdenticalTo'
},
'Target': {
'Identifier': target_id_obj,
'Type': {'Name': 'unknown'}
},
'LinkProvider': providers,
'Source': {
'Identifier': source_id_obj,
'Type': {'Name': 'unknown'}
},
'LinkPublicationDate': link_publication_date,
}
events.append(payload)
for event_chunk in chunks(events, 100):
try:
EventAPI.handle_event(
list(event_chunk), no_index=True, eager=True)
except ValueError as exc:
error_obj = ErrorMonitoring(origin="update_metadata", error=repr(exc), n_retries = 99, payload=event_chunk)
db.session.add(error_obj)
db.session.commit()
current_app.logger.exception(
'Error while processing identity event')
id_group = get_group_from_id(id_value, scheme)
if not id_group and create_missing_groups:
identifier = Identifier(
value=id_value, scheme=scheme).fetch_or_create_id()
db.session.commit()
id_group, _ = get_or_create_groups(identifier)
db.session.commit()
id_group.data.update(data)
db.session.commit()