Source code for ccdc.utilities

#
# This code is Copyright (C) 2015-2019 The Cambridge Crystallographic Data Centre
# (CCDC) of 12 Union Road, Cambridge CB2 1EZ, UK and a proprietary work of CCDC.
# This code may not be used, reproduced, translated, modified, disassembled or
# copied, except in accordance with a valid licence agreement with CCDC and may
# not be disclosed or redistributed in any form, either in whole or in part, to
# any third party. All copies of this code made in accordance with a valid
# licence agreement as referred to above must contain this copyright notice.
#
# No representations, warranties, or liabilities are expressed or implied in the
# supply of this code by CCDC, its servants or agents, except where such
# exclusion or limitation is prohibited, void or unenforceable under governing
# law.
#
'''
The :mod:`ccdc.utilities` module contains general purpose classes.

The main classes of the :mod:`ccdc.utilities` module are:

- :class:`ccdc.utilities.Logger`
- :class:`ccdc.utilities.FileLogger`
- :class:`ccdc.utilities.Histogram`
- :class:`ccdc.utilities.Grid`
- :class:`ccdc.utilities.Licence`
- :class:`ccdc.utilities.ApplicationInterface`
- :class:`ccdc.utilities.Resources`

'''
###########################################################################

import argparse
import base64
import collections
import csv
import glob
import inspect
import json
import logging
import os
from pathlib import Path
import re
import shutil
import subprocess
import sys
import tempfile
import time
import unicodedata
import unittest
import zlib
import ccdc

# Just until pytest fixes its deprecation warnings
import warnings
with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    try:
        import pytest
        _have_pytest = True
    except ImportError:
        _have_pytest = False

def _get_ccdc_module_name(basename):
    return f'{basename}{sys.version_info.major}{sys.version_info.minor}'

class _private_importer(object):
    def __enter__(self):
        location = os.path.dirname(__file__)
        sys.path.insert(0, os.path.join(location, '_lib'))
        return self

    def import_ccdc_module(self, library, alias=None):
        import builtins
        import importlib
        exec(f'{library} = importlib.import_module("{_get_ccdc_module_name(library)}")')
        if alias is None:
            exec(f'builtins.{library} = {library}')
        else:
            exec(f'builtins.{alias} = {library}')

    def __exit__(self, type, value, traceback):
        sys.path.pop(0)

#############################################################################

with _private_importer() as pi:
    pi.import_ccdc_module('UtilitiesLib')
    pi.import_ccdc_module('MathsLib')

###########################################################################

def processEvents():
    UtilitiesLib.processEvents()

###########################################################################

class CSDNotFoundException(RuntimeError):
    def __init__(self, message):
        super().__init__("CSD Data is not available in this installation. " + message)

###########################################################################

def print_set(s):
    '''Python2/3 compatible print for use in doctests.'''
    if s:
        l = [str(x) for x in s]
        l.sort()
        return '{%s}' % ', '.join(l)
    else:
        return 'set()'

###########################################################################

def natural_sort_key(key):
    '''Natural sort key function for use with list.sort() or sorted().

    Natural sort order is an ordering of strings in alphabetical order, except that
    multi-digit numbers are treated atomically, i.e., as if they were a single
    character. Natural sort order has been promoted as being more human-friendly
    ("natural") than the machine-oriented pure alphabetical order.

    For example, in alphabetical sorting "z11" would be sorted before "z2" because
    "1" is sorted as smaller than "2", while in natural sorting "z2" is sorted
    before "z11" because "2" is sorted as smaller than "11".
    e.g.  Alphabetical sorting: z11, z2. Natural sorting: z2, z11.
    (taken from https://en.wikipedia.org/wiki/Natural_sort_order)

    Implementation from https://stackoverflow.com/questions/4836710/is-there-a-built-in-function-for-string-natural-sort

    Added in response to bug PYAPI-2349 where amino acid residues are sorted on residue sequence number but that
    number may include an additional insertion code e.g. 60B.
    '''
    return [int(text) if text.isdecimal() else text.lower() for text in re.split(r'(\d+)', key)]

###########################################################################

def _fix_multiprocessing_on_macos():
    ''' Workaround for https://bugs.python.org/issue33725
    Python 3.7 and below use the fork multiprocessing start method by default on macos, causing random crashes and stuck subprocesses.
    The default has been changed from python 3.8 onwards to be 'spawn'. This method does the same.
    Use in the if __name__ == '__main__' portion of your multiprocessing-using scripts
    '''
    import sys
    if sys.platform == 'darwin' and sys.version_info < (3, 8):
        import multiprocessing as mp
        mp.set_start_method('spawn')

###########################################################################
#   Logging
###########################################################################

