Source code for ccdc.io

#
# This code is Copyright (C) 2015 The Cambridge Crystallographic Data Centre
# (CCDC) of 12 Union Road, Cambridge CB2 1EZ, UK and a proprietary work of CCDC.
# This code may not be used, reproduced, translated, modified, disassembled or
# copied, except in accordance with a valid licence agreement with CCDC and may
# not be disclosed or redistributed in any form, either in whole or in part, to
# any third party. All copies of this code made in accordance with a valid
# licence agreement as referred to above must contain this copyright notice.
#
# No representations, warranties, or liabilities are expressed or implied in the
# supply of this code by CCDC, its servants or agents, except where such
# exclusion or limitation is prohibited, void or unenforceable under governing
# law.
#
'''
Module for reading and writing of molecules, crystals and database entries.

There are three types of readers: :class:`MoleculeReader`,
:class:`CrystalReader` and :class:`EntryReader`. The latter is used to read in
database entries. It can also be used to read sdf files with the entry's attributes dictionary formatted as SD
tags.

Retrieving database entries from the CSD::

    # Creating a CSD entry reader, including any updates which may be present
    csd_entry_reader = EntryReader('CSD')

    # Similarly a set of in-house databases may be adjoined to the CSD by constructing readers over
    # a list of files.

    # Retrieve an entry based upon its index
    first_csd_entry = csd_entry_reader[0]

    # Access an entry/crystal/molecule based upon on its identifier
    abebuf_entry = csd_entry_reader.entry('ABEBUF')
    abebuf_crystal = csd_entry_reader.crystal('ABEBUF')
    abebuf_molecule = csd_entry_reader.molecule('ABEBUF')

    # Loop over all CSD entries
    for entry in csd_entry_reader:
        print(entry.identifier)

    # Loop over all the molecules
    for mol in csd_entry_reader.molecules():
        print(mol.smiles)

Accessing molecules from a file::

    # Creating a molecule reader
    mol_reader = MoleculeReader('my_molecules.mol2')

    # Retrieve a molecule based upon its index
    first_molecule = mol_reader[0]

    # Loop over all molecules
    for mol in mol_reader:
        print(mol.smiles)

There are three types of writers: :class:`MoleculeWriter`,
:class:`CrystalWriter` and :class:`EntryWriter`. The latter can be used to
write out sdf files with the entry's attributes dictionary formatted as SD
tags.  The writers inherit functionality from the private base class
:class:`_DatabaseWriter`.

Using a :class:`MoleculeWriter` to write out a molecule::

    with MoleculeWriter('abebuf.mol2') as mol_writer:
        mol_writer.write(abebuf_molecule)

'''
##########################################################################

import os
import glob
import types
import warnings
warnings.simplefilter('always', DeprecationWarning)

import collections
import tempfile
import gzip

from ccdc.entry import Entry
from ccdc.crystal import Crystal
from ccdc.molecule import Molecule, _CifFileWithBonds
from ccdc.utilities import nested_class, Logger, CSDNotFoundException

from ccdc.utilities import _private_importer
with _private_importer():
    import UtilitiesLib
    import ChemistryLib
    import DatabaseEntryLib
    import FileFormatsLib
    import CSDSQLDatabaseLib
    import MotifPharmacophoreLib

##########################################################################

class _CSDDatabaseLocator(object):
    @staticmethod
    def ignore_csd_data():
        return len(os.environ.get('CCDC_IGNORE_CSD_DATA', '')) >= 1

    '''Locates CSD databases in a platform independent way.'''
    @staticmethod
    def get_csd_location():
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        '''Finds the CSD from registry, environment variables &c.'''
        csd_loc = os.environ.get('CSD_DATA_DIRECTORY')
        if csd_loc and os.path.isdir(csd_loc):
            return csd_loc
        csd_loc = UtilitiesLib.CSDLocator().get_csd_location()
        version = _CSDDatabaseLocator.get_csd_version()
        if csd_loc and os.path.exists(csd_loc):
            return csd_loc
        if csd_loc and os.path.exists(os.path.dirname(csd_loc)):
            return os.path.dirname(csd_loc)
        warnings.warn(_CSDDatabaseLocator.get_location_warning())

    @staticmethod
    def get_csd_version():
        '''The version number of the current CSD.'''
        if _CSDDatabaseLocator.ignore_csd_data():
            return 0
        v = os.environ.get('CSD_VERSION')
        if not v:
            v = UtilitiesLib.CSDLocator().get_csd_version()
        version = ''.join(s for s in v if s.isdigit())
        return version

    @staticmethod
    def get_binary_csd_location():
        '''Locate the binary CSD.'''
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        path = os.environ.get('CCDC_TOOLKIT_SQLITE_DATABASE', '')
        if os.path.isfile(path):
            return path
        csd_loc = _CSDDatabaseLocator.get_csd_location() or ''
        if os.path.isdir(os.path.dirname(csd_loc)):
            version = _CSDDatabaseLocator.get_csd_version()
            bin_loc = os.path.join(os.path.dirname(csd_loc), 'as%sbe_ASER.sqlite' % version)
            if os.path.exists(bin_loc):
                return bin_loc
        warnings.warn(_CSDDatabaseLocator.get_location_warning())

    @staticmethod
    def get_interaction_library_directory():
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        csd_loc = _CSDDatabaseLocator.get_csd_location()
        if csd_loc:
            d = os.path.join(os.path.dirname(csd_loc), 'isostar_files')
            if os.path.exists(d):
                return d
        d = os.getenv('CCDC_ISOSTAR_DATA_DIRECTORY')
        if d and os.path.exists(d):
            return d

    @staticmethod
    def get_interaction_library_data_files_location():
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        data_dir = _CSDDatabaseLocator.get_interaction_library_directory()
        if data_dir:
            loc = os.path.join(data_dir, 'istr')
            if os.path.exists(loc):
                return loc
        warnings.warn('Cannot find interaction library data files')

    @staticmethod
    def get_interaction_library_query_files_location(which):
        '''which should be 'contact' or 'central' '''
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        data_dir = _CSDDatabaseLocator.get_interaction_library_directory()
        if data_dir:
            loc = os.path.join(data_dir, which + '_group_ini')
            if os.path.exists(loc):
                return loc
        # Internal use from a build space
        path = os.getenv('ISOSTAR_FILES')
        if os.path.exists(path):
            return os.path.join(path, which + '_group_ini')
        warnings.warn('Cannot find interaction library files')

    @staticmethod
    def get_interaction_library_definition_location(which):
        '''which should be 'central' or 'contact' '''
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        data_dir = _CSDDatabaseLocator.get_interaction_library_directory()
        if data_dir:
            loc = os.path.join(data_dir, 'group_xml_files', 'csd_%s_group_table.xml' % which)
            if os.path.exists(loc):
                return loc
        # Internal use from a build space
        path = os.getenv('ISOSTAR_FILES')
        if path and os.path.exists(path):
            path = os.path.join(path, 'group_xml_files', 'csd_%s_group_table.xml' % which)
            if os.path.exists(path):
                return path
        warnings.warn('Cannot find interaction library %s group file''' % which)

    @staticmethod
    def get_optimisation_parameter_file_location():
        # Note this location has nothing to do with the CSD data location
        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'parameter_files')
        if os.path.exists(path):
            return path
        warnings.warn('Cannot find parameter files')
    get_conformer_parameter_file_location = get_optimisation_parameter_file_location

    @staticmethod
    def get_cavity_dir_location():
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        return os.environ.get('CCDC_CAVITY_DIRECTORY', '')

    @staticmethod
    def get_crossminer_database_location():
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        return os.environ.get('CCDC_CROSSMINER_DATABASE')

    @staticmethod
    def get_crossminer_feature_definition_directory():
        if _CSDDatabaseLocator.ignore_csd_data():
            return None
        return os.environ.get('CCDC_CROSSMINER_FEATURE_DIRECTORY')

    @staticmethod
    def get_location_warning():
        '''Return a standard warning message.'''
        return ('Cannot locate the CSD database.\n'
                'For further help with installing and configuring data please visit\n'
                'the support page at https://www.ccdc.cam.ac.uk/csds_install_help')

