# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 CERN.
#
# Asclepias Broker is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Relationships search API."""
from itertools import groupby
from invenio_db import db
from ..core.models import Identifier, Relation
from ..graph.api import get_group_from_id
from ..graph.models import Group, GroupRelationship, GroupType, \
Identifier2Group
from ..schemas.loaders import from_datacite_relation
[docs]class RelationshipAPI:
"""Relationship API."""
[docs] @classmethod
def print_citations(self, pid_value):
"""Print citations of an identifier."""
id_A = Identifier.query.filter_by(scheme='DOI', value=pid_value).one()
full_c = self.get_citations(
id_A, with_parents=True, with_siblings=True, expand_target=True)
from pprint import pprint
pprint(full_c)
[docs] @classmethod
def get_citations(self, identifier, with_parents=False,
with_siblings=False, expand_target=False):
"""Get citations of an identfier from the database."""
# At the beginning, frontier is just identities
frontier = identifier.get_identities()
frontier_rel = set()
# Expand with parents
if with_parents or with_siblings:
parents_rel = set(
sum([iden.get_parents(Relation.HasVersion, as_relation=True)
for iden in frontier], []))
iden_parents = [item.source for item in parents_rel]
iden_parents = set(
sum([p.get_identities() for p in iden_parents], []))
if with_parents:
frontier_rel |= parents_rel
frontier += iden_parents
# Expand with siblings
if with_siblings:
children_rel = set(
sum([p.get_children(Relation.HasVersion, as_relation=True)
for p in iden_parents], []))
frontier_rel |= children_rel
par_children = [item.target for item in children_rel]
par_children = set(
sum([c.get_identities() for c in par_children], []))
frontier += par_children
frontier = set(frontier)
# frontier contains all identifiers which directly cite the resource
citations = set(
sum([iden.get_parents(Relation.Cites, as_relation=True)
for iden in frontier], []))
# Expand it to identical identifiers and group them if they repeat
expanded_sources = [c.source.get_identities() for c in citations]
zipped = sorted(zip(expanded_sources, citations),
key=lambda x: [xi.value for xi in x[0]])
aggregated_citations = [
(k, list(vi for _, vi in v))
for k, v in groupby(zipped, key=lambda x: x[0])]
frontier_rel = list(frontier_rel) + list(set(
sum([item._get_identities(as_relation=True)
for item in frontier], [])))
if expand_target:
aggregated_citations += [(list(frontier), frontier_rel)]
return aggregated_citations
[docs] @classmethod
def get_citations2(self, identifier, relation: str,
grouping_type=GroupType.Identity):
"""Get citations of an identfier from the database."""
grp = get_group_from_id(identifier.value, identifier.scheme,
group_type=grouping_type)
relation, inverse = from_datacite_relation(relation)
object_fk = GroupRelationship.source_id
target_fk = GroupRelationship.target_id
if inverse:
object_fk, target_fk = target_fk, object_fk
res = (
# TODO: +join by metadatas
db.session.query(GroupRelationship, Group, Identifier)
.filter(object_fk == grp.id,
GroupRelationship.relation == relation)
.join(Group, target_fk == Group.id)
.join(Identifier2Group, target_fk == Identifier2Group.group_id)
.join(Identifier, Identifier2Group.identifier_id == Identifier.id)
.order_by(Group.id)
.all()
)
from itertools import groupby
result = [(k, list(v)) for k, v in groupby(res, key=lambda x: x[1])]
return result