[docs]class Logger(logging.Logger): '''Handles CCDC log messages and Python API messages.''' contact_support_message = 'Please contact user support: support@ccdc.cam.ac.uk' _logger = None (DEBUG, INFO, WARNING, ERROR, CRITICAL) = ( logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL )
[docs] @classmethod def reset(klass): '''Reset the logger.''' klass._logger = None
def __init__(self): if Logger._logger is None: logging.Logger.__init__(self, 'CCDC_API') Logger._logger = self self.handler = logging.StreamHandler() self.handler.name = 'stderr' self.addHandler(self.handler) self.formatter = logging.Formatter( '%(levelname)s %(filename)s:%(lineno)d %(message)s' ) self.handler.setFormatter(self.formatter) self.setLevel(logging.INFO) else: self.__dict__ = Logger._logger.__dict__ def __del__(self): if self.handler.stream is not None and not self.handler.stream.closed: self.handler.flush() self.handler.close() self.reset()
[docs] def set_output_file(self, file_name): '''Specify an output file to use, rather than the default stdout. :param file_name: str ''' UtilitiesLib.CCDCLog.set_output_file(file_name) if file_name != self.handler.name: self.removeHandler(self.handler) if file_name == 'stdout': self.handler = logging.StreamHandler(sys.stdout) elif file_name == 'stderr': self.handler = logging.StreamHandler() else: self.handler = logging.FileHandler(file_name) self.handler.name = file_name self.handler.setFormatter(self.formatter) self.addHandler(self.handler)
[docs] def set_ccdc_log_level(self, value): '''Set the log level of the CCDC logger. :param value: int ''' UtilitiesLib.CCDCLog.set_level(value)
[docs] def set_ccdc_minimum_log_level(self, value): '''Set the minimum log level of the CCDC logger. Note that a minimum log level of 1 produces an enormous amount of output. A minimum log level of 3 is recommended. :param value: int ''' UtilitiesLib.CCDCLog.set_minimum_level(value)
[docs] def set_log_level(self, value): '''Set the log level of Python log messages. :param value: Logger.LOGLEVEL ''' self.setLevel(value)
[docs] def fatal(self, msg, contact=True): '''Log a critical message and exit. :param msg: str :param contact: whether to include CCDC contact details in the message ''' if contact: self.critical( '\n'.join([msg, self.contact_support_message]) ) else: self.critical(msg) sys.exit(1)
[docs] def ignore_line_numbers(self, tf=True): '''Format line numbers or not for output lines. :param tf: bool ''' if tf: self.formatter = logging.Formatter( '%(levelname)s %(message)s' ) else: self.formatter = logging.Formatter( '%(levelname)s %(filename)s:%(lineno)d %(message)s' ) self.handler.setFormatter(self.formatter)
###########################################################################
[docs]class FileLogger(object): '''A context manager to set logger output to a file. Use it like this:: with FileLogger('/tmp/ccdc.log') as log: ... log.info('Something happened') ... The file will be closed on exit and the logger reset to stderr. ''' def __init__(self, fname): '''Save the filename.''' self.file_name = fname def __enter__(self): '''Set the logger's output to the saved filename.''' self.log = Logger() self.log.set_output_file(self.file_name) return self.log def __exit__(self, *args): '''Reset the logger on exit.''' self.log.handler.flush() self.log.handler.close() self.log.reset()
########################################################################### # Uncertainty ###########################################################################
[docs]class Uncertainty(object): '''Represents the uncertainty in a float value.''' def __init__(self, precision=0, uncertainty=0, _uncertainty=None): if _uncertainty is None: _uncertainty = MathsLib.Uncertainty(precision, uncertainty) self._uncertainty = _uncertainty def __repr__(self): return repr(self._uncertainty) @property def precision(self): '''The precision of the uncertainty.''' return self._uncertainty.precision() @property def uncertainty(self): '''The uncertainty in the measurement.''' return self._uncertainty.uncertainty()
[docs]class UncertainValue(object): '''Represents a float with uncertainty.''' def __init__(self, f=0.0, u=None): '''Initialise with a float and an uncertainty.''' if u is None: u = MathsLib.Uncertainty() self._uncertain_value = MathsLib.UncertainValue(f, u) def __str__(self): '''As a string.''' return str(self._uncertain_value) __repr__ = __str__ @property def value(self): '''The value without the uncertainty.''' return round(self._uncertain_value.value(), self.precision) @property def precision(self): '''The number of decimal places supported.''' return self._uncertain_value.uncertainty().precision() @property def uncertainty(self): '''The uncertainty in the value.''' return self._uncertain_value.uncertainty().uncertainty()
[docs] @staticmethod def from_string(s): '''Create from a string representation.''' u = UncertainValue() u._uncertain_value = MathsLib.UncertainValue(s) return u
########################################################################### class nested_class(object): '''A class decorator to make nested classes more explicitly named.''' def __init__(self, parent_name): '''Save the name of the parent class.''' self.parent_name = parent_name def __call__(self, klass): '''Modify the klass name to reflect the hierarchy. Don't do it if in a sphinx context.''' if 'sphinx' not in sys.modules: klass.__name__ = '%s.%s' % (self.parent_name, klass.__name__) setattr(sys.modules[klass.__module__], klass.__name__, klass) return klass ########################################################################### class bidirectional_dict(dict): '''Dictionary with inverse lookup.''' def inverse_lookup(self, val): '''Lookup key for a value.''' if isinstance(val, str) and all(isinstance(v, str) for v in self.values()): possibles = [(k, v) for k, v in self.items() if v.lower().startswith(val.lower())] if len(possibles) == 1: return possibles[0][0] elif len(possibles) > 1: raise KeyError('Ambiguous value %s' % val) else: raise KeyError('Non-existent value %s' % val) for k, v in self.items(): if v == val: return k raise KeyError('Non-existent value %s' % val) def prefix_lookup(self, key): '''Lookup by unique prefix.''' vals = [(k.lower(), v) for k, v in self.items() if k.lower().startswith(key.lower())] if len(vals) == 1: return vals[0][1] if len(vals): keys, vals = zip(*vals) if key.lower() in keys: # exact match return vals[keys.index(key.lower())] raise KeyError('Non-unique prefix %s' % key) raise KeyError('Non-existent prefix %s' % key) ###########################################################################
[docs]class Histogram(object): '''A histogram.''' def __init__(self, start_value, end_value, nbins): self._histogram = MathsLib.Histogram(start_value, end_value, nbins) @staticmethod def _from_histogram(_histogram): '''Private.''' h = Histogram(0, 0, 1) h._histogram = _histogram return h
[docs] def add_value(self, v): '''Add a single value to the histogram.''' self._histogram.add(v)
[docs] def add_values(self, vs): '''Add an iterable of values.''' self._histogram.add(vs)
@property def bin_width(self): '''The width of a bin.''' return self._histogram.bin_width() @property def nbins(self): '''The number of bins.''' return self._histogram.nbins() @property def start_value(self): '''The starting value.''' return self._histogram.start_value() @property def end_value(self): '''The last value.''' return self._histogram.end_value() @property def frequencies(self): '''The frequency counts of each bin.''' return tuple( self._histogram.frequency(i) for i in range(self._histogram.nbins()) ) @property def nvalues(self): '''How many observations were made.''' return self._histogram.n_observations() @property def overflow(self): '''Whether over- or underflow occured in the histogram.''' return self._histogram.ntoo_big() > 0 or self._histogram.ntoo_small() > 0 @property def nunderflow(self): '''Number of underflowing values.''' return self._histogram.ntoo_small() @property def noverflow(self): '''Number of overflowing values.''' return self._histogram.ntoo_big()
[docs] def compare(self, other, method='jensen_shannon'): '''Compare two histograms. :param other: :class:`ccdc.utilities.Histogram` instance :param method: either 'jensen_shannon' or 'kullback_leibler' or a prefix of either :returns: a disimilarity measure ''' if method.lower().startswith('k'): return MathsLib.kullback_leibler_divergence(self._histogram, other._histogram) else: return MathsLib.jensen_shannon_divergence(self._histogram, other._histogram)
###########################################################################
[docs]class Grid(object): '''An orthonormal grid of values.''' def __init__(self, origin=None, far_corner=None, spacing=None, default=0.0, _grid=None): if _grid is not None: self._grid = _grid org = _grid.origin() far = _grid.far_corner() try: self._spacing = (far.x() - org.x())/(_grid.x_steps() - 1) except ZeroDivisionError: print(far.x(), org.x(), _grid.x_steps()) self._spacing = far.x() - org.x() else: if spacing is None: spacing = 0.2 self._spacing = spacing x_axis = MathsLib.vector_3d(far_corner[0] - origin[0], 0, 0) y_axis = MathsLib.vector_3d(0, far_corner[1] - origin[1], 0) z_axis = MathsLib.vector_3d(0, 0, far_corner[2] - origin[2]) self._grid = MathsLib.NormalGrid( int((far_corner[0] - origin[0]) / spacing) + 1, int((far_corner[1] - origin[1]) / spacing) + 1, int((far_corner[2] - origin[2]) / spacing) + 1, origin, x_axis, y_axis, z_axis, default ) def _combine(self, other=None, operator=None, inplace=False): if operator is None: raise RuntimeError('No operator provided') if inplace: gc = MathsLib.GridCombiner(MathsLib.GridCombiner.UPDATE_IN_PLACE) else: gc = MathsLib.GridCombiner() gc.add_grid(self._grid) if other is not None: gc.add_grid(other._grid) g = gc.combined_grid(operator) return Grid(_grid=g) def __mul__(self, other): '''Multiplies two grids.''' if isinstance(other, Grid): return self._combine(other, MathsLib.MultiplyFunction()) else: return self._combine(operator=MathsLib.ScalarMultiplyFunction(other)) def __imul__(self, other): if isinstance(other, Grid): return self._combine(other, MathsLib.MultiplyFunction(), inplace=True) else: return self._combine(operator=MathsLib.ScalarMultiplyFunction(other), inplace=True) def __truediv__(self, other): '''Divides a grid by another grid or a scalar. :raises: ZeroDivisionError if any element of the divisor is 0.0 ''' if isinstance(other, Grid): inxs = other.indices_at_value(0.0) if inxs: raise ZeroDivisionError('The divisor has 0.0 values') else: return self._combine(other, MathsLib.DivideFunction()) else: if other == 0.0: raise ZeroDivisionError('The dividend is 0.0') else: return self._combine(operator=MathsLib.ScalarDivideFunction(other)) def __itruediv__(self, other): '''In-place divides a grid by another or a scalar. :raises: ZeroDivisionError if any element of the dividend is 0.0. ''' if isinstance(other, Grid): inxs = other.indices_at_value(0.0) if inxs: raise ZeroDivisionError('The divisor has 0.0 values') else: return self._combine(other, MathsLib.DivideFunction(), inplace=True) else: if other == 0.0: raise ZeroDivisionError('The dividend is 0.0') else: return self._combine(operator=MathsLib.ScalarDivideFunction(other), inplace=True) def __add__(self, other): if isinstance(other, Grid): return self._combine(other, MathsLib.AddFunction()) else: return self._combine(operator=MathsLib.ScalarAddFunction(other)) def __iadd__(self, other): if isinstance(other, Grid): return self._combine(other, MathsLib.AddFunction(), inplace=True) else: return self._combine(operator=MathsLib.ScalarAddFunction(other), inplace=True) def __sub__(self, other): if isinstance(other, Grid): return self._combine(other, MathsLib.SubtractFunction()) else: return self._combine(operator=MathsLib.ScalarSubtractFunction(other)) def __isub__(self, other): if isinstance(other, Grid): return self._combine(other, MathsLib.SubtractFunction(), inplace=True) else: return self._combine(operator=MathsLib.ScalarSubtractFunction(other), inplace=True) def __and__(self, other): '''The grid containing 1.0 where both this grid and the other have a non-zero value, otherwise 0.0.''' return self._combine(other, MathsLib.AndFunction()) def __iand__(self, other): '''Inplace logical and.''' return self._combine(other, MathsLib.AndFunction(), inplace=True) def __or__(self, other): '''The grid containing 1.0 where either this grid or the other has a non-zero value otherwise 0.0.''' return self._combine(other, MathsLib.OrFunction()) def __ior__(self, other): '''Inplace logical or.''' return self._combine(other, MathsLib.OrFunction(), inplace=True) def __neg__(self): '''The grid containing 1.0 where this grid has a non-zero value, otherwise 0.0.''' return self._combine(operator=MathsLib.NotFunction()) def __lt__(self, other): '''The grid containing 1.0 where this grid has value less than the other grid value or scalar.''' if isinstance(other, Grid): return self._combine(other, MathsLib.LessThanFunction()) else: return self._combine(operator=MathsLib.ScalarLessThanFunction(other)) def __gt__(self, other): '''The grid containing 1.0 where this grid has value greater than the other grid value or scalar.''' if isinstance(other, Grid): return self._combine(other, MathsLib.GreaterThanFunction()) else: return self._combine(operator=MathsLib.ScalarGreaterThanFunction(other))
[docs] def masked_set(self, mask, value): '''A grid containing this grid's values where the mask is 0.0, otherwise value.''' return self._combine(mask, MathsLib.MaskedSetFunction(value))
@property def bounding_box(self): '''The origin and farthest corner of the grid.''' from ccdc import molecule org = self._grid.origin() far = self._grid.far_corner() return molecule.Coordinates(org.x(), org.y(), org.z()), molecule.Coordinates(far.x(), far.y(), far.z()) @property def nsteps(self): '''A triple of nx, ny, nz for the grid.''' return (self._grid.x_steps(), self._grid.y_steps(), self._grid.z_steps()) @property def spacing(self): '''The distance between successive grid points in one dimension.''' return self._spacing
[docs] def value(self, i, j, k): '''The value at a point.''' nx, ny, nz = self.nsteps if i >= nx or j >= ny or k >= nz: raise IndexError return self._grid.value(i, j, k)
[docs] def value_at_point(self, coords): '''The value at a point. If the point is outside the grid a default value of 0.0 will be returned. The value will be linearly interpolated from nearby grid point values. ''' if ( coords[0] < self._grid.origin().x() or coords[0] >= self._grid.far_corner().x() or coords[1] < self._grid.origin().y() or coords[1] >= self._grid.far_corner().y() or coords[2] < self._grid.origin().z() or coords[2] >= self._grid.far_corner().z() ): return 0.0 return self._grid.value(coords)
[docs] def set_value(self, i, j, k, val): '''Set the value at a grid point.''' self._grid.set_value(i, j, k, val)
[docs] def to_vector(self): '''Return the tuple of values in the grid. This makes it easy to create numpy arrays from a grid. ''' return self._grid.to_vector()
[docs] def from_vector(self, vector): '''Sets all points of the grid from the vector.''' self._grid.from_vector(vector)
[docs] def to_dict(self): '''A dictionary keyed by grid value with values the indices giving that value.''' return self._grid.to_map()
[docs] def from_dict(self, dic): '''Use a dictionary to set the values of the grid.''' self._grid.from_map(dic)
@property def extrema(self): '''The minimum and maximum value of the grid.''' mm = MathsLib.extrema(self._grid) return (mm.first, mm.second)
[docs] def copy(self): '''Make a copy of the grid.''' return Grid(_grid=MathsLib.copy_grid(self._grid))
[docs] def indices_at_value(self, value): '''The tuple of indices where the grid has the given value.''' return MathsLib.indices_at_value(self._grid, value)
[docs] def count_grid(self): '''The number of non-zero points in the grid.''' return MathsLib.count_grid(self._grid)
[docs] def sub_grid(self, region): '''Extract the region of the grid given by the six indices in region.''' return Grid(_grid=MathsLib.sub_grid(self._grid, region))
[docs] def dilate(self): '''Expands a grid to include points with a non-zero neighbour.''' return Grid(_grid=MathsLib.dilate(self._grid))
[docs] def contract(self): '''Contracts a grid by eliminating points with a zero-valued neighbour.''' return Grid(_grid=MathsLib.contract(self._grid))
[docs] def mean_value_of_neighbours(self): '''Sets the value at a grid point to the mean of the neighbouring grid points.''' return Grid(_grid=MathsLib.value_by_neighbours(self._grid, MathsLib.MEAN))
[docs] def min_value_of_neighbours(self): '''Sets the value at a grid point to the minimum value of its neighbours.''' return Grid(_grid=MathsLib.value_by_neighbours(self._grid, MathsLib.MINIMUM))
[docs] def max_value_of_neighbours(self): '''Sets the value at a grid point to the maximum value of its neighbours.''' return Grid(_grid=MathsLib.value_by_neighbours(self._grid, MathsLib.MAXIMUM))
[docs] def count_neighbours(self): '''Sets the value at a grid point to the number of neighbours with a non-zero value.''' return Grid(_grid=MathsLib.value_by_neighbours(self._grid, MathsLib.COUNT))
[docs] def flood_fill(self, other: object, i: int, j: int, k: int, threshold: float, value: float, x_periodic: bool = False, y_periodic: bool = False, z_periodic: bool = False): '''Sets all connected points of the other grid with values at or above threshold to a given value. x|y|z_periodic flags allow the flood fill to be carried out using periodic boundary conditions in the given axis. :param other: a :class:`ccdc.utilities.Grid` object containing connected points :param i: x grid index to start flood fill :param j: y grid index to start flood fill :param k: z grid index to start flood fill :param threshold: at or above threshold value used for selecting connected points :param value: fill value :param x_periodic: flag for periodicity along x axis :param y_periodic: flag for periodicity along y axis :param z_periodic: flag for periodicity along z axis :returns: a subgrid containing the connected region. ''' if isinstance(x_periodic, bool) and isinstance(y_periodic, bool) and isinstance(z_periodic, bool): pass else: raise TypeError("Please supply boolean for periodicity flags") region = MathsLib.flood_fill(other._grid, self._grid, value, threshold, i, j, k, x_periodic, y_periodic, z_periodic) if len(region): return Grid(_grid=MathsLib.sub_grid(other._grid, region))
[docs] def islands(self, threshold): '''The connected regions of the grid where all values are above the threshold.''' return tuple(Grid(_grid=x) for x in MathsLib.islands(self._grid, threshold))
[docs] @staticmethod def super_grid(padding, *grids): '''Inject the grids into the smallest grid containing them.''' if not grids: raise RuntimeError('No grids from which to make a super-grid') if len(set(round(g.spacing, 3) for g in grids)) > 1: raise RuntimeError('Incompatible grids from which to make a super-grid') boxes = [g.bounding_box for g in grids] bounds = [ (min(box[0][j] for box in boxes), max(box[1][j] for box in boxes)) for j in range(3) ] org = [bounds[j][0] - padding for j in range(3)] far = [bounds[j][1] + padding for j in range(3)] grid = Grid(org, far, grids[0].spacing) for g in grids: MathsLib.inject_grid(grid._grid, g._grid) return grid
[docs] def set_sphere(self, point, radius, value, scaling='linear', mode='add'): '''Set a sphere of values in the grid. :parameter point: the centre of the sphere to be set. :parameter radius: the radius of the sphere to be set. :parameter value: the value to set. :parameter scaling: either 'linear' for linear scaling of the value, or anything else for no scaling. :parameter mode: one of 'add', 'replace', 'min' or 'max' to control the method of overwriting the grid's current value. ''' if scaling.lower() == 'linear': scale = MathsLib.LINEAR_SCALE else: scale = MathsLib.NO_SCALE sphere_mode = dict( add=MathsLib.OVERWRITE_ADD, replace=MathsLib.OVERWRITE_REPLACE, min=MathsLib.OVERWRITE_MIN, max=MathsLib.OVERWRITE_MAX )[mode] p = [point[0], point[1], point[2]] MathsLib.set_sphere(self._grid, p, radius, value, scale, sphere_mode)
[docs] def score_molecule(self, molecule): '''The dictionary of values for atoms' coordinates on the grid.''' return { a : self.value_at_point(a.coordinates) for a in molecule.atoms }
[docs] def write(self, file_name, title='', format=None): '''Write the grid. :param file_name: to which the grid will be written. :param title: will be written to a Sybyl format file as the title. :param format: either 'acnt' for a Sybyl format grid file, 'grd' for an InsightII format grid file or 'ccp4' for a CCP4Map format. ''' if format is None: format = os.path.splitext(file_name)[1][1:] if format.lower() == 'acnt': acnt_file = MathsLib.AcntFile(title, self._grid) acnt_file.write(file_name) elif format.lower() == 'ccp4': ccp4map = MathsLib.CCP4MapFile() ccp4map.set(self._grid) ccp4map.write(file_name) else: grd_file = MathsLib.GRDFile() grd_file.set(self._grid) grd_file.write(file_name)
[docs] @staticmethod def from_file(file_name, format=None): '''Reads a grid from file. :param file_name: from which the grid will be written. :param format: either 'acnt' or 'grd' indicating a Sybyl or an InsightII formatted file. ''' if format is None: format = os.path.splitext(file_name)[1][1:] if format.lower() == 'acnt': acnt_file = MathsLib.AcntFile(file_name) return Grid(_grid=MathsLib.make_grid_from_acnt(acnt_file)); elif format.lower() == 'ccp4': ccp4_file = MathsLib.CCP4MapFile(file_name) return Grid(_grid=ccp4_file.grid()) else: grd_file = MathsLib.GRDFile(file_name) return Grid(_grid=grd_file.grid())
[docs] def slice(self, plane): '''Slices a grid with a plane. :param plane: a :class:`ccdc.descriptors.GeometricDescriptors.Plane` instance. :returns: a pair of :class:`ccdc.utilities.Grid`, the first with zeros further away from the origin than the plane distance, the second with zeros nearer to the origin than the plane distance. ''' plane_dist = plane.distance norm_x, norm_y, norm_z = plane.normal near = self.copy() far = self.copy() nx, ny, nz = self.nsteps (ox, oy, oz), _ = self.bounding_box for x in range(nx): for y in range(ny): for z in range(nz): p = [ox + x*self.spacing, oy + y*self.spacing, oz + z*self.spacing] d = norm_x*p[0] + norm_y*p[1] + norm_z*p[2] if d < plane_dist: far.set_value(x, y, z, 0.0) elif d > plane_dist: near.set_value(x, y, z, 0.0) return near, far
[docs]class GridEnsemble(object): '''A collection of homogeneous :class:`ccdc.utilities.Grid` instances. All grids will be on a common volume and spacing. Provides a lazy dict-like interface: grids will not be loaded until asked for, and may be accessed by base name as a key. Grids may be added via __setitem__. The grid will automatically be written to the directory. ''' def __init__(self, directory): '''Reads the names of grids in the directory. .grd format will be preferred. ''' self.directory = directory self._grid_files = glob.glob(os.path.join(directory, '*.acnt')) + \ glob.glob(os.path.join(directory, '*.ccp4')) + \ glob.glob(os.path.join(directory, '*.grd')) self._grid_dict = { os.path.splitext(os.path.basename(f))[0] : f for f in self._grid_files } self._grids = {} def __getitem__(self, key): key = key.replace(' ', '_') if key not in self._grid_dict: raise KeyError('No grid for %s' % key) if key not in self._grids: g = Grid.from_file(self._grid_dict[key]) self._grids[key] = g return self._grids[key] def __setitem__(self, key, val): self._grid_dict[key] = os.path.join(self.directory, key + '.grd') self._grids[key] = val val.write(self._grid_dict[key]) def keys(self): return self._grid_dict.keys()
########################################################################### def _argb2rgba(data): '''Private: transform image data to PIL format.''' l = [] for x in range(0, len(data), 4): l.extend( [data[x+2], data[x+1], data[x+0], data[x+3]] ) return bytes(l) ############################################################################# def _find_ccdc_datapacks_exe(): '''Locate the ccdc_datapacks executable Only use in tests with @_datapack_required decorator''' if sys.platform == 'win32': ccdc_datapacks_exe = 'ccdc_datapacks.exe' else: ccdc_datapacks_exe = 'ccdc_datapacks.x' ccdc_datapacks = os.path.join(os.environ["BT_BUILDSPACE_BINARY_PATH"], ccdc_datapacks_exe) if not os.path.exists(ccdc_datapacks): raise Exception(f"This test requires ccdc_datapacks to be built and available in {ccdc_datapacks}") return ccdc_datapacks def _datapack_required(datapack_full_reference): '''Decorator to skip tests if datapacks are not available This will ensure that a datapack is present :param datapack_full_reference: str a full reference to a datapack ''' def datapack_required_decorator_lambda(f): if 'BT_BUILD_SPACE' not in os.environ: return pytest.mark.skip("Test can only run under the CCDC build infrastructure")(f) if 'ARTIFACTORY_API_KEY' not in os.environ: return pytest.mark.skip("Test requires CCDC build infrastructure credentials for data")(f) _obtain_datapack(datapack_full_reference) return f return datapack_required_decorator_lambda def _locate_ccdc_datapack_in_buildspace(datapack_full_reference): """Ask ccdc_datapacks where a datapack is on disk""" location=subprocess.check_output([_find_ccdc_datapacks_exe(), '-p', datapack_full_reference]).decode().strip() if not location.startswith(datapack_full_reference): raise Exception(f'Invalid output from ccdc_datapacks: {location}') # the output is in the form datapack/full/reference=Path on disk location = location[len(datapack_full_reference)+1:] return location def _obtain_datapack(datapack_full_reference): subprocess.check_call([_find_ccdc_datapacks_exe(), '-f', datapack_full_reference]) def _locate_file_in_datapack(datapack_full_reference, file_name): return os.path.join(_locate_ccdc_datapack_in_buildspace(datapack_full_reference), file_name) def _obtain_datapack_or_skip_for_doctests(datapack_full_reference): if 'BT_BUILD_SPACE' not in os.environ: pytest.skip("Test can only run under the CCDC build infrastructure") if 'ARTIFACTORY_API_KEY' not in os.environ: pytest.skip("Test requires CCDC build infrastructure credentials for data") _obtain_datapack(datapack_full_reference) def _locate_file_in_datapack_or_skip_for_doctests(datapack_full_reference, file_name): return os.path.join(_locate_ccdc_datapack_in_buildspace(datapack_full_reference), file_name) def _find_test_file_for_doctests(fname): '''Used in doctests''' thisdir = os.path.dirname(os.path.abspath(__file__)) wrappingdir = os.path.dirname(os.path.dirname(thisdir)) for path, dirs, files in os.walk(wrappingdir): if ('example_files' in path or 'testdata' in path) and fname in files: return os.path.join(path, fname) raise IOError('File not found %s' % fname) def _find_test_file(scriptfilename, basename): '''Find a test file in a location relative to a test script. We can't use a location relative to this file as it may have been installed to a location that is entirely separate from the test files. ''' this = os.path.abspath(scriptfilename) thisdir = os.path.dirname(this) testdir = os.path.join(os.path.dirname(thisdir), 'tests') docdir = os.path.join(os.path.dirname(thisdir), 'doc') datadir = os.path.join(testdir, 'testdata') doc_data = os.path.join(docdir, 'example_files') file_name = os.path.join(datadir, basename) if os.path.exists(file_name): return file_name if os.path.exists(file_name + '.inf'): return file_name file_name = os.path.join(doc_data, basename) if os.path.exists(file_name): return file_name if os.path.exists(file_name + '.inf'): return file_name if 'ccdc_internal' in scriptfilename: return _find_test_file(scriptfilename.replace('_internal', '_rp'), basename) elif 'ccdc_rp' in scriptfilename: return _find_test_file(scriptfilename.replace('_rp', ''), basename) raise IOError('Test file %s does not exist in %s or %s' % (basename, datadir, doc_data)) def _test_output_dir(remove=True, relative_to_cwd=False): import tempfile import atexit import shutil if relative_to_cwd: dtmp = tempfile.mkdtemp(dir=os.path.abspath(os.getcwd())) else: dtmp = tempfile.mkdtemp() if remove: def f(d): try: shutil.rmtree(d) except: pass atexit.register(f, dtmp) else: print('*'*20, 'TEMP', dtmp, '*'*20) return dtmp #############################################################################
[docs]class Timer(object): '''Collects timing statistics for the run. Two methods of instrumenting code are available: a decorator for functions and methods, and a context manager for blocks of code. The former is used as | timer = Timer() | @timer.decorate('Some tag') | def method(self): | body_of_method The latter is used as | with timer('Some tag'): | code_to_time... At the end of the run, call | timer.report() which will print to stdout (or another file) a sorted list of times accumulated by the given tags. There is a staticmethod, :meth:`ccdc.utilities.Timer.progress` which may be used to provide a progress meter for long running tasks. | if count % 1000 == 0: | Timer.progress(start_time, count, expected_total, message, file=sys.stderr) to get periodic reports including percentage complete and expected length to wait for completion. An optional argument can also be provided to :meth:`ccdc.utilities.Timer.progress` to provide in place output that overwrites the previous output. | Timer.progress(start_time, count, expected_total, message, file=sys.stderr, in_place=True) ''' def __init__(self): self.times = collections.defaultdict(float) self.calls = collections.defaultdict(int)
[docs] def decorate(self, tag): '''Decorates a function or method to provide timing information.''' return lambda fn: self._decorator(fn, tag=tag)
def _decorator(self, fn, tag=''): def decorated(*args, **kw): with self(tag): return fn(*args, **kw) return decorated def __call__(self, tag): '''Returns a context manager for recording timing information.''' return Timer.Manager(self, tag)
[docs] class Manager(object): '''The context manager.''' def __init__(self, timer, tag): self.timer = timer self.tag = tag self.time = time.time() def __enter__(self): self.timer.calls[self.tag] += 1 def __exit__(self, *args): self.timer.times[self.tag] += time.time() - self.time
[docs] @staticmethod def format_time(t): '''Formats a time into hours:minutes:seconds.''' t = int(t) hours, mins, secs = t/3600, (t%3600)/60, t % 60 if hours > 24: return '%d:%d:%02d:%02d' % (hours/24, hours%24, mins, secs) return '%d:%02d:%02d' % (hours, mins, secs)
[docs] @staticmethod def progress(start, count, total, message, file=sys.stdout, in_place=False): '''Reports on progress of a long-running task. :param start: the timestamp of the start of the task. :param count: how many times the code has been executed. :param total: how many times it is expected to call the code. :param message: a message to be printed. ''' curr_time = time.time() - start to_go = (total * curr_time/count) - curr_time expected = ' (expected %s) ' % Timer.format_time(to_go) percent = count*100./total if in_place: file.write('\r%6d (%3d%%)... %s%s%s' % (count, percent, Timer.format_time(curr_time), expected, message)) else: print('%6d (%3d%%)... %s%s%s' % (count, percent, Timer.format_time(curr_time), expected, message), file=file) file.flush()
[docs] def report(self, file=sys.stdout): '''Reports the time spent per tag.''' longest = max(len(t) for t in self.times) for t, tag in sorted(((v, k) for k, v in self.times.items()), reverse=True): print('%*s: %3d: %s' % (longest, tag, self.calls[tag], self.format_time(self.times[tag])), file=file)
############################################################################# class PushDir(object): '''Context manager for directory changes.''' def __init__(self, dir): '''Save the directory.''' self.dir = dir def __enter__(self): '''Cd to it.''' self.old = os.getcwd() os.chdir(self.dir) def __exit__(self, type, value, traceback): '''And return to the old directory.''' os.chdir(self.old) ############################################################################# def _detect_format(s, format='mol2'): if '@<TRIPOS>MOLECULE' in s: format = 'mol2' elif 'V2000' in s or 'M END' in s or '$$$$' in s: format = 'sdf' elif 'loop_' in s: format = 'cif' elif s.count('\n') < 2: format = 'smiles' return format ############################################################################# class _temporary_copy(object): def __init__(self, original_path): self.original_path = original_path def __enter__(self): temp_dir = tempfile.mkdtemp() base_path = os.path.basename(self.original_path) self.path = os.path.join(temp_dir, base_path) shutil.copy2(self.original_path, self.path) if self.path.endswith('.gz'): new_path = self.path[:-3] with open(self.path, 'rb') as f_in: data = zlib.decompress(f_in.read(), zlib.MAX_WBITS | 16) with open(new_path, 'wb') as unpacked: unpacked.write(data) os.remove(self.path) self.path = new_path return self.path def __exit__(self, exc_type, exc_val, exc_tb): os.remove(self.path) def _get_csd_dir_content(): '''Return the contents of a valid CSD database directory.''' from ccdc.io import _CSDDatabaseLocator loc = _CSDDatabaseLocator.get_csd_location() if loc is None: raise CSDNotFoundException("CSD not found") dir = os.path.dirname(loc) dir_contents = os.listdir(dir) db_name = os.path.basename(loc) if not any(filename.startswith(db_name) for filename in dir_contents): raise CSDNotFoundException("Database not found") return dir_contents def _other_files_required(*args): def _decorator(f): if all(os.path.exists(a) for a in args): return f return unittest.skip('Some required files do not exist') return _decorator if _have_pytest: class _test_attr(object): '''Emulate nosetests attr decorator. If we decide to commit to pytest, replace the @attr decorators with the more native syntax @pytest.mark.slow. ''' def __init__(self, mark): self.mark = mark def __call__(self, obj): return getattr(pytest.mark, self.mark)(obj) def _other_files_required(*args): def _decorator(f): if all(os.path.exists(a) for a in args): return f return pytest.mark.skip('Some required files do not exist')(f) return _decorator def _csd_required(f): '''Decorator to skip tests if the CSD is not present.''' try: _get_csd_dir_content() except RuntimeError as e: return pytest.mark.skip("Test requires the CSD. " + str(e))(f) else: return f ###########################################################################
[docs]class Licence(object): '''Information about the current licensing.''' def __init__(self): self._days_remaining = 0 self._modules = [] if UtilitiesLib.is_csd_system_licensed(): self._modules.append('CSD-Core') if UtilitiesLib.is_csd_materials_licensed(): self._modules.append('CSD-Materials') if UtilitiesLib.is_csd_discovery_licensed(): self._modules.append('CSD-Discovery') if UtilitiesLib.is_research_partner_licensed(): self._modules.append('Research Partner') days = UtilitiesLib.days_remaining() if (self._days_remaining == 0) or (days > 0 and days < self._days_remaining): self._days_remaining = days self._features = UtilitiesLib.licensed_features() @property def files(self): '''Deprecated - The licence files in use.''' warnings.warn('Licensing no longer uses a file-based implementation', DeprecationWarning) return '' @property def days_remaining(self): '''The number of days remaining before any part of the licence expires.''' return self._days_remaining @property def modules(self): '''The licensed modules.''' return self._modules @property def features(self): '''The licensed features.''' return self._features.split(', ') def __str__(self): '''As a string.''' return 'Licensing\n%d days remaining\nModules: %s' % (self._days_remaining, ', '.join(self._modules)) __repr__ = __str__
######################################################################################################### Colour = collections.namedtuple('Colour', ['r', 'g', 'b', 'a']) def get_ccdc_icon(output_path): """:obj:`str`: The filename of a CCDC icon image in .png format.""" return _copy_asset('ccdc_logo_48x48.png', os.path.abspath(output_path)) def get_ccdc_logo(output_path): """:obj:`str`: The filename of a CCDC logo image in .png format.""" return _copy_asset('ccdc_logo_180x180_with_text.png', os.path.abspath(output_path)) def to_utf8(data): """Wrapper to encode unicode strings as UTF-8 if needed for Python 2+3 compatibility. :param data: (:obj:`str`) The string to process :returns: (:obj:`str`) The string encoded as UTF-8 if needed """ if isinstance(data, bytes): # Web requests that return UTF-8 will return this as a byte string on Python 3. # We need to decode this to a unicode string so it can be written to a file opened # in text mode via ccdc.utilities.output_file(). return data.decode('utf-8') else: return data def output_file(file_name): """Get a file object for writing output to with appropriate modes for writing CSV with unicode characters on both Python 2 and 3. :param file_name: (:obj:`str`) An absolute path to the file name or the extension to use :returns: (:obj:`file`) An open :obj:`file` object with correct modes and parameters to allow Unicode to be written properly to CSV on both Python 2.x and 3.x. """ # newline='' is needed to avoid csv.writer writing empty lines on Windows on Python 3 return open(file_name, 'a', encoding='utf-8', newline='') def _copy_asset(file_name, output_path): """Copy the given logo file from the CSD Python API assets path to the output directory. :param file_name: (:obj:`str`) The file name of the asset to copy :param output_path: (:obj:`str`) The directory to copy the asset to. :returns: :obj:`str` The filename of the asset in the output directory. :raises: RuntimeError if the given asset could not be found. """ destination = os.path.join(output_path, file_name) files_to_try = [os.path.abspath(os.path.join(os.path.dirname(__file__), 'assets', file_name)), os.path.abspath(os.path.join(os.environ.get('MAINDIR', ''), 'cppbuilds_shared', 'installer', 'images', file_name))] for candidate in files_to_try: if os.path.isfile(candidate): shutil.copyfile(candidate, destination) return destination raise RuntimeError('Cannot find requested asset at any of %s!' % files_to_try) def html_table(data, header=None, table_id=None, caption=None): """Generate a well formatted HTML table for use in reports from a list of rows of data. :param data: (:obj:`list` of :obj:`list`) A list of lists; each list is one row in the table. :param header: (:obj:`list` of :obj:`str`) The header row for the table, if any. :param table_id: (:obj:`str`) The unique identifier to use for the table's HTML element. :param caption: (:obj:`str`) The caption to use for the table. :returns: (:obj:`str`) A string of HTML containing the table data. """ if table_id is None and caption is not None: table_id = slug(caption) table = ['<table>' if table_id is None else '<table id="%s">' % table_id] if caption is not None: table.append(' <caption>%s</caption>' % caption) if header is not None: table.append(' <tr><th>%s</th></tr>' % ('</th><th>'.join([str(item) for item in header]))) for row in data: table.append(' <tr><td>%s</td></tr>' % ('</td><td>'.join([str(item) for item in row]))) table.append('</table>%s' % os.linesep) return os.linesep.join(table) def slug(text): """Turn the given text into a "slug", i.e. strip out any non- alphanumeric characters. :param text: (:obj:`str`) The text to process. :returns: (:obj:`str`) The processed text. """ # Convert the text to all-ASCII by normalising Unicode and dropping any that can't be normalised text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('ascii') # Remove any non-alphanumeric and non-whitespace characters and leading or trailing spaces text = re.sub(r'[^\w\s-]', '', text).strip() # Replace any remaining white space with underscores and convert to lower case return re.sub(r'[-\s]+', '_', text).lower() def b64encode(file_name): """Read the given file and convert it to a base64 string, e.g. for embedding images in HTML. :param file_name: (:obj:`str`) The absolute path to the file to encode. :returns: (:obj:`str`) The base64 string representing the file's contents. """ return base64.b64encode(open(file_name, 'rb').read()).decode()
[docs]class HTMLReport(object): """Utility class for writing HTML reports for API scripts. Can be used as a context manager, which will automatically write the closing HTML tags and close the output file when closed: with ccdc.utilities.HTMLReport(file_name='output.html', title='Example Report', ccdc_header=True, append=False, embed_images=False) as report: report.write_section_header('Headline') report.write('Test content') """ def __init__(self, file_name, report_title='Report', ccdc_header=True, append=False, embed_images=False): """Create and open the output file and write the base HTML header to it. :param file_name: (:obj:`str`) The name of the file to which to write the report. :param report_title: (:obj:`str`) The title for the report to be used in the header. :param ccdc_header: (:obj:`bool`) Whether to write the CCDC logo in the page header. :param append: (:obj:`bool`) Whether to write any page headers or just append to the file as is. :param embed_images: (:obj:`bool`) Whether to embed images in the HTML as base64 data URLs. """ self.file_name = file_name self.output_path = os.path.dirname(self.file_name) self.embed_images = embed_images self.append = append self.report_title = report_title self.ccdc_header = ccdc_header # If we don't want to append and the file already exists, raise an exception if os.path.isfile(self.file_name) and not self.append: raise IOError('Output file %s already exists!' % self.file_name) self.output_file = output_file(self.file_name) # If we do want to append and the file does not exist, write the template and header if os.path.getsize(self.file_name) == 0: # Write the report template to the output file report_template = os.path.join(os.path.dirname(__file__), 'assets', 'report_template.html') with open(report_template, 'r', encoding='utf-8') as template_file: html_header = template_file.read() self.write(html_header.replace('{title}', self.report_title)) # Write the in-page report header self.write_report_header() self.output_file.flush() def __enter__(self): """Support function so HTMLReport can be used as a context manager.""" return self def __exit__(self, type, value, traceback): """Close the output file when exiting the context handler.""" self.close()
[docs] def close(self): """Write the HTML footer and close the output file.""" if not self.append: self.write_footer() self.output_file.close()
def _get_img_src(self, file_name): """Get an image 'src' attribute either as an embedded base64 string or a file name. :param file_name: (:obj:`str`) The file name to process. :returns: (:obj:`str`) The correct 'src' attribute for the image. If self.embed_images is True, this will be a data: URL with the image encoded to base64 for direct embedding; otherwise the image will be copied adjacent to the HTML report and its base filename returned. """ if self.embed_images: return 'data:image/png;base64,{}'.format(b64encode(file_name)) else: if not os.path.exists(os.path.join(self.output_path, os.path.basename(file_name))): shutil.copy(file_name, self.output_path) return os.path.basename(file_name)
[docs] def write(self, content): """Write the given content to the file. :param content: (:obj:`str` or :obj:`list`) The HTML content to write to the file. If this is a list, each item in it will be written to a separate HTML paragraph. Otherwise, the entire content will be written to one paragraph. """ if isinstance(content, list): paragraph_sep = '</p>%s<p>' % os.linesep content = '<p>%s</p>' % paragraph_sep.join([str(item) for item in content]) self.output_file.write(content)
[docs] def write_paragraph(self, content): """Write the given content to the file enclosed in <p></p> tags. :param content: (:obj:`str` or :obj:`list`) The HTML content to write to the file. If this is a list, each item in it will be written to a separate HTML paragraph. Otherwise, the entire content will be written to one paragraph. """ self.write('<p>%s</p>%s' % (content, os.linesep))
[docs] def write_report(self, content): """Shorthand for writing a complete HTML report in one swoop. :param content: (:obj:`str`) The HTML content to write to the file. """ self.write_report_header() self.write(content)
[docs] def write_report_header(self): """Write the header for the report.""" if self.ccdc_header: self.write('<img src="%s" id="ccdc_logo" alt="CCDC" />%s' % (self._get_img_src(get_ccdc_logo(self.output_path)), os.linesep)) self.write('<h1 id="report_header">%s</h1>%s' % (self.report_title, os.linesep))
[docs] def write_section_header(self, headline, section_id=None, level=2): """Write a section header. :param headline: (:obj:`str`) The text to use for the section header. :param section_id: (:obj:`str`) The unique identifier to use for the HTML element. :param level: (:obj:`int`) The level of heading to write (as <h#>). """ if section_id is None: section_id = slug(headline) self.write('<h%s id="%s">%s</h%s>%s' % (level, section_id, headline, level, os.linesep))
[docs] def write_figure(self, file_name, alt_text='Figure', caption=None, figure_id=None): """Add a figure, copying the given file name adjacent to the HTML report. :param file_name: (:obj:`str`) The path to the image file for the figure. :param alt_text: (:obj:`str`) The HTML alt text for the figure. :param caption: (:obj:`str`) The caption for the figure, if any. :param figure_id: (:obj:`str`) The unique identifier to use for the HTML element. """ if figure_id is None and caption is not None: figure_id = slug(caption) data = '%s<img src="%s" alt="%s" />%s' % (os.linesep, self._get_img_src(file_name), alt_text, os.linesep) self.write_figure_data(figure_data=data, caption=caption, figure_id=figure_id)
[docs] def write_figure_data(self, figure_data, caption=None, figure_id=None): """Add a figure, simply inserting the given figure data inside the <figure> tags. This is useful for writing figures from SVG code. :param figure_data: (:obj:`str`) The data to insert in the figure. :param caption: (:obj:`str`) The caption for the figure, if any. :param figure_id: (:obj:`str`) The unique identifier to use for the HTML element. """ # Generate a figure ID from the caption if it isn't given if figure_id is None and caption is None: figure_tag = '<figure>' elif figure_id is not None: figure_tag = '<figure id="%s">' % figure_id elif caption is not None: figure_tag = '<figure id="%s">' % slug(caption) self.write('%s%s%s' % (figure_tag, os.linesep, figure_data)) if caption is not None: self.write(' <figcaption>%s</figcaption>%s' % (caption, os.linesep)) self.write('</figure>%s' % os.linesep)
class CSVWriter(object): """Utility for properly handling writing CSV with Unicode characters across Python 2 + 3. By default, this outputs both to the output file and stdout to allow for data to be directly viewed when running scripts from the command line. Can be used as a context manager, which will ensure the header is only written once and automatically close the output file when closed: with ccdc.utilities.CSVWriter(file_name='output.tsv', header=['Column A', 'Column B'], delimiter='\t', stdout=True) as tsv_file: tsv_file.write_row(['A1', 'B1']) """ def __init__(self, file_name=None, header=None, delimiter=',', stdout=True): """Set up the writer. :param file_name: (:obj:`str`) The name of the file to write to, if any. :param header: (:obj:`list` of :obj:`str`) A list of column headers to write to the file. :param delimiter: (:obj:`str`) The delimiter to use for columns in the output file. :param stdout: (:obj:`bool`) Whether to also output the CSV to stdout. """ self.header = header """:obj:`list` of :obj:`str`: The header to use for the CSV file.""" self._header_written = False """:obj:`bool`: Whether the header has been written to the file, to avoid writing multiple header lines by accident.""" self.delimiter = delimiter """:obj:`str`: The delimiter to use for the CSV file (default: ,).""" self.to_stdout = stdout """:obj:`bool`: Whether to output the CSV to stdout as well.""" self.file_name = os.path.abspath(file_name) if file_name else None """:obj:`str`: The absolute path of the file to write to.""" self.file_object = output_file(self.file_name) if file_name else None """:obj:`file`: The file object for the CSV file, opened with appropriate modes for writing Unicode regardless of Python version.""" self.file_writer = csv.writer(self.file_object, delimiter=self.delimiter) if file_name \ else None """:obj:`csv.writer`: A CSV writer object writing to file_name.""" self.stdout = None """:obj:`csv.writer`: A CSV writer object writing to stdout.""" if self.to_stdout: try: sys.stdout.reconfigure(encoding='utf-8', errors='replace') except AttributeError: print('Failed to reconfigure stdout to Unicode - some characters may not output correctly!') self.stdout = csv.writer(sys.stdout, delimiter=self.delimiter) # Write the file header if we aren't appending to an existing file. if self.header is not None: self.write_header(self.header) def __enter__(self): """Support function so CSVWriter can be used as a context manager.""" return self def __exit__(self, type, value, traceback): """Close the output file when exiting the context handler.""" self.close() def close(self): """Close the output file.""" if self.file_object is not None: self.file_object.close() @staticmethod def row_to_utf8(row): """Wrapper to encode any unicode strings in a list as UTF-8 for Python 2+3 compatibility. This function takes a list of strings to allow easy handling of rows of CSV data. :param row: (:obj:`list` of :obj:`str`) The row to process :returns: (:obj:`list` of :obj:`str`) The row with each string encoded as UTF-8 as necessary """ return [to_utf8(item) for item in row] def write_header(self, header): """Write the given list of rows to file. :param header: (:obj:`list` of :obj:`str`) The header to write. """ if not self._header_written: self.header = header self.write_row(self.header) self._header_written = True def write_rows(self, data): """Write the given list of rows to file. :param data: (:obj:`list` of :obj:`list`) A list of the rows to write to the CSV file. """ for row in data: self.write_row(row) def write_row(self, row): """Write the given row to file. :param row: (:obj:`list`) The row of data to write to the CSV file. """ row_encoded = self.row_to_utf8(row) if self.file_writer is not None: self.file_writer.writerow(row_encoded) if self.to_stdout: self.stdout.writerow(row)
[docs]class ApplicationInterface(object): """Utility class to make interfacing the CSD Python API with external applications easier."""
[docs] class CmdParser(argparse.ArgumentParser): """Use a thin wrapper class to ArgumentParser. So we can, if needed, redefine ArgumentParser's methods. """ pass
def __init__(self, description='', parse_commandline=True, show_progress=True): """Set up the application interface. :param description: (:obj:`str`) The description output when running the script from the command line with the --help parameter. :param parse_commandline: (:obj:`bool`) Whether to immediately parse the command line arguments when instantiating the ApplicationInterface. If you want to add custom command line arguments to the script, set this to `False`, add your custom parameters using `ApplicationInterface.commandline_parser.add_argument()` and then call `ApplicationInterface.parse_commandline()` to parse the command line. :param show_progress: (:obj:`bool`) Whether to not to update the script progress in the output progress file. Default True. """ self.show_progress = show_progress self.commandline_parser = self.CmdParser(description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter) """:obj:`argparse.ArgumentParser`: An argparse ArgumentParser to handle command line arguments passed to the script.""" self.interface_file = None """:obj:`str`: The absolute path of a JSON file specifying options.""" self.options = {} """:obj:`dict`: A dictionary of command-line arguments passed in. Filled by calling `parse_commandline()`.""" self.script_name = None """:obj:`str`: The file name of the script that has imported the ApplicationInterface. In general, this should be the script that's been run by the external application.""" # inspect the stack to find the file name of the script # that's imported the ApplicationInterface if __name__ != '__main__': for frame in inspect.stack()[1:]: if frame.filename[0] != '<': self.script_name = os.path.abspath(frame.filename) break else: self.script_name = os.path.abspath(__file__) self.identifier = None """:obj:`str`: The identifier of the structure currently selected in the application. This should be the structures you want the script to run on (if any).""" self.output_directory_path = os.path.abspath('.') """:obj:`str`: The absolute path of the output directory for the script.""" self.working_directory_path = os.path.abspath('.') """:obj:`str`: The absolute path of the working directory.""" self.output_base = None """:obj:`str`: A prefix for output files, as an absolute path but without extension. When running via Mercury and Hermes, this is passed in via the interface file.""" self.database_path = 'csd' """:obj:`str`: The file name of the database to use.""" self.program_path = None """:obj:`str`: The full path to the executable of the application calling the script.""" self.conquest_path = None """:obj:`str`: The full path to a CCDC ConQuest executable.""" self._entry_reader = None """:obj:`ccdc.io.EntryReader`: A CCDC database reader.""" self.log_file_name = None """:obj:`str`: The full path to a log file for use by scripts.""" self.output_html_file = None """:obj:`str`: The full path to a HTML output file for use by scripts.""" self.output_csv_file = None """:obj:`str`: The full path to a CSV output file for use by scripts.""" self.output_tsv_file = None """:obj:`str`: The full path to a TSV output file for use by scripts.""" self.output_c2m_file = None """:obj:`str`: The full path to a Conquest2Mercury file for use by scripts.""" self.output_gcd_file = None """:obj:`str`: The full path to a GCD output file for use by scripts.""" self.output_morphology_file = None """:obj:`str`: The full path to a morphology CIF output file for use by scripts.""" self.output_sdf_file = None """:obj:`str`: The full path to an SDF file for use by scripts.""" self.output_progress_file = None """:obj:`str`: The full path to a file for writing interactive progress information to.""" self.output_gold_conf_file = None """:obj:`str`: The full path to a .conf file for GOLD docking settings.""" self.output_mol2_file = None """:obj:`str`: The full path to a MOL2 output file for use by scripts.""" self.output_pdb_file = None """:obj:`str`: The full path to a PDB output file for use by scripts.""" self.output_cif_file = None """:obj:`str`: The full path to a CIF/mmCIF output file for use by scripts.""" self.input_mol2_file = None """:obj:`str`: The full path to a MOL2 file of the current structure for use by scripts. This MOL2 file contains the current state of the crystal in the applicaiton visualiser. This is derived from the output_base property for compatibility with Hermes and Mercury.""" self.input_pdb_file = None """:obj:`str`: The full path to a PDB file of the current protein for use by scripts. This is derived from the output_base property for compatibility with Hermes.""" self.input_cif_file = None """:obj:`str`: The full path to a CIF file of the current protein for use by scripts. This is derived from the output_base property for compatibility with Hermes.""" # add default arguments passed in by Hermes and Mercury to the parser self.commandline_parser.add_argument( 'identifier', type=str, nargs='?', help='Identifier of the currently selected in the application.') self.commandline_parser.add_argument( 'output_html_file', type=str, nargs='?', default=os.path.abspath('output.html'), help='Path to an output HTML file.') self.commandline_parser.add_argument( 'interface_file', type=str, nargs='?', help='Input JSON file as generated by e.g. Mercury or Hermes.') if parse_commandline: self.parse_commandline() if self.show_progress and self.output_progress_file is not None: self.update_progress('%s started.' % self.script_name) @property def current_entry(self): """:obj:`ccdc.entry.Entry`: An `Entry` object for the currently selected structure.""" if self._entry_reader is None: self._entry_reader = ccdc.io.EntryReader(self.database_path) if self.database_path == 'csd': self.database_path = str(self._entry_reader.file_name) try: # First, try to find the currently-selected entry in the main database return self._entry_reader.entry(self.identifier) except: # Otherwise, see if it's in either of the input structure files for structure_file in [self.input_mol2_file, self.input_cif_file, self.input_pdb_file]: if os.path.isfile(structure_file): reader = ccdc.io.EntryReader(structure_file) try: return reader.entry(self.identifier) except: pass # If we haven't found the named entry in the CSD or any of the input files, # return None instead. return None @property def selected_atoms(self): """:obj:`list` of :obj:`ccdc.molecule.Atom`: The currently selected atoms in the structure.""" return [atom for atom in self.current_entry.molecule.atoms if atom.index in self.options['selected_atoms']] @property def selected_indices(self): """:obj:`list` of :obj:`int`: The indices of the selected atoms in the structure.""" return self.options['selected_atoms'] @property def distances(self): """:obj:`list` of :obj:`int`: The pairs of atom indices defining requested distances.""" return [self.options['distances'][i:i+2] for i in range(0, len(self.options['distances']), 2)] @property def angles(self): """:obj:`list` of :obj:`int`: The triplets of atom indices defining requested angles.""" return [self.options['angles'][i:i+3] for i in range(0, len(self.options['angles']), 3)] @property def torsion_angles(self): """:obj:`list` of :obj:`int`: The 4-tuples of atom indices defining requested torsion angles.""" return [self.options['torsions'][i:i+4] for i in range(0, len(self.options['torsions']), 4)] @property def ccdc_logo(self): """:obj:`str`: The filename of a CCDC logo image in .png format.""" return get_ccdc_logo(self.output_directory_path)
[docs] def get_diagnostic_info(self, verbose=False): """Retrieve some diagnostic information about the API, Python version, etc. :param verbose: (:obj:`bool`) Whether to include all environment variables or just the default set :returns: (:obj:`list`) A list containing various diagnostic information. """ info = [ ['Python version', str(sys.version_info)], ['CSD Python API version', str(ccdc.__version__)], ['CSD Python API location', os.path.abspath(os.path.dirname(ccdc.__file__))], ['CSD data location', str(ccdc.io.EntryReader('CSD').file_name)], ['Current working directory', os.getcwd()], ['Script location', self.script_name], ['Script output path', self.output_directory_path], ['Base output file name', self.output_base], ['Script run by', self.program_path], ['Current database file', self.database_path], ['Current identifier', self.identifier], ['ApplicationInterface options', str(self.options)], ] # add environment variables to the list as needed if verbose: info += [[name, value] for name, value in os.environ.items()] else: info += [['PYTHONHOME', os.environ.get('PYTHONHOME', 'Not set')], ['PYTHONPATH', os.environ.get('PYTHONPATH', 'Not set')], ['LD_LIBRARY_PATH', os.environ.get('LD_LIBRARY_PATH', 'Not set')], ['DYLD_LIBRARY_PATH', os.environ.get('DYLD_LIBRARY_PATH', 'Not set')]] return info
[docs] def html_report(self, title='Report'): """Return a HTMLReport object writing to the current output html file. :param title: (:obj:`str`) The title for the HTML report. :returns: (:obj:`ccdc.utilities.HTMLReport`) The HTMLReport object.. """ return HTMLReport(file_name=self.output_html_file, report_title=title)
[docs] def write_report(self, title='Report', content=None, file_name=None): """Shorthand for writing a complete HTML report in one swoop. :param title: (:obj:`str`) The HTML content to write to the file. :param content: (:obj:`str`) The HTML content to write to the file. :param file_name: (:obj:`str`) The name of the file to write the report to. """ if file_name is None: file_name = self.output_html_file with HTMLReport(file_name=file_name, report_title=title) as report: report.write(content)
[docs] def show_script_error(self, message): """Write a user-friendly error message to the output HTML file. :param message: (:obj:`str`) The text of the error message to display. """ content = ['<p id="error_message">%s</p>' % message, html_table(data=self.get_diagnostic_info(), table_id='diagnostics')] self.write_report(title='Script Error', content=content) print('Script Error: %s' % message)
[docs] def exit_with_error(self, message, exit_code=1): """Write a user-friendly error message to the output HTML file and exit the script. :param message: (:obj:`str`) The text of the error message to display. :param exit_code: (:obj:`int`) The exit code to return from the script. """ self.show_script_error(message) exit(exit_code)
[docs] def update_progress(self, message, progress=None): """Write an update to the .progress file. This will display the status message in the Python API script dialog in Hermes and Mercury. :param message: (:obj:`str`) The progress message to display in the dialog. :param progress: (:obj:`float`) The fractional value of progress. """ if progress is not None: percentage = '[%.1f%%] ' % (progress * 100) else: percentage = '' if not os.path.exists(os.path.dirname(self.output_progress_file)): os.makedirs(os.path.dirname(self.output_progress_file)) with open(self.output_progress_file, 'w') as progress_file: print('%s%s' % (percentage, message), file=progress_file)
[docs] def open_output_folder(self): """Open the output folder in a file browser.""" import webbrowser webbrowser.open(self.output_directory_path)
[docs] def parse_commandline(self): """Parse any command-line parameters passed into the script.""" # Parse only the known args and get a list of the unknown arguments args, unknown = self.commandline_parser.parse_known_args() add_custom_arg = ''.join(['Please set up the ApplicationInterface using the ' + 'parse_commandline=False option and use the ' + 'ApplicationInterface.commandline_parser.add_argument() ' + 'function to define this as a custom argument for the script.']) # Taken from https://stackoverflow.com/questions/37367331/: Try to handle unknown # commandline arguments to keep backwards compatibility with scripts that have custom # commandline parsers already. This *will* still break for positionals or switches using # action='store_true' or action='store_false'. for arg in unknown: if arg.startswith(('-', '--')): print('[WARNING] Found unknown command line argument {arg}! '.format(arg=arg) + add_custom_arg) self.commandline_parser.add_argument(arg) try: # Re-parse the command line after we've extended the parser with unknown arguments args = self.commandline_parser.parse_args() except argparse.ArgumentError as exc: # If there are still unknown arguments we can't handle automatically, fail out. raise RuntimeError('Failed to handle unknown argument: %s! %s\nFull command: %s' % (exc, add_custom_arg, ' '.join(sys.argv))) # Populate the options dictionary with the any command-line arguments passed in. # If there is also an interface file, its contents will override any of these if present. self.options.update(vars(args)) # output_directory_path isn't actually passed in by Hermes or Mercury's script runner, # nor is it contained in the interface file so we need to derive it. # The old classes derived this from the output_base option but that isn't passed in by # the default command line arguments either so we'll derive it from the output_html_file. self.options['output_directory_path'] = os.path.dirname( os.path.abspath(self.options['output_html_file'])) # If we're running via the command line, output_base isn't passed in so we # need to set it explicitly. If there is an interface file, output_base is defined in this # so we'll be overwriting it when we read the interface file. # If no identifier has been passed, we'll just default to "output" as a prefix. self.options['output_base'] = os.path.abspath( os.path.join(self.options['output_directory_path'], self.options['identifier'] or 'output')) # Running via an application with options set via JSON file # The content of this is defined by main/guilib/guilib_qt/ScriptRunnerRunInfo.cpp if 'interface_file' in self.options and self.options['interface_file'] is not None: with open(self.options['interface_file'], 'r') as json_reader: try: # Hermes and Mercury currently still generate interface files that set the # program_executable_path property, which would raise a DeprecationWarning # whenever running a script with a standard .m2a/.h2a file. # As such, we want to ignore any DeprecationWarnings raised while reading an # interface file. with warnings.catch_warnings(): warnings.simplefilter('ignore', category=DeprecationWarning) self.options.update(json.loads(json_reader.read())['options']) except json.JSONDecodeError: raise RuntimeError('Failed to parse interface file %s!' % args.interface_file) # Parse the options dict into attributes on the ApplicationInterface class. # The old classes manually set these for the supported options, but since we want to be # able to also accept arbitrary command line arguments we'll just parse everything. for attr, value in self.options.items(): try: setattr(self, attr, value) except AttributeError: # Some of the class properties are actually @property functions # derived on the fly from the current options. We don't want to overwrite these. pass # Finally, set the various class properties used by our existing scripts. # In case self.output_base is not an absolute path, a lot of the below starts tripping up. if not os.path.isabs(self.output_base): self.output_base = os.path.abspath(self.output_base) # The output HTML file can be passed in directly by Hermes or Mercury. If this is the case, # the passed-in file name should always take precedence over the generated file name. self.output_html_file = os.path.abspath(self.options['output_html_file']) or self.output_file_name('.html') self.log_file_name = self.output_file_name('.log') self.output_csv_file = self.output_file_name('.csv') self.output_tsv_file = self.output_file_name('.tsv') self.output_progress_file = self.output_file_name('.progress') self.input_mol2_file = self.output_file_name('_input.mol2') # Mercury specific files self.output_c2m_file = self.output_file_name('.c2m') self.output_gcd_file = self.output_file_name('.gcd') self.output_morphology_file = self.output_file_name('_morphology.cif') # Hermes specific files self.input_pdb_file = self.output_file_name('_input.pdb') self.input_cif_file = self.output_file_name('_input.cif') self.output_mol2_file = self.output_file_name('.mol2') self.output_pdb_file = self.output_file_name('.pdb') self.output_cif_file = self.output_file_name('.cif') self.output_sdf_file = self.output_file_name('.sdf') self.output_gold_conf_file = self.output_file_name('.conf') if self.show_progress: self.update_progress('Parsed parameters for %s.' % self.script_name)
[docs] def output_file_name(self, suffix): """Return a file name appended with the given suffix. :param suffix: (:obj:`str`) The suffix to append to the base output file name. :returns: (:obj:`str`) The full output file name with the given suffix. """ return '%s%s' % (self.output_base, suffix)
# # The remaining segment of the ApplicationInterface class is backwards compatibility # handling for scripts making use of features, functions and properties that have been # moved or renamed versus HermesInterface and MercuryInterface. # def _deprecation_warning(self, used, replacement): """Raise a DeprecationWarning for backwards compatibility features in ApplicationInterface. :param used: (:obj:`str`) The function or property that was used :param replacement: (:obj:`str`) The function or property that should be used instead """ warnings.showwarning('%s is for backwards compatibility only! Please switch to using %s instead.' % (used, replacement), DeprecationWarning, filename=self.script_name, lineno='', file=sys.stdout) @property def file_path(self): """:obj:`str`: The file name of the database to use.""" self._deprecation_warning(used='ApplicationInterface.file_path', replacement='ApplicationInterface.database_path') return self.database_path @file_path.setter def file_path(self, path): """:obj:`str`: The file name of the database to use.""" self._deprecation_warning(used='ApplicationInterface.file_path', replacement='ApplicationInterface.database_path') self.database_path = path @property def program_executable_path(self): """:obj:`str`: The full path to the executable of the application calling the script.""" self._deprecation_warning(used='ApplicationInterface.program_executable_path', replacement='ApplicationInterface.program_path') return self.program_path @program_executable_path.setter def program_executable_path(self, value): """:obj:`str`: The full path to the executable of the application calling the script.""" self._deprecation_warning(used='ApplicationInterface.program_executable_path', replacement='ApplicationInterface.program_path') self.program_path = value @property def input_m2a_file(self): """:obj:`str`: The full path to the JSON file of options passed in by the application.""" self._deprecation_warning(used='ApplicationInterface.input_m2a_file', replacement='ApplicationInterface.interface_file') return self.interface_file @property def input_h2a_file(self): """:obj:`str`: The full path to the JSON file of options passed in by the application.""" self._deprecation_warning(used='ApplicationInterface.input_h2a_file', replacement='ApplicationInterface.interface_file') return self.interface_file
class Resources(): '''Utility class to provide location of resources''' def __init__(self): self._resources_dir = Path(__file__).resolve().parent / 'resources' maindir = os.environ.get('MAINDIR') if maindir: # dev/build env where a copy of the source is available self._mercury_distrib_dir = Path(maindir) / 'mercury' / 'distrib' self._superstar_dist_dir = Path(maindir) / 'superstar' / 'superstar_dist' else: # Env where relevant package is installed self._mercury_distrib_dir = ( Path(__file__).resolve().parent.parent.parent.parent / 'mercury' / 'distrib') self._superstar_dist_dir = ( Path(__file__).resolve().parent.parent.parent.parent / 'superstar') def _get_dir(self, dirs, name): for possible_dir in dirs: if possible_dir.is_dir(): return possible_dir raise RuntimeError(f'Could not find {name}.') def get_ccdc_solvents_dir(self): '''Used in solvate analysis''' ccdc_solvents_dirs = [ self._resources_dir / 'mercury' / 'ccdc_solvents', self._mercury_distrib_dir / 'molecular_libraries' / 'ccdc_solvents', Path(UtilitiesLib.CSDLocator().get_app_resources_path( 'mercury', 'molecular_libraries/ccdc_solvents')) ] return self._get_dir(ccdc_solvents_dirs, 'CCDC solvents directory') def get_ccdc_coformers_dir(self): '''Used in a hydrogen bond propensity report example script''' ccdc_coformers_dirs = [ self._resources_dir / 'mercury' / 'ccdc_coformers', self._mercury_distrib_dir / 'molecular_libraries' / 'ccdc_coformers', Path(UtilitiesLib.CSDLocator().get_app_resources_path( 'mercury', 'molecular_libraries/ccdc_coformers')) ] return self._get_dir(ccdc_coformers_dirs, 'CCDC coformers library') def get_functional_groups_dir(self): '''Used in hydrogen bond propensity calculations''' functional_groups_dirs = [ self._resources_dir / 'mercury' / 'functional_groups', self._mercury_distrib_dir / 'functional_groups', Path(UtilitiesLib.CSDLocator().get_app_resources_path( 'mercury', 'functional_groups')) ] return self._get_dir(functional_groups_dirs, 'functional groups directory') def get_aromatic_ring_model_dir(self): '''Used in aromatics analysis''' aromatic_ring_model_dirs = [ self._resources_dir / 'mercury' / 'aromatic_ring_model', self._mercury_distrib_dir / 'aromatic_ring_model', Path(UtilitiesLib.CSDLocator().get_app_resources_path( 'mercury', 'aromatic_ring_model')) ] return self._get_dir(aromatic_ring_model_dirs, 'aromatic ring model directory') def get_r_path(self): '''Used in hydrogen bond propensity calculations''' if sys.platform == 'win32': return self._resources_dir / 'rstatistics' / 'bin' / 'R.exe' else: return self._resources_dir / 'rstatistics' / 'bin' / 'R' def get_superstar_data_dir(self): '''Used in interaction maps analysis''' superstar_data_dirs = [ self._resources_dir / 'superstar', self._superstar_dist_dir ] return self._get_dir(superstar_data_dirs, 'superstar data root directory')