""":module XCSITPhotonDetector: Hosts the XCSITPhotonDetector class."""
##########################################################################
# #
# Copyright (C) 2015-2017 Jan-Philipp Burchert #
# Copyright (C) 2015-2018 Carsten Fortmann-Grote #
# Contact: Carsten Fortmann-Grote <carsten.grote@xfel.eu> #
# #
# This file is part of simex_platform. #
# simex_platform is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# simex_platform is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/>. #
# #
##########################################################################
try:
import libpy_detector_interface as lpdi
except ImportError:
print("\nWARNING: Importing libpy_detector_interface failed. This is most probably due to XCSIT and/or Geant4 not being installed properly on this system. The XCSITPhotonDetector class can still be instantiated, but the backengine() method will throw an exception.\n")
except:
raise
import h5py
import os
import numpy
import sys
from SimEx.Calculators.AbstractPhotonDetector import AbstractPhotonDetector
[docs]class XCSITPhotonDetector(AbstractPhotonDetector):
"""
:class XCSITPhotonDetector: Wraps detector simulations with XCSIT.
"""
# Constructor.
def __init__(self,parameters=None,
input_path=None,
output_path=None):
"""
:param parameters: Parameters of the calulator such as the type of
detector
:type parameters: XCSITPhotonDetectorParameters
:param input_path: Path to the hdf5 file holding the input data.
:type input_path: str
:param output_path: Path pointing to the path for output
:type output_path: str
"""
# Failure if no argument is given
if any([parameters is None,
input_path is None]):
raise AttributeError("parameters and input_path are essential to"+
" to init an instance of this class")
# Init base class
super(XCSITPhotonDetector,self).__init__(parameters,input_path,output_path)
# Use the setters to check the input and assign it to the attributes
# the attributes are overwritten in the base class
self.parameters = parameters
self.input_path = input_path
self.output_path = output_path
# Define the input and output structure of the hdf5 file
self.__expected_data = ['/data/data', # <- poissonized (int)
'/data/diffr', # <- intensities (float)
'/data/angle',
'/history/parent/detail',
'/history/parent/parent',
'/info/package_version',
'/info/contact',
'/info/data_description',
'/info/method_description',
'/params/geom/detectorDist',
'/params/geom/pixelWidth',
'/params/geom/pixelHeight',
'/params/geom/mask',
'/params/beam/photonEnergy',
'/params/beam/photons',
'/params/beam/focusArea',
'/params/info',
]
self.__provided_data = ['/data/data', # <- store charge matrix here, in ADU
'/data/photons' # <- store photon/pixel map here, no directions
'/data/interactions',
'/history/parent/',
'/info/package_version',
'/params/geom/detectorDist',
'/params/geom/pixelWidth',
'/params/geom/pixelHeight',
'/params/beam/photonEnergy',
]
@AbstractPhotonDetector.input_path.setter
def input_path(self,value):
# Check the value type
if value is None:
raise ValueError("The parameter 'input_path' must be specified.")
if not isinstance(value,str):
raise TypeError("As input_path attribute only instances of str " +
"are allowed.")
if not value:
raise IOError("You must not enter an empty string as output path.")
# treat the input path:
# Cases
# 1) given is an absolute path
# a) ends with <parent path>/<filename>.h5-> complete file path
# b) ends with <parent path>/<filename> -> add .h5
# c) ends with <parent path>/<folder> -> search for an .h5 file in the
# parentfolder
# 2) given is an relative path to the current directory
# cases like in 1) a-c just with the cwd added previously
# TODO: loop over multiple files
# check if absolute path
# Linux: starts with /
# Windows: starts with \
# Modify value accodingly to create an absolute path and normalize it
if not os.path.isabs(value):
value = os.path.abspath(value)
# Check if the parent folder exists since it has to exists in an correct
# absolute path and check if it is a directory and not a file
(head,tail) = os.path.split(value)
if not os.path.isdir(head):
raise IOError("Parent path is not valid. Please check again: " +
str(head) +".")
# Check cases a)-c)
# Multiple input files are stored in one specifed input folder
in_pathes = []
if value.endswith(".h5") and os.path.isfile(value):
### syntax??
# All is fine
in_pathes.append(value)
elif os.path.isfile(value + ".h5"):
# filename without ending
value += ".h5"
in_pathes.append(value)
elif os.path.isdir(value):
# directory containing many input files
# directory can contain multiple files and multiple .h5 files
for i in os.listdir(value):
if i.endswith(".h5"):
in_pathes.append(os.path.join(value,i))
else:
raise IOError("Last path element causes error: " + str(tail) +
"Please check absolute path again: " + str(value) + ".")
# Check if the content are all strings
if not all(isinstance(i,str) for i in in_pathes):
raise ValueError("One of the pathes is not a python str" +
str(in_pathes) + ".")
# Assign to the variable
self.__input_path = in_pathes
@AbstractPhotonDetector.output_path.setter
def output_path(self,value):
# Check the value type
### Default handling: detector_out.h5 -> in case of many input files, put all detector images into one detector.h5, replicate structure of diffr.h5
if value is None:
value=os.getcwd()
if not isinstance(value,str):
raise TypeError("As output_path attribute only instances of str " +
"are allowed.")
if not value:
raise IOError("You must not enter an empty string as output path.")
# Situation: output_path
# 1) existing path, filename can exist but doesn't need to
# i) absolut path:
# a) ends with <parent>/<file>.h5
# b) ends with <parent>/<file>
# c) ends with <parent> -> create a file
# ii) a relative path such as 1) i)
# 2) a partly or non existing path
# i) absolute path:
# a) create if ends with <file>.h5
# b) ends with <parent> -> create path and add another
# <file>.h5
# ii) relative path:
# same as 2) i)
# check if absolute path
# Linux: starts with /
# Windows: starts with \
# It does not matter if the path exists or not, only the first character
# is necessary to know
# Modify value accodingly to create an absolute path and normalize it
if not os.path.isabs(value):
value = os.path.abspath(value)
# Test the cases:
(head,tail) = os.path.split(value)
out_name = "detector_out.h5"
if os.path.isdir(value):
# input is existing path to directory where to put in the output
# file 1ic) and 1iic)
# Create a file and join
value = os.path.join(value,out_name)
value = os.path.normpath(value)
elif os.path.isdir(head):
# input is absolute path with file name 1ia 1ib 1iia 1iib
# isdir checks also for existance
if value.endswith(".h5"):
# complete path, create the file there
pass
else:
# last path element is a filename without ending
value += ".h5"
elif value.endswith(".h5"):
# a path including non existant folders but a specified outputfile
# name -> nothing to do since the hdf5 will create necessary pathes
if not os.path.isdir(value):
os.makedirs(os.path.dirname(value))
else:
# a path with non existant folders to the parent folder of a not
# specified output file
# Create an outpuf file
value = os.path.join(value,out_name)
value = os.path.normpath(value)
if not os.path.isdir(value):
os.makedirs(os.path.dirname(value))
# set the value to the attribute
self._AbstractBaseCalculator__output_path=value
[docs] def expectedData(self):
return self.__expectedData
[docs] def providedData(self):
return self.__providedData
[docs] def getChargeData(self):
return self.__charge_data
[docs] def getInteractionData(self):
return self.__ia_data
[docs] def getPhotonData(self):
return self.__photon_data
def __createXCSITInteractions(self):
self.__ia_data = [lpdi.InteractionData() for i in self.parameters.patterns]
def __createXCSITChargeMatrix(self):
self.__charge_data = [lpdi.ChargeMatrix() for i in self.parameters.patterns]
# Subengine to calulate the particle simulation: The interaction of the
# photons with the detector of choice
def __backengineIA(self):
"""
Run the particle simulation
"""
is_successful = True
# Check containers
if self.__photon_data is None:
raise RuntimeError("Photon container has not been initialized yet.")
if self.__ia_data is None:
raise RuntimeError("Interaction container has not been initialized yet.")
for i,pd in enumerate(self.__photon_data):
# Run the simulation, catch everything that might happen and report
try:
param = self.parameters
ps = lpdi.ParticleSim()
ps.setInput(pd)
ps.setOutput(self.__ia_data[i])
ps.initialization(param.detector_type)
ps.runSimulation()
except:
err = sys.exc_info()
print("Photon-Detector interaction error:")
print(("Error type: " + str(err[0])))
print(("Error value: " + str(err[1])))
print(("Error traceback: " + str(err[2])))
is_successful = False
print("\nSUMMARY:\nDetected {0:d} interactions from {1:d} photons in pattern {2:s}.\n".format(self.__ia_data[i].size(), pd.size(), self.__pattern_ids[i]))
return is_successful
def __backengineCP(self):
"""
Run the charge simulation
"""
is_successful = True
# Check containers
if self.__ia_data is None:
raise RuntimeError("interaction container has not been initialized"+
" yet.")
# Run the simulation
for i, ia in enumerate(self.__ia_data):
try:
para = self.parameters
cs = lpdi.ChargeSim()
cs.setInput(ia)
cs.setOutput(self.__charge_data[i])
cs.setComponents(para.plasma_search_flag,
para.point_simulation_method,
para.plasma_simulation_flag,
para.detector_type
)
cs.runSimulation()
# Necessary due to the definition of XChargeData.hh
except:
err = sys.exc_info()
print("Charge propagation error:")
print(("Error type: " + str(err[0])))
print(("Error value: " + str(err[1])))
print(("Error traceback: " + str(err[2])))
is_successful = False
return is_successful
# Count how many pixels have charge
counter=0
for x in list(range(self.__charge_data[i].width())):
for y in list(range(self.__charge_data[i].height())):
entry=self.__charge_data[i].getEntry(x,y)
if(entry.getCharge() != 0):
counter+=1
print(("\nSUMMARY:\nFound " + str(counter) + " signals in the detector of size " +
str(self.__charge_data[i].height()) + "x" + str(self.__charge_data[i].width()) +
" for " + str(self.__ia_data[i].size()) + " interactions and " +
str(self.__photon_data[i].size()) + " photons"))
# Return if everything went well
return is_successful
[docs] def backengine(self):
"""
Executes the simulation of the particle and charge simulation.
"""
print("\nSimulating photon-detector interaction.")
# Create interaction container
self.__createXCSITInteractions()
# Run the particle simulation.
if not self.__backengineIA():
raise RuntimeError("Particle simulation caused errors, " +
"interrupting execution.")
else:
print("Interaction simulation of the detector is finished.")
# Create interaction container
self.__createXCSITChargeMatrix()
# Run the charge simulation.
if not self.__backengineCP():
raise RuntimeError("Charge simulation caused errors, " +
"interrupting execution.")
else:
print("Charge propagation simulation in the detector is finished.")
return 0
def _readH5(self):
"""
Reads the hdf5 file and create the storage container for the photons
according to that data
"""
# The __input_path is a non relative path to an existing file
if not os.path.exists(self.__input_path[0]):
raise RuntimeError("Input file " + str(self.__input_path[0]) +
" does not exists")
# Raise exception if there is no input file saved
if len(self.__input_path[0]) == 0 or not os.path.isfile(self.__input_path[0]):
raise RuntimeError("Input path does not lead to a file. Input path"+
" is: " + str(self.__input_path[0]))
infile = self.__input_path[0]
# Reflect the current handeling of multiple files
if len(self.__input_path) > 1:
raise RuntimeError("Currently there should be only one input file.")
# Open the file to read from
with h5py.File(infile,"r") as h5_infile:
keys = list(h5_infile["/data"].keys())
keys.sort()
matrix = h5_infile["/data/"+ keys[0] + "/diffr"].value
pattern_ids = self.parameters.patterns
number_of_patterns = len(pattern_ids)
# Get the array where each pixel contains a number of photons
# Explaination:
# /data/.../data are poissonized patterns
# /data/.../diffr are the intensities
if type(self.parameters.patterns[0]) is int:
pattern_ids = [keys[i] for i in pattern_ids]
self.__pattern_ids = pattern_ids
photons = numpy.empty(shape=(number_of_patterns,
matrix.shape[0],
matrix.shape[1]
)
)
for i,pid in enumerate(pattern_ids):
#photons[i,:,:] = h5_infile["/data/{0:s}/diffr".format(pid)].value
photons[i,:,:] = h5_infile["/data/{0:s}/data".format(pid)].value
#photons = numpy.floor(photons)
photons = photons.astype(int)
x_num, y_num = photons.shape[1:]
print(("Size of input matrix: " + str(x_num) + "x" + str(y_num)))
# Parameters of the matrix
x_pixel = h5_infile["/params/geom/pixelWidth"].value
print("pixel width = {0:e} m.".format(x_pixel))
y_pixel = h5_infile["/params/geom/pixelHeight"].value
print("pixel height= {0:e} m.".format(y_pixel))
center_energy = h5_infile["/params/beam/photonEnergy"].value # missing profile
print("central beam energy = {0:e} eV".format(center_energy))
detector_dist = h5_infile["/params/geom/detectorDist"].value
print("Detector distance = {0:e} m.".format(detector_dist))
# Create the photon instance
self.__photon_data = [lpdi.PhotonData() for i in range(photons.shape[0])]
# Assumptions
# - All the photon originate from the center
# - photon energy is everywhere the same in the beam
print("Creating the photons")
for ipd, pd in enumerate(self.__photon_data):
for i in range(x_num):
for j in range(y_num):
number_of_photons_in_pixel = photons[ipd,i,j]
if number_of_photons_in_pixel < 1:
continue
# Transfer from python to cartesian
direct = numpy.zeros((3,),dtype=numpy.float_)
direct[0] = i
direct[1] = y_num - 1 - j
direct[2] = 0
# Center center and correct for center of element
direct[0] = direct[0] - 0.5* x_num + 0.5
direct[1] = direct[1] - 0.5* y_num + 0.5
direct[2] = direct[2]
# calculate the length
direct[0] = direct[0]*x_pixel
direct[1] = direct[1]*y_pixel
direct[2] = detector_dist
# Calculate the photon flight vector with respect to the matrix
# center and the origin of diffraction
#direct[0] = x_pixel*(i + 0.5 - 0.5 *x_num) # x-coordinate
#direct[1] = y_num*y_pixel - y_pixel*(j + 0.5 -0.5 *y_num)
# python goes from left 2 right top 2 bottom
# in conrast to our coordinate space
#direct[2] = detector_dist
# calculate the normalized direction vector
le=numpy.sqrt(numpy.sum((direct**2)))
normal_direction = numpy.zeros((3,),dtype=numpy.float_)
if le != 0:
normal_direction = direct/le
# For each photon detected at (i,j) create an instance
# shift into the koordinate system where the detector is in
# the origin direct - (0,0,detector_dist)
entries = [ pd.addEntry() for i in range(number_of_photons_in_pixel)]
for entry in entries:
entry.setPositionX(direct[0])
entry.setPositionY(direct[1])
entry.setPositionZ(0)
entry.setDirectionX(numpy.asscalar(normal_direction[0]))
entry.setDirectionY(numpy.asscalar(normal_direction[1]))
entry.setDirectionZ(numpy.asscalar(normal_direction[2]))
#Photon energies have be set in units of keV.
entry.setEnergy(center_energy*1e-3)
# Close the input file
h5_infile.close()
print("XCSITPhotonDetector read {0:d} patterns from the input.".format(len(self.__photon_data)))
[docs] def saveH5(self):
"""
Save the results in a file
"""
# Write the new data into python arrays
# Convert the interaction data to a 2D numpy array (size x 5)
#num_ia = numpy.zeros((self.__ia_data.size(),5),dtype=numpy.float_)
#for i in list(range(self.__ia_data.size())):
#entry = self.__ia_data.getEntry(i)
#num_ia[i][0] = entry.getPositionX()
#num_ia[i][1] = entry.getPositionY()
#num_ia[i][2] = entry.getPositionZ()
#num_ia[i][3] = entry.getEnergy()
#num_ia[i][4] = entry.getTime()
# Convert the ChargeMatrix to a numpy array to be able to store its
# content
x_size = self.__charge_data[0].width()
y_size = self.__charge_data[0].height()
charge_array = [numpy.zeros((x_size,y_size),dtype=numpy.float_) for p in self.__pattern_ids] # float64
for i,ca in enumerate(charge_array):
for x in range(x_size):
for y in range(y_size):
entry = self.__charge_data[i].getEntry(x,y)
ca[x][y] = entry.getCharge()
# TODO settle this issue:
# For unknown reasons the interaction entries and the chargematrix is
# rotated of 180 degree to each other
ca = numpy.fliplr(ca)
ca = numpy.flipud(ca)
## Identify Nan and Inf in ChargeSim output
#parent =os.path.dirname(self.output_path)
#if not os.path.isdir(parent):
#parent=os.makedirs(parent)
#ofn = os.path.join(parent,"NaNInf.txt")
#with open(ofn,'w') as outfile:
#outfile.write("# All the coordinates are in typical python convention")
## Search for Nan and Inf
#for x in list(range(x_size)):
#for y in list(range(y_size)):
#if(numpy.isnan(ca[x][y])):
#print(("Warning: Detected NaN in ChargeMatrix at (" + str(x) +
#"," + str(y) + ") (python conention: l-r, t-b)"))
#outfile.write(str(x) + "\t" + str(y) + "\tNaN")
#if(numpy.isinf(ca[x][y])):
#print(("Warning: Detected Inf in ChargeMatrix at (" + str(x) +
#"," + str(y) + ") (python conention: l-r, t-b)"))
#outfile.write(str(x) + "\t" + str(y) + "\tInf")
# Create the new datasets
# ------------------------------------------------------------
# Open required files
with h5py.File(self.output_path, "w") as h5_outfile:
# Create the necessary output groups
data_gr = h5_outfile.create_group("data")
info_gr = h5_outfile.create_group("info")
param_geom_gr= h5_outfile.create_group("params/geom")
param_beam_gr= h5_outfile.create_group("params/beam")
for i, pid in enumerate(self.__pattern_ids):
# Create the direct data values independent of the input file
data_gr.create_dataset("{0:s}/data".format(pid), data=charge_array[i])
#data_gr.create_dataset("{0:s}/interactions".format(pid), data=num_ia[i])
info_gr.create_dataset("package_version".format(pid),data="1.0")
with h5py.File(self.__input_path[0],"r" ) as h5_infile:
# Link the data from the input file
# -------------------------------------------------------------
# Copy
dist = h5_infile["/params/geom/detectorDist"]
param_geom_gr.create_dataset("detectorDist",data=dist[()])
pw = h5_infile["/params/geom/pixelWidth"]
param_geom_gr.create_dataset("pixelWidth",data=pw[()])
ph = h5_infile["/params/geom/pixelHeight"]
param_geom_gr.create_dataset("pixelHeight", data=ph[()])
pe = h5_infile["/params/beam/photonEnergy"]
param_beam_gr.create_dataset("photonEnergy",data=pe[()])
# Close file since it is not possible to link to it via
# ExternalLink if it is still open
h5_infile.close()
# Link in the input file root.
h5_outfile["/history/parent/"] = h5py.ExternalLink(self.__input_path[0],"/")
# Close file
h5_outfile.close()