Source code for SimEx.Calculators.EMCOrientation

""":module EMCOrientation: Module that holds the EMCOrientation class.  """
##########################################################################
#                                                                        #
# Copyright (C) 2015-2018 Carsten Fortmann-Grote                         #
# Contact: Carsten Fortmann-Grote <carsten.grote@xfel.eu>                #
#                                                                        #
# This file is part of simex_platform.                                   #
# simex_platform is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by   #
# the Free Software Foundation, either version 3 of the License, or      #
# (at your option) any later version.                                    #
#                                                                        #
# simex_platform is distributed in the hope that it will be useful,      #
# but WITHOUT ANY WARRANTY; without even the implied warranty of         #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the          #
# GNU General Public License for more details.                           #
#                                                                        #
# You should have received a copy of the GNU General Public License      #
# along with this program.  If not, see <http://www.gnu.org/licenses/>.  #
#                                                                        #
##########################################################################

import h5py
import numpy
import os
import subprocess,shlex
import tempfile
import time

from SimEx.Calculators.EMCCaseGenerator import EMCCaseGenerator, _print_to_log
from SimEx.Calculators.AbstractPhotonAnalyzer import AbstractPhotonAnalyzer
from SimEx.Parameters.EMCOrientationParameters import EMCOrientationParameters
from SimEx.Utilities import IOUtilities
from SimEx.Utilities import ParallelUtilities
from SimEx.Utilities.EntityChecks import checkAndSetInstance

