# -*- coding: utf-8 -*-
"""Manager for Bio2BEL InterPro."""
import logging
import time
from itertools import groupby
from operator import itemgetter
from typing import List, Mapping, Optional
from tqdm import tqdm
from bio2bel.manager.bel_manager import BELManagerMixin
from bio2bel.manager.flask_manager import FlaskMixin
from bio2bel.manager.namespace_manager import BELNamespaceManagerMixin
from compath_utils import CompathManager
from pybel import BELGraph
from pybel.manager.models import Namespace, NamespaceEntry
from .constants import CHUNKSIZE, MODULE_NAME
from .models import Annotation, Base, Entry, GoTerm, Protein, Type, entry_go
from .parser.entries import get_entries_df
from .parser.interpro_to_go import get_interpro_go_mappings
from .parser.proteins import get_proteins_chunks
from .parser.tree import get_interpro_tree
__all__ = ['Manager']
log = logging.getLogger(__name__)
[docs]class Manager(CompathManager, BELNamespaceManagerMixin, BELManagerMixin, FlaskMixin):
"""Protein-family and protein-domain memberships."""
_base = Base
module_name = MODULE_NAME
flask_admin_models = [Entry, Protein, Type, Annotation, GoTerm]
edge_model = [entry_go, Annotation]
pathway_model = Entry
pathway_model_identifier_column = Entry.interpro_id
protein_model = Protein
namespace_model = Entry
identifiers_recommended = 'InterPro'
identifiers_pattern = r'^IPR\d{6}$'
identifiers_miriam = 'MIR:00000011'
identifiers_namespace = 'interpro'
identifiers_url = 'http://identifiers.org/interpro/'
def __init__(self, *args, **kwargs): # noqa: D105, D107
super().__init__(*args, **kwargs)
self.types = {}
self.interpros = {}
self.go_terms = {}
[docs] def is_populated(self) -> bool:
"""Check if the database is already populated."""
return 0 < self.count_interpros()
[docs] def count_interpros(self) -> int:
"""Count the number of InterPro entries in the database."""
return self._count_model(Entry)
[docs] def list_interpros(self) -> List[Entry]:
"""List the InterPro entries in the database."""
return self._list_model(Entry)
[docs] def count_annotations(self) -> int:
"""Count the number of protein-interpro associations."""
return self._count_model(Annotation)
[docs] def count_proteins(self) -> int:
"""Count the number of protein entries in the database."""
return self._count_model(Protein)
[docs] def list_proteins(self) -> List[Protein]:
"""List the proteins in the database."""
return self._list_model(Protein)
[docs] def count_go_terms(self) -> int:
"""Count the GO terms in the database."""
return self._count_model(GoTerm)
[docs] def summarize(self) -> Mapping[str, int]:
"""Summarize the database."""
return dict(
interpros=self.count_interpros(),
annotations=self.count_annotations(),
proteins=self.count_proteins(),
go_terms=self.count_go_terms(),
)
[docs] def get_type_by_name(self, name: str) -> Optional[Type]:
"""Get an InterPro entry type by its name if it exists."""
return self.session.query(Type).filter(Type.name == name).one_or_none()
[docs] def get_interpro_by_interpro_id(self, interpro_id: str) -> Optional[Entry]:
"""Get a InterPro entry by its identifier if it exists."""
return self.session.query(Entry).filter(Entry.interpro_id == interpro_id).one_or_none()
[docs] def get_go_by_go_identifier(self, go_id: str) -> Optional[GoTerm]:
"""Get a GO term by its identifier if it exists."""
return self.session.query(GoTerm).filter(GoTerm.go_id == go_id).one_or_none()
[docs] def get_or_create_interpro(self, interpro_id: str, **kwargs) -> Entry:
"""Get an InterPro entry by its identifier if it exists, or create one."""
interpro = self.interpros.get(interpro_id)
if interpro is not None:
return interpro
interpro = self.get_interpro_by_interpro_id(interpro_id)
if interpro is not None:
self.interpros[interpro_id] = interpro
return interpro
interpro = self.interpros[interpro_id] = Entry(interpro_id=interpro_id, **kwargs)
self.session.add(interpro)
return interpro
[docs] def get_or_create_go_term(self, go_id: str, name=None) -> GoTerm:
"""Get a GO term by its identifier if it exists, or create one."""
go = self.go_terms.get(go_id)
if go is not None:
return go
go = self.get_go_by_go_identifier(go_id)
if go is not None:
self.go_terms[go_id] = go
return go
go = self.go_terms[go_id] = GoTerm(go_id=go_id, name=name)
self.session.add(go)
return go
[docs] def populate(
self,
entries_url: Optional[str] = None,
tree_url: Optional[str] = None,
go_mapping_path: Optional[str] = None,
populate_proteins: bool = False,
proteins_url: Optional[str] = None
) -> None:
"""Populate the database.
:param Optional[str] entries_url:
:param Optional[str] tree_url:
:param Optional[str] go_mapping_path:
:param Optional[str] proteins_url:
"""
self._populate_entries(entry_url=entries_url, tree_url=tree_url)
self._populate_go(path=go_mapping_path)
if populate_proteins:
self._populate_proteins(url=proteins_url)
def _populate_entries(self, entry_url: Optional[str] = None, tree_url: Optional[str] = None,
force_download: bool = False) -> None:
"""Populate the database."""
df = get_entries_df(url=entry_url, force_download=force_download)
for _, interpro_id, entry_type, name in tqdm(df.itertuples(), desc='Entries', total=len(df.index)):
family_type = self.types.get(entry_type)
if family_type is None:
family_type = self.types[entry_type] = Type(name=entry_type)
self.session.add(family_type)
self.get_or_create_interpro(
interpro_id=interpro_id,
type=family_type,
name=name,
)
t = time.time()
log.info('committing entries')
self.session.commit()
log.info('committed entries in %.2f seconds', time.time() - t)
graph = get_interpro_tree(path=tree_url, force_download=force_download)
for parent_name, child_name in tqdm(graph.edges(), desc='Building Tree', total=graph.number_of_edges()):
child_id = graph.nodes[child_name]['interpro_id']
parent_id = graph.nodes[parent_name]['interpro_id']
child = self.interpros.get(child_id)
parent = self.interpros.get(parent_id)
if child is None:
log.warning('missing %s/%s', child_id, child_name)
continue
if parent is None:
log.warning('missing %s/%s', parent_id, parent_name)
continue
child.parent = parent
t = time.time()
log.info('committing tree')
self.session.commit()
log.info('committed tree in %.2f seconds', time.time() - t)
def _populate_go(self, path: Optional[str] = None):
"""Populate the InterPro-GO mappings.
Assumes entries are populated.
"""
go_count = self.count_go_terms()
if go_count > 0:
log.info('GO terms (%d) already populated', go_count)
return
go_mappings = get_interpro_go_mappings(path=path)
for interpro_id, go_id, go_name in tqdm(go_mappings, desc='Mappings to GO'):
interpro = self.interpros.get(interpro_id)
if interpro is None:
log.warning('could not find %s', interpro_id)
continue
interpro.go_terms.append(self.get_or_create_go_term(go_id=go_id, name=go_name))
t = time.time()
log.info('committing go terms')
self.session.commit()
log.info('committed go terms in %.2f seconds', time.time() - t)
def _populate_proteins(self, url: Optional[str] = None, chunksize: Optional[int] = None) -> None:
"""Populate the InterPro-protein mappings."""
chunksize = chunksize or CHUNKSIZE
log.info('precaching interpros')
interpros = {
interpro.interpro_id: interpro
for interpro in self.list_interpros()
}
log.info('cached %d interpros', len(interpros))
log.info('building protein models')
# Assumes ordered by uniprot_id
missing = set()
chunks = get_proteins_chunks(url=url, chunksize=chunksize)
for chunk in tqdm(chunks, desc=f'Protein mapping chunks of {chunksize}'):
it = (x for _, x in chunk.iterrows())
grouped = groupby(it, key=itemgetter(0))
for uniprot_id, lines in tqdm(grouped):
protein = Protein(uniprot_id=uniprot_id)
for (_, interpro_id, xref, start, end) in lines:
interpro = interpros.get(interpro_id)
if interpro is None:
missing.add(interpro_id)
continue
annotation = Annotation(
entry=interpro,
protein=protein,
xref=xref,
start=start,
end=end,
)
self.session.add(annotation)
t = time.time()
log.info('committing proteins from chunk')
self.session.commit()
log.info('committed proteins from chunk in %.2f seconds', time.time() - t)
for m in missing:
log.warning('missing %s', m)
[docs] def get_interpro_by_name(self, name: str) -> Optional[Entry]:
"""Get an InterPro family by name, if exists."""
return self.session.query(Entry).filter(Entry.name == name).one_or_none()
[docs] def enrich_proteins(self, graph: BELGraph):
"""Find UniProt entries and annotates their InterPro entries."""
raise NotImplementedError
[docs] def enrich_interpros(self, graph: BELGraph):
"""Find InterPro entries and annotates their proteins."""
raise NotImplementedError
@staticmethod
def _get_identifier(entry: Entry) -> str:
return entry.interpro_id
def _create_namespace_entry_from_model(self, entry: Entry, namespace: Namespace) -> NamespaceEntry:
return NamespaceEntry(
encoding='P',
name=entry.name,
identifier=entry.interpro_id,
namespace=namespace,
)
[docs] def to_bel(self) -> BELGraph:
"""Get the InterPro hierarchy and annotations as BEL."""
graph = BELGraph()
interpro_namespace = self.upload_bel_namespace()
graph.namespace_url[interpro_namespace.keyword] = interpro_namespace.url
for entry in self.list_interpros():
entry_bel = entry.as_bel()
for child in entry.children:
graph.add_is_a(child.as_bel(), entry_bel)
for annotation in entry.annotations:
graph.add_is_a(annotation.protein.as_bel(), entry_bel)
# for go_term in entry.go_terms:
# graph.add_qualified_edge(entry_bel, go_term.as_bel())
return graph