# -*- coding: utf-8 -*-
"""Defines the Bio2BEL DrugBank manager."""
import json
import logging
import os
import time
from bio2bel import AbstractManager
from bio2bel.manager.bel_manager import BELManagerMixin
from bio2bel.manager.flask_manager import FlaskMixin
from bio2bel.manager.namespace_manager import BELNamespaceManagerMixin
from bio2bel_uniprot import get_slim_mappings_df
from collections import Counter, defaultdict
from sqlalchemy import func
from tqdm import tqdm
from typing import Dict, Iterable, List, Mapping, Optional, Tuple
import bio2bel_hgnc
from pybel import BELGraph
from pybel.constants import ABUNDANCE, FUNCTION, IDENTIFIER, NAME, NAMESPACE, PROTEIN
from pybel.dsl import BaseEntity, abundance
from pybel.manager.models import Namespace, NamespaceEntry
from .constants import DATA_DIR, MODULE_NAME
from .models import (
Action, Alias, Article, AtcCode, Base, Category, Drug, DrugProteinInteraction, DrugXref, Group, Patent, Protein,
Species, Type, drug_category, drug_group,
)
from .parser import extract_drug_info, get_xml_root
__all__ = ['Manager']
log = logging.getLogger(__name__)
_dti_ids_cache_path = os.path.join(DATA_DIR, 'drug_to_gene_ids.json')
_dti_symbols_cache_path = os.path.join(DATA_DIR, 'drug_to_gene_symbols.json')
[docs]class Manager(AbstractManager, FlaskMixin, BELManagerMixin, BELNamespaceManagerMixin):
"""Manager for Bio2BEL DrugBank."""
module_name = MODULE_NAME
namespace_model = Drug
identifiers_recommended = 'DrugBank'
identifiers_pattern = r'^DB\d{5}$'
identifiers_miriam = 'MIR:00000102'
identifiers_namespace = 'drugbank'
identifiers_url = 'http://identifiers.org/drugbank/'
flask_admin_models = [Drug, Alias, AtcCode, Category, Group, Type, Patent, DrugXref, Species, Protein,
DrugProteinInteraction, Action, Article]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.type_to_model = {}
self.group_to_model = {}
self.category_to_model = {}
self.patent_to_model = {}
self.species_to_model = {}
self.action_to_model = {}
self.pmid_to_model = {}
self.uniprot_id_to_protein = {}
@property
def _base(self):
return Base
[docs] def get_type_by_name(self, name: str) -> Optional[Type]:
"""Get a Type by name."""
return self.session.query(Type).filter(Type.name == name).one_or_none()
[docs] def get_or_create_type(self, name: str) -> Type:
"""Get or create a Type by name."""
m = self.type_to_model.get(name)
if m is not None:
return m
m = self.get_type_by_name(name)
if m is not None:
self.type_to_model[name] = m
return m
m = self.type_to_model[name] = Type(name=name)
self.session.add(m)
return m
[docs] def list_groups(self) -> List[Group]:
"""List all chemical groups."""
return self._list_model(Group)
[docs] def get_group_by_name(self, name: str) -> Optional[Group]:
"""Get a Group by name."""
return self.session.query(Group).filter(Group.name == name).one_or_none()
[docs] def get_or_create_group(self, name: str) -> Group:
"""Get or create a Group by name."""
m = self.group_to_model.get(name)
if m is not None:
return m
m = self.get_group_by_name(name)
if m is not None:
self.group_to_model[name] = m
return m
m = self.group_to_model[name] = Group(name=name)
self.session.add(m)
return m
[docs] def get_species_by_name(self, name: str) -> Optional[Species]:
"""Get a Species by name."""
return self.session.query(Species).filter(Species.name == name).one_or_none()
[docs] def get_or_create_species(self, name: str) -> Species:
"""Get or create a Species by name."""
m = self.species_to_model.get(name)
if m is not None:
return m
m = self.get_species_by_name(name)
if m is not None:
self.species_to_model[name] = m
return m
m = self.species_to_model[name] = Species(name=name)
self.session.add(m)
return m
[docs] def get_category_by_name(self, name: str) -> Optional[Category]:
"""Get a Category by name."""
return self.session.query(Category).filter(Category.name == name).one_or_none()
[docs] def get_or_create_category(self, name: str, **kwargs) -> Category:
"""Get or create a Category by name."""
m = self.category_to_model.get(name)
if m is not None:
return m
m = self.get_category_by_name(name)
if m is not None:
self.category_to_model[name] = m
return m
m = self.category_to_model[name] = Category(name=name, **kwargs)
self.session.add(m)
return m
[docs] def get_or_create_patent(self, country: str, patent_id: str, **kwargs) -> Patent:
"""Get or creates a Patent."""
m = self.patent_to_model.get((country, patent_id))
if m is not None:
return m
m = self.session.query(Patent).filter(Patent.filter_pk(country, patent_id)).one_or_none()
if m is not None:
self.patent_to_model[(country, patent_id)] = m
return m
m = self.patent_to_model[(country, patent_id)] = Patent(
country=country,
patent_id=patent_id,
**kwargs
)
self.session.add(m)
return m
[docs] def is_populated(self) -> bool:
"""Check if the database is populated by counting the drugs."""
return 0 < self.count_drugs()
def get_protein_by_uniprot_id(self, uniprot_id: str) -> Optional[Protein]:
return self.session.query(Protein).filter(Protein.uniprot_id == uniprot_id).one_or_none()
def get_protein_by_hgnc_id(self, hgnc_id: str) -> Optional[Protein]:
res: List[Protein] = list(self.session.query(Protein).filter(Protein.hgnc_id == hgnc_id).all())
if len(res) == 0:
return
if len(res) == 1:
return res[0]
warning_txt = '\n'.join(f' - {result.uniprot_id} {result.name}' for result in res)
log.error('found multiple isoforms of hgnc:%s. Keeping first of:\n%s', hgnc_id, warning_txt)
return res[0]
def get_or_create_protein(self, uniprot_id: str, **kwargs) -> Protein:
m = self.uniprot_id_to_protein.get(uniprot_id)
if m is not None:
return m
m = self.get_protein_by_uniprot_id(uniprot_id)
if m is not None:
self.uniprot_id_to_protein[uniprot_id] = m
return m
m = self.uniprot_id_to_protein[uniprot_id] = Protein(
uniprot_id=uniprot_id,
**kwargs
)
self.session.add(m)
return m
def get_action_by_name(self, name: str) -> Optional[Action]:
return self.session.query(Action).filter(Action.name == name).one_or_none()
def get_or_create_action(self, name: str) -> Action:
m = self.action_to_model.get(name)
if m is not None:
return m
m = self.get_action_by_name(name)
if m is not None:
self.action_to_model[name] = m
return m
m = self.action_to_model[name] = Action(name=name)
self.session.add(m)
return m
def get_article_by_pmid(self, pubmed_id: str):
return self.session.query(Article).filter(Article.pubmed_id == pubmed_id).one_or_none()
def get_or_create_article(self, pubmed_id) -> Article:
m = self.pmid_to_model.get(pubmed_id)
if m is not None:
return m
m = self.get_article_by_pmid(pubmed_id)
if m is not None:
self.pmid_to_model[pubmed_id] = m
return m
m = self.pmid_to_model[pubmed_id] = Article(pubmed_id=pubmed_id)
self.session.add(m)
return m
def _create_drug_protein_interaction(self, drug_model: Drug, data: Mapping) -> DrugProteinInteraction:
protein = self.get_or_create_protein(
uniprot_id=data['uniprot_id'],
species=self.get_or_create_species(data['organism']),
name=data.get('name'),
hgnc_id=data.get('hgnc_id')
)
dpi = DrugProteinInteraction(
drug=drug_model,
protein=protein,
known_action=(data['known_action'] == 'yes'),
actions=[
self.get_or_create_action(name.strip().lower())
for name in data.get('actions', [])
],
articles=[
self.get_or_create_article(pubmed_id)
for pubmed_id in data.get('articles', [])
],
category=data['category'],
)
self.session.add(dpi)
return dpi
[docs] def populate(self, url: Optional[str] = None) -> None:
"""Populate DrugBank.
:param url: Path to the DrugBank XML
"""
root = get_xml_root(path=url)
for drug_xml in tqdm(root, desc='Drugs'):
drug = extract_drug_info(drug_xml)
drug_model = Drug(
type=self.get_or_create_type(drug['type']),
drugbank_id=drug['drugbank_id'],
cas_number=drug['cas_number'],
name=drug['name'],
description=drug['description'],
groups=[
self.get_or_create_group(name)
for name in drug['groups']
],
atc_codes=[
AtcCode(name=name)
for name in drug['atc_codes']
],
categories=[
self.get_or_create_category(**category)
for category in drug['categories']
],
inchi=drug.get('inchi'),
inchikey=drug.get('inchikey'),
aliases=[
Alias(name=name)
for name in drug['aliases']
],
patents=[
self.get_or_create_patent(**patent)
for patent in drug['patents']
],
xrefs=[
DrugXref(**xref)
for xref in drug['xrefs']
]
)
drug_model.protein_interactions = [
self._create_drug_protein_interaction(drug_model, drug_protein_interaction_data)
for drug_protein_interaction_data in drug['protein_interactions']
]
self.session.add(drug_model)
t = time.time()
log.info('committing models')
self.session.commit()
log.info('committed models in %.2f seconds', time.time() - t)
[docs] def count_drugs(self) -> int:
"""Count the number of drugs in the database."""
return self._count_model(Drug)
[docs] def list_drugs(self) -> List[Drug]:
"""List all drugs in the database."""
return self._list_model(Drug)
[docs] def count_types(self) -> int:
"""Count the number of types in the database."""
return self._count_model(Type)
[docs] def count_aliases(self) -> int:
"""Count the number of aliases in the database."""
return self._count_model(Alias)
[docs] def count_atc_codes(self) -> int:
"""Count the number of ATC codes in the database."""
return self._count_model(AtcCode)
[docs] def count_groups(self) -> int:
"""Count the number of groups in the database."""
return self._count_model(Group)
[docs] def count_categories(self) -> int:
"""Count the number of categories in the database."""
return self._count_model(Category)
[docs] def count_drugs_categories(self) -> int:
"""Count the number of drug-category relations in the database."""
return self._count_model(drug_category)
[docs] def count_drugs_groups(self) -> int:
"""Count the number of drug-group relations in the database."""
return self._count_model(drug_group)
[docs] def count_patents(self) -> int:
"""Count the number of patents in the database."""
return self._count_model(Patent)
[docs] def list_patents(self) -> List[Patent]:
"""List the patents in the database."""
return self._list_model(Patent)
[docs] def count_xrefs(self) -> int:
"""Count the number of cross-references in the database."""
return self._count_model(DrugXref)
def get_xrefs_by_resource(self, resource) -> List[DrugXref]:
return self.session.query(DrugXref).filter(DrugXref.resource == resource).all()
def summarize_xrefs(self) -> Counter:
return Counter(
self.session.query(DrugXref.resource, func.count(DrugXref.resource)).group_by(DrugXref.resource).all())
def count_species(self) -> int:
return self._count_model(Species)
def count_proteins(self) -> int:
return self._count_model(Protein)
def list_proteins(self) -> List[Protein]:
return self._list_model(Protein)
def count_actions(self) -> int:
return self._count_model(Action)
def count_drug_protein_interactions(self) -> int:
return self._count_model(DrugProteinInteraction)
[docs] def list_drug_protein_interactions(self) -> List[DrugProteinInteraction]:
"""List drug-protein interactions."""
return self._list_model(DrugProteinInteraction)
def count_articles(self) -> int:
return self._count_model(Article)
[docs] def summarize(self) -> Dict[str, int]:
"""Summarize the database."""
return dict(
drugs=self.count_drugs(),
types=self.count_types(),
aliases=self.count_aliases(),
atc_codes=self.count_atc_codes(),
groups=self.count_groups(),
categories=self.count_categories(),
patents=self.count_patents(),
xrefs=self.count_xrefs(),
proteins=self.count_proteins(),
species=self.count_species(),
actions=self.count_actions(),
drug_protein_interactions=self.count_drug_protein_interactions(),
)
@staticmethod
def _get_identifier(drug: Drug) -> str:
return drug.drugbank_id
def _create_namespace_entry_from_model(self, model: Drug, namespace: Namespace) -> NamespaceEntry:
return NamespaceEntry(
encoding='A',
name=model.name,
identifier=model.drugbank_id,
namespace=namespace,
)
def lookup_target(self, node: BaseEntity) -> Optional[Protein]:
namespace = node.get(NAMESPACE)
if node[FUNCTION] != PROTEIN or namespace is None:
return
identifier = node.get(IDENTIFIER)
if namespace.lower() == 'hgnc' and identifier:
return self.get_protein_by_hgnc_id(identifier)
if namespace.lower() == 'uniprot' and identifier:
return self.get_protein_by_uniprot_id(identifier)
def iter_targets(self, graph: BELGraph) -> Iterable[Tuple[BaseEntity, Protein]]:
for node in graph:
protein_model = self.lookup_target(node)
if protein_model is not None:
yield node, protein_model
[docs] def enrich_targets(self, graph: BELGraph) -> None:
"""Enrich the protein targets in the graph with Drug-Protein interactions from DrugBank."""
self.add_namespace_to_graph(graph)
c = 0
for node_data, protein_model in list(self.iter_targets(graph)):
for dpi in protein_model.drug_interactions:
dpi._add_to_graph(graph, dpi.drug.as_bel(), node_data)
c += 1
log.info('added %d drug-protein interactions.', c)
def get_drug_by_drugbank_id(self, drugbank_id: str) -> Optional[Drug]:
return self.session.query(Drug).filter(Drug.drugbank_id == drugbank_id).one_or_none()
def get_drug_by_inchi(self, inchi: str) -> Optional[Drug]:
return self.session.query(Drug).filter(Drug.inchi == inchi).one_or_none()
def get_drug_by_inchikey(self, inchikey: str) -> Optional[Drug]:
return self.session.query(Drug).filter(Drug.inchikey == inchikey).one_or_none()
def get_drug_by_xref(self, resource: str, identifier: str) -> Optional[Drug]:
# need to join the xref table for this one
xref = self.session.query(DrugXref).filter(DrugXref.has_identifier(resource, identifier)).one_or_none()
if xref:
return xref.drug
[docs] def lookup_drug(self, node: BaseEntity) -> Optional[Drug]:
"""Try and look up a drug."""
namespace = node.get(NAMESPACE)
if node[FUNCTION] != ABUNDANCE or namespace is None:
return
name, identifier = node.get(NAME), node.get(IDENTIFIER)
if namespace.lower() == 'drugbank':
if identifier is not None:
return self.get_drug_by_drugbank_id(identifier)
if name is not None and name.startswith('DB'):
return self.get_drug_by_drugbank_id(name)
def iter_drugs(self, graph: BELGraph) -> Iterable[Tuple[BaseEntity, Drug]]:
for node in graph:
drug_model = self.lookup_drug(node)
if drug_model is not None:
yield node, drug_model
[docs] def enrich_drug_inchi(self, graph: BELGraph) -> None:
"""Enrich drugs in the graph with their InChI equivalent nodes."""
self.add_namespace_to_graph(graph)
for node, drug_model in list(self.iter_drugs(graph)):
if drug_model.inchi:
graph.add_equivalence(node, drug_model.as_inchi_bel())
[docs] def enrich_drug_equivalences(self, graph: BELGraph) -> None:
"""Enrich drugs in the graph with their equivalent nodes."""
self.add_namespace_to_graph(graph)
for node, drug_model in list(self.iter_drugs(graph)):
if drug_model.inchi:
graph.add_equivalence(node, drug_model.as_inchi_bel())
if drug_model.inchikey:
graph.add_equivalence(node, drug_model.as_inchikey_bel())
for xref in drug_model.xrefs:
resource = xref.resource.lower()
identifier = xref.identifier
if xref.resource in {'chebi', 'chembl'}:
graph.add_equivalence(node, abundance(namespace=resource, identifier=identifier))
elif xref.resource == 'KEGG Compound':
# https://www.ebi.ac.uk/miriam/main/datatypes/MIR:00000013
graph.add_equivalence(node, abundance(namespace='kegg.compound', identifier=identifier))
elif xref.resource == 'PubChem Substance':
# https://www.ebi.ac.uk/miriam/main/datatypes/MIR:00000033
graph.add_equivalence(node, abundance(namespace='pubchem.substance', identifier=identifier))
elif xref.resource == 'PubChem Compound':
# https://www.ebi.ac.uk/miriam/main/datatypes/MIR:00000034
graph.add_equivalence(node, abundance(namespace='pubchem.compound', identifier=identifier))
# TODO there are plenty more. implement as other bio2bel repositories need
[docs] def enrich_drugs(self, graph: BELGraph) -> None:
"""Enrich drugs in the graph with their targets."""
self.add_namespace_to_graph(graph)
for node_data, drug_model in list(self.iter_drugs(graph)):
for dpi in drug_model.protein_interactions:
dpi.add_to_graph(graph)
[docs] def to_bel(self) -> BELGraph:
"""Export DrugBank as BEL."""
graph = BELGraph(
name='DrugBank',
version='5.1',
)
self.add_namespace_to_graph(graph)
hgnc_manager = bio2bel_hgnc.Manager(engine=self.engine, session=self.session)
hgnc_manager.add_namespace_to_graph(graph)
dpis = self.list_drug_protein_interactions()
for dpi in tqdm(dpis, total=self.count_drug_protein_interactions(),
desc='Mapping drug-protein interactions to BEL'):
if dpi.protein.hgnc_id is None:
continue
dpi.add_to_graph(graph)
return graph
[docs] def get_hgnc_id_to_drugs(self) -> Dict[str, List[str]]:
"""Get a dictionary of HGNC identifiers (not prepended with HGNC:) to list of drug names."""
rv = defaultdict(list)
for dpi in tqdm(self.list_drug_protein_interactions(),
total=self.count_drug_protein_interactions(),
desc='getting DTIs'):
if dpi.protein.hgnc_id is None:
continue
drug_name = dpi.drug.name
hgnc_id = dpi.protein.hgnc_id[len('HGNC:'):]
rv[hgnc_id].append(drug_name)
return dict(rv)
[docs] def get_drug_to_hgnc_ids(self, cache=True, recalculate=False) -> Dict[str, List[str]]:
"""Get a dictionary of drug names to lists HGNC identifiers (not prepended with HGNC:)."""
if cache and not recalculate and os.path.exists(_dti_ids_cache_path):
log.info('loading cached DTIs')
with open(_dti_ids_cache_path) as file:
return json.load(file)
rv = defaultdict(list)
for dpi in tqdm(self.list_drug_protein_interactions(),
total=self.count_drug_protein_interactions(),
desc='getting DTIs'):
if dpi.protein.hgnc_id is None:
continue
drug_name = dpi.drug.name
hgnc_id = dpi.protein.hgnc_id[len('HGNC:'):]
rv[drug_name].append(hgnc_id)
if cache:
with open(_dti_ids_cache_path, 'w') as file:
log.info('dumping cached DTIs')
json.dump(rv, file)
return dict(rv)
[docs] def get_drug_to_hgnc_symbols(self, cache=True, recalculate=False) -> Dict[str, List[str]]:
"""Get a dictionary of drug names to HGNC gene symbols."""
if cache and not recalculate and os.path.exists(_dti_symbols_cache_path):
log.debug('loading cached DTIs with gene symbols')
with open(_dti_symbols_cache_path) as file:
return json.load(file)
hgnc_manager = bio2bel_hgnc.Manager(engine=self.engine, session=self.session)
if not hgnc_manager.is_populated():
hgnc_manager.populate()
hgnc_id_symbol_mapping = hgnc_manager.build_hgnc_id_symbol_mapping()
drug_to_hgnc_ids = self.get_drug_to_hgnc_ids()
rv = defaultdict(list)
for drug, hgnc_ids in drug_to_hgnc_ids.items():
for hgnc_id in hgnc_ids:
hgnc_symbol = hgnc_id_symbol_mapping.get(hgnc_id)
if hgnc_symbol is None:
log.warning('could not map HGNC identifier: %s', hgnc_id)
continue
rv[drug].append(hgnc_symbol)
if cache:
with open(_dti_symbols_cache_path, 'w') as file:
log.info('dumping cached DTIs')
json.dump(rv, file)
return dict(rv)
[docs] def get_interactions_by_hgnc_id(self, hgnc_id: str) -> List[DrugProteinInteraction]:
"""Get the drug targets for a given HGNC identifier."""
protein = self.get_protein_by_hgnc_id(hgnc_id)
if not protein:
return []
return [
interaction
for interaction in protein.drug_interactions
]