[docs]class EMCOrientation(AbstractPhotonAnalyzer): """ :class EMCOrientation: Representing photon data analysis for orientation of 2D diffraction patterns to a 3D diffraction volume. """ def __init__(self, parameters=None, input_path=None, output_path=None, tmp_files_path=None, run_files_path=None): """ :param parameters: Parameters for the EMC orientation calculator. :type parameters: EMCOrientationParameters instance :param input_path: Path to directory holding input data for EMC. :type input_path: str :param output_path: Path to file where output data will be stored. :type output_path: str :param tmp_files_path: Path to directory where temporary files will be stored. :type tmp_files_path: str :param run_files_path: Path to directory where run data will be stored, in particular the sparse photons file 'photons.dat' and 'detector.dat'. :type run_files_path: str :note: If 'run_files_path' is an existing directory that contains data from a previous EMC run, the current run will append to the existing data. A consistency check is performed. """ # Check parameters. if isinstance( parameters, dict ): parameters = EMCOrientationParameters( parameters_dictionary = parameters ) # Set default parameters is no parameters given. if parameters is None: parameters = checkAndSetInstance( EMCOrientationParameters, parameters, EMCOrientationParameters() ) else: parameters = checkAndSetInstance( EMCOrientationParameters, parameters, None ) # Initialize base class. super(EMCOrientation, self).__init__(parameters,input_path,output_path) self.__provided_data = ['data/data', 'data/angle', 'data/center', 'params/info', 'version',] self.__expected_data = ['/data/data', '/data/diffr', '/data/angle', '/params/geom/detectorDist', '/params/geom/pixelWidth', '/params/geom/pixelHeight', '/params/geom/mask', '/params/beam/photonEnergy', '/params/beam/photons', '/params/beam/focusArea', ] # Set run and tmp files paths. if _checkPaths( run_files_path, tmp_files_path ): self.run_files_path = run_files_path self.tmp_files_path = tmp_files_path
[docs] def expectedData(self): """ Query for the data expected by the Analyzer. """ return self.__expected_data
[docs] def providedData(self): """ Query for the data provided by the Analyzer. """ return self.__provided_data
@property def data(self): """ Query for the field data. """ return self.__data @property def run_files_path(self): return self.__run_instance_dir @run_files_path.setter def run_files_path(self, value): """ Set the path to runtime files. :param value: The path where runtime generated files shall be saved. :type value: str """ if isinstance( value, str ) or value is None: self.__run_instance_dir = value else: raise IOError( "Parameter 'run_files_path' must be a string or None." ) @property def tmp_files_path(self): return self.__tmp_out_dir @tmp_files_path.setter def tmp_files_path(self, value): """ Set the path to tmptime files. :param value: The path where tmptime generated files shall be saved. :type value: str """ if isinstance( value, str ) or value is None: self.__tmp_out_dir = value else: raise IOError( "Parameter 'tmp_files_path' must be a string or None." ) def _readH5(self): """ """ """ Private method for reading the hdf5 input and extracting the parameters and data relevant to initialize the object. """ pass # Nothing to be done since IO happens in backengine.
[docs] def saveH5(self): """ """ """ Private method to save the object to a file. :param output_path: The file where to save the object's data. :type output_path: string """ pass # No action required since output is written in backengine.
def _setupPaths(self): """ """ """ Private method do setup all needed directories for temp and persistant output. """ # If tmp dir is set to None, create a temporary directory and store path on object and return value. if self.tmp_files_path is None: tmp_out_dir = tempfile.mkdtemp(prefix='emc_out_') self.tmp_files_path = tmp_out_dir # Else, check if path exists and store on return value. else: if not os.path.isdir( self.tmp_files_path ): os.mkdir( self.tmp_files_path ) tmp_out_dir = self.tmp_files_path # Same for run dir. if self.run_files_path is None: run_instance_dir = tempfile.mkdtemp(prefix='emc_run_') self.__run_instance_dir = run_instance_dir else: if not os.path.isdir( self.run_files_path ): os.mkdir( self.run_files_path ) # If run dir already existed, this would have been caught earlier. run_instance_dir = self.run_files_path self._sparsePhotonFile = os.path.join(tmp_out_dir, "photons.dat") self._detectorFile = os.path.join(tmp_out_dir, "detector.dat") self._outputLog = os.path.join(run_instance_dir, "EMC_extended.log") self._avgPatternFile = os.path.join(tmp_out_dir, "avg_photon.h5") self._lockFile = os.path.join(tmp_out_dir, "write.lock") self._run_instance_dir = run_instance_dir self._tmp_out_dir = tmp_out_dir return run_instance_dir,tmp_out_dir
[docs] def computeNTasks(self): resources=ParallelUtilities.getParallelResourceInfo() ncores=resources['NCores'] nnodes=resources['NNodes'] if self.parameters.cpus_per_task=="MAX": np=nnodes else: np=max(int(ncores/int(self.parameters.cpus_per_task)),1) return np
[docs] def backengine(self): """ Starts EMC simulations in parallel in a subprocess """ # Set paths. self._setupPaths() fname = IOUtilities.getTmpFileName() self.dumpToFile(fname) # collect MPI arguments if self.parameters.forced_mpi_command=="": np=self.computeNTasks() #mpicommand=ParallelUtilities.prepareMPICommandArguments(np) mpicommand="" else: mpicommand=self.parameters.forced_mpi_command # collect program arguments command_sequence = ['python', __file__, fname, ] # put MPI and program arguments together args = shlex.split(mpicommand) + command_sequence print(' '.join(args)) if 'SIMEX_VERBOSE' in os.environ: if 'MPI' in os.environ['SIMEX_VERBOSE']: print(("EMCOrientation backengine mpicommand: "+mpicommand)) print(' '.join(args)) if 'PYTHON' in os.environ['SIMEX_VERBOSE']: import platform print("Running python version %s." % (platform.python_version())) # Run the backengine command. proc = subprocess.Popen(args) proc.wait() os.remove(fname) # Return the return code from the backengine. return proc.returncode
def _need_prepare_photon_files(self,thisProcess): ############################################################### # A lock file is created if subprocess is converting sparse photons # so that another subprocess does not clobber an ongoing conversion. # Make photons.dat and detector.dat if they don't exist. # Create time-tagged output subdirectory for intermediate states. ############################################################### while (os.path.isfile(self._lockFile)): # Sleep in 30 s intervals, then check if sparse photon lock has been released. sleep_duration = 30 msg = "Lock file in " + self._tmp_out_dir + ". " msg += "Photons.dat likely being written to tmpDir by another programm. " msg += "Sleeping this programm for %d s." % sleep_duration if thisProcess == 0: _print_to_log(msg, log_file=self._outputLog) time.sleep(sleep_duration) return not (os.path.isfile(self._sparsePhotonFile) and os.path.isfile(self._detectorFile)) def _join_photon_files(self,numProcesses): outf = open(self._sparsePhotonFile, "w") for n in range(0,numProcesses): fname=self._sparsePhotonFile + "_"+str(n) with open(fname) as infile: for line in infile: outf.write(line) os.system("rm %s " % fname) outf.close() outh5 = h5py.File(self._avgPatternFile, 'w') for n in range(0,numProcesses): fname=self._avgPatternFile + "_"+str(n) f = h5py.File(fname, 'r') if n == 0: mask = f["mask"].value avg = f["average"].value else: avg += f["average"].value os.system("rm %s " % fname) outh5.create_dataset("average", data=avg, compression="gzip", compression_opts=9) outh5.create_dataset("mask", data=mask, compression="gzip", compression_opts=9) outh5.close() def _prepare_photon_files(self,comm=None): #thisProcess = comm.rank thisProcess = 0 #numProcesses = comm.size numProcesses = 1 # Prepare for reading input. if os.path.isdir(self.input_path): photonFiles = [ os.path.join(self.input_path, pf) for pf in os.listdir( self.input_path ) ] photonFiles.sort() elif os.path.isfile(self.input_path): photonFiles = [self.input_path] else: raise IOError( " Input file %s not found." % self.input_path ) gen = EMCCaseGenerator(self._outputLog) gen.readGeomFromPhotonData(photonFiles[0],thisProcess) if thisProcess == 0: os.system("touch %s" % self._lockFile) gen.writeDetectorToFile(filename=self._detectorFile) gen.writeSparsePhotonFile(photonFiles, self._sparsePhotonFile+ "_"+str(thisProcess), self._avgPatternFile + "_"+str(thisProcess), thisProcess,numProcesses) #comm.Barrier() if thisProcess == 0: self._join_photon_files(numProcesses) #comm.Barrier() if thisProcess == 0: _print_to_log(msg="Sparse photons file created. Deleting lock file now", log_file=self._outputLog) # _print_to_log(msg="Detector parameters: %d %d %d"%(gen.qmax, len(gen.detector), len(gen.beamstop)), log_file=self._outputLog) os.system("rm %s " % self._lockFile) def _run(self): """ """ """ Private method to run the Expand-Maximize-Compress (EMC) algorithm. :return: 0 if EMC returns successfully, 1 if not. :note: Copied and adapted from the main routine in s2e_recon/EMC/runEMC.py """ #from mpi4py import MPI #rc.finalize = False # MPI info #comm = MPI.COMM_WORLD #thisProcess = comm.Get_rank() #if self._need_prepare_photon_files(thisProcess): if True: # if thisProcess == 0: if True: msg = "Photons.dat and detector.dat not found in " + self._tmp_out_dir + ". Will create them now..." _print_to_log(msg=msg, log_file=self._outputLog) print(msg) self._prepare_photon_files(comm=None) else: if thisProcess == 0: msg = "Photons.dat and detector.dat already exists in " + self._tmp_out_dir + "." _print_to_log(msg=msg, log_file=self._outputLog) print(msg) # the rest is non-parallel (yet) # if thisProcess != 0: # MPI.Finalize() # return 0 ############################################################### # Instantiate a reconstruction object ############################################################### # If parameters are given, map them to command line arguments. initial_number_of_quaternions = self.parameters.initial_number_of_quaternions max_number_of_quaternions = self.parameters.max_number_of_quaternions max_number_of_iterations = self.parameters.max_number_of_iterations min_error = self.parameters.min_error beamstop = self.parameters.beamstop detailed_output = self.parameters.detailed_output quaternion_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'CalculatorUtilities', 'quaternions') gen = EMCCaseGenerator(self._outputLog) gen.readGeomFromDetectorFile(self._detectorFile) _print_to_log(msg="Detector parameters: %d %d %d"%(gen.qmax, len(gen.detector),len(gen.beamstop)), log_file=self._outputLog) if not (os.path.isfile(os.path.join(self._run_instance_dir,"detector.dat"))): os.symlink(os.path.join(self._tmp_out_dir,"detector.dat"), os.path.join(self._run_instance_dir,"detector.dat")) if not (os.path.isfile(os.path.join(self._run_instance_dir,"photons.dat"))): os.symlink(os.path.join(self._tmp_out_dir,"photons.dat"), os.path.join(self._run_instance_dir,"photons.dat")) ############################################################### # Create dummy destination h5 for intermediate output from EMC ############################################################### cwd = os.path.abspath(os.curdir) os.chdir(self._run_instance_dir) #Output file is kept in tmpOutDir. outFile = self.output_path offset_iter = 0 if not (os.path.isfile(outFile)): f = h5py.File(outFile, "w") f.create_group("data") f.create_group("misc") f.create_group("info") f.create_group("params") f.create_group("history") gg = f["history"] gg.create_group("intensities") gg.create_group("error") gg.create_group("angle") gg.create_group("mutual_info") gg.create_group("quaternion") gg.create_group("time") c = numpy.array([gen.qmax, gen.qmax, gen.qmax]) f.create_dataset("data/center", data=c) f.create_dataset("misc/qmax", data=gen.qmax) f.create_dataset("misc/detector", data=gen.detector) f.create_dataset("misc/beamstop", data=gen.beamstop) f.create_dataset("version", data=h5py.version.hdf5_version) f.close() else: f = h5py.File(outFile, 'r') offset_iter = len(list(f["/history/intensities"].keys())) f.close() msg = "Output will be appended to the results of %d iterations before this."%offset_iter _print_to_log(msg=msg, log_file=self._outputLog) ############################################################### # Iterate EMC ############################################################### intensL = 2*gen.qmax + 1 iter_num = 1 currQuat = initial_number_of_quaternions try: while(currQuat <= max_number_of_quaternions): if os.path.isfile(os.path.join(self._run_instance_dir,"quaternion.dat")): os.remove(os.path.join(self._run_instance_dir,"quaternion.dat")) os.symlink(os.path.join(quaternion_dir ,"quaternion"+str(currQuat)+".dat"), os.path.join(self._run_instance_dir,"quaternion.dat")) diff = 1. while (iter_num <= max_number_of_iterations): if (iter_num > 1 and diff < min_error): _print_to_log(msg="Error %0.3e is smaller than threshold %0.3e. Going to next quaternion."%(diff, min_error), log_file=self._outputLog) break _print_to_log("Beginning iteration %d, with quaternion %d %s"%(iter_num+offset_iter, currQuat, "."*20), log_file=self._outputLog) # Here is the actual timed EMC iteration, which calls the EMC.c code. start_time = time.clock() #command_sequence = ['EMC.x', '1'] command_sequence = ['EMC', '1'] process_handle = subprocess.Popen(command_sequence) process_handle.wait() time_taken = time.clock() - start_time _print_to_log("Took %lf s"%(time_taken), log_file=self._outputLog) # Read intermediate output of EMC.c and stuff them into a h5 file # Delete these EMC.c-generated intermediate files afterwards, # except finish_intensity.dat --> start_intensity.dat for next iteration. gen.intensities = (numpy.fromfile("finish_intensity.dat", sep=" ")).reshape(intensL, intensL, intensL) data_info = numpy.fromfile("mutual_info.dat", sep=" ") most_likely_orientations = numpy.fromfile("most_likely_orientations.dat", sep=" ") if(os.path.isfile("start_intensity.dat")): intens1 = numpy.fromfile("start_intensity.dat", sep=" ") diff = numpy.sqrt(numpy.mean(numpy.abs(gen.intensities.flatten()-intens1)**2)) else: diff = 2.*min_error f = h5py.File(outFile, "a") gg = f["history/intensities"] if detailed_output: gg.create_dataset("%04d"%(iter_num + offset_iter), data=gen.intensities, compression="gzip", compression_opts=9) try: f.create_dataset("data/data", data=gen.intensities, compression="gzip", compression_opts=9) except: temp = f["data/data"] temp[...] = gen.intensities gg = f["history/error"] gg.create_dataset("%04d"%(iter_num + offset_iter), data=diff) _print_to_log("rms change in intensities %e"%(diff), log_file=self._outputLog) gg = f["history/angle"] gg.create_dataset("%04d"%(iter_num + offset_iter), data=most_likely_orientations, compression="gzip", compression_opts=9) try: f.create_dataset("data/angle", data=most_likely_orientations, compression="gzip", compression_opts=9) except: temp = f["data/angle"] temp[...] = most_likely_orientations gg = f["history/mutual_info"] gg.create_dataset("%04d"%(iter_num + offset_iter), data=data_info) gg = f["history/quaternion"] gg.create_dataset("%04d"%(iter_num + offset_iter), data=currQuat) gg = f["history/time"] gg.create_dataset("%04d"%(iter_num + offset_iter), data=time_taken) f.close() f = open(self._outputLog, "a") f.write("%e\t %lf\n"%(diff, time_taken)) f.close() os.system("cp finish_intensity.dat start_intensity.dat") _print_to_log("Iteration number %d completed"%(iter_num), log_file=self._outputLog) iter_num += 1 currQuat += 1 _print_to_log("All EMC iterations completed", log_file=self._outputLog) os.chdir(cwd) #MPI.Finalize() return 0 except: os.chdir(cwd) #MPI.Finalize() return 1
def _checkPaths(run_files_path, tmp_files_path): """ """ """ Private (hidden) utility to check validity of paths given to constructor. """ if not all([ (isinstance( path, str ) or path is None) for path in [run_files_path, tmp_files_path] ]): raise IOError( "Paths must be strings.") if run_files_path is not None: if os.path.isdir( run_files_path ): raise IOError( "Run files path already exists, cowardly refusing to overwrite.") return True if __name__ == '__main__': EMCOrientation.runFromCLI()