# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 CERN.
# Copyright (c) 2017 Thomas P. Robitaille.
#
# Asclepias Broker is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Core database models."""
import enum
import uuid
from invenio_db import db
from sqlalchemy.schema import Index, UniqueConstraint
from sqlalchemy_utils.models import Timestamp
from sqlalchemy_utils.types import UUIDType
[docs]class Relation(enum.Enum):
"""Relation type."""
Cites = 1
IsSupplementTo = 2
HasVersion = 3
IsIdenticalTo = 4
IsRelatedTo = 5
[docs]class Identifier(db.Model, Timestamp):
"""Identifier model."""
__tablename__ = 'identifier'
__table_args__ = (
UniqueConstraint('value', 'scheme',
name='uq_identifier_value_scheme'),
# TODO: Check if equivalent with passing "index=True"
# Index('ix_identifier_value', 'value'),
# Index('ix_identifier_scheme', 'scheme'),
)
id = db.Column(UUIDType, default=uuid.uuid4, primary_key=True)
value = db.Column(db.String, index=True)
scheme = db.Column(db.String, index=True)
def __repr__(self):
"""String representation of the Identifier."""
return f"<{self.scheme}: {self.value}>"
[docs] @classmethod
def get(cls, value=None, scheme=None, **kwargs):
"""Get the identifier from the database."""
return cls.query.filter_by(
value=value, scheme=scheme).one_or_none()
[docs] def fetch_or_create_id(self):
"""Fetches from the database or creates an id for the identifier."""
if not self.id:
obj = self.get(self.value, self.scheme)
if obj:
self = obj
else:
self.id = uuid.uuid4()
return self
def _get_related(self, condition, relationship):
cond = condition & (Relationship.relation == relationship)
return Relationship.query.filter(cond)
def _get_identities(self, as_relation=False):
"""Get the first-layer of 'Identical' Identifies."""
cond = ((Relationship.source == self) | (Relationship.target == self))
q = self._get_related(cond, Relation.IsIdenticalTo)
if as_relation:
return q.all()
else:
siblings = set(sum([[item.source, item.target] for item in q], []))
if siblings:
return list(siblings)
else:
return [self, ]
[docs] def get_identities(self):
"""Get the fully-expanded list of 'Identical' Identifiers."""
ids = next_ids = set([self])
while next_ids:
grp = set(sum([item._get_identities() for item in next_ids], []))
next_ids = grp - ids
ids |= grp
return list(ids)
[docs] def get_parents(self, rel_type, as_relation=False):
"""Get all parents of given Identifier for given relation."""
q = self._get_related((Relationship.target == self), rel_type)
if as_relation:
return q.all()
else:
return [item.source for item in q]
[docs] def get_children(self, rel_type, as_relation=False):
"""Get all children of given Identifier for given relation."""
q = self._get_related((Relationship.source == self), rel_type)
if as_relation:
return q.all()
else:
return [item.target for item in q]
@property
def identity_group(self):
"""Get the identity group the identifier belongs to."""
# TODO: See if we can avoid this
from ..graph.models import GroupType
return next((id2g.group for id2g in self.id2groups
if id2g.group.type == GroupType.Identity), None)
@property
def data(self):
"""Get the metadata of the identity group the identifier belongs to."""
if self.identity_group and self.identity_group.data:
return self.identity_group.data.json
[docs]class Relationship(db.Model, Timestamp):
"""Relationship between two identifiers."""
__tablename__ = 'relationship'
__table_args__ = (
UniqueConstraint('source_id', 'target_id', 'relation',
name='uq_relationship_source_target_relation'),
Index('ix_relationship_source', 'source_id'),
Index('ix_relationship_target', 'target_id'),
Index('ix_relationship_relation', 'relation'),
)
id = db.Column(UUIDType, default=uuid.uuid4, primary_key=True)
source_id = db.Column(
UUIDType,
db.ForeignKey(Identifier.id,
onupdate='CASCADE', ondelete='CASCADE',
name='fk_relationship_source'),
nullable=False
)
target_id = db.Column(
UUIDType,
db.ForeignKey(Identifier.id,
onupdate='CASCADE', ondelete='CASCADE',
name='fk_relationship_target'),
nullable=False
)
relation = db.Column(db.Enum(Relation))
source = db.relationship(Identifier, foreign_keys=[source_id],
backref='sources')
target = db.relationship(Identifier, foreign_keys=[target_id],
backref='targets')
[docs] @classmethod
def get(cls, source, target, relation, **kwargs):
"""Get the relationship from the database."""
return cls.query.filter_by(
source_id=source.id, target_id=target.id,
relation=relation).one_or_none()
[docs] def fetch_or_create_id(self):
"""Fetches from the database or creates an id for the relationship."""
self.source = self.source.fetch_or_create_id()
self.target = self.target.fetch_or_create_id()
if not self.id:
obj = self.get(self.source, self.target, self.relation)
if obj:
self = obj
else:
self.id = uuid.uuid4()
return self
@property
def identity_group(self):
"""Get the relationship's identity group."""
# TODO: See if we can avoid this
from ..graph.models import GroupRelationship, GroupType
return GroupRelationship.query.filter_by(
source=self.source.identity_group,
target=self.target.identity_group,
relation=self.relation,
type=GroupType.Identity).one_or_none()
@property
def data(self):
"""Get the relationship's identity group metadata."""
if self.identity_group and self.identity_group.data:
return self.identity_group.data.json
def __repr__(self):
"""String representation of the relationship."""
return (
f'<{self.source.value} {self.relation.name} {self.target.value}>'
)