Molecular processing examples

Standardising, converting and filtering molecules

#!/usr/bin/env python
#
# This script can be used for any purpose without limitation subject to the
# conditions at http://www.ccdc.cam.ac.uk/Community/Pages/Licences/v2.aspx
#
# This permission notice and the following statement of attribution must be
# included in all copies or substantial portions of this script.
#
# 2015-06-17: created by the Cambridge Crystallographic Data Centre
#
"""
Filter and transform molecules.

Input specification should precede all other arguments with the exception of
an optional -v.  If input is 'CSD' the current CSD will be
filtered/transformed.  Otherwise a gcd or mol2 file will be used.

Other filters and transformers will be applied in the order given, though a
single output option will be applied last.

Any filter or transformer may be preceded by -v to enable logging of that
processor.

For more help use the -h argument.

"""

import sys
import os
import glob
import argparse

import logging

import ccdc.io
import ccdc.molecule
import ccdc.search

from ccdc.utilities import Logger
from functools import reduce

##############################################################################

class MolProcessor(object):
    """Base class for readers, writers, filters and transformers."""

    def __init__(self, source, **kw):
        """Updates dictionary with keyword arguments."""
        self.source = source
        self.__dict__.update(kw)
        self.successes = 0
        self.failures = 0
        self.logger = Logger()
        self.logger.ignore_line_numbers()
        self.logger.set_log_level(logging.INFO)

    def __str__(self):
        """Defaults to class name."""
        return self.__class__.__name__
    __repr__ = __str__

    def molecules(self):
        """Generator for molecules. Yields a molecule."""
        for m in self.source.molecules():
            m = self.process(m)
            if m:
                yield m

    def process(self, m):
        """Default to do nothing."""
        return m

    def message(self, *args):
        """Unconditionally print a message."""
        self.logger.info(' '.join(map(str, args)))

    def log(self, *args):
        """Log a message if verbose."""
        if self.verbose:
            self.logger.info(' '.join(map(str, args)))

    def succeed(self, *args):
        """Register a success, and optionally log it."""
        self.successes += 1
        if self.verbose:
            self.logger.info(
                '%s: succeeded %s' % (self, ' '.join(map(str, args))) )

    def fail(self, *args):
        """Register a failure and optionally log it."""
        self.failures += 1
        if self.verbose:
            self.logger.info(
                '%s: failed %s' % (self, ' '.join(map(str, args))) )

    def print_stats(self):
        """Print statistics on success and failure."""
        if self.source is not None:
            self.source.print_stats()
        self.logger.info('%6d total, %6d successes, %6d failures, %s' % (
                self.successes+self.failures,
                self.successes,
                self.failures,
                self
            ))

    @classmethod
    def long_option(klass):
        """Construct a sensible long option name."""
        s = '-'
        for k in klass.__name__:
            if k in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
                s += '-' + k.lower()
            else:
                s += k
        return s

##############################################################################

class MolSource(MolProcessor):
    """Sources of molecules."""
    def process(self, m):
        raise NotImplementedError('MolSource does not implement process')

    class ParseArgs(argparse.Action):
        def __call__(self, parser, namespace, values, option_string):
            verbose = namespace.verbose or namespace.Verbose
            namespace.verbose = False
            fname = values[0]
            if '*' in fname:
                s = GlobSource(fname, verbose=verbose)
            else:
                s = ccdc.io.MoleculeReader(fname)
            if namespace.source is None:
                namespace.source = s
            else:
                namespace.source = CompoundSource(namespace.source,
                                                  s,
                                                  verbose=verbose)

    def __str__(self):
        return 'MoleculeReader(%s)' % self.file_name

class CompoundSource(MolSource):
    """All molecules from two sources."""
    def __init__(self, first, second, verbose=False):
        MolProcessor.__init__(self, None)
        self.first = first
        self.second = second
        self.verbose = verbose

    def molecules(self):
        """Iterates over first, then second."""
        for m in self.first.molecules():
            yield m
        for m in self.second.molecules():
            yield m

    def __str__(self):
        return 'Compound(%s, %s)' % (self.first, self.second)

