#!/usr/bin/env python
#
# This script can be used for any purpose without limitation subject to the
# conditions at https://www.ccdc.cam.ac.uk/Community/Pages/Licences/v2.aspx
#
# This permission notice and the following statement of attribution must be
# included in all copies or substantial portions of this script.
#
# 2015-06-17: created by the Cambridge Crystallographic Data Centre
#
"""
Filter and transform molecules.
Input specification should precede all other arguments with the exception of
an optional -v. If input is 'CSD' the current CSD will be
filtered/transformed. Otherwise a GCD or mol2 file will be used.
Other filters and transformers will be applied in the order given, though a
single output option will be applied last.
Any filter or transformer may be preceded by -v to enable logging of that
processor.
For more help use the -h argument.
"""
import sys
import os
import glob
import argparse
import logging
import ccdc.io
import ccdc.molecule
import ccdc.search
from ccdc.utilities import Logger
from functools import reduce
##############################################################################
class MolProcessor(object):
"""Base class for readers, writers, filters and transformers."""
def __init__(self, source, **kw):
"""Updates dictionary with keyword arguments."""
self.source = source
self.__dict__.update(kw)
self.successes = 0
self.failures = 0
self.logger = Logger()
self.logger.ignore_line_numbers()
self.logger.set_log_level(logging.INFO)
def __str__(self):
"""Defaults to class name."""
return self.__class__.__name__
__repr__ = __str__
def molecules(self):
"""Generator for molecules. Yields a molecule."""
for m in self.source.molecules():
m = self.process(m)
if m:
yield m
def process(self, m):
"""Default to do nothing."""
return m
def message(self, *args):
"""Unconditionally print a message."""
self.logger.info(' '.join(map(str, args)))
def log(self, *args):
"""Log a message if verbose."""
if self.verbose:
self.logger.info(' '.join(map(str, args)))
def succeed(self, *args):
"""Register a success, and optionally log it."""
self.successes += 1
if self.verbose:
self.logger.info(
'%s: succeeded %s' % (self, ' '.join(map(str, args))) )
def fail(self, *args):
"""Register a failure and optionally log it."""
self.failures += 1
if self.verbose:
self.logger.info(
'%s: failed %s' % (self, ' '.join(map(str, args))) )
def print_stats(self):
"""Print statistics on success and failure."""
if self.source is not None:
self.source.print_stats()
self.logger.info('%6d total, %6d successes, %6d failures, %s' % (
self.successes+self.failures,
self.successes,
self.failures,
self
))
@classmethod
def long_option(klass):
"""Construct a sensible long option name."""
s = '-'
for k in klass.__name__:
if k in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
s += '-' + k.lower()
else:
s += k
return s
##############################################################################
class MolSource(MolProcessor):
"""Sources of molecules."""
def process(self, m):
raise NotImplementedError('MolSource does not implement process')
class ParseArgs(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
verbose = namespace.verbose or namespace.Verbose
namespace.verbose = False
fname = values[0]
if '*' in fname:
s = GlobSource(fname, verbose=verbose)
else:
s = ccdc.io.MoleculeReader(fname)
if namespace.source is None:
namespace.source = s
else:
namespace.source = CompoundSource(namespace.source,
s,
verbose=verbose)
def __str__(self):
return 'MoleculeReader(%s)' % self.file_name
class CompoundSource(MolSource):
"""All molecules from two sources."""
def __init__(self, first, second, verbose=False):
MolProcessor.__init__(self, None)
self.first = first
self.second = second
self.verbose = verbose
def molecules(self):
"""Iterates over first, then second."""
for m in self.first.molecules():
yield m
for m in self.second.molecules():
yield m
def __str__(self):
return 'Compound(%s, %s)' % (self.first, self.second)
class GlobSource(MolSource):
"""Read all molecules from all files matched by the globbing pattern."""
def __init__(self, pattern, verbose=False):
"""Get all files"""
MolProcessor.__init__(self, None, verbose=verbose)
self.file_name = pattern
self.files = glob.glob(pattern)
self.files.sort()
def molecules(self):
"""Iterate over all molecules of all files."""
for f in self.files:
with ccdc.io.MoleculeReader(f) as source:
for i, m in enumerate(source):
if i > 0:
m.identifier = '%s_%d' % (
os.path.splitext(os.path.basename(f))[0], i )
else:
m.identifier = os.path.splitext(os.path.basename(f))[0]
self.succeed(m.identifier)
yield m
##############################################################################
class MolSink(MolProcessor):
"""Output files."""
def __init__(self, source, fname=None, verbose=False, format=''):
"""Opens a stream as well as storing the file name."""
MolProcessor.__init__(self, source, fname=fname, verbose=verbose)
self.sink = ccdc.io.MoleculeWriter(fname, format=format)
def __str__(self):
"""Report file name if any."""
return 'MoleculeWriter(%s)' % self.sink.file_name
def molecules(self):
"""Doesn't produce any molecules."""
raise StopIteration
def close(self):
"""Close the sink."""
self.sink.close()
def run(self, limit=sys.maxsize):
"""This pulls the chain of processors."""
ct = 0
for m in self.source.molecules():
if ct >= limit:
break
self.sink.write(m)
ct += 1
class GlobSink(MolSink):
"Write molecules to separate files based on their identifier and the format."
def __init__(self, source, format='', verbose=False):
"""Save the format"""
MolProcessor.__init__(self, source, verbose=verbose)
self.format = format
def __str__(self):
"""Show the name."""
return 'MoleculeWriter(%s)' % self.format
def close(self):
"""nothing to close"""
pass
def run(self, limit=sys.maxsize):
"""Writes a molecule to a file based on pattern and filename.
Will create all necessary directories.
"""
ct = 0
for m in self.source.molecules():
if ct >= limit:
break
fname = self.format.replace('*', m.identifier)
d = os.path.dirname(fname)
if d and not os.path.isdir(d):
os.makedirs(d)
with ccdc.io.MoleculeWriter(fname) as sink:
sink.write(m)
self.succeed(m.identifier)
ct += 1
class CanonicalSmiles(MolSink):
"""Write canonical SMILES."""
def __init__(self, source, fname=None, verbose=False):
"""Sets up generators for the calculation"""
MolProcessor.__init__(self, source, verbose=verbose)
if fname:
self.stream = open(fname, 'w')
else:
self.stream = sys.stdout
def generate(self, m):
"""Generate the SMILES."""
return m.smiles
def run(self, limit=sys.maxsize):
ct = 0
for m in self.source.molecules():
if ct >= limit:
break
s = self.generate(m)
if s is None:
self.stream.write(s + '\n')
self.succeed(m.identifier)
ct += 1
else:
self.fail(m.identifier, 'Cannot calculate SMILES')
class UniqueSmiles(CanonicalSmiles):
"""Write canonical smiles without duplication."""
def __init__(self, source, fname=None, verbose=False):
CanonicalSmiles.__init__(self, source, fname=fname, verbose=verbose)
self.seen = collections.defaultdict(list)
def process(self, m):
s = m.smiles
if s is None:
self.fail('Cannot calculate SMILES')
else:
if s in self.seen:
self.fail(m.identifier, 'Duplicate')
else:
self.succeed(m.identifier, s)
self.stream.write(s + '\n')
self.seen[s].append(m.identifier)
def print_stats(self):
MolProcessor.print_stats(self)
self.logger.info('%6d Unique out of %6d structures' % (
len(self.seen),
len(reduce(list.__add__, self.seen.values(), []))
))
##############################################################################
class NullaryMolProcessor(MolProcessor):
"""Processors requiring no arguments."""
class Callback(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
"""Set the verbose flag and construct the class."""
verbose = namespace.verbose or namespace.Verbose
namespace.source = self.klass(namespace.source, verbose=verbose)
namespace.verbose = False
@classmethod
def add_option(klass, group):
group.add_argument(
klass.long_option(),
help=klass.__doc__,
nargs=0,
action=NullaryMolProcessor.Callback
).klass = klass
class UnaryMolProcessor(MolProcessor):
class Callback(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
verbose = namespace.verbose or namespace.Verbose
namespace.verbose = False
if self.klass.validate_arg(values[0]):
namespace.source = self.klass(namespace.source,
arg=values[0],
verbose=verbose)
else:
raise argparse.ArgumentError(self,
'%s %s invalid argument %s' % (
self.klass.__name__,
self.klass.__doc__,
' '.join(values)
)
)
@classmethod
def add_option(klass, group):
group.add_argument(
klass.long_option(), help=klass.__doc__,
action=klass.Callback, nargs=1
).klass = klass
@classmethod
def validate_arg(klass, value):
return True
class IntRangeMolProcessor(UnaryMolProcessor):
"""Processors requiring a pair of integers."""
class Callback(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
"""Extract arguments, set verbosity and construct the class."""
value = values[0]
verbose = namespace.verbose or namespace.Verbose
namespace.verbose = False
try:
l, h = [int(x) for x in value.split(',')]
except (TypeError, ValueError):
try:
l, h = (int(x) for x in value.split(':'))
except (TypeError, ValueError):
raise argparse.ArgumentError(
self,
'%s requires an int,int pair as argument' % option_string
)
namespace.source = self.klass(
namespace.source, low=l, high=h, verbose=verbose
)
def __str__(self):
"""Report the range."""
return '%s (%d, %d)' % (self.__class__.__name__, self.low, self.high)
class IntListMolProcessor(UnaryMolProcessor):
"""Processors requiring a list of integers."""
class Callback(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
"""Extract arguments, set verbosity and construct the class."""
value = values[0]
verbose = namespace.verbose or namespace.Verbose
namespace.verbose = False
try:
l = [int(x) for x in value.split(',')]
except (TypeError, ValueError):
try:
l = (int(x) for x in value.split(':'))
except (TypeError, ValueError):
raise argparse.ArgumentError(
self,
'%s requires a a list of comma-separated integers (e.g. 1,2,3,4) as argument' % option_string
)
namespace.source = self.klass(
namespace.source, values=l, verbose=verbose
)
def __str__(self):
"""Report the range."""
return '%s (%s)' % (self.__class__.__name__, str(self.values))
class FloatRangeMolProcessor(UnaryMolProcessor):
"""Processors requiring two real values."""
class Callback(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
"""Extract arguments, set verbosity and construct the class."""
value = values[0]
try:
l, h = (float(x) for x in value.split(','))
except (TypeError, ValueError):
try:
l, h = (float(x) for x in value.split(':'))
except (TypeError, ValueError):
raise argparse.ArgumentError(
self,
'%s requires a float,float pair as argument' % option_string
)
verbose = namespace.verbose or namespace.Verbose
namespace.verbose = False
namespace.source = self.klass(namespace.source,
low=l,
high=h,
verbose=verbose )
def __str__(self):
"""Report the range."""
return '%s (%.2f, %.2f)' % (self.__class__.__name__,
self.low, self.high )
class FloatMolProcessor(UnaryMolProcessor):
class Callback(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
"Extract single argument, set verbosity and construct the class."
value = values[0]
try:
v = float(value)
except TypeError:
raise argparse.ArgumentError(
self,
'%s requires a float value as argument' % option_string
)
namespace.source = self.klass(
namespace.source, verbose=verbose, **{klass.variable: v}
)
def __str__(self):
"""Report the value."""
return '%s (%s=%.2f)' % (self.__class__.__name__,
self.variable,
getattr(self, self.variable))
##############################################################################
# Transformers
##############################################################################
class LargestComponent(NullaryMolProcessor):
"""Extract the heaviest component from a multi-component structure."""
def process(self, m):
l = sorted([(x.molecular_weight, x) for x in m.components])
self.succeed(m.identifier,
'%d components, heaviest %.2f' % (len(l), l[-1][0]) )
return l[-1][1]
class AddHydrogens(NullaryMolProcessor):
"""Add hydrogen atoms to a molecule.
If we want sites, we need a crystal structure or a cell.
"""
def process(self, m):
nats = len(m.atoms)
m.add_hydrogens()
self.succeed(m.identifier, 'before', nats, 'now', len(m.atoms))
return m
class RemoveHydrogens(NullaryMolProcessor):
"""Remove hydrogen atoms from a molecule."""
def process(self, m):
nats = len(m.atoms)
m.remove_hydrogens()
self.succeed(m.identifier, 'before', nats, 'now', len(m.atoms))
return m
class NormaliseHydrogens(NullaryMolProcessor):
"""Normalise the positions of hydrogen atoms in a molecule."""
def process(self, m):
m.normalise_hydrogens()
self.succeed(m.identifier, 'normalised')
return m
class RemoveWaters(NullaryMolProcessor):
"""Remove water molecules from entries."""
def process(self, m):
keep = []
waters = 0
for s in m.components:
ats = [at.atomic_symbol for at in s.atoms]
if len(ats) == 3:
ats.sort()
if ats[0] == 'H' and ats[1] == 'H' and ats[2] == 'O':
waters += 1
else:
keep.append(s)
else:
keep.append(s)
new = ccdc.molecule.Molecule(m.identifier)
for k in keep:
new.add_molecule(k)
self.succeed(m.identifier,
waters,
'removed',
len(keep),
'structures retained')
return new
class Neutralise(NullaryMolProcessor):
"""Remove formal charge from all atoms."""
def process(self, m):
ct = 0
for a in m.atoms:
if a.formal_charge != 0:
a.formal_charge = 0
ct += 1
self.succeed(m.identifier, '%d atoms neutralised' % ct)
return m
class StandardiseAromaticBonds(NullaryMolProcessor):
"""Standardise aromatic bonds to CSD conventions."""
def process(self, m):
bond_map = [b.bond_type for b in m.bonds]
m.standardise_aromatic_bonds()
bonds_now = [b.bond_type for b in m.bonds]
diffs = sum(a != b for a, b in zip(bond_map, bonds_now))
self.succeed(m.identifier, 'changed %d bonds' % diffs)
return m
class StandardiseDelocalisedBonds(NullaryMolProcessor):
"""Standardise delocalised bonds to CSD conventions."""
def process(self, m):
bond_map = [b.bond_type for b in m.bonds]
m.standardise_delocalised_bonds()
bonds_now = [b.bond_type for b in m.bonds]
diffs = sum(a != b for a, b in zip(bond_map, bonds_now))
self.succeed(m.identifier, 'changed %d bonds' % diffs)
return m
class AssignBondTypes(UnaryMolProcessor):
"""Assign bond types. Either 'all' or 'unknown'."""
def process(self, m):
bond_map = [b.bond_type for b in m.bonds]
m.assign_bond_types(self.arg)
bonds_now = [b.bond_type for b in m.bonds]
diffs = sum(a != b for a, b in zip(bond_map, bonds_now))
self.succeed(m.identifier, 'changed %d bonds' % diffs)
return m
@classmethod
def validate_arg(klass, value):
return value in ['all', 'unknown']
##############################################################################
# Filters
##############################################################################
class MolecularWeight(FloatRangeMolProcessor):
"""Filters by molecular weight: float,float."""
def process(self, m):
if self.low <= m.molecular_weight <= self.high:
self.succeed(m.identifier, m.molecular_weight)
return m
else:
self.fail(m.identifier, m.molecular_weight)
class Organic(NullaryMolProcessor):
"""Permit only organic structures."""
def process(self, m):
if m.is_organic:
self.succeed(m.identifier, 'organic')
return m
else:
self.fail(m.identifier, 'inorganic or organometallic')
class AtomCount(IntRangeMolProcessor):
"""Filter by all atom count: int,int."""
def process(self, m):
if self.low <= len(m.atoms) <= self.high:
self.succeed(m.identifier, '%d atoms' % len(m.atoms))
return m
else:
self.fail(m.identifier, '%d atoms' % len(m.atoms))
class PermittedAtomicNumbers(IntListMolProcessor):
"""Filter by list of permitted atoms: [int,(int,int ...)]"""
def process(self, m):
if len([x for x in m.atoms if x.atomic_number in self.values]) == len(m.atoms):
self.succeed(m.identifier, 'contains only allowed elements')
return m
else:
self.fail(m.identifier, 'contains disallowed elements')
class RotatableBondCount(IntRangeMolProcessor):
"""Filter by rotatable bond count: int,int."""
def process(self, m):
c = sum(b.is_rotatable for b in m.bonds)
if self.low <= c <= self.high:
self.succeed(m.identifier, '%d rotatable bonds' % c)
return m
else:
self.fail(m.identifier, '%d rotatable bonds' % c)
class DonorCount(IntRangeMolProcessor):
"""Filter by HBond donor count: int,int."""
def process(self, m):
c = sum(a.is_donor for a in m.atoms)
if self.low <= c <= self.high:
self.succeed(m.identifier, '%d donor atoms' % c)
return m
else:
self.fail(m.identifier, '%d donor atoms' % c)
class AcceptorCount(IntRangeMolProcessor):
"""Filters by HBond acceptor count: int,int."""
def process(self, m):
c = sum(a.is_acceptor for a in m.atoms)
if self.low <= c <= self.high:
self.succeed(m.identifier, '%d acceptors' % c)
return m
else:
self.fail(m.identifier, '%d acceptors' % c)
class FormalCharge(IntRangeMolProcessor):
"""Filter by formal charge: int,int."""
def process(self, m):
c = m.formal_charge
if self.low <= c <= self.high:
self.succeed(m.identifier, '%d formal charge' % c)
return m
else:
self.fail(m.identifier, '%d formal charge' % c)
class MetalCount(IntRangeMolProcessor):
"""Filter by number of metals: int, int."""
def process(self, m):
c = sum(a.is_metal for a in m.atoms)
if self.low <= c <= self.high:
self.succeed(m.identifier, '%d metals' % c)
return m
else:
self.fail(m.identifier, '%d metals' % c)
class SameSiteMetals(NullaryMolProcessor):
"""Filter by whether two metals share the same site."""
def process(self, m):
metals = [at for at in m.atoms if at.is_metal]
for i in range(len(metals)):
for j in range(i):
one = metals[i].coordinates
two = metals[j].coordinates
if ( one.x() == two.x()
and one.y() == two.y()
and one.z() == two.z() ):
self.fail(m.identifier,
'%s and %s share a site' % (metals[i],
metals[j]))
break
else:
self.succeed(m.identifier, 'No metals share a site')
return m
class AllAtomsHaveSites(NullaryMolProcessor):
"""Filter on whether all atoms have sites."""
def process(self, m):
a = m.all_atoms_have_sites
if a:
self.succeed(m.identifier, 'all atoms have sites')
return m
else:
self.fail(m.identifier, 'some atoms have no sites')
class NumberOfComponents(IntRangeMolProcessor):
"""Filter on count of distinct entities in a structure: int,int."""
def process(self, m):
c = len(m.components)
if self.low <= c <= self.high:
self.succeed(m.identifier, '%d components' % c)
return m
else:
self.fail(m.identifier, '%d components' % c)
class SmallestRingSize(IntRangeMolProcessor):
'''Filter on minimum and maximum size of a ring'''
def process(self, m):
if not hasattr(self, 'query'):
at = ccdc.search.QueryAtom()
at.smallest_ring = [self.low, self.high]
self.query = ccdc.search.QuerySubstructure()
self.query.add_atom(at)
if self.query.search_molecule(m, limit=1):
self.succeed(m.identifier, 'smallest ring')
return m
else:
self.fail(m.identifier, 'smallest ring')
class UnfusedUnbridgedRingSize(IntRangeMolProcessor):
'''Filter for ...'''
def process(self, m):
if not hasattr(self, 'query'):
at = ccdc.search.QueryAtom()
at.smallest_ring = [self.low, self.high]
at.unfused_unbridged_ring = True
self.query = ccdc.search.QuerySubstructure()
self.query.add_atom(at)
if self.query.search_molecule(m, limit=1):
self.succeed(m.identifier, 'unfused ring')
return m
else:
self.fail(m.identifier, 'unfused ring')
##############################################################################
class Arguments(object):
"""Holds options and is used to build the processor."""
def __init__(self):
self.source = None
self.verbose = False
self.Verbose = False
if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='molprocessor.py',
description=__doc__
)
group = parser.add_argument_group('Files')
group.add_argument(
'-i', '--input', help="""\
Input file name or CSD to process the whole of the CSD. An input should be at
the start of a chain of processors, but additional inputs may appear at any
other point to allow pre-processed files to be appended.""",
action=MolSource.ParseArgs, nargs=1
)
group.add_argument(
'-o', '--output', help='Output file'
)
group = parser.add_argument_group('Filters')
MolecularWeight.add_option(group)
AtomCount.add_option(group)
FormalCharge.add_option(group)
AllAtomsHaveSites.add_option(group)
RotatableBondCount.add_option(group)
DonorCount.add_option(group)
AcceptorCount.add_option(group)
NumberOfComponents.add_option(group)
MetalCount.add_option(group)
SameSiteMetals.add_option(group)
Organic.add_option(group)
SmallestRingSize.add_option(group)
UnfusedUnbridgedRingSize.add_option(group)
PermittedAtomicNumbers.add_option(group)
group = parser.add_argument_group('Transformers')
LargestComponent.add_option(group)
AddHydrogens.add_option(group)
RemoveHydrogens.add_option(group)
NormaliseHydrogens.add_option(group)
RemoveWaters.add_option(group)
Neutralise.add_option(group)
StandardiseAromaticBonds.add_option(group)
StandardiseDelocalisedBonds.add_option(group)
AssignBondTypes.add_option(group)
group = parser.add_argument_group('Miscellaneous')
group.add_argument(
'-v', '--verbose',
help='Sets the following processor to print as it processes',
action='store_true',
default=False
)
group.add_argument(
'-V', '--Verbose',
help='Sets all processors to verbose',
action='store_true',
default=False
)
group.add_argument(
'-s', '--silent',
help='Do not print statistics at the end',
action='store_true',
default=False
)
group.add_argument(
'-l', '--limit',
help='Limit the number of structures written',
type=int,
default=sys.maxsize
)
arguments = Arguments()
if len(sys.argv) == 1:
sys.argv.append('--help')
parser.parse_args(sys.argv[1:], namespace=arguments)
if arguments.output:
if arguments.output.endswith('.smiles'):
sink = CanonicalSmiles(arguments.source, arguments.output)
elif '*' in arguments.output:
sink = GlobSink(arguments.source, arguments.output)
else:
sink = MolSink(arguments.source, arguments.output)
else:
sink = MolSink(arguments.source, 'stdout', format='identifiers')
sink.run(limit=int(arguments.limit))
sink.close()
sys.exit()
if not arguments.silent:
sink.print_stats()