Source code for bornprofiler.analysis

# BornProfiler -- analysis classes, typically used in scripts
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding: utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# BornProfiler --- A package to calculate electrostatic free energies with APBS
# Written by Kaihsu Tai, Lennard van der Feltz, and Oliver Beckstein
# Released under the GNU Public Licence, version 3
#
# Copyright (c) 2008 Kaihsu Tai
# Copyright (c) 2011-2013 Oliver Beckstein
# Copyright (c) 2013-2015 Oliver Beckstein, Lennard van der Feltz

"""
Analysis of calculations --- :mod:`bornprofiler.analysis`
=========================================================

Functions and classes to process the output from the APBS
calculations.

"""

from __future__ import with_statement

import os.path
import glob
import math
import numpy
import bornprofiler
from bornprofiler.core import BPbase
import bornprofiler.bpio as bpio
import logging
logger = logging.getLogger('bornprofiler.analysis')

[docs]def get_files(runfile, basedir=os.path.curdir): """Read *runfile* and return dict with input files and names.""" # convenience function --- one could also just read the cfg file from bornprofiler.bpio import RunParameters try: p = RunParameters(runfile,False) samplepoints = p.get_bornprofile_kwargs('points') points = bpio.readPoints(samplepoints) numpoints = points.shape[0] oompoints = int(math.ceil(math.log10(numpoints))) if oompoints < 4: oompoints = 4 winnum = '[0-9]'*oompoints fileglob = os.path.join(basedir, p.get_bornprofile_kwargs('name'), 'w{winnum}'.format(winnum=winnum), 'job*.out') jobName = p.get_bornprofile_kwargs('name') ionName = p.get_bornprofile_kwargs('ion') pqr = p.get_bornprofile_kwargs('pqr') except: logger.fatal("Cannot obtain information about the sample points and " "directory from the run parameter file %r.", runfile) raise return {'samplepoints': samplepoints, 'datafiles': glob.glob(fileglob), # gets all individual windows! 'jobName': jobName, 'ionName': ionName, 'pqr': pqr, }
[docs]class Analyzer(BPbase): """Base class for analysis 1. read points 2. read energy for each point 3. write data file(s) 4. optional: plot Developer note: :attr:`exporters` contains the logic for exporting to various other formats besides plain column/text such as PDB. For additional exporters, add the entries in the local __init__ *after* calling ``super(name, self).__init__(*args,**kwargs)``. The Born energy W at each point (x,y,z) is stored in :attr:`data`, a (4,N) numpy array:: X,Y,Z,W = data """ def __init__(self, *args, **kwargs): self.pointsName = args[0] self.datafiles = args[1:] self.jobName = kwargs.pop('jobName', 'bornprofile') create = kwargs.pop("create", True) if create: self.readPoints() if self.points.shape[0] != len(self.datafiles): msg = "Number of sampled points (%d) does not match the number "\ "of data files (%d). Some windows are MISSING and will be skipped." % \ (self.points.shape[0], len(self.datafiles)) logger.warn(msg) missing = self.find_missing_windows() for num,x,y,z,path in missing: logger.warn("missing: %4d (%8.3f,%8.3f,%8.3f) taskid=%d", num, x, y, z, self.get_taskid(num)) self.accumulate() else: self.read() # other classes can add their exporters in __init__ self.exporters = {'pdb': {'ext': '.pdb', 'exporter': self._export_pdb}, } super(Analyzer, self).__init__(**kwargs)
[docs] def read(self, filename=None): """Read datafile *filename* (format: x,y,z,W or z,W). The data are stored in :attr:`data`, a (4,N) array. If only z coordinates are provided then the x and y columns(data[0] and data[1]) are set to 0. """ if filename is None: filename = self.datafile("welec") data = numpy.loadtxt(filename).T if data.shape[0] == 2: logger.warn("Reading old-style data file (z,W) %(filename)r.", vars()) self.data = numpy.zeros((4,data.shape[-1]), dtype=data.dtype) self.data[[-2,-1]] = data else: self.data = data logger.info("Read Born PMF from %(filename)r.", vars())
def write(self): outName = self.datafile("welec") with open(outName, "w") as outFile: outFile.write("# x/A y/A z/A W/(kJ/mol)\n") for x,y,z,E in self.data.T: outFile.write("%(x)8.3f %(y)8.3f %(z)8.3f %(E)8.3e\n" % vars()) logger.info("Wrote Born PMF to %(outName)r.", vars())
[docs] def accumulate(self): """Read the energy from each datafile and store with the coordinates. Populates :attr:`AnalyzeElec.data`, a (4,N) array where N is the number of windows. """ data = [] for num, point in enumerate(self.points): outName = self.outfilename(num) lines = "" # find file name in list of input files outPath = None for path in self.datafiles: if path.endswith(outName): outPath = path break if outPath is None: logger.warn("Sample point %d %r: Could not find file %s." % (num, point, outName)) continue with open(outPath) as outFile: for line in outFile: # maybe use regex o make this more robust... if (line[0:18] == " Local net energy"): data.append(numpy.concatenate((point, [float(line.split()[6])]))) break print "[%5.1f%%] Read point %6d/%6d %r \r" % \ (100. * float(num+1)/len(self.points), num, len(self.points)-1, outPath), print self.data = numpy.array(data).T
[docs] def find_missing_windows(self): """Return a list of job numbers that have no data corresponding to a point.""" missing = [] for num, point in enumerate(self.points): outName = self.outfilename(num) # find file name in list of input files outPath = None for path in self.datafiles: # could make self.datafiles a hash for faster search if path.endswith(outName): outPath = path break # good, found it if outPath is None: missing.append((num, point[0], point[1], point[2], outName)) return numpy.rec.fromrecords(missing, names="num,x,y,z,path")
[docs] def export(self, filename=None, format="dx", **kwargs): """Export data to different file format. The format is deduced from the filename suffix or *format*. *kwargs* are set according to the exporter. dx histogram on a grid; must provide kwarg *delta* for the grid spacing. pdb write PDB file with an ion for each sample point and the energy as the B-factor. The kwarg *ion* can be used to set the name/resName in the file (ION is the default). """ if filename is None: filename = self.datafile("welec", ext="."+format) else: format = os.path.splitext(filename)[1][1:] if not format in self.exporters: raise ValueError("%r format is unsupported, only %r work" % (format, self.exporters.keys())) return self.exporters[format]['exporter'](filename, **kwargs)
def _export_pdb(self, filename, **kwargs): """Write datapoints to pdb file with energy in the B-factor field. :Keywords: *ion* name of the ion (name/resSeq in pdb) """ # http://www.wwpdb.org/documentation/format32/sect9.html fmt = {'ATOM': "ATOM %(serial)5d %(name)-4s%(altLoc)1s%(resName)-3s %(chainID)1s%(resSeq)4d%(iCode)1s %(x)8.3f%(y)8.3f%(z)8.3f%(occupancy)6.2f%(tempFactor)6.2f\n", 'REMARK': "REMARK %s\n", 'TITLE': "TITLE %s\n", 'CRYST1': "CRYST1%9.3f%9.3f%9.3f%7.2f%7.2f%7.2f %-11s%4d\n", } name = resName = kwargs.pop('ion', 'ION') or 'ION' # None --> ION chainID = "Z" iCode = altLoc = " " occupancy = 1.0 with open(filename, 'w') as pdb: pdb.write(fmt['TITLE'] % "Poisson-Boltzmann electrostatic free energy") pdb.write(fmt['REMARK'] % "W(x,y,z) in kJ/mol") pdb.write(fmt['REMARK'] % "Written by apbs-bornprofile-analyze3D.py") for serial,(x,y,z,W) in enumerate(self.data.T): serial += 1 resSeq = serial tempFactor = W pdb.write(fmt['ATOM'] % vars()) logger.info("Wrote ion positions for %(name)s to pdb file %(filename)r", vars()) def __repr__(self): return "<%s jobName=%r points=%r, %d data files>" % \ (self.__class__.__name__, self.jobName, self.pointsName, len(self.datafiles))
[docs]class AnalyzeElec(Analyzer): "analyze APBS energy profiling results"
[docs] def plot(self, filename=None, plotter='matplotlib', **kwargs): """Plot Born profile. plot([filename[,plotter[,kwargs ...]]]) :Keywords: *filename* name of image file to save the plot in; with *plotter* = 'Gnuplot' only eps files are supported (I think...) *kwargs* other keyword arguments that are passed on to :func:`pylab.plot` """ import matplotlib matplotlib.use('AGG') import matplotlib.pyplot as plt if filename is None: plotName = self.datafile("welec",".pdf") else: plotName = filename kwargs.setdefault('color', 'black') kwargs.setdefault('linewidth', 2) z,W = self.data[-2], self.data[-1] # should work for (4,N) and (2,N) data plt.plot(z,W, **kwargs) plt.xlabel(r'$z$ in nm') plt.ylabel(r'$W$ in kJ$\cdot$mol$^{-1}$') plt.savefig(plotName) logger.info("Plotted graph W(z) %(plotName)r.", vars()) return plotName
def __repr__(self): return "<%s jobName=%r points=%r, %d data files>" % \ (self.__class__.__name__, self.jobName, self.pointsName, len(self.datafiles))
class MultiPlot1D(object): pass
[docs]class AnalyzeElec3D(Analyzer): """analyze APBS energy profiling results With create=True (default), reads position data from samplepoints file and energies from APBS output files. With create=False, reads positions and energies from a previous output file. """ def __init__(self, *args, **kwargs): super(AnalyzeElec3D, self).__init__(*args, **kwargs) self.exporters['dx']= {'ext': '.dx', 'exporter': self._export_dx}
[docs] def ranges(self, padding=0): """Returns the range of values in each dimension. :Returns: Array *r* of shape (2,3): r[0] contains the smallest and r[1] the largest values. """ def minmax(dim): return numpy.array([self.data[dim,:].min(), self.data[dim,:].max()]) ranges = numpy.array([minmax(dim) for dim in (0,1,2)]).T ranges[0] -= padding ranges[1] += padding return ranges
[docs] def histogramdd(self, delta, fillfac=5): """Histogram PMF on a regular grid. The spacing *delta* must be the same or larger than used in Hollow; to be on the safe side, just use the value of *grid_spacing* (e.g. 2.0 for 2 A). The histogram grid is chosen large enough to encompass all data points. If *delta* is bigger than *grid_spacing* or points are not on a regular grid the values are averaged over each bin. Points without data are filled with fillfac * max(histogram). :Returns: histogram, edges (same as :func:`numpy.histogramdd`) """ ranges = self.ranges(padding=delta/2.) bins = ((ranges[1] - ranges[0])/float(delta)).astype(int) h,e = numpy.histogramdd(self.data[:3].T, range=ranges.T, bins=bins, weights=self.data[3]) N,e = numpy.histogramdd(self.data[:3].T, range=ranges.T, bins=bins) h[N>0] /= N[N>0] # average PMF h[N==0] = fillfac*h.max() return h,e
[docs] def Grid(self, delta, **kwargs): """Package the PMF as a :class:`gridData.Grid` object. *delta* should be the original spacing of the points in angstroem. With a *resample_factor*, the data are interpolated on a new grid with *resample_factor* times as many bins as in the original histogram (See :meth:`gridData.Grid.resample_factor`). *interpolation_order* sets the interpolation order for the resampling procedure. .. Warning:: Interpolating can lead to artifacts in the 3D PMF. If in doubt, **do not resample**. """ from gridData import Grid resample_factor = kwargs.pop('resample_factor', None) kwargs.setdefault('interpolation_spline_order', 3) g = Grid(*self.histogramdd(delta), **kwargs) if not resample_factor: return g return g.resample_factor(resample_factor)
def _export_dx(self, filename, delta=2.0, **kwargs): """Write data to dx file *filename*. export(delta[,filename[,kwargs]]) Requires grid spacing delta of original data points. *filename* is automatically chosen if not supplied. Other *kwargs* are passed on to :meth:`AnalElec.Grid`. """ g = self.Grid(delta, **kwargs) g.export(filename, file_format="dx") logger.info("Wrote Born PMF to dx file %(filename)r with spacing %(delta)g A.", vars())