class GlobSource(MolSource):
    """Read all molecules from all files matched by the globbing pattern."""
    def __init__(self, pattern, verbose=False):
        """Get all files"""
        MolProcessor.__init__(self, None, verbose=verbose)
        self.file_name = pattern
        self.files = glob.glob(pattern)
        self.files.sort()

    def molecules(self):
        """Iterate over all molecules of all files."""
        for f in self.files:
            with ccdc.io.MoleculeReader(f) as source:
                for i, m in enumerate(source):
                    if i > 0:
                        m.identifier = '%s_%d' % (
                            os.path.splitext(os.path.basename(f))[0], i )
                    else:
                        m.identifier = os.path.splitext(os.path.basename(f))[0]
                    self.succeed(m.identifier)
                    yield m

##############################################################################

class MolSink(MolProcessor):
    """Output files."""
    def __init__(self, source, fname=None, verbose=False, format=''):
        """Opens a stream as well as storing the file name."""
        MolProcessor.__init__(self, source, fname=fname, verbose=verbose)
        self.sink = ccdc.io.MoleculeWriter(fname, format=format)

    def __str__(self):
        """Report file name if any."""
        return 'MoleculeWriter(%s)' % self.sink.file_name

    def molecules(self):
        """Doesn't produce any molecules."""
        raise StopIteration

    def close(self):
        """Close the sink."""
        self.sink.close()

    def run(self, limit=sys.maxsize):
        """This pulls the chain of processors."""
        ct = 0
        for m in self.source.molecules():
            if ct >= limit:
                break
            self.sink.write(m)
            ct += 1

class GlobSink(MolSink):
    "Write molecules to separate files based on their identifier and the format."
    def __init__(self, source, format='', verbose=False):
        """Save the format"""
        MolProcessor.__init__(self, source, verbose=verbose)
        self.format = format

    def __str__(self):
        """Show the name."""
        return 'MoleculeWriter(%s)' % self.format

    def close(self):
        """nothing to close"""
        pass

    def run(self, limit=sys.maxsize):
        """Writes a molecule to a file based on pattern and filename.
        Will create all necessary directories.
        """
        ct = 0
        for m in self.source.molecules():
            if ct >= limit:
                break
            fname = self.format.replace('*', m.identifier)
            d = os.path.dirname(fname)
            if d and not os.path.isdir(d):
                os.makedirs(d)
            with ccdc.io.MoleculeWriter(fname) as sink:
                sink.write(m)
                self.succeed(m.identifier)
            ct += 1

class CanonicalSmiles(MolSink):
    """Write canonical SMILES."""
    def __init__(self, source, fname=None, verbose=False):
        """Sets up generators for the calculation"""
        MolProcessor.__init__(self, source, verbose=verbose)
        if fname:
            self.stream = open(fname, 'w')
        else:
            self.stream = sys.stdout

    def generate(self, m):
        """Generate the SMILES."""
        return m.smiles

    def run(self, limit=sys.maxsize):
        ct = 0
        for m in self.source.molecules():
            if ct >= limit:
                break
            s = self.generate(m)
            if s is None:
                self.stream.write(s + '\n')
                self.succeed(m.identifier)
                ct += 1
            else:
                self.fail(m.identifier, 'Cannot calculate SMILES')

class UniqueSmiles(CanonicalSmiles):
    """Write canonical smiles without duplication."""
    def __init__(self, source, fname=None, verbose=False):
        CanonicalSmiles.__init__(self, source, fname=fname, verbose=verbose)
        self.seen = collections.defaultdict(list)

    def process(self, m):
        s = m.smiles
        if s is None:
            self.fail('Cannot calculate SMILES')
        else:
            if s in self.seen:
                self.fail(m.identifier, 'Duplicate')
            else:
                self.succeed(m.identifier, s)
                self.stream.write(s + '\n')
            self.seen[s].append(m.identifier)

    def print_stats(self):
        MolProcessor.print_stats(self)
        self.logger.info('%6d Unique out of %6d structures' % (
            len(self.seen),
            len(reduce(list.__add__, self.seen.values(), []))
        ))


##############################################################################

