#!/usr/bin/env python
""":module XMDYNPhotonMatterAnalysis: Hosting utilities to analyse and visualize photon-matter trajectories generated by XMDYN."""
import h5py
import numpy
import os
import pylab
import periodictable as pte
ELEMENT_SYMBOL = ['All'] + [e.symbol for e in pte.elements]
from SimEx.Analysis.AbstractAnalysis import AbstractAnalysis, plt
from SimEx.Utilities.IOUtilities import loadPDB
[docs]class XMDYNPhotonMatterAnalysis(AbstractAnalysis):
""":class XMDYNPhotonMatterAnalysis: Class to encapsulate diagnostics of photon matter interaction trajectories. """
def __init__( self, input_path=None, snapshot_indices=None, elements=None, sample_path=None):
"""
:param input_path: Path to the data to analyze.
:type input_path: str
:param snapshot_indices: Snapshot (indices or IDs) to analyze (Default: All).
:type snapshot_indices: list
:param elements: Which elements to include in the analysis (Default: All).
:type elements: list
"""
self.input_path = input_path
self.sample_path = sample_path
self.snapshot_indices=snapshot_indices
self.elements=elements
self.__num_digits = 7
self.__prj = '.'
self.load_trajectory()
@property
def input_path(self):
""" Query the input path. """
return self.__input_path
@input_path.setter
def input_path(self, value):
""" Set the input path to a value.
:param value: The input path to set.
:type value: str
"""
error_message = "The parameter 'input_path' must be a str indicating an existing file or directory. "
if not isinstance(value, str):
raise TypeError(error_message)
if not os.path.exists(value):
raise ValueError(error_message)
self.__input_path = value
@property
def sample_path(self):
""" Query the sample path. """
return self.__sample_path
@sample_path.setter
def sample_path(self, value):
""" Set the sample path to a value.
:param value: The sample path to set.
:type value: str
"""
error_message = "The parameter 'sample_path' must be a str indicating an existing file or directory. "
if not isinstance(value, str):
raise TypeError(error_message)
if not os.path.exists(value):
raise ValueError(error_message)
self.__sample_path = value
@property
def snapshot_indices(self):
""" Query the snapshot indices. """
return self.__snapshot_indices
@snapshot_indices.setter
def snapshot_indices(self, value):
""" Set the snapshot indices.
:param value: The snapshot indices.
:type value: list
"""
error_message = "The parameter 'snapshot_indices' must be a list of integers."
if value is None or value == 'All':
value = ['All']
if isinstance(value, int):
value = [value]
if not hasattr(value, "__iter__"):
raise TypeError(error_message)
if not all([(isinstance(i, int) or i=="All") for i in value]):
raise TypeError(error_message)
self.__snapshot_indices = value
@property
def elements(self):
""" Query the elements to include. """
return self.__elements
@elements.setter
def elements(self, value):
""" Set the elements to include in the analysis.
:param value: Which elements to include.
:type value: list of symbols or element numbers.
"""
error_message = "The parameter 'elements' must be a list of chemical element symbols or integers indicating which elements to include in the analysis. "
if value is None or value == 'All':
value = ['All']
if isinstance(value, int) or value in ELEMENT_SYMBOL:
value = [value]
if not hasattr(value, "__iter__"):
raise TypeError(error_message)
if not all([(isinstance(i, int) or i in ELEMENT_SYMBOL) for i in value]):
raise ValueError(error_message)
self.__elements = value
[docs] def load_snapshot(self, snapshot_index):
""" Load snapshot data from hdf5 file into memory. """
snp = snapshot_index
dbase_root = "/data/snp_" + str( snp ).zfill(self.__num_digits) + "/"
xsnp = dict()
with h5py.File(self.input_path) as fp:
xsnp['Z'] = fp.get(dbase_root+ 'Z').value
xsnp['T'] = fp.get(dbase_root + 'T').value
xsnp['ff'] = fp.get(dbase_root + 'ff').value
xsnp['xyz'] = fp.get(dbase_root + 'xyz').value
xsnp['r'] = fp.get(dbase_root + 'r').value
xsnp['Nph'] = fp.get(dbase_root + 'Nph').value
N = xsnp['Z'].size
xsnp['q'] = numpy.array([xsnp['ff'][pylab.find(xsnp['T']==x), 0] for x in xsnp['xyz']]).reshape(N,)
xsnp['snp'] = snp ;
return xsnp
[docs] def number_of_snapshots(self) :
""" Get number of valid snapshots. """
with h5py.File( self.input_path, 'r') as xfp:
count = len([k for k in xfp['data'].keys() if "snp_" in k])
return count
[docs] def plot_displacement(self):
""" Plot the average displacement per atomic species as function of time. """
#t = self.trajectory['time']
for d in self.__trajectory['displacement'].T:
plt.plot(d) # self.trajectory['disp'][ : , pylab.find( sel_Z == pylab.array( list(data['sample']['selZ'].keys()) ) ) ] , xcolor )
plt.xlabel( 'Time [fs]' )
plt.ylabel( 'Average displacement [$\AA$]' )
[docs] def plot_charge(self):
""" Plot the average number of electrons per atom per atomic species as function of time. """
for d in self.__trajectory['charge'].T:
plt.plot(d)
### TODO: time axis, labels, legend
plt.xlabel( 'Time [fs]' )
plt.ylabel( 'Number of bound electrons per atom' )
[docs] def plot_energies(self):
""" Plot the evolution of MD energies over the simulation time. """
raise RuntimeError("Not implemented yet.")
[docs] def load_trajectory(self):
""" Load the selected snapshots and extract data to analyze. """
trajectory = dict()
sample = None
disp = []
charge = []
time = []
# Read sample data.
try:
sample = load_sample(self.sample_path)
except:
sample = loadPDB(self.sample_path)
snapshot_indices = self.snapshot_indices
if snapshot_indices == "All":
snapshot_indices = range(1, self.number_of_snapshots() + 1)
for si in snapshot_indices:
snapshot = self.load_snapshot(si)
disp.append(calculate_displacement(snapshot, r0=sample['r'], sample=sample))
charge.append(calculate_ion_charge(snapshot, sample))
trajectory['displacement'] = numpy.array(disp)
trajectory['charge'] = numpy.array(charge)
trajectory['time'] = numpy.array(time)
self.__trajectory = trajectory
[docs] def animate(self):
""" Generate an animation of atom trajectories and their ionization. """
pass
[docs]def load_sample(sample_path) :
""" Load a sample file into memory. """
sample = dict()
with h5py.File( sample_path , "r" ) as xfp:
sample['Z'] = xfp.get('Z').value
sample['r'] = xfp.get('r').value
sample['selZ'] = dict()
for sel_Z in numpy.unique(sample['Z']) :
sample['selZ'][sel_Z] = pylab.find(sel_Z == sample['Z'])
return sample
[docs]def read_h5_dataset( path , dataset ) :
""" Read a dataset from hdf5 file. """
with h5py.File( path , "r" ) as xfp:
data = xfp.get( dataset ).value
return data
[docs]def calculate_displacement(snapshot, r0, sample) :
""" Calculate the average displacement per atomic species in a snapshot.
:param snapshot: The snapshot to analyze
:type snapshot: dict
:param r0: Unperturbed positions of the sample atoms.
:type r0: numpy.array (shape=(Natoms, 3))
### CHECKME: Can't we read r0 from the sample dict?
:param sample: Sample data
:type sample: dict
"""
num_Z = len( list(sample['selZ'].keys()) )
all_disp = numpy.zeros( ( num_Z , ) )
count = 0
for sel_Z in list(sample['selZ'].keys()) :
dr = snapshot['r'][sample['selZ'][sel_Z],:] - r0[sample['selZ'][sel_Z],:]
all_disp[count] = numpy.mean( numpy.sqrt( numpy.sum( dr * dr , axis = 1 ) ) ) / 1e-10
count = count + 1
return numpy.array(all_disp)
[docs]def calculate_ion_charge(snapshot, sample):
""" Calculate the remaining electric charge per atomic species of a given snapshot.
:param snapshot: The snapshot to analyze
:type snapshot: dict
:param sample: The sample data.
:type sample: dict
"""
num_Z = len( list(sample['selZ'].keys()) )
all_numE = numpy.zeros( ( num_Z , ) )
count = 0
for sel_Z in list(sample['selZ'].keys()) :
all_numE[count] = numpy.mean( snapshot['q'][sample['selZ'][sel_Z]] )
count = count + 1
return numpy.array(all_numE)