Source code for asclepias_broker.graph.api

# -*- coding: utf-8 -*-
# Copyright (C) 2018 CERN.
# Asclepias Broker is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Graph functions."""

import uuid
from typing import Optional, Tuple, Union

from invenio_db import db
from sqlalchemy.orm import aliased

from ..core.models import Identifier, Relation, Relationship
from ..metadata.models import GroupMetadata, GroupRelationshipMetadata
from .models import Group, GroupM2M, GroupRelationship, GroupRelationshipM2M, \
    GroupType, Identifier2Group, Relationship2GroupRelationship

[docs]def merge_group_relationships( group_a: Group, group_b: Group, merged_group: Group ): """Merge the relationships of merged groups A and B to avoid collisions. :param group_a: some things. :param group_b: some other things. Groups 'group_a' and 'group_b' will be merged as 'merged_group'. This function takes care of moving any duplicate group relations, e.g.: If we have 4 relations: - A Cites X - B Cites X - Y Cites A - Y Cites B and we merge groups A and B, we also need to squash the first two and last two relations together: - {AB} Cites X - Y Cites {AB} before we can perform the actual marging of A and B. Otherwise we will violate the unique constraint. We do that by removing the duplicate relationships (only one of each duplicate pair), so that we can later execute and UPDATE. """ # Determine if this is an Identity-type group merge identity_groups = group_a.type == GroupType.Identity # Remove all GroupRelationship objects between groups A and B. # Correspnding GroupRelationshipM2M objects will cascade ( GroupRelationship.query.filter( ((GroupRelationship.source_id == & (GroupRelationship.target_id == | ((GroupRelationship.source_id == & (GroupRelationship.target_id == .delete(synchronize_session='fetch') ) # We need to execute the same group relation merging twice, first for the # 'outgoing' relations ('A Cites X' + 'B Cites X' = 'AB Cites X'), and then # for the 'incoming' edges ('Y Cites A' + 'Y Cites B' = 'Y Cites AB'). # Instead of repeating the code twice, we parametrize it as seen below merge_groups_ids = [,] for queried_fk, grouping_fk in [('source_id', 'target_id'), ('target_id', 'source_id'), ]: left_gr = aliased(GroupRelationship, name='left_gr') right_gr = aliased(GroupRelationship, name='right_gr') left_queried_fk = getattr(left_gr, queried_fk) right_queried_fk = getattr(right_gr, queried_fk) left_grouping_fk = getattr(left_gr, grouping_fk) right_grouping_fk = getattr(right_gr, grouping_fk) # 'duplicate_relations' holds GroupRelations, which should be # "squashed" after group merging. If we didn't do this, we would # violate the UNIQUE constraint # Generate 'duplicate_relations' by joining the table with itself # by the "grouping_fk" (target_id/source_id) duplicate_relations = ( db.session.query(left_gr, right_gr) .filter( <, # Don't repeat the same pairs left_queried_fk.in_(merge_groups_ids), right_queried_fk.in_(merge_groups_ids), right_queried_fk != left_queried_fk, right_gr.relation == left_gr.relation) .join( right_gr, left_grouping_fk == right_grouping_fk) ) del_rel = set() for rel_a, rel_b in duplicate_relations: kwargs = { queried_fk:, grouping_fk: getattr(rel_a, grouping_fk), 'relation': rel_a.relation, 'id': uuid.uuid4(), 'type': rel_a.type } new_grp_rel = GroupRelationship(**kwargs) db.session.add(new_grp_rel) if identity_groups: group_rel_meta = GroupRelationshipMetadata( db.session.add(group_rel_meta) json1, json2 =, if < json1, json2 = json2, json1 group_rel_meta.json = json1 group_rel_meta.update(json2, validate=False, multi=True) # Delete the duplicate pairs of relationship M2Ms before updating delete_duplicate_relationship_m2m(rel_a, rel_b) rel_ids = [,] ( GroupRelationshipM2M.query .filter(GroupRelationshipM2M.relationship_id.in_(rel_ids)) .update({GroupRelationshipM2M.relationship_id:}, synchronize_session='fetch') ) ( GroupRelationshipM2M.query .filter(GroupRelationshipM2M.subrelationship_id.in_(rel_ids)) .update( {GroupRelationshipM2M.subrelationship_id:}, synchronize_session='fetch') ) if identity_groups: cls = Relationship2GroupRelationship delete_duplicate_relationship_m2m(rel_a, rel_b, cls=cls) ( cls.query .filter(cls.group_relationship_id.in_(rel_ids)) .update({cls.group_relationship_id:}, synchronize_session='fetch') ) del_rel.add( del_rel.add( # Delete the duplicate relations ( GroupRelationship.query .filter( .delete(synchronize_session='fetch') ) queried_fk_inst = getattr(GroupRelationship, queried_fk) # Update the other non-duplicated relations ( GroupRelationship.query .filter(queried_fk_inst.in_(merge_groups_ids)) .update({queried_fk_inst:}, synchronize_session='fetch') )
[docs]def delete_duplicate_relationship_m2m( group_a: Group, group_b: Group, cls: Union[ GroupRelationshipM2M, Relationship2GroupRelationship ] = GroupRelationshipM2M, ): """Delete any duplicate relationship M2M objects. Deletes any duplicate (unique-constraint violating) M2M objects between relationships and group relationships. This step is required before merging of two groups. """ if cls == GroupRelationshipM2M: queried_fk = 'subrelationship_id' grouping_fk = 'relationship_id' elif cls == Relationship2GroupRelationship: queried_fk = 'group_relationship_id' grouping_fk = 'relationship_id' else: raise ValueError( "Parameter 'cls' must be either 'GroupRelationshipM2M' or " "'Relationship2GroupRelationship'.") for queried_fk, grouping_fk in [(queried_fk, grouping_fk), (grouping_fk, queried_fk), ]: left_gr = aliased(cls, name='left_gr') right_gr = aliased(cls, name='right_gr') left_queried_fk = getattr(left_gr, queried_fk) right_queried_fk = getattr(right_gr, queried_fk) left_grouping_fk = getattr(left_gr, grouping_fk) right_grouping_fk = getattr(right_gr, grouping_fk) merge_groups_ids = [,] duplicate_relations = ( db.session.query(left_gr, right_gr) .filter( # Because we join in the same table by grouping_fk, we will # have pairs [(A,B), (B,A)] on the list. We can impose an # inequality condition on one FK to reduce this to just one # pair [(A,B)] left_queried_fk < right_queried_fk, left_queried_fk.in_(merge_groups_ids), right_queried_fk.in_(merge_groups_ids), right_queried_fk != left_queried_fk, ) .join( right_gr, left_grouping_fk == right_grouping_fk) ) # TODO: Delete in a query for rel_a, rel_b in duplicate_relations: db.session.delete(rel_a)
[docs]def delete_duplicate_group_m2m(group_a: Group, group_b: Group): """ Delete any duplicate GroupM2M objects. Removes one of each pair of GroupM2M objects for groups A and B. """ cls = GroupM2M queried_fk = 'group_id' grouping_fk = 'subgroup_id' for queried_fk, grouping_fk in [(queried_fk, grouping_fk), (grouping_fk, queried_fk), ]: left_gr = aliased(cls, name='left_gr') right_gr = aliased(cls, name='right_gr') left_queried_fk = getattr(left_gr, queried_fk) right_queried_fk = getattr(right_gr, queried_fk) left_grouping_fk = getattr(left_gr, grouping_fk) right_grouping_fk = getattr(right_gr, grouping_fk) merge_groups_ids = [,] duplicate_relations = ( db.session.query(left_gr, right_gr) .filter( # Because we join in the same table by grouping_fk, we will # have pairs [(A,B), (B,A)] on the list. We impose an # inequality condition on one FK to reduce this to just one # pair [(A,B)] left_queried_fk < right_queried_fk, left_queried_fk.in_(merge_groups_ids), right_queried_fk.in_(merge_groups_ids), right_queried_fk != left_queried_fk, ) .join( right_gr, left_grouping_fk == right_grouping_fk) ) # TODO: Delete in a query for rel_a, rel_b in duplicate_relations: db.session.delete(rel_a)
[docs]def merge_identity_groups( group_a: Group, group_b: Group ) -> Tuple[Optional[Group], Optional[Group]]: """Merge two groups of type "Identity". Merges the groups together into one group, taking care of migrating all group relationships and M2M objects. """ # Nothing to do if groups are already merged if group_a == group_b: return None, None if not (group_a.type == group_b.type == GroupType.Identity): raise ValueError("Can only merge Identity groups.") # TODO: Should join with Group and filter by Group.type=GroupType.Version version_group_a = GroupM2M.query.filter_by( subgroup=group_a).one().group version_group_b = GroupM2M.query.filter_by( subgroup=group_b).one().group merged_version_group = merge_version_groups( version_group_a, version_group_b) merged_group = Group(type=GroupType.Identity, id=uuid.uuid4()) db.session.add(merged_group) merged_group_meta = GroupMetadata( db.session.add(merged_group_meta) json1, json2 =, if < json1, json2 = json2, json1 merged_group_meta.json = json1 merged_group_meta.update(json2) merge_group_relationships(group_a, group_b, merged_group) (Identifier2Group.query .filter(Identifier2Group.group_id.in_([,])) .update({Identifier2Group.group_id:}, synchronize_session='fetch')) # Delete the duplicate GroupM2M entries and update the remaining with # the new Group delete_duplicate_group_m2m(group_a, group_b) (GroupM2M.query .filter(GroupM2M.subgroup_id.in_([,])) .update({GroupM2M.subgroup_id:}, synchronize_session='fetch')) Group.query.filter([,])).delete( synchronize_session='fetch') # After merging identity groups, we need to merge the version groups return merged_group, merged_version_group
[docs]def merge_version_groups(group_a: Group, group_b: Group) -> Optional[Group]: """Merge two Version groups into one.""" # Nothing to do if groups are already merged if group_a == group_b: return if group_a.type != group_b.type: raise ValueError("Cannot merge groups of different type.") if group_a.type == GroupType.Identity: # Merging Identity groups is done separately raise ValueError("Cannot merge groups of type 'Identity'.") merged_group = Group(type=group_a.type, id=uuid.uuid4()) db.session.add(merged_group) merge_group_relationships(group_a, group_b, merged_group) # Delete the duplicate GroupM2M entries and update the remaining with # the new Group delete_duplicate_group_m2m(group_a, group_b) (GroupM2M.query .filter(GroupM2M.group_id.in_([,])) .update({GroupM2M.group_id:}, synchronize_session='fetch')) (GroupM2M.query .filter(GroupM2M.subgroup_id.in_([,])) .update({GroupM2M.subgroup_id:}, synchronize_session='fetch')) Group.query.filter([,])).delete( synchronize_session='fetch') return merged_group
[docs]def get_or_create_groups(identifier: Identifier) -> Tuple[Group, Group]: """Given an Identifier, fetch or create its Identity and Version groups.""" id2g = Identifier2Group.query.filter( Identifier2Group.identifier == identifier).one_or_none() if not id2g: group = Group(type=GroupType.Identity, id=uuid.uuid4()) db.session.add(group) gm = GroupMetadata( db.session.add(gm) id2g = Identifier2Group(identifier=identifier, group=group) db.session.add(id2g) g2g = (GroupM2M.query .join(Group, GroupM2M.group_id == .filter(GroupM2M.subgroup ==, Group.type == GroupType.Version) .one_or_none()) if not g2g: group = Group(type=GroupType.Version, id=uuid.uuid4()) db.session.add(group) g2g = GroupM2M(group=group, db.session.add(g2g) return,
[docs]def get_group_from_id( identifier_value: str, id_type: str = 'doi', group_type: GroupType = GroupType.Identity ) -> Group: """Resolve from 'A' to Identity Group of A or to a Version Group of A.""" # TODO: Move this method to api.utils or to models? id_ = Identifier.get(identifier_value, id_type) if id_: id_grp = id_.id2groups[0].group if group_type == GroupType.Identity: return id_grp else: return GroupM2M.query.filter_by(subgroup=id_grp).one().group
[docs]def add_group_relationship( relationship: Relationship, src_id_grp: Group, tar_id_grp: Group, src_ver_grp: Group, tar_ver_grp: Group ): """Add a group relationship between corresponding groups.""" # Add GroupRelationship between Identity groups id_grp_rel = GroupRelationship(source=src_id_grp, target=tar_id_grp, relation=relationship.relation, type=GroupType.Identity, id=uuid.uuid4()) grm = GroupRelationshipMetadata( db.session.add(grm) db.session.add(id_grp_rel) rel2grp_rel = Relationship2GroupRelationship( relationship=relationship, group_relationship=id_grp_rel) db.session.add(rel2grp_rel) # Add GroupRelationship between Version groups if it doesn't exist ver_grp_rel = ( GroupRelationship.query .filter(GroupRelationship.source_id ==, GroupRelationship.target_id ==, GroupRelationship.relation == relationship.relation) .one_or_none() ) if not ver_grp_rel: ver_grp_rel = GroupRelationship(source=src_ver_grp, target=tar_ver_grp, relation=relationship.relation, type=GroupType.Version) db.session.add(ver_grp_rel) g2g_rel = GroupRelationshipM2M(relationship=ver_grp_rel, subrelationship=id_grp_rel) db.session.add(g2g_rel)
[docs]def update_groups( relationship: Relationship, delete: bool = False ) -> Tuple[Tuple[Group, Group, Group], Tuple[Group, Group, Group]]: """Update groups and related M2M objects for given relationship.""" src_idg, src_vg = get_or_create_groups(relationship.source) tar_idg, tar_vg = get_or_create_groups( merged_group = None merged_version_group = None if relationship.relation == Relation.IsIdenticalTo: merged_group, merged_version_group = merge_identity_groups( src_idg, tar_idg) elif relationship.relation == Relation.HasVersion: merged_version_group = merge_version_groups(src_vg, tar_vg) else: # Relation.Cites, Relation.IsSupplementTo, Relation.IsRelatedTo grp_rel = ( GroupRelationship.query .filter(GroupRelationship.source_id ==, GroupRelationship.target_id ==, GroupRelationship.relation == relationship.relation) .one_or_none() ) # If GroupRelationship exists, simply add the Relationship M2M entry if grp_rel: obj = Relationship2GroupRelationship( relationship=relationship, group_relationship=grp_rel) db.session.add(obj) # Otherwise, add the group relationship with propagation else: add_group_relationship(relationship, src_idg, tar_idg, src_vg, tar_vg) return ( (src_idg, tar_idg, merged_group), (src_vg, tar_vg, merged_version_group), )