class NullaryMolProcessor(MolProcessor):
    """Processors requiring no arguments."""
    class Callback(argparse.Action):
        def __call__(self, parser, namespace, values, option_string=None):
            """Set the verbose flag and construct the class."""
            verbose = namespace.verbose or namespace.Verbose
            namespace.source = self.klass(namespace.source, verbose=verbose)
            namespace.verbose = False

    @classmethod
    def add_option(klass, group):
        group.add_argument(
            klass.long_option(),
            help=klass.__doc__,
            nargs=0,
            action=NullaryMolProcessor.Callback
        ).klass = klass

class UnaryMolProcessor(MolProcessor):
    class Callback(argparse.Action):
        def __call__(self, parser, namespace, values, option_string=None):
            verbose = namespace.verbose or namespace.Verbose
            namespace.verbose = False
            if self.klass.validate_arg(values[0]):
                namespace.source = self.klass(namespace.source,
                                              arg=values[0],
                                              verbose=verbose)
            else:
                raise argparse.ArgumentError(self,
                    '%s %s invalid argument %s' % (
                        self.klass.__name__,
                        self.klass.__doc__,
                        ' '.join(values)
                    )
                )

    @classmethod
    def add_option(klass, group):
        group.add_argument(
            klass.long_option(), help=klass.__doc__,
            action=klass.Callback, nargs=1
        ).klass = klass

    @classmethod
    def validate_arg(klass, value):
        return True

class IntRangeMolProcessor(UnaryMolProcessor):
    """Processors requiring a pair of integers."""
    class Callback(argparse.Action):
        def __call__(self, parser, namespace, values, option_string):
            """Extract arguments, set verbosity and construct the class."""
            value = values[0]
            verbose = namespace.verbose or namespace.Verbose
            namespace.verbose = False
            try:
                l, h = [int(x) for x in value.split(',')]
            except (TypeError, ValueError):
                try:
                    l, h = (int(x) for x in value.split(':'))
                except (TypeError, ValueError):
                    raise argparse.ArgumentError(
                        self,
                        '%s requires an int,int pair as argument' % option_string
                    )
            namespace.source = self.klass(
                namespace.source, low=l, high=h, verbose=verbose
            )

    def __str__(self):
        """Report the range."""
        return '%s (%d, %d)' % (self.__class__.__name__, self.low, self.high)

class IntListMolProcessor(UnaryMolProcessor):
    """Processors requiring a list of integers."""
    class Callback(argparse.Action):
        def __call__(self, parser, namespace, values, option_string):
            """Extract arguments, set verbosity and construct the class."""
            value = values[0]
            verbose = namespace.verbose or namespace.Verbose
            namespace.verbose = False
            try:
                l = [int(x) for x in value.split(',')]
            except (TypeError, ValueError):
                try:
                    l = (int(x) for x in value.split(':'))
                except (TypeError, ValueError):
                    raise argparse.ArgumentError(
                        self,
                        '%s requires a a list of comma-separated integers (e.g. 1,2,3,4) as argument' % option_string
                    )
            namespace.source = self.klass(
                namespace.source, values=l, verbose=verbose
            )

    def __str__(self):
        """Report the range."""
        return '%s (%s)' % (self.__class__.__name__, str(self.values))

class FloatRangeMolProcessor(UnaryMolProcessor):
    """Processors requiring two real values."""
    class Callback(argparse.Action):
        def __call__(self, parser, namespace, values, option_string):
            """Extract arguments, set verbosity and construct the class."""
            value = values[0]
            try:
                l, h = (float(x) for x in value.split(','))
            except (TypeError, ValueError):
                try:
                    l, h = (float(x) for x in value.split(':'))
                except (TypeError, ValueError):
                    raise argparse.ArgumentError(
                        self,
                        '%s requires a float,float pair as argument' % option_string
                    )
            verbose = namespace.verbose or namespace.Verbose
            namespace.verbose = False
            namespace.source = self.klass(namespace.source,
                                          low=l,
                                          high=h,
                                          verbose=verbose )

    def __str__(self):
        """Report the range."""
        return '%s (%.2f, %.2f)' % (self.__class__.__name__,
                                    self.low, self.high )