##########################################################################


[docs]def csd_directory(): '''Return the directory containing the CSD.''' csd_loc = _CSDDatabaseLocator.get_csd_location() if os.path.isdir(csd_loc): return csd_loc return os.path.dirname(csd_loc)
[docs]def csd_version(): '''Return the version of the CSD in use.''' return _CSDDatabaseLocator.get_csd_version()
########################################################################## class _LazyCSDLoader: '''Lazy loading of the CSD for testsuite This avoids repeat loading. It is a good way to access the CSD if using the _csd_required annotation on tests, as that way the CSD isn't loaded until the test has been checked to ensure it is necessary ''' def __init__(self, creation_method): self._csd = None self.creation_method = creation_method def csd(self): if self._csd is None: self._csd = self.creation_method('csd') return self._csd class _LazyCSDEntryReader(): '''Lazy loading of an EntryReader on the CSD for testsuite ''' def __init__(self): self._csd = _LazyCSDLoader(EntryReader) def csd(self): return self._csd.csd() class _LazyCSDCrystalReader(): '''Lazy loading of a CrystalReader on the CSD for testsuite ''' def __init__(self): self._csd = _LazyCSDLoader(CrystalReader) def csd(self): return self._csd.csd() class _LazyCSDMoleculeReader(): '''Lazy loading of a MoleculeReader on the CSD for testsuite ''' def __init__(self): self._csd = _LazyCSDLoader(MoleculeReader) def csd(self): return self._csd.csd() ########################################################################## class _Writer(object): '''Base class for Writer - do not instantiate directly.''' def __init__(self, fname='', append=False): '''Sets the file name and opens an output stream''' self.file_name = fname self.append = append if fname == '' or fname == 'stdout': self.stream = UtilitiesLib.cout else: self.stream = UtilitiesLib.ofstream() mode = UtilitiesLib.ofstream.app if append else UtilitiesLib.ofstream.out self.stream.open(str(fname), mode) if not self.stream.good(): raise IOError('Could not open %s for writing.' % fname) def __str__(self): '''Human readable representation.''' return "%s('%s')" % (self.__class__.__name__, self.file_name) __repr__ = __str__ def write_entry(self, e): '''Not implemented''' raise NotImplementedError('_Writer is an abstract class') def write_crystal(self, e): '''Not implemented.''' raise NotImplementedError('_Writer is an abstract class') def write_molecule(self, m): '''Not implemented.''' raise NotImplementedError('_Writer is an abstract class') def close(self): '''Close the stream.''' try: self.stream.close() except: self.stream.flush() def __enter__(self): '''Entry point for a context manager.''' return self def __exit__(self, type_val, value, traceback): '''Exit point for a context manager.''' self.close() if traceback is not None: if str(value) == 'KeyboardInterrupt': print('Interrupt during write of %s. File may not be complete.' % self) else: os.unlink(self.file_name) class _MoleculeFileWriter(_Writer): '''Base class for mol2 and sdf formats.''' def __init__(self, fname='', append=False): '''Instantiates the file format, and opens an output stream.''' _Writer.__init__(self, fname, append) self.mf = self.klass() # pylint: disable=E1101 def write_entry(self, e): '''Writes an entry to the file format class''' if isinstance(e, Molecule): e = Entry.from_molecule(e) elif isinstance(e, Crystal): e = Entry.from_molecule(e.molecule) self.mf.clear() self.mf.set(e._entry) self.mf.write(self.stream) def write_crystal(self, c): '''Writes a crystal to the file format class.''' if isinstance(c, Entry): c = c.crystal elif isinstance(c, Molecule): c = Entry.from_molecule(c).crystal e = DatabaseEntryLib.CrystalStructureImmediateDatabaseEntry() e.set_crystal_structure(c._crystal) e.set_identifier(UtilitiesLib.DatabaseEntryIdentifier(c.identifier)) try: e.set_chemical_info(c._chemical_info) except AttributeError: pass try: e.set_crystal_info(c._crystal_info) except AttributeError: pass self.mf.clear() self.mf.set(e) self.mf.write(self.stream) def write_molecule(self, m): '''Writes a molecule to the file format class.''' self.mf.clear() if hasattr(m, '_cell'): c = ChemistryLib.ConcreteCrystalStructure() c.set_editable_molecule(m._molecule) cell = m._cell c.set_cell( cell, ChemistryLib.CrystalStructure.KEEP_ORTHOGONAL_COORDINATES ) self.mf.set(c, UtilitiesLib.DatabaseEntryIdentifier(m.identifier)) else: self.mf.set(m._molecule, UtilitiesLib.DatabaseEntryIdentifier(m.identifier)) self.mf.write(self.stream) class _Mol2Writer(_MoleculeFileWriter): '''Writer in mol2 format.''' klass = FileFormatsLib.Mol2File def write_entry(self, e): if isinstance(e, Molecule): e = Entry.from_molecule(e) elif isinstance(e, Crystal): e = Entry.from_molecule(e.molecule) x = FileFormatsLib.DatabaseEntryToSDfileDatabaseEntry(e._entry) if hasattr(e, 'attributes'): attrs = { k: str(v) for k, v in e.attributes.items() } else: attrs = dict() if not x: x = FileFormatsLib.SDfileDatabaseEntry( e._entry.identifier(), e._entry.crystal_structure(), attrs ) self.mf.set(x) mol2comment = FileFormatsLib.Mol2Comment() self.mf.add_comment(mol2comment) if attrs: for k, v in attrs.items(): mol2comment.add_comment_line('> <%s>' % k) mol2comment.add_comment_line(str(v)) mol2comment.add_comment_line('') else: mol2comment.add_comment_line('') if hasattr(e, 'atom_sets'): m2set = FileFormatsLib.Mol2Set() for k, v in e.atom_sets.items(): m2set.add_atom_set(k, v) self.mf.set_set(m2set) self.mf.write(self.stream) class _SDFWriter(_MoleculeFileWriter): '''Writer in sdf (MACCS) format.''' klass = FileFormatsLib.MolFile def write_entry(self, e): if isinstance(e, Molecule): e = Entry.from_molecule(e) elif isinstance(e, Crystal): e = Entry.from_molecule(e.molecule) x = FileFormatsLib.DatabaseEntryToSDfileDatabaseEntry(e._entry) if hasattr(e, 'attributes'): attrs = { k: str(v) for k, v in e.attributes.items() } else: attrs = dict() if not x: x = FileFormatsLib.SDfileDatabaseEntry( e._entry.identifier(), e._entry.crystal_structure(), attrs ) self.mf.set(x) for k, v in attrs.items(): self.mf.add_sd_tag( '> <%s>' % k, ['%s' % v] ) self.mf.write(self.stream) class _GCDWriter(_Writer): '''Writer in GCD (i.e., refcode) format.''' def write_entry(self, e): '''Writes the refcode''' self.stream.write(str(e.identifier + '\n'), len(e.identifier) + 1) def write_crystal(self, c): '''Writes the refcode.''' self.stream.write(str(c.identifier + '\n'), len(c.identifier) + 1) def write_molecule(self, m): '''Writes the refcode.''' self.stream.write(str(m.identifier + '\n'), len(m.identifier) + 1) ########################################################################
[docs]class _DatabaseWriter(_Writer): '''Base class for database formats. :param fname: The filename of the database to create or open. :param append: Append to the database when True, rather than replace it. Writers are context managers, supporting the syntax:: with MoleculeWriter('output.mol2', append=True) as filehandle: filehandle.write(mol) ''' def __init__(self, fname, append=False): self.file_name = os.path.abspath(fname) if not append: self.remove() _Writer.__init__(self, self.file_name, append=append) self._db = self.klass( # pylint: disable=E1101 fname, UtilitiesLib.OpenMode( UtilitiesLib.OpenMode.CREATE | UtilitiesLib.OpenMode.WRITE ) ) if self._db.has_expiry_date(): if self._db.expiry_date() < UtilitiesLib.Date.today(): raise RuntimeError('This database has expired.')
[docs] def remove(self): '''Remove the file if it exists.''' if os.path.exists(self.file_name): os.unlink(self.file_name)
[docs] def write_entry(self, e): '''Appends an entry to the database to be written out. :param e: :class:`ccdc.entry.Entry` ''' self._db.append(e._entry)
[docs] def write_crystal(self, c): '''Appends an entry to the database to be written out. :param c: :class:`ccdc.crystal.Crystal` ''' e = DatabaseEntryLib.CrystalStructureImmediateDatabaseEntry() e.set_crystal_structure(c._crystal) e.set_identifier(UtilitiesLib.DatabaseEntryIdentifier(c.identifier)) e.set_chemical_info(c._chemical_info) e.set_crystal_info(c._crystal_info) self._db.append(e)
[docs] def write_molecule(self, m): '''Appends a molecule to the database to be written out. :param m: :class:`ccdc.molecule.Molecule` ''' c = ChemistryLib.ConcreteCrystalStructure() c.set_editable_molecule(m._molecule) if hasattr(m, '_cell'): cell = m._cell else: cell = ChemistryLib.Cell() c.set_cell( cell, ChemistryLib.CrystalStructure.KEEP_ORTHOGONAL_COORDINATES ) e = DatabaseEntryLib.CrystalStructureImmediateDatabaseEntry( UtilitiesLib.DatabaseEntryIdentifier(m.identifier) ) e.set_crystal_structure(c) self._db.append(e)
def __del__(self): '''Ensure the database is closed on deletion.''' self.close()
[docs] def close(self): '''Close the database.''' if self._db is not None: self._db.flush() self._db = None self.stream.flush() self.stream.close()
class _AserWriter(_DatabaseWriter): '''Not implemented any more (2.0).''' def __init__(self, *args): raise RuntimeError( '''ASER format databases have been removed in version 2.0. Please write your your data to another format of database such as csdsql.''' ) class _CifWriter(_MoleculeFileWriter): '''Write CifFiles from molecules or entries.''' klass = _CifFileWithBonds def write_entry(self, e): '''Writes the entry, including any new attributes.''' if isinstance(e, (Molecule, Crystal)): super(self.__class__, self).write_entry(e) else: if hasattr(e, 'global_attributes') and hasattr(e.global_attributes, '_global_data_block') and e.global_attributes._global_data_block is not None: e.global_attributes._global_data_block._print(self.stream, FileFormatsLib.CifWriteSettings()) if hasattr(e, 'attributes') and hasattr(e.attributes, '_data_block'): e.attributes._data_block._print(self.stream, FileFormatsLib.CifWriteSettings()) else: self.mf.clear() self.mf.set(e._entry) self.mf.write(self.stream) class _ResWriter(_MoleculeFileWriter): '''Write res (shellx) files.''' klass = FileFormatsLib.ResFile class _PDBWriter(_MoleculeFileWriter): klass = FileFormatsLib.PdbFile class _CSDSQLDatabaseWriter(_DatabaseWriter): '''Writes entries in CSDSQLite format.''' def __init__(self, file_name, append): self.file_name = file_name self.append = append if os.path.exists(file_name) and not append: os.unlink(file_name) self._db = CSDSQLDatabaseLib.make_database( file_name, UtilitiesLib.OpenMode.CREATE | UtilitiesLib.OpenMode.WRITE ) def close(self): '''Closes the database.''' if hasattr(self, '_db'): del self._db def write_molecule(self, molecule): '''Writes a molecule to the database.''' self.write_entry(Entry.from_molecule(molecule)) def write_crystal(self, crystal): self.write_entry(Entry.from_molecule(crystal.molecule)) def write_entry(self, entry): '''Writes an entry to the database.''' try: self._db.append(entry._entry, 0) except RuntimeError as exc: raise RuntimeError(str(exc)) ########################################################################## class _WriterFactory(object): '''Factory to return a _Writer based on file suffix.''' known_formats = { 'sdf': _SDFWriter, 'mol': _SDFWriter, 'mol2': _Mol2Writer, 'identifiers': _GCDWriter, 'cif': _CifWriter, 'res': _ResWriter, 'pdb': _PDBWriter, 'csdsql': _CSDSQLDatabaseWriter, } known_suffixes = { 'sdf': _SDFWriter, 'mol': _SDFWriter, 'mol2': _Mol2Writer, 'gcd': _GCDWriter, #'inf': _AserWriter, - is this used by CSDSQLDatabase? 'cif': _CifWriter, 'res': _ResWriter, 'pdb': _PDBWriter, 'ent': _PDBWriter, 'csdsql' : _CSDSQLDatabaseWriter, } def __new__(klass, fname, format='', append=False): '''Construct a local class based on filename, and splice in methods from klass. ''' if format: if format.lower() in _WriterFactory.known_formats: class k(_WriterFactory.known_formats[format.lower()]): pass else: raise NotImplementedError('Unknown format %s' % format) else: if fname == 'stdout' or fname == 'stderr': class k(_Mol2Writer): pass else: suff = os.path.splitext(fname)[1][1:] if suff.lower() in _WriterFactory.known_suffixes: class k(_WriterFactory.known_suffixes[suff.lower()]): pass else: raise NotImplementedError('Unknown file suffix %s' % suff) k.__name__ = klass.__name__ ret = k(fname, append=append) for m, v in klass.__dict__.items(): if isinstance(getattr(klass, m), (types.MethodType, types.FunctionType)): setattr(k, m, v) return ret
[docs]class CrystalWriter(_WriterFactory): '''Writes crystals by default.'''
[docs] def write(self, c): '''Write the crystal. :param c: :class:`ccdc.crystal.Crystal` ''' self.write_crystal(c) # pylint: disable=E1101
[docs]class MoleculeWriter(_WriterFactory): '''Writes molecules by default.'''
[docs] def write(self, m): '''Write the molecule. :param m: :class:`ccdc.molecule.Molecule` ''' self.write_molecule(m) # pylint: disable=E1101
[docs]class EntryWriter(_WriterFactory): '''Writes Database Entries by default.'''
[docs] def write(self, e): '''Write the entry. :param e: :class:`ccdc.entry.Entry` ''' self.write_entry(e) # pylint: disable=E1101
########################################################################## # Readers ##########################################################################
[docs]class _DatabaseReader(object): '''Base class for database readers. Readers are context managers, supporting the syntax:: with MoleculeReader(filename) as filehandle: for mol in filehandle: print(mol.smiles) ''' def __init__(self, fname, db=''): '''Distinguished file name of 'CSD' to use the (internal or installed) CSD database. Also, updates, such as Nov12. ''' self.file_name = fname self._db = None self._lazy_enumerator = None self._tempfile = None if hasattr(fname, '__iter__') and not isinstance(fname, str): dbs = tuple(self.__class__(f)._db for f in fname) self._db = FileFormatsLib.CrystalStructureDatabasePool() for db in dbs: self._db.append(db) db_names = fname self._component_dbs = dict( (db_names[i], dbs[i]) for i in range(len(db_names)) ) elif fname.lower() == 'csd': csd_dir = _CSDDatabaseLocator.get_binary_csd_location() ok = csd_dir is not None csd_paths = None if ok: if 'CCDC_LOCAL_CSD_SQLITE_ASER_FILES' in os.environ: csd_paths = os.environ['CCDC_LOCAL_CSD_SQLITE_ASER_FILES'].split(os.pathsep) else: csd_paths = glob.glob(os.path.join( os.path.dirname(_CSDDatabaseLocator.get_binary_csd_location()), '*ASER.sqlite' )) if csd_paths is None or len(csd_paths) == 0: ok = False try: self._db = CSDSQLDatabaseLib.load_main_csd_and_updates_as_pool() except RuntimeError: ok = False if not ok: raise CSDNotFoundException("Cannot load CSD data from %s" % csd_dir) if len(csd_paths) != 1: filename = csd_paths months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] csd_paths = [fname for fname in csd_paths if fname.startswith('as') or fname[:3].upper() in months] def sortkey(f): base = os.path.basename(f) if base.startswith('as'): return '' m = months.index(base[:3].upper()) k = '%s_%02d' % (base[3:7], m) return k filename.sort(key=sortkey) else: filename = csd_paths[0] self.file_name = filename elif os.path.exists(fname) and os.path.splitext(fname)[1] == '.sqlite': self._db = CSDSQLDatabaseLib.make_database(fname, UtilitiesLib.OpenMode.READ) self.file_name = fname elif os.path.exists(fname) and os.path.splitext(fname)[1] == '.gz': self._tempfile = os.path.join(tempfile.gettempdir(), os.path.splitext(os.path.basename(fname))[0]) with gzip.open(fname,"rb") as compressed, open(self._tempfile,"wb") as decompressed: decompressed.write(compressed.read()) decompressed.flush() decompressed.close() self._db = self.klass(os.path.abspath(self._tempfile)) self.file_name = fname elif os.path.exists(fname): self._db = self.klass(os.path.abspath(fname)) self.file_name = fname else: raise IOError('File not found %s' % fname) @property def _enumerator(self): """ Creating an enumerator can take time proportional to the size of the entire database. Therefore only do it once it's needed. """ if self._lazy_enumerator is None: self._lazy_enumerator = self._db.enumerator() return self._lazy_enumerator @_enumerator.setter def _enumerator(self, value): self._lazy_enumerator = value def __str__(self): '''Human readable representation.''' return "%s('%s')" % (self.__class__.__name__, self.file_name) __repr__ = __str__ def _real_database(self, identifier): '''Private: deconvolve a pool if necessary.''' if hasattr(self, '_component_dbs'): return self._component_dbs[self._db.source_database_name(UtilitiesLib.DatabaseEntryIdentifier(identifier))] else: return self._db @staticmethod def _make_crystal(e): '''PRIVATE: make a crystal from a DatabaseEntry''' return Entry._make_crystal(e) def _make_molecule(self, e): '''PRIVATE: make a molecule from an entry.''' return self._make_entry(e).molecule
[docs] def entries(self): '''Generator for entries in the database.''' for i in range(self._db.size()): yield self._make_entry(self._enumerator.entry(i))
[docs] def crystals(self): '''Generator for crystals in the database.''' for e in self.entries(): yield e.crystal
[docs] def molecules(self): '''Generator for molecules of the database.''' for e in self.entries(): m = e.molecule yield m
def _make_entry(self, e): return Entry(e)
[docs] def entry(self, id): '''Random access to entries. :param id: :attr:`ccdc.entry.Entry.identifier` :returns: :class:`ccdc.entry.Entry` ''' e = self._db.entry(UtilitiesLib.DatabaseEntryIdentifier(id)) return self._make_entry(e)
[docs] def crystal(self, id): '''Random access to crystals. :param id: :attr:`ccdc.crystal.Crystal.identifier` :returns: :class:`ccdc.crystal.Crystal` ''' e = self.entry(id) return self._make_crystal(e._entry)
[docs] def molecule(self, id): '''Random access to molecules :param id: :attr:`ccdc.molecule.Molecule.identifier` :returns: :class:`ccdc.molecule.Molecule` ''' return self.entry(id).molecule
[docs] def identifier(self, i): '''Random access to identifiers. :param i: int index :returns: str identifier ''' return self[i].identifier
def __del__(self): '''Delete the database.''' self.close()
[docs] def close(self): '''Close the database.''' if hasattr(self, "_component_dbs") and self._component_dbs: self._component_dbs = [] if hasattr(self, "_text_numeric_searcher"): self._text_numeric_searcher = None if hasattr(self, '_db') and self._db is not None: self._db = None if hasattr(self, '_lazy_enumerator') and self._lazy_enumerator is not None: self._lazy_enumerator = None if hasattr(self, '_tempfile') and self._tempfile is not None: os.remove(self._tempfile) self._tempfile = None
def __len__(self): '''The size of the database. This will be the number of entries, not necessarily the number of accessible molecules.''' return self._db.size() def __enter__(self): '''Make the database a context manager.''' return self def __exit__(self, type, value, traceback): '''Termination of the context manager''' self.close() def _extended_refcode_dict(self): if not hasattr(self, '_ext_ref_dict'): if hasattr(self._db, 'extended_refcode_list'): ExtendedInfo = collections.namedtuple('ExtendedInfo', [ 'has_3d', 'reliability_score' ]) self._ext_ref_dict = dict( (x.identifier().str(), ExtendedInfo(x.has_3d(), x.reliability_score())) for x in self._db.extended_refcode_list() ) else: raise RuntimeError('The database %s does not support extended refcodes.''' % self.file_name) return self._ext_ref_dict @property def journals(self): '''The list of journals held in a database.''' if hasattr(self._db, 'journal_list_info'): try: return dict( (j.name(), j.ccdc_coden()) for j in self._db.journal_list_info().journal_list() ) except TypeError: pass if hasattr(self._db, 'journal_info'): return dict( (j.name(), j.ccdc_coden()) for j in self._db.journal_info().journal_list() )
class _DatabasePoolReader(_DatabaseReader): def __init__(self, file_names, **kw): self.file_name = file_names self._lazy_enumerator = None _dbs = [_ReaderFactory(fname) for fname in file_names] if all(isinstance(_db, _Mol2Reader) for _db in _dbs): self._db = FileFormatsLib.GoldMol2DatabasePool() for db in _dbs: self._db.append(FileFormatsLib.CrystalStructureDatabaseAsMol2Database(db._db)) elif all(isinstance(_db, _SDFReader) for _db in _dbs): self._db = FileFormatsLib.GoldSDFDatabasePool() for db in _dbs: self._db.append(FileFormatsLib.CrystalStructureDatabaseAsSDFDatabase(db._db)) else: self._db = FileFormatsLib.CrystalStructureDatabasePool() for db in _dbs: self._db.append(db._db) self._db = FileFormatsLib.CrystalStructureDatabasePoolAsCrystalStructureDatabase(self._db) def _make_entry(self, e): entry = Entry(e) if isinstance(self._db, FileFormatsLib.GoldMol2DatabasePool): f = FileFormatsLib.DatabaseEntryToSDfileDatabaseEntry(e) tags = f.tags() entry.attributes = tags zzz = f.mol2_set() if not FileFormatsLib.Mol2Set_is_NULL(zzz): entry.atom_sets = { name: zzz.indices_in_set(name) for name in zzz.set_names() } elif isinstance(self._db, FileFormatsLib.GoldSDFDatabasePool): f = FileFormatsLib.DatabaseEntryToSDfileDatabaseEntry(e) tags = f.tags() entry.attributes = tags return entry class _StringDatabaseReader(_DatabaseReader): def __init__(self): class EnumeratorMaker(object): def __init__(self, _structures): self._structures = _structures def enumerator(self): return self def entry(self, i): return self._structures[i] self.file_name = 'string' self._db = self self._lazy_enumerator = EnumeratorMaker(self._structures) def __iter__(self): return self._structures.__iter__() def _make_entry(self, e): return e def _make_crystal(self, e): return e.crystal def _make_molecule(self, e): return e.molecule def entries(self): for e in self._structures: yield e def entry(self, id): for e in self.entries(): if e.identifier == id: return e def __len__(self): return len(self._structures) class _MultiMol2StringDatabase(_StringDatabaseReader): def __init__(self, text, **kw): self.text = text self.parts = ['%s%s' % ('@<TRIPOS>MOLECULE', p) for p in text.split('@<TRIPOS>MOLECULE')[1:]] self._structures = [Entry.from_string(p, format='mol2') for p in self.parts] _StringDatabaseReader.__init__(self) class _MultiSDFStringDatabase(_StringDatabaseReader): def __init__(self, text, **kw): self.text = text self.parts = [p for p in text.split('$$$$\n')] try: self._structures = [Entry.from_string(p, format='sdf') for p in self.parts] except RuntimeError: self._structures = [Entry.from_string(p, format='sdf') for p in self.parts[:-1]] _StringDatabaseReader.__init__(self) class _AserReader(_DatabaseReader): '''Not implemented any more (2.0).''' def __init__(self, *args, **kw): raise RuntimeError( '''ASER format databases have been version 2.0. Please convert your ASER format databases using ccdc_babel to another format such as csdsql.''' ) class _CSDSQLDatabaseReader(_DatabaseReader): '''CSD SQLite databases.''' klass = CSDSQLDatabaseLib.CSDSQLDatabase class _SQLMol2Reader(_DatabaseReader): '''Mol2 SQLite databases.''' klass = MotifPharmacophoreLib.Mol2FileSqliteDatabase class _CifDatabaseWithBonds(FileFormatsLib.CifDatabase): '''CifDatabase variant that is configured for reading and writing bonds.''' def __init__(self, *args, **kwargs): super(_CifDatabaseWithBonds, self).__init__(*args, **kwargs) options = _CifFileWithBonds.cif_bond_options() self.set_read_write_options(options) class _CifReader(_DatabaseReader): '''Database of cif molecules.''' klass = FileFormatsLib.CifDatabase def __init__(self, fname, db=None): _DatabaseReader.__init__(self, fname) options = self._db.read_write_options() options.calculate_z_value_ = True self._db.set_read_write_options(options) self._db = FileFormatsLib.CifDatabaseAsCrystalStructureDatabase(self._db) def _make_entry(self, e): entry = Entry(e) i = self._enumerator.index(e.identifier()) cif_db = FileFormatsLib.CrystalStructureDatabaseAsCifDatabase(self._db) entry.attributes = Entry._CifAttributes(cif_db.data_block(i)) if cif_db.global_data_block(i) is not None: entry.global_attributes = Entry._CifAttributes(cif_db.global_data_block(i)) entry.global_attributes._global_data_block = cif_db.global_data_block(i) else: entry.global_attributes = None return entry class _GCDReader(_DatabaseReader): '''GCD (refcode lists) databases. Will take entries from the supplied database. Will use the internal or installed CSD by default. ''' def __init__(self, fname, db=''): self.file_name = fname self._lazy_enumerator = None xxx = EntryReader(db) self._underlying_file_name = xxx.file_name _db = xxx._db self._db = FileFormatsLib.CrystalStructureDatabaseSubset(fname, _db) self._db = FileFormatsLib.CrystalStructureDatabaseSubsetAsCrystalStructureDatabase(self._db) if isinstance(fname, (list, tuple)): if not all(_db.identifier_exists(UtilitiesLib.DatabaseEntryIdentifier(name)) for name in fname): logger = Logger() logger.warning('Not all the identifiers exist in the database') else: with open(fname) as f: if not all( _db.identifier_exists(UtilitiesLib.DatabaseEntryIdentifier(name.strip())) for name in f ): logger = Logger() logger.warning('Not all the identifiers exist in the database') class _Mol2Reader(_DatabaseReader): '''Database of mol2 molecules.''' klass = FileFormatsLib.GoldMol2Database def __init__(self, fname, db=None): _DatabaseReader.__init__(self, fname) self._db = FileFormatsLib.Mol2DatabaseAsCrystalStructureDatabase(self._db) def _make_entry(self, e): entry = Entry(e) f = FileFormatsLib.DatabaseEntryToSDfileDatabaseEntry(e) tags = f.tags() entry.attributes = tags zzz = f.mol2_set() if not FileFormatsLib.Mol2Set_is_NULL(zzz): entry.atom_sets = { name: zzz.indices_in_set(name) for name in zzz.set_names() } return entry class _SDFReader(_DatabaseReader): '''Database of sdf molecules.''' class _SDFBase(FileFormatsLib.SDFDatabase, _DatabaseReader): def __init__(self, filename): FileFormatsLib.SDFDatabase.__init__(self, filename, FileFormatsLib.DONT_ENFORCE) klass = _SDFBase def __init__(self, filename, db=None): _DatabaseReader.__init__(self, filename) self._db = FileFormatsLib.SDFDatabaseAsCrystalStructureDatabase(self._db) def _make_entry(self, e): entry = Entry(e) f = FileFormatsLib.DatabaseEntryToSDfileDatabaseEntry(e) tags = f.tags() entry.attributes = tags return entry class _PDBReader(_DatabaseReader): '''Database of PDB entries.''' klass = FileFormatsLib.PDBDatabase def __init__(self, filename, db=None): _DatabaseReader.__init__(self, filename) self._db = FileFormatsLib.PDBDatabaseAsCrystalStructureDatabase(self._db) class _ResReader(_DatabaseReader): '''Database of res (shellx) entries.''' klass = FileFormatsLib.ResDatabase def __init__(self, file_name, db=None): _DatabaseReader.__init__(self, file_name) self._db = FileFormatsLib.ResDatabaseAsCrystalStructureDatabase(self._db) class _GlobReader(_DatabaseReader): def __init__(self, pattern): '''Initialise by saving the globbed pattern''' self.files = glob.glob(pattern) self.files.sort() def entries(self): '''Iterate over entries''' for f in self.files: with EntryReader(f) as reader: for e in reader.entries(): yield e def crystals(self): '''Iterate over crystals''' for f in self.files: with CrystalReader(f) as reader: for c in reader.crystals(): yield c def molecules(self): '''Iterate over molecules''' for f in self.files: with MoleculeReader(f) as reader: for m in reader.molecules(): yield m class _ReaderFactory(object): '''Provide a molecule Reader by inspection of file suffix. If db is not given it will default to CSD. It may be required for .gcd files. If format is given it will override that given in the filename. It is required where filename is 'stdin' ''' known_suffixes = { 'mol2': _Mol2Reader, 'gcd': _GCDReader, 'sdf': _SDFReader, 'mol': _SDFReader, 'cif': _CifReader, 'pdb': _PDBReader, 'ent': _PDBReader, 'res': _ResReader, 'sqlite': _CSDSQLDatabaseReader, 'csdsql': _CSDSQLDatabaseReader, 'csdsqlx': _CSDSQLDatabaseReader, 'sqlmol2': _SQLMol2Reader, } known_formats = { 'sdf': _SDFReader, 'mol': _SDFReader, 'mol2': _Mol2Reader, 'identifiers': _GCDReader, 'cif': _CifReader, 'res': _ResReader, 'sqlite': _CSDSQLDatabaseReader, 'csdsql': _CSDSQLDatabaseReader, 'csdsqlx': _CSDSQLDatabaseReader, 'sqlmol2': _SQLMol2Reader, } def __new__(klass, filename='', db='', format='', subset=''): '''Construct a local class based on filename suffix, and splice in methods from the given klass. ''' if '@<TRIPOS>MOLECULE' in filename: class k(_MultiMol2StringDatabase): pass elif '$$$$' in filename and ('V2000' in filename or 'V3000' in filename): class k(_MultiSDFStringDatabase): pass elif format: if format.lower() in _ReaderFactory.known_formats: class k(_ReaderFactory.known_formats[format.lower()]): pass else: raise NotImplementedError('Unknown format %s' % format) elif subset: class k(_GCDReader): pass filename = Subsets().get_subset_file(subset) else: if hasattr(filename, '__iter__') and not isinstance(filename, str): if not filename: class k(_GCDReader): pass elif any(not isinstance(x, str) for x in filename): class k(_DatabasePoolReader): pass elif all(x.lower() == 'csd' or os.path.exists(x) for x in filename): class k(_DatabasePoolReader): pass elif all(isinstance(x, str) for x in filename): class k(_GCDReader): pass else: # Looks like a bunch of MariadDB things, which we don't need to support raise RuntimeError('Unsupported format') elif filename == 'stdin': raise TypeError('_Reader: stdin needs a format') else: suff = os.path.splitext(filename)[1][1:] if suff == 'gz': suff = os.path.splitext(os.path.splitext(filename)[0])[1][1:] if not suff: if filename == '': filename = 'csd' if filename.lower() == 'csd': class k(_DatabaseReader): pass else: raise RuntimeError('Unsupported format') elif suff.lower() in _ReaderFactory.known_suffixes: class k(_ReaderFactory.known_suffixes[suff.lower()]): pass else: raise NotImplementedError('Unknown file suffix %s' % suff) ret = k(filename, db=db) k.__name__ = klass.__name__ for m, v in klass.__dict__.items(): if isinstance(getattr(klass, m), (types.MethodType, types.FunctionType)): setattr(k, m, v) return ret ##########################################################################
[docs]class EntryReader(_ReaderFactory): '''Treat the database as a source of entries. An :class:`EntryReader` can instantiated using: - The explicit string 'CSD', which defaults to the CSD. - A file name with an optional ``format`` argument. If the ``format`` argument is empty it uses the suffix of the file name to infer the file format. - A list of connection strings, to specify a pool. One of the supported file formats is 'identifiers' in which case the file is assumed to contain a new line separated list of refcodes from the CSD. The suffix of such a file may be '.gcd'. During initialisation a :class:`_DatabaseReader` is dynamically bound to the :class:`EntryReader` instance, which means that the methods of :class:`_DatabaseReader` are available from the :class:`EntryReader` instance. >>> csd_entry_reader = EntryReader('CSD') >>> type(csd_entry_reader[0]) <class 'ccdc.entry.Entry'> >>> print(csd_entry_reader.identifier(0)) AABHTZ >>> aabhtz_entry = csd_entry_reader.entry('AABHTZ') >>> print(aabhtz_entry.publication.authors) P.-E.Werner ''' def __iter__(self): '''Iterator.''' return self.entries() # pylint: disable=E1101 def __getitem__(self, i): return self._make_entry(self._enumerator.entry(i)) # pylint: disable=E1101
[docs]class CrystalReader(_ReaderFactory): '''Treat the database as a source of crystals. A :class:`CrystalReader` can be instantiated using: - The explicit string 'CSD', which defaults to the CSD. - A file name with an optional ``format`` argument. If the ``format`` argument is empty it uses the suffix of the file name to infer the file format. One of the supported file formats is 'identifiers' in which case the file is assumed to contain a new line separated list of refcodes from the CSD. The suffix of such a file may be '.gcd'. During initialisation a :class:`_DatabaseReader` is dynamically bound to the :class:`CrystalReader` instance, which means that the methods of :class:`_DatabaseReader` are available from the :class:`CrystalReader` instance. >>> csd_crystal_reader = CrystalReader('CSD') >>> type(csd_crystal_reader[0]) <class 'ccdc.crystal.Crystal'> >>> print(csd_crystal_reader.identifier(0)) AABHTZ >>> aabhtz_crystal = csd_crystal_reader.crystal('AABHTZ') >>> print(aabhtz_crystal.crystal_system) triclinic ''' def __iter__(self): '''Allows for crystal in CrystalReader(filename)''' return self.crystals() # pylint: disable=E1101 def __getitem__(self, i): '''Allows direct indexing. For example: CrystalReader(filename)[42] ''' return self._make_crystal(self._enumerator.entry(i)) # pylint: disable=E1101
[docs]class MoleculeReader(_ReaderFactory): '''Treat the database as a source of molecules. A :class:`MoleculeReader` can be instantiated using: - The explicit string 'CSD', which defaults to the CSD. - A file name with an optional ``format`` argument. If the ``format`` argument is empty it uses the suffix of the file name to infer the file format. One of the supported file formats is 'identifiers' in which case the file is assumed to contain a new line separated list of refcodes from the CSD. The suffix of such a file may be '.gcd'. During initialisation a :class:`_DatabaseReader` is dynamically bound to the :class:`MoleculeReader` instance, which means that the methods of :class:`_DatabaseReader` are available from the :class:`MoleculeReader` instance. >>> csd_molecule_reader = MoleculeReader('CSD') >>> type(csd_molecule_reader[0]) <class 'ccdc.molecule.Molecule'> >>> print(csd_molecule_reader.identifier(0)) AABHTZ >>> aabhtz_molecule = csd_molecule_reader.molecule('AABHTZ') >>> print(aabhtz_molecule.smiles) CC(=O)NN1C=NN=C1N(N=Cc1c(Cl)cccc1Cl)C(C)=O ''' def __iter__(self): '''Make it iterable.''' return self.molecules() # pylint: disable=E1101 def __getitem__(self, i): '''Make it list-like.''' return self._make_molecule(self._enumerator.entry(i)) # pylint: disable=E1101
[docs]class Subsets(): ''' This class provides a simple way to access pre-defined CSD subsets. Example: >>> mof_reader = EntryReader(subset=Subsets.MOF) The returned *reader* object is the same as if the *Reader* class has been initialized with the associated gcd file directly. Subsets available: - ADP - BEST_HYDROGENS - BEST_LOW_TEMP - BEST_RFACTOR - BEST_ROOM_TEMP - COVID19 - DRUG - DRUG_SINGLE_COMPONENT - ELECTRON - HIGH_PRESSURE - HYDRATE - MOF - MOF_NO_DISORDER - PESTICIDE - POLYMORPHIC - TEACHING ''' ADP = 'ADPs_available_subset.gcd' BEST_HYDROGENS = 'Best_representative_lists/best_hydrogens_list.gcd' BEST_LOW_TEMP = 'Best_representative_lists/best_low_temperature_list.gcd' BEST_RFACTOR = 'Best_representative_lists/best_R_factor_list.gcd' BEST_ROOM_TEMP = 'Best_representative_lists/best_room_temperature_list.gcd' COVID19 = 'CSD_Drug_subsets/CSD_COVID-19_subset.gcd' DRUG = 'CSD_Drug_subsets/CSD_Drug_subset.gcd' DRUG_SINGLE_COMPONENT = 'CSD_Drug_subsets/single-component_CSD_Drug_subset.gcd' ELECTRON = 'Electron_diffraction_subset.gcd' HIGH_PRESSURE = 'High_pressure_subset.gcd' HYDRATE = 'Hydrate_subset.gcd' MOF = 'CSD_MOF_subsets/MOF_subset.gcd' MOF_NO_DISORDER = 'CSD_MOF_subsets/Non-disordered_MOF_subset.gcd' PESTICIDE = 'CSD_Pesticide_subset.gcd' POLYMORPHIC = 'Polymorphic_subset.gcd' TEACHING = 'teaching_subset.gcd' def __init__(self): csd_loc = _CSDDatabaseLocator.get_csd_location() if csd_loc is None: raise CSDNotFoundException("Cannot find CSD data at %s" % csd_loc) possible_subsets_dirs = [ os.path.join(csd_loc, 'subsets'), os.path.join(csd_loc, 'csd', 'subsets'), ] self.subsets_dir = None for subsets_dir in possible_subsets_dirs: if os.path.isdir(subsets_dir): self.subsets_dir = subsets_dir break if self.subsets_dir is None: raise CSDNotFoundException("CSD subsets not found at %s" % possible_subsets_dirs[0]) def get_subset_file(self, subset_file): return os.path.join(self.subsets_dir, subset_file)