class FloatMolProcessor(UnaryMolProcessor):
    class Callback(argparse.Action):
        def __call__(self, parser, namespace, values, option_string):
            "Extract single argument, set verbosity and construct the class."
            value = values[0]
            try:
                v = float(value)
            except TypeError:
                raise argparse.ArgumentError(
                    self,
                    '%s requires a float value as argument' % option_string
                )
            namespace.source = self.klass(
                namespace.source, verbose=verbose, **{klass.variable: v}
            )

    def __str__(self):
        """Report the value."""
        return '%s (%s=%.2f)' % (self.__class__.__name__,
                                 self.variable,
                                 getattr(self, self.variable))

##############################################################################
#   Transformers
##############################################################################

class LargestComponent(NullaryMolProcessor):
    """Extract the heaviest component from a multi-component structure."""
    def process(self, m):
        l = sorted([(x.molecular_weight, x) for x in m.components])
        self.succeed(m.identifier,
                     '%d components, heaviest %.2f' % (len(l), l[-1][0]) )
        return l[-1][1]

class AddHydrogens(NullaryMolProcessor):
    """Add hydrogen atoms to a molecule.

    If we want sites, we need a crystal structure or a cell.
    """
    def process(self, m):
        nats = len(m.atoms)
        m.add_hydrogens()
        self.succeed(m.identifier, 'before', nats, 'now', len(m.atoms))
        return m

class RemoveHydrogens(NullaryMolProcessor):
    """Remove hydrogen atoms from a molecule."""
    def process(self, m):
        nats = len(m.atoms)
        m.remove_hydrogens()
        self.succeed(m.identifier, 'before', nats, 'now', len(m.atoms))
        return m

class NormaliseHydrogens(NullaryMolProcessor):
    """Normalise the positions of hydrogen atoms in a molecule."""
    def process(self, m):
        m.normalise_hydrogens()
        self.succeed(m.identifier, 'normalised')
        return m

class RemoveWaters(NullaryMolProcessor):
    """Remove water molecules from entries."""
    def process(self, m):
        keep = []
        waters = 0
        for s in m.components:
            ats = [at.atomic_symbol for at in s.atoms]
            if len(ats) == 3:
                ats.sort()
                if ats[0] == 'H' and ats[1] == 'H' and ats[2] == 'O':
                    waters += 1
                else:
                    keep.append(s)
            else:
                keep.append(s)
        new = ccdc.molecule.Molecule(m.identifier)
        for k in keep:
            new.add_molecule(k)
        self.succeed(m.identifier,
                     waters,
                     'removed',
                     len(keep),
                     'structures retained')
        return new

class Neutralise(NullaryMolProcessor):
    """Remove formal charge from all atoms."""
    def process(self, m):
        ct = 0
        for a in m.atoms:
            if a.formal_charge != 0:
                a.formal_charge = 0
                ct += 1
        self.succeed(m.identifier, '%d atoms neutralised' % ct)
        return m

class StandardiseAromaticBonds(NullaryMolProcessor):
    """Standardise aromatic bonds to CSD conventions."""
    def process(self, m):
        bond_map = [b.bond_type for b in m.bonds]
        m.standardise_aromatic_bonds()
        bonds_now = [b.bond_type for b in m.bonds]
        diffs = sum(a != b for a, b in zip(bond_map, bonds_now))
        self.succeed(m.identifier, 'changed %d bonds' % diffs)
        return m

class StandardiseDelocalisedBonds(NullaryMolProcessor):
    """Standardise delocalised bonds to CSD conventions."""
    def process(self, m):
        bond_map = [b.bond_type for b in m.bonds]
        m.standardise_delocalised_bonds()
        bonds_now = [b.bond_type for b in m.bonds]
        diffs = sum(a != b for a, b in zip(bond_map, bonds_now))
        self.succeed(m.identifier, 'changed %d bonds' % diffs)
        return m

class AssignBondTypes(UnaryMolProcessor):
    """Assign bond types. Either 'all' or 'unknown'."""
    def process(self, m):
        bond_map = [b.bond_type for b in m.bonds]
        m.assign_bond_types(self.arg)
        bonds_now = [b.bond_type for b in m.bonds]
        diffs = sum(a != b for a, b in zip(bond_map, bonds_now))
        self.succeed(m.identifier, 'changed %d bonds' % diffs)
        return m

    @classmethod
    def validate_arg(klass, value):
        return value in ['all', 'unknown']

##############################################################################
#   Filters
##############################################################################

class MolecularWeight(FloatRangeMolProcessor):
    """Filters by molecular weight: float,float."""
    def process(self, m):
        if self.low <= m.molecular_weight <= self.high:
            self.succeed(m.identifier, m.molecular_weight)
            return m
        else:
            self.fail(m.identifier, m.molecular_weight)

class Organic(NullaryMolProcessor):
    """Permit only organic structures."""
    def process(self, m):
        if m.is_organic:
            self.succeed(m.identifier, 'organic')
            return m
        else:
            self.fail(m.identifier, 'inorganic or organometallic')

class AtomCount(IntRangeMolProcessor):
    """Filter by all atom count: int,int."""
    def process(self, m):
        if self.low <= len(m.atoms) <= self.high:
            self.succeed(m.identifier, '%d atoms' % len(m.atoms))
            return m
        else:
            self.fail(m.identifier, '%d atoms' % len(m.atoms))

class PermittedAtomicNumbers(IntListMolProcessor):
    """Filter by list of permitted atoms: [int,(int,int ...)]"""
    def process(self, m):
        if len([x for x in m.atoms if x.atomic_number in self.values]) == len(m.atoms):
            self.succeed(m.identifier, 'contains only allowed elements')
            return m
        else:
            self.fail(m.identifier, 'contains disallowed elements')


class RotatableBondCount(IntRangeMolProcessor):
    """Filter by rotatable bond count: int,int."""
    def process(self, m):
        c = sum(b.is_rotatable for b in m.bonds)
        if self.low <= c <= self.high:
            self.succeed(m.identifier, '%d rotatable bonds' % c)
            return m
        else:
            self.fail(m.identifier, '%d rotatable bonds' % c)

class DonorCount(IntRangeMolProcessor):
    """Filter by HBond donor count: int,int."""
    def process(self, m):
        c = sum(a.is_donor for a in m.atoms)
        if self.low <= c <= self.high:
            self.succeed(m.identifier, '%d donor atoms' % c)
            return m
        else:
            self.fail(m.identifier, '%d donor atoms' % c)

class AcceptorCount(IntRangeMolProcessor):
    """Filters by HBond acceptor count: int,int."""
    def process(self, m):
        c = sum(a.is_acceptor for a in m.atoms)
        if self.low <= c <= self.high:
            self.succeed(m.identifier, '%d acceptors' % c)
            return m
        else:
            self.fail(m.identifier, '%d acceptors' % c)

class FormalCharge(IntRangeMolProcessor):
    """Filter by formal charge: int,int."""
    def process(self, m):
        c = m.formal_charge
        if self.low <= c <= self.high:
            self.succeed(m.identifier, '%d formal charge' % c)
            return m
        else:
            self.fail(m.identifier, '%d formal charge' % c)

class MetalCount(IntRangeMolProcessor):
    """Filter by number of metals: int, int."""
    def process(self, m):
        c = sum(a.is_metal for a in m.atoms)
        if self.low <= c <= self.high:
            self.succeed(m.identifier, '%d metals' % c)
            return m
        else:
            self.fail(m.identifier, '%d metals' % c)

class SameSiteMetals(NullaryMolProcessor):
    """Filter by whether two metals share the same site."""
    def process(self, m):
        metals = [at for at in m.atoms if at.is_metal]
        for i in range(len(metals)):
            for j in range(i):
                one = metals[i].coordinates
                two = metals[j].coordinates
                if ( one.x() == two.x()
                     and one.y() == two.y()
                     and one.z() == two.z() ):
                    self.fail(m.identifier,
                              '%s and %s share a site' % (metals[i],
                                                          metals[j]))
                break
        else:
            self.succeed(m.identifier, 'No metals share a site')
            return m

class AllAtomsHaveSites(NullaryMolProcessor):
    """Filter on whether all atoms have sites."""
    def process(self, m):
        a = m.all_atoms_have_sites
        if a:
            self.succeed(m.identifier, 'all atoms have sites')
            return m
        else:
            self.fail(m.identifier, 'some atoms have no sites')

class NumberOfComponents(IntRangeMolProcessor):
    """Filter on count of distinct entities in a structure: int,int."""
    def process(self, m):
        c = len(m.components)
        if self.low <= c <= self.high:
            self.succeed(m.identifier, '%d components' % c)
            return m
        else:
            self.fail(m.identifier, '%d components' % c)

class SmallestRingSize(IntRangeMolProcessor):
    '''Filter on minimum and maximum size of a ring'''
    def process(self, m):
        if not hasattr(self, 'query'):
            at = ccdc.search.QueryAtom()
            at.smallest_ring = [self.low, self.high]
            self.query = ccdc.search.QuerySubstructure()
            self.query.add_atom(at)
        if self.query.search_molecule(m, limit=1):
            self.succeed(m.identifier, 'smallest ring')
            return m
        else:
            self.fail(m.identifier, 'smallest ring')

class UnfusedUnbridgedRingSize(IntRangeMolProcessor):
    '''Filter for ...'''
    def process(self, m):
        if not hasattr(self, 'query'):
            at = ccdc.search.QueryAtom()
            at.smallest_ring = [self.low, self.high]
            at.unfused_unbridged_ring = True
            self.query = ccdc.search.QuerySubstructure()
            self.query.add_atom(at)
        if self.query.search_molecule(m, limit=1):
            self.succeed(m.identifier, 'unfused ring')
            return m
        else:
            self.fail(m.identifier, 'unfused ring')


##############################################################################

class Arguments(object):
    """Holds options and is used to build the processor."""
    def __init__(self):
        self.source = None
        self.verbose = False
        self.Verbose = False

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        prog='molprocessor.py',
        description=__doc__
    )
    group = parser.add_argument_group('Files')
    group.add_argument(
        '-i', '--input', help="""\
Input file name or CSD to process the whole of the CSD. An input should be at
the start of a chain of processors, but additional inputs may appear at any
other point to allow pre-processed files to be appended.""",
        action=MolSource.ParseArgs, nargs=1
    )

    group.add_argument(
        '-o', '--output', help='Output file'
    )

    group = parser.add_argument_group('Filters')
    MolecularWeight.add_option(group)
    AtomCount.add_option(group)
    FormalCharge.add_option(group)
    AllAtomsHaveSites.add_option(group)
    RotatableBondCount.add_option(group)
    DonorCount.add_option(group)
    AcceptorCount.add_option(group)
    NumberOfComponents.add_option(group)
    MetalCount.add_option(group)
    SameSiteMetals.add_option(group)
    Organic.add_option(group)
    SmallestRingSize.add_option(group)
    UnfusedUnbridgedRingSize.add_option(group)
    PermittedAtomicNumbers.add_option(group)

    group = parser.add_argument_group('Transformers')
    LargestComponent.add_option(group)
    AddHydrogens.add_option(group)
    RemoveHydrogens.add_option(group)
    NormaliseHydrogens.add_option(group)
    RemoveWaters.add_option(group)
    Neutralise.add_option(group)
    StandardiseAromaticBonds.add_option(group)
    StandardiseDelocalisedBonds.add_option(group)
    AssignBondTypes.add_option(group)

    group = parser.add_argument_group('Miscellaneous')
    group.add_argument(
        '-v', '--verbose',
        help='Sets the following processor to print as it processes',
        action='store_true',
        default=False
    )
    group.add_argument(
        '-V', '--Verbose',
        help='Sets all processors to verbose',
        action='store_true',
        default=False
    )
    group.add_argument(
        '-s', '--silent',
        help='Do not print statistics at the end',
        action='store_true',
        default=False
    )
    group.add_argument(
        '-l', '--limit',
        help='Limit the number of structures written',
        type=int,
        default=sys.maxsize
    )

    arguments = Arguments()
    if len(sys.argv) == 1:
        sys.argv.append('--help')

    parser.parse_args(sys.argv[1:], namespace=arguments)

    if arguments.output:
        if arguments.output.endswith('.smiles'):
            sink = CanonicalSmiles(arguments.source, arguments.output)
        elif '*' in arguments.output:
            sink = GlobSink(arguments.source, arguments.output)
        else:
            sink = MolSink(arguments.source, arguments.output)
    else:
        sink = MolSink(arguments.source, 'stdout', format='identifiers')

    sink.run(limit=int(arguments.limit))
    sink.close()
    sys.exit()
    if not arguments.silent:
        sink.print_stats()