from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from six.moves import range
from six.moves import zip
from six.moves import input
#!/usr/bin/python
# Filename: xrs_read.py
#/*##########################################################################
#
# The XRStools software package for XRS spectroscopy
#
# Copyright (c) 2013-2014 European Synchrotron Radiation Facility
#
# This file is part of the XRStools XRS spectroscopy package developed at
# the ESRF by the DEC and Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
__author__ = "Christoph J. Sahle - ESRF"
__contact__ = "christoph.sahle@esrf.fr"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
from . import xrs_rois, xrs_scans, xrs_utilities, math_functions, xrs_fileIO, roifinder_and_gui
import h5py
import scipy.io
import traceback
import sys
import os
import numpy as np
import array as arr
import pickle
import matplotlib.pyplot as plt
from numpy import array
from itertools import groupby
from scipy.integrate import trapz
from scipy.interpolate import interp1d, Rbf
from scipy import signal, optimize
from scipy.ndimage import measurements
# try to import the fast PyMCA parsers
try:
import PyMca5.PyMcaIO.EdfFile as EdfIO
import PyMca5.PyMcaIO.specfilewrapper as SpecIO
use_PyMca = True
except:
use_PyMca = False
print( " >>>>>>>> use_PyMca " , use_PyMca)
__metaclass__ = type # new style classes
[docs]def print_citation_message():
"""Prints plea for citing the XRStools article when using this software.
"""
print (' ')
print (' ############################# Welcome to XRStools #############################')
print (' # If you are using this software, please cite the following work: #')
print (' # Ch.J. Sahle, A. Mirone, J. Niskanen, J. Inkinen, M. Krisch, and S. Huotari: #')
print (' # "Planning, performing, and analyzing X-ray Raman scattering experiments." #')
print (' # Journal of Synchrotron Radiation 22, No. 2 (2015): 400-409. #')
print (' ###############################################################################')
print (' ')
print_citation_message()
[docs]class Hydra:
"""Main class for handling XRS data from ID20's multi-analyzer spectrometer 'Hydra'.
This class is intended to read SPEC- and according EDF-files and generate spectra from
multiple individual energy loss scans.
Note:
Hydra is the name of the multi-analyzer x-ray Raman scattering spectrometer at ESRF's
ID20 beamline. This class has been adopted specifically for this spectrometer.
If you are using this program, please cite the following work:
Sahle, Ch J., A. Mirone, J. Niskanen, J. Inkinen, M. Krisch, and S. Huotari.
"Planning, performing and analyzing X-ray Raman scattering experiments."
Journal of Synchrotron Radiation 22, No. 2 (2015): 400-409.
Args:
* path (str): Absolute path to directory holding the data.
* SPECfname (str): Name of the SPEC-file ('hydra' is the default).
* EDFprefix (str): Prefix for the EDF-files ('/edf/' is the default).
* EDFname (str): Filename of the EDF-files ('hydra_' is the default).
* EDFpostfix (str): Postfix for the EDF-files ('.edf' is the default).
* en_column (str): Counter mnemonic for the energy motor ('energy' is the default).
* moni_column (str): Mnemonic for the monitor counter ('izero' is the default).
Attributes:
* path (str): Absolute path to directory holding the data.
* SPECfname (str): Name of the SPEC-file ('hydra' is the default).
* EDFprefix (str): Prefix for the EDF-files ('/edf/' is the default).
* EDFname (str): Filename of the EDF-files ('hydra_' is the default).
* EDFpostfix (str): Postfix for the EDF-files ('.edf' is the default).
* en_column (str): Counter mnemonic for the energy motor ('energy' is the default).
* moni_column (str): Mnemonic for the monitor counter ('izero' is the default).
* scans (dict): Dictionary holding all loaded scans.
* scan_numbers (list): List with scan number of all loaded scans.
* eloss (np.array): Array with the common energy loss scale for all analyzers.
* energy (np.array): Array with the common energy scale for all analyzers.
* signals (np.array): Array with the signals for all analyzers (one column per anayzer).
* errors (np.array): Array with the poisson errors for all analyzers (one column per anayzer).
* qvalues (list): List of momentum transfer values for all analyzers.
* groups (dict): Dictionary of groups of scans (instances of the 'scangroup' class,
such as 2 'elastic', or 5 'edge1', etc.).
* cenom (list): List of center of masses of the elastic lines.
* E0 (int): Elastic line energy value, mean value of all center of masses.
* tth (list): List of all scattering angles (one value for each ROI).
* resolution (list): List of FWHM of the elastic lines (one for each analyzer).
* comp_factor (float): Compensation factor for line-by-line energy dispersion compensation.
* cenom_dict (dict): Dictionary holding center-of-masses for of the elastic line.
* raw_signals (dict): Dictionary holding pixel- or line-wise signals.
* raw_errors (dict): Dictionary holding pixel- or line-wise Poisson errors.
* TTH_OFFSETS1 (np.array): Two-Theta offsets between individual analyzers inside each analyzer
module in one direction (horizontal for V-boxes, vertical for H-boxes).
* TTH_OFFSETS2 (np.array): Two-Theta offsets between individual analyzers inside each analyzer
module in one direction (horizontal for H-boxes, vertical for V-boxes).
* roi_obj (instance): Instance of the roi_object class from the xrs_rois module defining all
ROIs for the current dataset (default is 'None').
"""
def __init__( self, path, SPECfname='hydra', EDFprefix='/edf/', EDFname='hydra_', \
EDFpostfix='.edf', en_column='energy', moni_column='izero' ):
self.path = path
self.SPECfname = SPECfname
if not os.path.isfile(os.path.join(path, SPECfname)):
raise Exception('IOError! No such file or directory.')
self.EDFprefix = EDFprefix
self.EDFname = EDFname
self.EDFpostfix = EDFpostfix
self.en_column = en_column.lower()
self.moni_column = moni_column.lower()
self.scans = {}
self.scan_numbers = []
self.eloss = np.array([])
self.energy = np.array([])
self.signals = np.array([])
self.errors = np.array([])
self.q_values = []
self.groups = {}
self.cenom = []
self.E0 = []
self.tth = []
self.resolution = []
self.comp_factor = None
self.cenom_dict = {}
self.raw_signals = {}
self.raw_errors = {}
self.TTH_OFFSETS1 = np.array([5.0, 0.0, -5.0, 5.0, 0.0, -5.0, 5.0, 0.0, -5.0, 5.0, 0.0, -5.0])
self.TTH_OFFSETS2 = np.array([-9.71, -9.75, -9.71, -3.24, -3.25, -3.24, 3.24, 3.25, 3.24, 9.71, 9.75, 9.71])
self.PIXEL_SIZE = 0.055 # pixel size in mm
self.roi_obj = None
print_citation_message()
[docs] def save_state_hdf5( self, file_name, group_name, comment="" ):
""" **save_state_hdf5**
Save the status of the current instance in an HDF5 file.
Args:
file_name (str): Path and file name for the HDF5-file to be created.
group_name (str): Group name under which to store status in the HDF5-file.
comment (str): Optional comment (no comment is default).
"""
h5 = h5py.File(file_name,"a")
h5.require_group(group_name)
h5group = h5[group_name]
for key in self.__dict__.keys():
if key in h5group.keys():
raise Exception( 'Data \'' + key + '\' already present in ' + file_name + ':' + group_name )
else:
h5group[key] = getattr( self, key )
h5group["comment"] = comment
h5.flush()
h5.close()
[docs] def load_state_hdf5( self, file_name, group_name ):
""" **load_state_hdf5**
Load the status of an instance from an HDF5 file.
Args:
file_name (str): Path and filename for the HDF5-file to be created.
group_name (str): Group name under which to store status in the HDF5-file.
"""
h5 = h5py.File( file_name,"r" )
h5group = h5[group_name]
keys = {"eloss":array, "energy":array, "signals":array, "errors":array, "q_values":array,
"cenom":array, "E0":float, "tth":array, "resolution":array }
for key in keys:
setattr(self, key, keys[key](array(h5group[key])))
h5.flush()
h5.close()
[docs] def set_roiObj( self,roiobj ):
""" **set_roiObj**
Assign an instance of the 'roi_object' class to the current data set.
Args:
roiobj (instance): Instance of the 'roi_object' class holding all
information about the definition of the ROIs.
"""
self.roi_obj = roiobj
[docs] def load_scan( self, scan_numbers, scan_type='generic', direct=True, scaling=None, method='sum' ):
"""**load_scan**
Load a single or multiple scans.
Note:
When composing a spectrum later, scans of the same scan_type will
be averaged over. Scans with scan type 'elastic' or long in their
names are recognized and will be treated specially.
Args:
* scan_numbers (int or list): Integer or iterable of scan numbers to be loaded.
* scan_type (str): String describing the scan to be loaded (e.g. 'edge1' or 'K-edge').
* direct (boolean): Flag, 'True' if the EDF-files should be deleted after loading/integrating the scan.
"""
# make sure scannumbers are iterable (list)
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
# make sure there is a cenom_dict available if direct=True AND method='sum'
if direct and method=='pixel' and not self.cenom_dict:
print('Please run the get_compensation_factor method first for pixel-wise compensation.')
return
# go throught list of scan_numbers and load scans
for number in numbers:
# create a name for each scan
scan_name = 'Scan%03d' % number
# create an instance of the Scan class
scan = xrs_scans.Scan()
# load the scan
scan.load( self.path, self.SPECfname, self.EDFprefix, self.EDFname, self.EDFpostfix, number, \
direct=direct, roi_obj=self.roi_obj, scaling=scaling, scan_type=scan_type, \
en_column=self.en_column, moni_column=self.moni_column, method=method, \
cenom_dict=self.cenom_dict, comp_factor=self.comp_factor )
# add it to the scans dict
self.scans[scan_name] = scan
# add the number to the list of scan numbers
if not number in self.scan_numbers:
self.scan_numbers.extend([number])
[docs] def load_loop( self, beg_nums, num_of_regions, direct=True, method='sum' ):
""" **load_loop**
Loads a whole loop of scans based on their starting numbers and
the number of single scans in the loop.
Args:
beg_nums (list): List of scan numbers of the first scans in each loop.
num_of_regions (int): Number of scans in each loop.
"""
type_names = []
for n in range(num_of_regions):
type_names.append('edge'+str(n+1))
numbers = []
for n in range(len(beg_nums)):
for m in range(num_of_regions):
numbers.append((beg_nums[n]+m))
type_names = type_names*len(beg_nums)
for n in range(len(type_names)):
number = []
number.append(numbers[n])
self.load_scan( number, type_names[n], direct=True, method=method )
[docs] def delete_scan( self, scan_numbers ):
""" **delete_scan**
Deletes scans from the dictionary of scans.
Args:
scan_numbers (int or list): Integer or list of integers (SPEC scan numbers) to be deleted.
"""
# make sure scannumbers are iterable (list)
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
# delete the scan
for number in numbers:
scan_name = 'Scan%03d' % number
del(self.scans[scan_name])
self.scan_numbers.remove(number)
[docs] def get_raw_data( self, method='sum', scaling=None ):
""" **get_raw_data**
Applies the ROIs to the EDF-files.
This extracts the raw-data from the EDF-files subject to three
different methods: 'sum' will simply sum up all pixels inside a
ROI, 'row' will sum up over the dispersive direction, and 'pixel'
will not sum at all.
Args:
method (str): Keyword specifying the selected choice of data treatment:
can be 'sum', 'row', or 'pixel'. Default is 'sum'.
"""
if not self.roi_obj:
print ( 'Did not find a ROI object, please set one first.' )
return
for scan in self.scans:
if len(self.scans[scan].edfmats):
print ( "Integrating " + scan + " using method \'" + method + '\'.')
self.scans[scan].get_raw_signals( self.roi_obj , method=method, scaling=scaling )
[docs] def get_data_new(self, method='sum', scaling=None):
""" **get_data_new**
Applies the ROIs to the EDF-files.
This extracts the raw-data from the EDF-files subject to three
different methods: 'sum' will simply sum up all pixels inside a
ROI, 'row' will sum up over the dispersive direction, and 'pixel'
will not sum at all.
Args:
method (str): Keyword specifying the selected choice of data treatment:
can be 'sum', 'row', or 'pixel'. Default is 'sum'.
"""
if not self.cenom_dict:
print ( 'No compensation factor/center-of-mass info found, please provide first.' )
return
for scan in self.scans:
self.scans[scan].get_signals( method=method, cenom_dict=self.cenom_dict, comp_factor=self.comp_factor, scaling=scaling )
[docs] def get_data(self):
""" **get_data**
Applies the ROIs and sums up intensities.
Returns:
'None', if no ROI object is available.
"""
if not self.roi_obj:
print ( 'Did not find a ROI object, please set one first.' )
return
for scan in self.scans:
if len(self.scans[scan].edfmats):
print ( "Integrating " + scan )
self.scans[scan].apply_rois( self.roi_obj )
[docs] def get_data_pw(self):
""" **get_data_pw**
Extracts intensities for each pixel in each ROI.
Returns:
'None', if no ROI object is available.
"""
if not np.any(self.roi_obj.indices):
print ( 'Did not find a ROI object, please set one first.' )
return
for scan in self.scans:
if len(self.scans[scan].edfmats):
print ( "Integrating pixelwise " + scan )
self.scans[scan].apply_rois_pw(self.roi_obj)
[docs] def SumDirect( self, scan_numbers, index=None ):
""" **SumDirect**
Creates a summed 2D image of a given scan or list of scans.
Args:
scan_numbers (int or list): Scan number or list of scan numbers to be added up.
Returns:
A 2D np.array of the same size as the detector with the summed image.
"""
# make sure scannumbers are iterable (list)
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
im_sum = None
en_column = None # uses first column in SPEC file as scanned motor
for number in numbers:
scan = xrs_scans.Scan()
scan.load(self.path, self.SPECfname, self.EDFprefix, self.EDFname, self.EDFpostfix, number, \
direct=False, roi_obj=None, scaling=None, scan_type='generic', \
en_column=en_column, moni_column=self.moni_column)
if im_sum is None:
im_sum1 = np.zeros(scan.edfmats[0].shape ,"f")
im_sum2 = np.zeros(scan.edfmats[0].shape ,"f")
if not index:
im_sum1[:] += scan.edfmats.sum(axis=0)
else:
im_sum1[:] += scan.edfmats[0:index,:,:].sum(axis=0)
im_sum2[:] += scan.edfmats[index:,:,:].sum(axis=0)
if not index:
return im_sum1
else:
return im_sum2-im_sum1
[docs] def get_eloss_new(self, method='sum'):
""" **get_eloss_new**
Defines the energy loss scale for all ROIs and applies dispersion
compensation if applicable.
Args:
method (str): Keyword describing which dispersion compensation
method to use. Possible choices are 'sum' (no compensation),
'pixel' (pixel-by-pixel compensation), or 'row' (line-by-line)
compensation.
"""
# make sure an elastic line was loaded
if not 'elastic' in self.groups:
print( 'Please load/integrate at least one elastic scan first!' )
return
# make sure there is data
elif not np.any(self.groups['elastic'].raw_signals):
self.get_raw_data( method=method )
# make sure there is center of masses available
elif not self.cenom_dict and method != 'row':
for scan in self.scans:
if self.scans[scan].get_type() == 'elastic':
print('GETIING COMP FACTOR!!!')
self.get_compensation_factor( self.scans[scan].scan_number, method=method )
first_key = list(self.raw_signals.keys())[0]
# 'sum'
if method == 'sum':
# master eloss scale in eV is the one of the first ROI
self.signals = np.zeros((len(self.energy),len(self.cenom_dict)))
self.errors = np.zeros((len(self.energy),len(self.cenom_dict)))
master_eloss = (self.energy - np.median([self.cenom_dict[key] for key in self.cenom_dict]))*1.0e3
self.E0 = np.median([self.cenom_dict[key] for key in self.cenom_dict])
for key,ii in zip(sorted(self.cenom_dict, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(self.cenom_dict))):
# signals
x = ( self.energy - self.cenom_dict[key] )*1.0e3
y = self.raw_signals[key][:]
#try:
# rbfi = Rbf( x, y, function='linear' )
# self.signals[:,ii] = rbfi( master_eloss )
#except:
self.signals[:,ii] = np.interp( master_eloss, x, y )
# errors
y = self.raw_errors[key][:]
#try:
# rbfi = Rbf( x, y, function='linear' )
# self.errors[:,ii] = rbfi( master_eloss )
#except:
self.errors[:,ii] = np.interp( master_eloss, x, y )
self.eloss = master_eloss
# 'pixel'
elif method == 'pixel':
# master eloss scale in eV is the one of central pixel in first ROI
self.signals = np.zeros((len(self.energy),len(self.cenom_dict)))
self.errors = np.zeros((len(self.energy),len(self.cenom_dict)))
master_eloss = ( self.energy - np.median(self.cenom_dict[first_key][self.cenom_dict[first_key] > 0.0]) )*1.0e3
self.E0 = np.median(self.cenom_dict[first_key][self.cenom_dict[first_key] > 0.0])
for key,ii in zip(sorted(self.cenom_dict, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(self.cenom_dict))):
print ('Pixel-by-pixel compensation for ' + key +'.')
signal = np.zeros(len(master_eloss))
error = np.zeros(len(master_eloss))
for dim1 in range(self.cenom_dict[key].shape[0]):
for dim2 in range(self.cenom_dict[key].shape[1]):
x = ( self.energy - self.cenom_dict[key][dim1, dim2] )*1.0e3
# signals
y = self.raw_signals[key][:, dim1, dim2]
if np.any(y)>0.0:
#print "Y AMAX", signal.max()
#rbfi = Rbf( x, y, function='linear' )
rbfi = interp1d(x, y,bounds_error=False, fill_value=0.0)
#print "rbf AMAX", Rbf( x, y, function='linear' )
#signal += rbfi( master_eloss )
signal += rbfi( master_eloss )
#print "SIGNAL AMAX", signal.max()
# errors
y = self.raw_errors[key][:, dim1, dim2]
rbfi = Rbf( x, y, function='linear' )
error += rbfi( master_eloss )**2
self.signals[:,ii] = signal
self.errors[:,ii] = np.sqrt(error)
self.eloss = master_eloss
# 'row'
elif method == 'row':
self.signals = np.zeros((len(self.energy),len(self.roi_obj.red_rois)))
self.errors = np.zeros((len(self.energy),len(self.roi_obj.red_rois)))
energy = self.energy * 1e3 # energy in eV
for key,ii in zip(sorted(self.raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(self.raw_signals))):
y = self.raw_signals[key]
meanii = len(range(y.shape[1]))/2
yc = np.zeros_like(y)
for jj in range(y.shape[1]):
sort = np.argsort(energy)
yc[:,jj] = np.interp( energy[sort] + (jj-meanii)*self.comp_factor*self.PIXEL_SIZE, energy[sort], y[sort,jj],
left=float('nan'), right=float('nan') )
self.signals[:,ii] = xrs_utilities.nonzeroavg(yc)
self.errors[:,ii] = np.sqrt(self.signals[:,ii])
else:
print('Method \''+method+'\' not supported, use either \'sum\', \'pixel\', or \'row\'.')
return
[docs] def get_eloss( self ):
""" **get_eloss**
Finds the energy loss scale for all ROIs by calculating the center
of mass (COM) for each ROI's elastic line. Calculates the resolution
function (FWHM) of the elastic lines.
"""
# make sure an elastic line was loaded
if not 'elastic' in self.groups:
print( 'Please load/integrate at least one elastic scan first!' )
return
# make sure there is data
elif not self.groups['elastic'].signals.shape[1] == len(self.roi_obj.indices):
self.get_data()
else:
# reset values, in case this is run several times
self.cenom = []
self.resolution = []
Origin = None
valid_cenoms = []
for n in range(self.groups['elastic'].signals.shape[1]):
# find the center of mass for each ROI
cofm = xrs_utilities.find_center_of_mass(self.groups['elastic'].energy,self.groups['elastic'].signals[:,n])
self.cenom.append(cofm)
if self.there_is_a_valid_roi_at( n ):
valid_cenoms.append(cofm)
if Origin is None:
Origin = cofm
# find the FWHM/resolution for each ROI
en_scale = (self.groups['elastic'].energy - self.cenom[n])*1e3
intensity = self.groups['elastic'].signals_orig[:,n]
FWHM, x0 = xrs_utilities.fwhm(en_scale,intensity)
# find master E0
self.E0 = np.mean(valid_cenoms)
# define last eloss scale as the 'master' scale for all ROIs
self.eloss = (self.energy - cofm)*1e3 # energy loss in eV
# define eloss-scale for each ROI and interpolate onto the 'master' eloss-scale
for n in range(self.signals.shape[1]):
# inserting zeros at beginning and end of the vectors to avoid interpolation errors
x = (self.energy-self.cenom[n])*1e3
x = np.insert(x,0,-1e10)
x = np.insert(x,-1,1e10)
y = self.signals[:,n]
y = np.insert(y,0,0)
y = np.insert(y,-1,0)
f = interp1d(x,y, bounds_error=False,fill_value=0.0)
self.signals[:,n] = f(self.eloss)
# do the same for the errors
for n in range(self.signals.shape[1]):
# inserting zeros at beginning and end of the vectors to avoid interpolation errors
x = (self.energy-self.cenom[n])*1e3
x = np.insert(x,0,-1e10)
x = np.insert(x,-1,1e10)
y = self.errors[:,n]
y = np.insert(y,0,0)
y = np.insert(y,-1,0)
f = interp1d(x,y, bounds_error=False,fill_value=0.0)
self.errors[:,n] = f(self.eloss)
[docs] def get_spectrum( self, include_elastic=False, abs_counts=False ):
""" **get_spectrum**
Constructs a spectrum based on the scans loaded so far. Defines the energy
loss scale based on the elastic lines.
Args:
include_elastic (boolean): Boolean flag, does not include the elastic line if
set to 'False' (this is the default).
abs_counts (boolean): Boolean flag, constructs the spectrum in absolute
counts if set to 'True' (default is 'False')
"""
# find the groups
all_groups = xrs_scans.findgroups(self.scans)
# append scans
for group in all_groups:
one_group = xrs_scans.makegroup_nointerp(group, absCounts=abs_counts)
self.groups[one_group.get_type()] = one_group
self.energy, self.signals, self.errors = xrs_scans.appendScans(self.groups,include_elastic)
# define the energy loss scale
self.get_eloss()
[docs] def get_spectrum_new( self, method='sum', include_elastic=False, abs_counts=False, interpolation=False ):
""" **get_spectrum_new**
Constructs a spectrum from the scans loaded so far based on the chosen method:
detector pixels can either be summed up (method='sum'), pixel-by-pixel
compensation can be applied (method='pixel'), or a line-by-line compensation
scheme can be applied (method='row').
The energy loss scale will be defined in the process and the data will be
interpolated onto this scale using norm-conserving wavelet interpolation.
Args:
method (str): Keyword describing the kind of integration scheme
to be used (possible values are 'sum', 'pixel', or 'row'), default is 'sum'.
include_elastic (boolean): Boolean flag, does not include the elastic line if
set to 'False' (this is the default).
abs_counts (boolean): Boolean flag, constructs the spectrum in absolute
counts if set to 'True' (default is 'False')
interpolation (boolean): Boolean flag, if True, signals are interpolated
onto energy grid of the first scan in each group of scans.
"""
if not method in ['pixel', 'sum', 'row']:
print('Unknown integration method. Use either \'pixel\', \'sum\', or \'row\'')
return
# make sure there is an elastic line available
elastic_number = None
for key in self.scans:
if self.scans[key].scan_type == 'elastic':
elastic_number = self.scans[key].scan_number
print ('Using scan No. %d for CENOMs.'%elastic_number)
break
else:
pass
if not elastic_number:
print( 'Please load/integrate at least one elastic scan first!' )
return
# get compensation factor/CENOM for an elastic line
if method in [ 'sum' , 'pixel'] and not self.cenom_dict:
self.get_compensation_factor( elastic_number, method=method )
if method == 'row' and not self.comp_factor:
self.get_compensation_factor( elastic_number, method=method )
# get raw data
for key in self.scans:
if not self.scans[key].raw_signals:
self.scans[key].get_raw_signals( self.roi_obj, method=method )
# sum up similar scans
# find all groups of scans
all_groups = xrs_scans.findgroups(self.scans)
# initiate groups
self.groups = {}
# sum up similar scans
for group in all_groups:
self.groups[group[0].get_type()] = xrs_scans.sum_scans_to_group( group, method=method, interp=interpolation )
# stitch groups together into a spectrum
spectrum = xrs_scans.stitch_groups_to_spectrum( self.groups, method=method, include_elastic=include_elastic )
self.energy = spectrum.energy
self.signals = spectrum.signals
self.errors = spectrum.errors
self.raw_signals = spectrum.raw_signals
self.raw_errors = spectrum.raw_errors
# define energy loss scale and apply compensation if applicable
self.get_eloss_new( method=method )
[docs] def get_q_values( self, inv_angstr=False, energy_loss=None ):
""" **get_q_values**
Calculates the momentum transfer for each analyzer.
Args:
inv_angstr (boolean): Boolean flag, if 'True' momentum transfers are calculated in
inverse Angstroms.
energy_loss (float): Energy loss value at which the momentum transfer is to be
calculated. If 'None' is given, the momentum transfer is calculated for every
energy loss point of the spectrum.
Returns:
If an energy loss value is passed, the function returns the momentum transfers
at this energy loss value for each analyzer crystal.
"""
# one q-value per analyzer and energy loss point
q_vals = np.zeros_like(self.signals)
if inv_angstr:
for n in range( self.signals.shape[1] ):
q_vals[:,n] = xrs_utilities.momtrans_inva(self.E0+self.eloss/1e3,self.E0,self.tth[n])
else:
for n in range( self.signals.shape[1] ):
q_vals[:,n] = xrs_utilities.momtrans_au(self.E0+self.eloss/1e3,self.E0,self.tth[n])
self.q_values = q_vals
# return all q-values if a specific energy loss is given
if energy_loss:
ind = np.abs(self.eloss - energy_loss).argmin()
return self.q_values[ind,:]
[docs] def copy_edf_files( self, scan_numbers, dest_dir ):
""" **copy_edf_files**
Copies all EDF-files from given scan_numbers into given directory.
Args:
* scan_numbers (int or list) = Integer or list of integers defining the scan
numbers of scans to be copied.
* dest_dir (str) = String with absolute path for the destination.
"""
import shutil
# make scan_numbers iterable
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
fname = self.path + self.SPECfname
# go through the scans, find the EDF-files and copy them
for nscan in numbers:
if use_PyMca:
data, motors, counters = xrs_fileIO.PyMcaSpecRead(fname,nscan)
else:
data, motors, counters = xrs_fileIO.SpecRead(fname,nscan)
for m in range(len(counters['ccdno'])):
ccdnumber = counters['ccdno'][m]
edfname = self.path + self.EDFprefix + self.EDFname + "%04d" % ccdnumber + self.EDFpostfix
shutil.copy2(edfname, dest_dir)
[docs] def dump_spectrum_ascii( self, file_name, header='' ):
""" **dump_spectrum_ascii**
Stores the energy loss and signals in a txt-file.
Args:
filename (str): Path and filename to the file to be written.
"""
data = np.zeros((len(self.eloss),self.signals.shape[1]*2+1))
data[:,0] = self.eloss
col = 1
for ii in range(self.signals.shape[1]):
data[:,col] = self.signals[:,ii]
col += 1
data[:,col] = self.errors[:,ii]
col += 1
np.savetxt( file_name, data, header=header )
[docs] def dump_spectrum_hdf5( self, file_name, group_name, comment='' ):
""" **dump_spectrum_hdf5**
Writes the summed spectrum into an HDF5 file.
Args:
file_name (str): Path and file name for the HDF5-file to be created.
group_name (str): Group name under which to store status in the HDF5-file.
comment (str): Optional comment (no comment is default).
"""
h5 = h5py.File(file_name,"a")
h5.require_group(group_name)
h5group = h5[group_name]
keys = ['energy', 'eloss', 'signals', 'errors']
for key in keys:
h5group[key] = getattr( self, key )
h5group["comment"] = comment
h5.flush()
h5.close()
[docs] def dump_scans_ascii( self, scan_numbers, pre_fix, f_name, post_fix='.dat', header='' ):
""" **dump_scans_ascii**
Produce ASCII-type files with columns of energy, signal, and Poisson error.
Args:
scan_numbers (int or list): SPEC scan numbers of scans to be safed in ASCII format.
pre_fix (str): Path to directory where files should be written into.
f_name (str): Base name for the files to be written.
post_fix (str): Extention for the files to be written (default is '.dat').
"""
# make sure scan_numbers are iterable
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
for number in numbers:
scan_name = 'Scan%03d' % number
if not scan_name in self.scans.keys():
print ('Scan No. %03d is currently not loaded.'% number)
return
x = self.scans[scan_name].energy
y = self.scans[scan_name].signals
z = self.scans[scan_name].errors
data = np.zeros((x.shape[0], y.shape[1]+z.shape[1]+1 ))
data[:,0] = x
data[:,1:1+y.shape[1]] = y
data[:,1+y.shape[1]:1+y.shape[1]+z.shape[1]] = z
file_name = pre_fix + f_name + '_' + scan_name + post_fix
np.savetxt(file_name, data, header=header)
[docs] def print_scan_length( self,scan_numbers ):
""" **print_scan_length**
Print out the numper of points in given scans.
Args:
scan_numbers (int or list): Scan number or list of scan numbers.
"""
# make scan_numbers iterable
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
# print out the number of points for all scan_numbers
for ii in numbers:
name = 'Scan%03d' % ii
print( 'Length of scan %03d ' %ii + ' is ' + str(len(self.scans[name].energy)) + '.')
[docs] def get_tths( self, rvd=None, rvu=None, rvb=None, rhl=None, rhr=None, rhb=None, order=[0,1,2,3,4,5] ):
""" **get_tths**
Calculates the scattering angles for all analyzer crystals based on
the mean angle of the analyzer modules.
Args:
* rhl (float): Mean scattering angle of the HL module (default is 0.0).
* rhr (float): Mean scattering angle of the HR module (default is 0.0).
* rhb (float): Mean scattering angle of the HB module (default is 0.0).
* rvd (float): Mean scattering angle of the VD module (default is 0.0).
* rvu (float): Mean scattering angle of the VU module (default is 0.0).
* rvb (float): Mean scattering angle of the VB module (default is 0.0).
* order (list): List of integers (0-5) that describe the order of modules in which the
ROIs were defined (default is VD, VU, VB, HR, HL, HB; i.e. [0,1,2,3,4,5]).
"""
# reset all tth values
self.tth = []
# try to grab from positions from SPEC-file (first scan in list)
scan_name = list(self.scans.keys())[0]
if not rvd:
rvd = self.scans[scan_name].motors['RVD']
if not rvu:
rvu = self.scans[scan_name].motors['RVU']
if not rvb:
rvb = self.scans[scan_name].motors['RVB']
if not rhr:
rhr = self.scans[scan_name].motors['RHR']
if not rhl:
rhl = self.scans[scan_name].motors['RHL']
if not rhb:
rhb = self.scans[scan_name].motors['RHB']
# horizontal modules
# HL (motor name rhl)
v_angles = self.TTH_OFFSETS1
h_angles = self.TTH_OFFSETS2 + rhl
HLtth = []
for n in range(len(h_angles)):
HLtth.append( np.arccos(np.cos(np.radians(h_angles[n]))*np.cos(np.radians(v_angles[n])))*180.0/np.pi)
# HR (motor name rhr)
v_angles = self.TTH_OFFSETS1
h_angles = self.TTH_OFFSETS2 + rhr
HRtth = []
for n in range(len(h_angles)):
HRtth.append( np.arccos(np.cos(np.radians(h_angles[n]))*np.cos(np.radians(v_angles[n])))*180.0/np.pi)
# HB (motor name rhb)
v_angles = self.TTH_OFFSETS1
h_angles = self.TTH_OFFSETS2 + rhb
HBtth = []
for n in range(len(h_angles)):
HBtth.append( np.arccos(np.cos(np.radians(h_angles[n]))*np.cos(np.radians(v_angles[n])))*180.0/np.pi)
# vertical modules
# VD
v_angles = self.TTH_OFFSETS2 + rvd
h_angles = self.TTH_OFFSETS1
VDtth = []
for n in range(len(h_angles)):
VDtth.append( np.arccos(np.cos(np.radians(v_angles[n]))*np.cos(np.radians(h_angles[n])))*180.0/np.pi)
# VU
v_angles = self.TTH_OFFSETS2 + rvu
h_angles = self.TTH_OFFSETS1
VUtth = []
for n in range(len(h_angles)):
VUtth.append( np.arccos(np.cos(np.radians(v_angles[n]))*np.cos(np.radians(h_angles[n])))*180.0/np.pi)
# VB
v_angles = self.TTH_OFFSETS2 + rvb
h_angles = self.TTH_OFFSETS1
VBtth = []
for n in range(len(h_angles)):
VBtth.append( np.arccos(np.cos(np.radians(v_angles[n]))*np.cos(np.radians(h_angles[n])))*180.0/np.pi)
# list of TTH values
tth = [VDtth, VUtth, VBtth, HRtth, HLtth, HBtth]
# list all TTH values in one long list ordered by the 'order'-keyword
for n in order:
self.tth.extend(tth[n])
[docs] def there_is_a_valid_roi_at( self,n ):
""" **there_is_a_valid_roi_at**
Checks if n is a valid ROI index.
Args:
n (int): Index to be checked.
Returns:
True, if n is a valid ROI index.
"""
return n<len(self.roi_obj.indices) and len(self.roi_obj.indices[n])
[docs] def get_pw_matrices( self, scan_numbers, method='pixel' ):
""" **get_pw_matrices**
Sums scans from pixelwise ROI integration for use in the pixel-wise ROI
refinement.
Args:
* scan_numbers (int, list): Integer or list of scan numbers to be added.
* method (str): Keyword describing how to return the data
(possible values are: 'pixel' for pixel-wise integration, 'column'
for column-wise summation, and 'row' for row-wise summation.
Returns:
* Raw data in pixel-wise format.
"""
# make scan_numbers iterable
if isinstance(scan_numbers,list):
scannums = scan_numbers
elif isinstance(scan_numbers,int):
scannums = [scan_numbers]
else:
print( 'Please provide keyword \'scan_numbers\' as integer or list of integers.' )
return
# make sure method is known
if not method in ['pixel', 'column', 'row']:
print('Unknown integration method. Use either \'pixel\', \'column\', or \'row\'')
return
# make sure scan exists
for scannum in scannums:
if not scannum in self.scan_numbers:
self.load_scan(scannum, direct=True, method=method)
# make sure raw_signals is existing
if not self.scans['Scan%03d' % scannums[0]].raw_signals:
self.get_raw_data( method=method )
# one scan only
if len(scannums)==1:
scanname = 'Scan%03d' % scannums[0]
raw_signals = self.scans[scanname].raw_signals # dict with raw_signals
monitor = self.scans[scanname].monitor
# normalize data
pw_matrices_norm = []
for key in sorted(raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
if method == 'pixel':
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]*raw_signals[key].shape[2]))
elif method in [ 'column' , 'row']:
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]))
else:
print('Method \''+method+'\' not supported, use either \'pixel\', \'column\', or \'row\'.')
return
for ii in range(raw_signals[key].shape[0]):
if method == 'pixel':
unrav_mat[ii,:] = raw_signals[key][ii,:,:].ravel() / monitor[ii]
else:
unrav_mat[ii,:] = raw_signals[key][ii,:] / monitor[ii]
pw_matrices_norm.append(unrav_mat)
# multiple scans to be added
else:
# start with first scan
scanname = 'Scan%03d' % scannums[0]
raw_signals = self.scans[scanname].raw_signals # dict with raw_signals
monitor = self.scans[scanname].monitor
# normalize data (first scan)
pw_matrices_norm = []
for key in sorted(raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
if method == 'pixel':
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]*raw_signals[key].shape[2]))
elif method in [ 'column' , 'row' ]:
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]))
else:
print('Method \''+method+'\' not supported, use either \'pixel\', \'column\', or \'row\'.')
return
for ii in range(raw_signals[key].shape[0]):
if method == 'pixel':
unrav_mat[ii,:] = raw_signals[key][ii,:,:].ravel() / monitor[ii]
else:
unrav_mat[ii,:] = raw_signals[key][ii,:] / monitor[ii]
pw_matrices_norm.append(unrav_mat)
# add all other scans
for ii in scannums[1:]:
scanname = 'Scan%03d' % ii
raw_signals = self.scans[scanname].raw_signals # dict with raw_signals
monitor = self.scans[scanname].monitor
for key,jj in zip(sorted(raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(raw_signals))):
if method == 'pixel':
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]*raw_signals[key].shape[2]))
elif method in [ 'column' , 'row']:
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]))
for ii in range(raw_signals[key].shape[0]):
if method == 'pixel':
unrav_mat[ii,:] = raw_signals[key][ii,:].ravel() / monitor[ii]
else:
unrav_mat[ii,:] = raw_signals[key][ii,:] / monitor[ii]
pw_matrices_norm[jj] += unrav_mat
return pw_matrices_norm
[docs] def get_compensation_factor( self, scan_number, method='sum', roi_number=None ):
""" **get_compensation_factor**
Calculates the compensation factor from a given elastic line scan:
- a pixel-wise center of mass for 'pixel' compensation.
- a slope (eV/mm) for row-by-row compensation.
- the center of mass for each ROI for no compensation.
Args:
scan_number (int): Scan number of elastic line scan to be used
for finding the compensation factors.
method (str): Keyword describing what kind of compensation
to be used. Can be \'sum\', \'row\', or \'pixel\'.
roi_number (int): ROI number (first ROI is Nr. 0) for which to
calculate the line-by-line compensation factor.
"""
# make sure a valid method is selected
if not method in ['sum', 'row', 'pixel']:
print( 'get_compensation_factor' )
print ( 'Method \''+method+'\' not supported, use either \'sum\', \'row\', or \'pixel\'.')
return
# make sure there is a ROI defined
if not self.roi_obj:
print( 'Please set a ROI object first.' )
return
# simple sum: find center of mass for each ROI
if method == 'sum':
# reset values
self.cenom_dict = {}
self.resolution_dict = {}
# get EDF-files and pw_data
self.load_scan(scan_number, direct=False, scan_type='elastic' )
scan_name = 'Scan%03d' % scan_number
self.scans[scan_name].get_raw_signals( self.roi_obj, method='sum' )
# find CENOM of each ROI
for key in sorted(self.scans[scan_name].raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
cofm = xrs_utilities.find_center_of_mass(self.scans[scan_name].energy,self.scans[scan_name].raw_signals[key])
self.cenom_dict[key] = cofm
# set compensation factor
self.comp_factor = 'sum'
# pixel-by-pixel: find center of mass for each pixel in each ROI
if method == 'pixel':
# reset values
self.cenom_dict = {}
self.resolution_dict = {}
# get EDF-files and pw_data
self.load_scan(scan_number, direct=False, scan_type='elastic' )
scan_name = 'Scan%03d' % scan_number
self.scans[scan_name].get_raw_signals( self.roi_obj, method='pixel' )
# find CENOM for each pixel of each ROI
for key in sorted(self.scans[scan_name].raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
self.cenom_dict[key] = np.zeros_like(self.roi_obj.red_rois[key][1],dtype='float')
for ii in range(self.cenom_dict[key].shape[0]):
for jj in range(self.cenom_dict[key].shape[1]):
cofm = xrs_utilities.find_center_of_mass(self.scans[scan_name].energy, self.scans[scan_name].raw_signals[key][:,ii,jj])
self.cenom_dict[key][ii,jj] = cofm
# set compensation factor
self.comp_factor = 'pixel'
# line-by-line: find a RIXS-type compensation factor
if method == 'row':
# reset values
self.cenom_dict = {}
self.resolution_dict = {}
# which ROI should the compensation factor be calculated for:
if not roi_number:
roi_number = int(raw_input("Which ROI to calculate the factor (Python counting)? "))
# get EDF-files and pw_data
self.load_scan(scan_number, direct=False, scan_type='elastic' )
scan_name = 'Scan%03d' % scan_number
self.scans[scan_name].get_raw_signals( self.roi_obj, method='row' )
# fit the response for each energy
plt.ion()
raw_signals = self.scans[scan_name].raw_signals['ROI%02d' % roi_number]
el_positions = np.zeros(raw_signals.shape[0] )
for ii in range(el_positions.shape[0]):
x = np.arange(raw_signals.shape[1])*self.PIXEL_SIZE+self.PIXEL_SIZE
y = raw_signals[ii,:]
try:
popt = optimize.curve_fit(math_functions.gauss_forcurvefit, x, y)[0]
g = math_functions.gauss_forcurvefit(x,popt[0],popt[1],popt[2])
el_positions[ii] = (popt[1])
except:
g = np.zeros_like(x)
el_positions[ii] = 0.0
plt.cla()
plt.plot(x,y,x,g)
plt.draw()
# fit the dispersion (eV/mm)
plt.cla()
# activate the zoom function already
thismanager = plt.get_current_fig_manager()
thismanager.toolbar.zoom()
# make sure the energy vector is ascending
sort = np.argsort(np.array(self.scans[scan_name].energy)*1e3)
energy = self.scans[scan_name].energy*1e3
center = el_positions[sort]
energy = energy[sort]
# make user zoom into fig to define fitting-range
plt.ion()
plt.plot(center,energy)
plt.xlabel('x_0 [mm]')
plt.ylabel('energy [eV]')
plt.draw()
raw_input('Zoom in and press enter to continue.')
limits = plt.axis()
inds1 = np.where(np.logical_and(center >= limits[0], center <= limits[1]))[0]
inds2 = np.where(np.logical_and(energy >= limits[2], energy <= limits[3]))[0]
# make sure to set the correct window (vertical and horizontal)
inds = []
for ind in inds1:
if ind in inds2:
inds.append(ind)
inds = np.array(inds)
# do the fit
fact = np.polyfit( center[inds], energy[inds], 1)
# plot the result
plt.cla()
plt.plot(center,energy,center,np.polyval(fact, center), center[inds], energy[inds])
plt.legend(['dispersion', 'fit', 'data-points used'])
# assign compensation factor
self.comp_factor = comp_factor = fact[0]
print (' >>>> The compensation factor is: %0.6f [eV/mm].' %self.comp_factor )
plt.ioff()
[docs]class Hydra_imaging(Hydra):
""" **Hydra_imaging**
"""
def __init__( self, path, SPECfname='hydra', EDFprefix='/edf/', EDFname='hydra_', \
EDFpostfix='.edf', en_column='sty', moni_column='izero' ):
Hydra.__init__(self, path, SPECfname='hydra', EDFprefix='/edf/', EDFname='hydra_', \
EDFpostfix='.edf', en_column='sty', moni_column='izero')
[docs] def load_scan( self, scan_numbers, scan_type='imaging', direct=True, scaling=None, method='column', scan_motor='STZ' ):
""" **load_scan**
Load a single or multiple scans.
Note:
When composing a spectrum later, scans of the same 'scan_type' will
be averaged over. Scans with scan type 'elastic' or 'long' in their
names are recognized and will be treated specially.
Args:
* scan_numbers (int or list): Integer or iterable of scan numbers to be loaded.
* scan_type (str): String describing the scan to be loaded (e.g. 'edge1' or 'K-edge').
* direct (boolean): Flag, 'True' if the EDF-files should be deleted after loading/integrating the scan.
"""
# make sure scannumbers are iterable (list)
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
# make sure there is a cenom_dict available if direct=True AND method='sum'
if direct and method=='pixel' and not self.cenom_dict:
print('Please run the get_compensation_factor method first for pixel-wise compensation.')
return
# keep track of the y- and z-axes
y_scale = np.array([])
z_scale = []
# go throught list of scan_numbers and load scans
for number in numbers:
# create a name for each scan
scan_name = 'Scan%03d' % number
# create an instance of the Scan class
scan = xrs_scans.Scan()
# load the scan
scan.load( self.path, self.SPECfname, self.EDFprefix, self.EDFname, self.EDFpostfix, number, \
direct=direct, roi_obj=self.roi_obj, scaling=scaling, scan_type=scan_type, \
en_column=self.en_column, moni_column=self.moni_column, method=method, cenom_dict=self.cenom_dict )
# keep track of scales
y_scale = scan.energy
z_scale.append(scan.motors[scan_motor])
# add it to the scans dict
self.scans[scan_name] = scan
# add the number to the list of scan numbers
if not number in self.scan_numbers:
self.scan_numbers.extend([number])
self.y_scale = y_scale
self.z_scale = np.sort(np.array(list(set(z_scale))))
[docs] def load_reference_scan(self, scan_number, scan_type='image_ref', direct=True, scaling=None,):
scan.load( self.path, self.SPECfname, self.EDFprefix, self.EDFname, self.EDFpostfix, number, \
direct=direct, roi_obj=self.roi_obj, scaling=scaling, scan_type=scan_type, \
en_column='stx', moni_column=self.moni_column, method=method, cenom_dict=self.cenom_dict )
[docs] def get_compensation_factor( self, el_scan_numbers, scan_motor='sty', plotting=False):
""" **get_compensation_factor**
Calculates the compensation factor for the case of imaging:
Args:
scan_number (int): Scan number of elastic line scan to be used
for finding the compensation factors.
method (str): Keyword describing what kind of compensation
to be used. Can be \'sum\', \'row\', or \'pixel\'.
roi_number (int): ROI number (first ROI is Nr. 0) for which to
calculate the line-by-line compensation factor.
"""
# make sure there is a ROI defined
if not self.roi_obj:
print( 'Please set a ROI object first.' )
return
# load the scans around the elastic line
self.load_scan( el_scan_numbers, method='column', direct=True, scan_type='elastic')
## parse energy scale and signals for each ROI
#energy = {} #np.zeros(len(range(440,589)))
#signals = {} #np.zeros(len(range(440,589)))
#for roi_key in sorted(self.roi_obj.red_rois):
# energy[roi_key] = np.zeros(len(el_scan_numbers))
# signals[roi_key] = np.zeros(len(el_scan_numbers))
# for scan_number, step in zip(el_scan_numbers, range(len(el_scan_numbers))):
# scan_name = 'Scan%03d'%scan_number
# energy[roi_key][step] = self.scans[scan_name].motors['energy']
# signals[roi_key][step] = np.sum(self.scans[scan_name].raw_signals[roi_key])
# parse energy scale
energy = np.array([])
for scan_key in sorted(self.scans, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
energy = np.append(energy, self.scans[scan_key].motors['energy'])
energy = np.array(list(set(energy)))
# parse signals
signals = {}
for roi_key, (pos,M) in sorted(self.roi_obj.red_rois.items()):
signals[roi_key] = np.zeros( (len(energy), ) )
for en,estep in zip(energy, range(len(energy))):
for scan_key in sorted(self.scans, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
if self.scans[scan_key].motors['energy'] == en:
signals[roi_key][estep] += np.sum(self.scans[scan_key].raw_signals[roi_key])
# get centers of mass for each ROI and save in self.cenom_dict
for key in signals:
inds = np.argsort(energy)
cofm = xrs_utilities.find_center_of_mass(energy[inds],signals[key][inds])
self.cenom_dict[key] = cofm
if plotting:
plt.cla()
plt.plot(energy[inds],signals[key][inds],'b-')
plt.plot([cofm, cofm], [np.amin(signals[key]), np.amax(signals[key])], 'k-')
plt.xlabel('energy [keV]')
plt.ylabel('intensity [arb. units]')
plt.title('CENOM for %s'%(key))
plt.waitforbuttonpress()
self.E0 = np.mean( [self.cenom_dict[key] for key in self.cenom_dict if self.cenom_dict[key]>0.0] )
[docs] def interpolate_scans(self, step_motor='STZ'):
""" **compensate_scans**
Interpolate signals onto common energy-loss grid.
"""
# define master energy-loss scale
energy = np.array([])
for scan_key in sorted(self.scans, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
energy = np.append(energy, self.scans[scan_key].motors['energy'])
energy = np.sort(np.array(list(set(energy))))
master_eloss = ( energy - self.E0 )*1.0e3
self.eloss = master_eloss
# define master sty scale
scan_scale_1 = self.scans[scan_key].energy
# define master stz scale
dmy = []
for scan_key in self.scans:
dmy.append(self.scans[scan_key].motors[step_motor])
scan_scale_2 = np.array(list(set(dmy)))
# concatenate scans to volume
for roi_key, (pos,M) in sorted(self.roi_obj.red_rois.items()):
self.raw_signals[roi_key] = np.zeros( (len(energy), len(scan_scale_1), M.shape[1], len(scan_scale_2)) )
for en,estep in zip(energy, range(len(energy))):
for scan_key in sorted(self.scans, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
if self.scans[scan_key].motors['energy'] == en:
for zstep in range(len(scan_scale_2)):
if self.scans[scan_key].motors[step_motor] == scan_scale_2[zstep]:
self.raw_signals[roi_key][estep,:,:,zstep] = self.scans[scan_key].raw_signals[roi_key]
# interpolate everything onto master energy-loss scale
self.raw_signals_int = {}
for roi_key in sorted(self.raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
x = (energy - self.cenom_dict[roi_key])*1.0e3
y = self.raw_signals[roi_key]
f = interp1d(x, y, kind='linear', axis=0, bounds_error=False, fill_value=0.0)
self.raw_signals_int[roi_key] = f(master_eloss)
[docs]class Fourc:
"""Main class for handling RIXS data from ID20's high-resolution spectrometer 'Fourc'.
This class is intended to read SPEC- and according EDF-files and perform dispersion
compensations.
Note:
'Fourc' is the name of the high-energy-resolution spectrometer at ESRF's
ID20 beamline. This class has been adopted specifically for this spectrometer.
If you are using this program, please cite the following work:
Sahle, Ch J., A. Mirone, J. Niskanen, J. Inkinen, M. Krisch, and S. Huotari.
"Planning, performing and analyzing X-ray Raman scattering experiments."
Journal of Synchrotron Radiation 22, No. 2 (2015): 400-409.
Args:
* path (str): Absolute path to directory holding the data.
* SPECfname (str): Name of the SPEC-file ('rixs' is the default).
* EDFprefix (str): Prefix for the EDF-files ('/edf/' is the default).
* EDFname (str): Filename of the EDF-files ('rixs_' is the default).
* EDFpostfix (str): Postfix for the EDF-files ('.edf' is the default).
* en_column (str): Counter mnemonic for the energy motor ('energy' is the default).
* moni_column (str): Mnemonic for the monitor counter ('izero' is the default).
* EinCoor (list): Coordinates, where to find the incident energy value in the
SPEC-file (default is [9,0])
Attributes:
* path (str): Absolute path to directory holding the data.
* SPECfname (str): Name of the SPEC-file ('hydra' is the default).
* EDFprefix (str): Prefix for the EDF-files ('/edf/' is the default).
* EDFname (str): Filename of the EDF-files ('hydra_' is the default).
* EDFpostfix (str): Postfix for the EDF-files ('.edf' is the default).
* en1_column (str): Counter mnemonic for the energy motor ('anal energy' is the default).
* en2_column (str): Counter mnemonic for the energy motor ('energy' is the default).
* moni_column (str): Mnemonic for the monitor counter ('izero' is the default).
* EinCoor (list): Coordinates, where to find the incident energy value in the
SPEC-file (default is [9,0])
* scans (dict): Dictionary holding all loaded scans.
* scan_numbers (list): List with scan number of all loaded scans.
* energy (np.array): Array with the common energy scale.
* energy2 (np.array): Array with the common energy2 scale.
* signals (np.array): Array with the signals for all analyzers (one column per anayzer).
* errors (np.array): Array with the poisson errors for all analyzers (one column per anayzer).
* groups (dict): Dictionary of groups of scans (instances of the 'scangroup' class,
such as 2 'elastic', or 5 'edge1', etc.).
* tth (list): List of all scattering angles (one value for each ROI).
* resolution (list): List of FWHM of the elastic lines (one for each analyzer).
* roi_obj (instance): Instance of the roi_object class from the x0.055 # pixel size in mmrs_rois module defining all
ROIs for the current dataset (default is 'None').
* comp_factor (float): Compensation factor used for the dispersion correction.
* PIXEL_SIZE (float): Pixel size of the used Maxipix detector (in mm).
"""
def __init__( self, path, SPECfname='rixs', EDFprefix='/edf/', EDFname='rixs_', \
EDFpostfix='.edf', moni_column='izero', EinCoor='energy' ):
self.path = path
self.SPECfname = SPECfname
if not os.path.isfile( path + SPECfname ):
raise Exception( 'IOError! No such file or directory.' )
self.EDFprefix = EDFprefix
self.EDFname = EDFname
self.EDFpostfix = EDFpostfix
#self.en1_column = en1_column.lower()
#self.en2_column = en2_column.lower()
self.en_column = None
self.moni_column = moni_column.lower()
self.EinCoor = EinCoor
self.scans = {}
self.scan_numbers = []
self.energy = []
self.energy2 = []
self.signals = []
self.errors = []
self.groups = {}
self.tth = []
self.resolution = []
self.cenom_dict = {}
self.raw_signals = {}
self.raw_errors = {}
self.roi_obj = []
self.comp_factor = 0.0
self.comp_type = None
self.PIXEL_SIZE = 0.055 # pixel size in mm
print_citation_message( )
[docs] def SumDirect( self,scan_numbers , clean_edf_stack=False ):
""" **SumDirect**
Creates a summed 2D image of a given scan or list of scans.
Args:
scan_numbers (int or list): Scan number or list of scan numbers to be added up.
Returns:
A 2D np.array of the same size as the detector with the summed image.
"""
# make sure scannumbers are iterable (list)
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
# sum up all EDF-images from the given scans
im_sum = None
en_column = None # uses first column in SPEC file as scanned motor
for number in numbers:
scan = xrs_scans.Scan()
scan.load(self.path, self.SPECfname, self.EDFprefix, self.EDFname, self.EDFpostfix, number, \
direct=False, roi_obj=None, scaling=None, scan_type='generic', \
en_column=en_column, moni_column=self.moni_column, clean_edf_stack=clean_edf_stack)
if im_sum is None:
im_sum = np.zeros(scan.edfmats[0].shape ,"f")
im_sum[:] += scan.edfmats.sum(axis=0)
return im_sum
[docs] def set_roiObj( self, roiobj ):
""" **set_roiObj**
Assign an instance of the 'roi_object' class to the current data set.
Args:
roiobj (instance): Instance of the 'roi_object' class holding all
information about the definition of the ROIs.
"""
self.roi_obj = roiobj
[docs] def load_scan( self, scan_numbers, direct=True, comp_factor=None, scan_type='generic', scaling=None, method='sum', rot_angles=None, clean_edf_stack=False ):
""" **load_scan**
Loads given scans and applies the dispersion compensation.
Args:
scan_numbers (int or list): Scan number(s) of scans to be loaded.
direct (boolean): Flag, if set to 'True', EDF-files are
deleted after loading the scan (this is the default).
comp_factor (float): Compensation factor to be used. If 'None',
the global compensation factor will be used. If provided, the global
compensation factor will be overwritten.
scan_type (str): String describing the scan to be loaded.
Note:
If a compensation factor is passed to this function, the classes 'globel'
compensation factor is overwritten.
"""
# make sure scan_numbers are iterable
if not isinstance(scan_numbers,list):
numbers = []
numbers.append(scan_numbers)
else:
numbers = scan_numbers
# load scan/scans
for number in numbers:
# create a name for each scan
scan_name = 'Scan%03d' % number
# create an instance of the Scan class
scan = xrs_scans.Scan()
# load scan, first column in SPEC file will be scanned motor
scan.load( self.path, self.SPECfname, self.EDFprefix, self.EDFname, \
self.EDFpostfix, number, direct=direct, roi_obj=self.roi_obj, \
scaling=scaling, scan_type=scan_type, en_column=self.en_column, \
moni_column=self.moni_column, method=method, cenom_dict=self.cenom_dict,\
comp_factor=comp_factor,rot_angles=rot_angles, clean_edf_stack=clean_edf_stack )
# assign one dictionary entry to each scan
self.scans[scan_name] = scan
if not number in self.scan_numbers:
self.scan_numbers.extend([number])
# save the incident energy
try:
self.scans[scan_name].Ein = scan.motors[self.EinCoor]
except:
self.scans[scan_name].Ein = None
[docs] def get_compensation_factor( self, scan_number, method='sum', roi_number=None, rot_angles=None ):
""" **get_compensation_factor**
Calculates the compensation factor from a given elastic line scan:
- a pixel-wise center of mass for 'pixel' compensation.
- a slope (eV/mm) for row-by-row compensation.
- the center of mass for each ROI for no compensation.
Args:
scan_number (int): Scan number of elastic line scan to be used
for finding the compensation factors.
method (str): Keyword describing what kind of compensation
to be used. Can be \'sum\', \'row\', or \'pixel\'.
roi_number (int): ROI number (first ROI is Nr. 0) for which to
calculate the line-by-line compensation factor.
"""
# make sure there is a ROI defined
if not self.roi_obj:
print( 'Please set a ROI object first.' )
return
# simple sum: find center of mass for each ROI
if method == 'sum':
# reset values
self.cenom_dict = {}
self.resolution_dict = {}
# get EDF-files and pw_data
elastic_scan = xrs_scans.Scan()
elastic_scan.load( self.path, self.SPECfname, self.EDFprefix, \
self.EDFname, self.EDFpostfix, scan_number, \
direct=False, scan_type='elastic', \
moni_column=self.moni_column, method='sum' )
elastic_scan.get_raw_signals( self.roi_obj, method='sum' )
# find CENOM of each ROI
for key in elastic_scan.raw_signals:
cofm = xrs_utilities.find_center_of_mass( elastic_scan.energy,elastic_scan.raw_signals[key] )
self.cenom_dict[key] = cofm
# set compensation factor
self.comp_type = 'sum'
# pixel-by-pixel: find center of mass for each pixel in each ROI
elif method == 'pixel':
# reset values
self.cenom_dict = {}
self.resolution_dict = {}
# get EDF-files and pw_data
elastic_scan = xrs_scans.Scan()
elastic_scan.load( self.path, self.SPECfname, self.EDFprefix, \
self.EDFname, self.EDFpostfix, scan_number, \
direct=False, scan_type='elastic', \
moni_column=self.moni_column, method='pixel' )
elastic_scan.get_raw_signals( self.roi_obj, method='pixel' )
# find CENOM for each pixel of each ROI
for key in sorted(elastic_scan.raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
self.cenom_dict[key] = np.zeros_like(self.roi_obj.red_rois[key][1])
for ii in range(self.cenom_dict[key].shape[0]):
for jj in range(self.cenom_dict[key].shape[1]):
cofm = xrs_utilities.find_center_of_mass(elastic_scan.energy, elastic_scan.raw_signals[key][:,ii,jj])
self.cenom_dict[key][ii,jj] = cofm
# set compensation factor
self.comp_type = 'pixel'
# line-by-line: find a RIXS-type compensation factor
elif method == 'row':
# reset values
self.cenom_dict = {}
self.resolution_dict = {}
self.comp_type = 'row'
# which ROI should the compensation factor be calculated for:
if not roi_number:
roi_number = int(raw_input("Which ROI to calculate the factor (Python counting)? "))
self.comp_factor = 0.0
# get EDF-files and pw_data
elastic_scan = xrs_scans.Scan()
elastic_scan.load( self.path, self.SPECfname, self.EDFprefix, \
self.EDFname, self.EDFpostfix, scan_number, \
direct=False, scan_type='elastic', \
moni_column=self.moni_column, method='row', rot_angles=rot_angles )
elastic_scan.get_raw_signals( self.roi_obj, method='row' )
# fit the response for each energy
plt.ion()
raw_signals = elastic_scan.raw_signals['ROI%02d' % roi_number]
el_positions = np.zeros(raw_signals.shape[0] )
for ii in range(el_positions.shape[0]):
x = np.arange(raw_signals.shape[1])*self.PIXEL_SIZE+self.PIXEL_SIZE
y = raw_signals[ii,:]
try:
p0 = [ np.amax(y), xrs_utilities.find_center_of_mass(x,y), xrs_utilities.fwhm(x,y)[1]]
popt = optimize.curve_fit(math_functions.gauss_forcurvefit, x, y, p0=p0)[0]
g = math_functions.gauss_forcurvefit(x,popt[0],popt[1],popt[2])
el_positions[ii] = (popt[1])
except:
g = np.zeros_like(x)
el_positions[ii] = 0.0
plt.cla()
plt.plot(x,y,x,g)
plt.draw()
# fit the dispersion (eV/mm)
plt.cla()
# activate the zoom function already
thismanager = plt.get_current_fig_manager()
thismanager.toolbar.zoom()
# make sure the energy vector is ascending
sort = np.argsort(np.array(elastic_scan.energy)*1e3)
energy = elastic_scan.energy*1e3
center = el_positions[sort]
energy = energy[sort]
# make user zoom into fig to define fitting-range
plt.ion()
plt.plot(center,energy)
plt.xlabel('x_0 [mm]')
plt.ylabel('energy [eV]')
plt.draw()
raw_input('Zoom in and press enter to continue.')
limits = plt.axis()
inds1 = np.where(np.logical_and(center >= limits[0], center <= limits[1]))[0]
inds2 = np.where(np.logical_and(energy >= limits[2], energy <= limits[3]))[0]
# make sure to set the correct window (vertical and horizontal)
inds = []
for ind in inds1:
if ind in inds2:
inds.append(ind)
inds = np.array(inds)
# do the fit
fact = np.polyfit( center[inds], energy[inds], 1)
# plot the result
plt.cla()
plt.plot(center,energy,center,np.polyval(fact, center), center[inds], energy[inds])
plt.legend(['dispersion', 'fit', 'data-points used'])
# assign compensation factor
self.comp_factor = fact[0]
print (' >>>> The compensation factor is: %0.6f [eV/mm].' %self.comp_factor )
plt.ioff()
# if method is unknown
else:
print ( 'Method \''+method+'\' not supported, use either \'sum\', \'row\', or \'pixel\'.')
return
[docs] def get_raw_data( self, method='sum', scaling=None ):
""" **get_raw_data**
Applies the ROIs to extract the raw signals from the
EDF-files.
"""
# make sure there are some ROIs set
if not self.roi_obj:
print ( 'Did not find a ROI object, please set one first.' )
return
# call the get_raw_signals function for each scan
for scan in self.scans:
if len(self.scans[scan].edfmats):
print ( "Integrating " + scan + " using method \'" + method + '\'.')
self.scans[scan].get_raw_signals( self.roi_obj , method=method, scaling=scaling )
[docs] def get_data( self, method='sum', scaling=None ):
""" **get_data**
Applies the ROIs to the EDF-files.
This extracts the raw-data from the EDF-files subject to three
different methods: 'sum' will simply sum up all pixels inside a
ROI, 'row' will sum up over the dispersive direction, and 'pixel'
will not sum at all.
Args:
method (str): Keyword specifying the selected choice of data treatment:
can be 'sum', 'row', or 'pixel'. Default is 'sum'.
scaling (list): Optional scaling factors (one per ROI) to scale
the data with.
"""
if not self.cenom_dict:
print ( 'No compensation factor/center-of-mass info found, please provide first.' )
return
for scan in self.scans:
# make sure there is raw data available
if not self.scans[scan].raw_signals:
self.get_raw_data( self, method=method, scaling=scaling )
# apply compensation / get signals
self.scans[scan].get_signals( method=method, cenom_dict=self.cenom_dict, comp_factor=self.comp_factor, scaling=scaling )
[docs] def get_XES_spectrum( self, method='sum', interpolation=False ):
""" **get_XES_spectrum**
Constructs an emission spectrum based on the loaded single spectra.
NOTE: this needs support for all methods ('sum', 'pixel', 'row')
"""
# cenom_dict for XES is zeros everywhere
if method == 'sum':
self.cenom_dict = {}
for key in self.roi_obj.red_rois:
self.cenom_dict[key] = 0.0
# make sure there is data available
for scan in self.scans:
if not np.any(self.scans[scan].signals):
self.get_data( method=method )
# find the groups
all_groups = xrs_scans.findgroups( self.scans )
for group in all_groups:
self.groups[group[0].get_type()] = xrs_scans.sum_scans_to_group( group, method=method, interp=interpolation )
if method == 'sum':
self.groups[group[0].get_type()].get_signals(method='sum', cenom_dict=self.cenom_dict )
self.energy, self.signals, self.errors = xrs_scans.get_XES_spectrum( self.groups )
[docs] def get_Ein_RIXS_map( self, scan_numbers, roi_number, logscaling=False, file_name=None):
""" **get_Ein_RIXS_map**
Returns a RIXS map that plots energy loss vs inciden energy for the
specified scans.
Args:
* scan_numbers (int or list): SPEC scan numbers to be deleted.
* logscaling (boolean): If true numbers are returned on logarithmic scale
(False by default)
* file_name (str): Absolute path, if map should also be written into
an ascii-file.
* roi_number (int): ROI to use for creating the RIXS map (Python counting).
"""
# make sure scan_numbers are iterable
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
# create the matrix
scanname = 'Scan%03d' % numbers[0]
pre_map = np.zeros( ( len(self.scans[scanname].energy), len(numbers) ) )
rixs_map = np.zeros( ( len(self.scans[scanname].energy), len(numbers) ) )
e_transfer = np.flipud(self.scans[scanname].energy - self.scans[scanname].Ein)*1e3
# save the incident energies
e_incident = []
# fill the matrix scan-by-scan
for number,ii in zip(numbers, range(len(numbers))):
scanname = 'Scan%03d' % number
e_incident.append(self.scans[scanname].Ein)
pre_map[:,ii] = np.interp( e_transfer, np.flipud(self.scans[scanname].energy - self.scans[scanname].Ein)*1e3, np.flipud(self.scans[scanname].signals[:,roi_number]))
sort_inds = np.argsort(e_incident)
e_incident_sort = []
for ii in range(pre_map.shape[1]):
rixs_map[:,ii] = pre_map[:,sort_inds[ii]]
e_incident_sort.append(e_incident[sort_inds[ii]])
return np.array(rixs_map), np.array(e_incident_sort), -np.flipud(np.array(e_transfer))
[docs] def delete_scan( self, scan_numbers ):
""" **delete_scan**
Deletes scans of given scan numbers.
Args:
scan_numbers (int or list): SPEC scan numbers to be used for the RIXS map.
"""
# make sure scan_numbers are iterable
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
# delete scans
for number in numbers:
scanname = 'Scan%03d' % number
del(self.scans[scanname])
self.scan_numbers.remove(number)
[docs] def dump_scans_ascii( self, scan_numbers, pre_fix, f_name, post_fix='.dat', header='' ):
""" **dum_scans_ascii**
Produce ASCII-type files with columns of energy, signal, and Poisson error.
Args:
scan_numbers (int or list): SPEC scan numbers of scans to be safed in ASCII format.
pre_fix (str): Path to directory where files should be written into.
f_name (str): Base name for the files to be written.
post_fix (str): Extention for the files to be written (default is '.dat').
"""
# make sure scan_numbers are iterable
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
for number in numbers:
scan_name = 'Scan%03d' % number
if not scan_name in self.scans.keys():
print ('Scan No. %03d is currently not loaded.'% number)
return
x = self.scans[scan_name].energy
y = self.scans[scan_name].signals
z = self.scans[scan_name].errors
data = np.zeros((x.shape[0], y.shape[1]+z.shape[1]+1 ))
data[:,0] = x
data[:,1:1+y.shape[1]] = y
data[:,1+y.shape[1]:1+y.shape[1]+z.shape[1]] = z
file_name = pre_fix + f_name + '_' + scan_name + post_fix
if len(header) == 0:
header = ' Ein = ' + str(self.scans[scan_name].Ein) + ' keV'
np.savetxt(file_name, data, header=header)
[docs] def copy_edf_files( self, scan_numbers, dest_dir ):
""" **copy_edf_files**
Copies all EDF-files from given scan_numbers into given directory.
Args:
* scan_numbers (int or list) = Integer or list of integers defining the scan
numbers of scans to be copied.
* dest_dir (str) = String with absolute path for the destination.
"""
import shutil
# make sure scan_numbers is iterable
numbers = []
if not isinstance(scan_numbers,list):
numbers.append(scan_numbers)
else:
numbers = scan_numbers
fname = self.path + self.SPECfname
# find EDF-file names and copy them
for nscan in numbers:
if use_PyMca:
data, motors, counters = xrs_fileIO.PyMcaSpecRead(fname,nscan)
else:
data, motors, counters = xrs_fileIO.SpecRead(fname,nscan)
for m in range(len(counters['ccdno'])):
ccdnumber = counters['ccdno'][m]
edfname = self.path + self.EDFprefix + self.EDFname + "%04d" % ccdnumber + self.EDFpostfix
shutil.copy2( edfname, dest_dir )
[docs] def get_pw_matrices( self, scan_numbers, method='pixel' ):
""" **get_pw_matrices**
Sums scans from pixelwise ROI integration for use in the pixel-wise ROI
refinement.
Args:
* scan_numbers (int, list): Integer or list of scan numbers to be added.
* method (str): Keyword describing how to return the data
(possible values are: 'pixel' for pixel-wise integration, 'column'
for column-wise summation, and 'row' for row-wise summation.
Returns:
* Raw data in pixel-wise format.
"""
# make scan_numbers iterable
if isinstance(scan_numbers,list):
scannums = scan_numbers
elif isinstance(scan_numbers,int):
scannums = [scan_numbers]
else:
print( 'Please provide keyword \'scan_numbers\' as integer or list of integers.' )
return
# make sure method is known
if not method in ['pixel', 'column', 'row']:
print('Unknown integration method. Use either \'pixel\', \'column\', or \'row\'')
return
# make sure scan exists
for scannum in scannums:
if not scannum in self.scan_numbers:
self.load_scan(scannum, direct=False, method=method)
# make sure raw_signals is existing
if not self.scans['Scan%03d' % scannums[0]].raw_signals:
self.get_raw_data( method=method )
# one scan only
if len(scannums)==1:
scanname = 'Scan%03d' % scannums[0]
raw_signals = self.scans[scanname].raw_signals # dict with raw_signals
monitor = self.scans[scanname].monitor
# normalize data
pw_matrices_norm = []
for key in sorted(raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
if method == 'pixel':
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]*raw_signals[key].shape[2]))
elif method in [ 'column' , 'row']:
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]))
else:
print('Method \''+method+'\' not supported, use either \'pixel\', \'column\', or \'row\'.')
return
for ii in range(raw_signals[key].shape[0]):
if method == 'pixel':
unrav_mat[ii,:] = raw_signals[key][ii,:,:].ravel() / monitor[ii]
else:
unrav_mat[ii,:] = raw_signals[key][ii,:] / monitor[ii]
pw_matrices_norm.append(unrav_mat)
# multiple scans to be added
else:
# start with first scan
scanname = 'Scan%03d' % scannums[0]
raw_signals = self.scans[scanname].raw_signals # dict with raw_signals
monitor = self.scans[scanname].monitor
# normalize data (first scan)
pw_matrices_norm = []
for key in sorted(raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ):
if method == 'pixel':
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]*raw_signals[key].shape[2]))
elif method in [ 'column' , 'row']:
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]))
else:
print('Method \''+method+'\' not supported, use either \'pixel\', \'column\', or \'row\'.')
return
for ii in range(raw_signals[key].shape[0]):
if method == 'pixel':
unrav_mat[ii,:] = raw_signals[key][ii,:,:].ravel() / monitor[ii]
else:
unrav_mat[ii,:] = raw_signals[key][ii,:] / monitor[ii]
pw_matrices_norm.append(unrav_mat)
# add all other scans
for ii in scannums[1:]:
scanname = 'Scan%03d' % ii
raw_signals = self.scans[scanname].raw_signals # dict with raw_signals
monitor = self.scans[scanname].monitor
for key,jj in zip(sorted(raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(raw_signals))):
if method == 'pixel':
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]*raw_signals[key].shape[2]))
elif method in [ 'column' ,'row'] :
unrav_mat = np.zeros((raw_signals[key].shape[0], raw_signals[key].shape[1]))
for ii in range(raw_signals[key].shape[0]):
if method == 'pixel':
unrav_mat[ii,:] = raw_signals[key][ii,:].ravel() / monitor[ii]
else:
unrav_mat[ii,:] = raw_signals[key][ii,:] / monitor[ii]
pw_matrices_norm[jj] += unrav_mat
return pw_matrices_norm
[docs]class read_id20:
"""
Main class for handling raw data from XRS experiments on ESRF's ID20. This class
is used to read scans from SPEC files and the according EDF-files, it provides access
to all tools from the xrs_rois module for defining ROIs, it can be used to integrate
scans, sum them up, stitch them together, and define the energy loss scale.
INPUT:
* absfilename = path and filename of the SPEC-file
* energycolumn = name (string) of the counter for the energy as defined in the SPEC session (counter mnemonic)
* monitorcolumn = name (string) of the counter for the monitor signals as defined in the SPEC session (counter mnemonic)
* edfName = name/prefix (string) of the EDF-files (default is the same as the SPEC-file name)
* single_image = boolean switch, 'True' (default) if all 6 detectors are merged in a single image,
'False' if two detector images per point exist.
"""
def __init__(self,absfilename,energycolumn='energy',monitorcolumn='kap4dio',edfName=None,single_image=True):
self.scans = {} # was a dictionary before
if absfilename is not None:
if not os.path.isfile(absfilename):
raise Exception('IOError! No such file %s, please check filename.' % absfilename )
self.path = os.path.split(absfilename)[0] + '/'
self.filename = os.path.split(absfilename)[1]
if not edfName:
self.edfName = os.path.split(absfilename)[1]
else:
self.edfName = edfName
self.single_image = single_image
self.scannumbers = []
self.EDF_PREFIXh = 'edf/h_'
self.EDF_PREFIXv = 'edf/v_'
self.EDF_PREFIX = 'edf/'
self.EDF_POSTFIX = '.edf'
self.DET_PIXEL_NUMx = 768
self.DET_PIXEL_NUMy = 512
self.DET_PIXEL_NUM = 256
# which column in the SPEC file to be used for the energy and monitor
self.encolumn = energycolumn.lower()
self.monicolumn = monitorcolumn.lower()
# here are the attributes of the old rawdata class
self.eloss = [] # common eloss scale for all analyzers
self.energy = [] # common energy scale for all analyzers
self.signals = [] # signals for all analyzers
self.errors = [] # poisson errors
self.qvalues = [] # for all analyzers
self.groups = {} # dictionary of groups (such as 2 'elastic', or 5 'edge1', etc.)
self.cenom = [] # list of center of masses of the elastic lines
self.E0 = [] # energy value, mean value of self.cenom
self.tth = [] # list of scattering angles (one for each ROI)
self.VDtth = []
self.VUtth = []
self.VBtth = []
self.HRtth = []
self.HLtth = []
self.HBtth = []
self.resolution = [] # list of FWHM of the elastic lines for each analyzer
self.signals_orig = [] # signals for all analyzers before interpolation
self.errors_orig = [] # errors for all analyzers before interpolation
# TTH offsets from center of V and H modules
# tth offsets in one direction (horizontal for V-boxes, vertical for H-boxes)
self.TTH_OFFSETS1 = np.array([5.0, 0.0, -5.0, 5.0, 0.0, -5.0, 5.0, 0.0, -5.0, 5.0, 0.0, -5.0])
# tth offsets in the other direction (horizontal for H-boxes, vertical for V-boxes)
self.TTH_OFFSETS2 = np.array([-9.71, -9.75, -9.71, -3.24, -3.25, -3.24, 3.24, 3.25, 3.24, 9.71, 9.75, 9.71])
# input
self.roi_obj = [] # an instance of the roi_object class from the xrs_rois module (new)
[docs] def save_state_hdf5(self, filename, groupname, comment=""):
import h5py
h5 = h5py.File(filename,"a")
h5.require_group(groupname)
h5group = h5[groupname]
if( "eloss" in h5group.keys() ):
raise Exception(" Read data already present in " + filename+":"+groupname)
for key in [
"eloss",
"energy",
"signals",
"errors",
"qvalues",
########### "groups",
"cenom",
"E0",
"tth",
"VDtth",
"VUtth",
"VBtth",
"HRtth",
"HLtth",
"HBtth",
"resolution",
"signals_orig",
"errors_orig"
]:
h5group[key] = getattr(self,key)
h5group["comment"] = comment
h5.flush()
h5.close()
[docs] def load_state_hdf5(self, filename, groupname):
import h5py
print ( "filename " , filename )
print ( "groupname " , groupname )
h5 = h5py.File(filename,"r")
h5group = h5[groupname]
chiavi = {"eloss":array,
"energy":array,
"signals":array,
"errors":array,
"qvalues":array,
########### "groups":array,
"cenom":array,
"E0":float,
"tth":array,
"VDtth":array,
"VUtth":array,
"VBtth":array,
"HRtth":array,
"HLtth":array,
"HBtth":array,
"resolution":array,
"signals_orig":array,
"errors_orig":array
}
for key in chiavi:
# print key
setattr(self,key,chiavi[key]( array(h5group[key])))
h5.flush()
h5.close()
[docs] def save_state(self):
d={}
for key in [
"eloss",
"energy",
"signals",
"errors",
"qvalues",
"groups",
"cenom",
"E0",
"tth",
"VDtth",
"VUtth",
"VBtth",
"HRtth",
"HLtth",
"HBtth",
"resolution",
"signals_orig",
"errors_orig"
]:
d[key] = getattr(self,key)
f=open("datas.pick","w")
pickle.dump(d,f)
f.close()
[docs] def there_is_a_valid_roi_at(self,n):
return n<len(self.roi_obj.indices) and len(self.roi_obj.indices[n])
[docs] def read_just_first_scanimage(self,scannumber):
fn = self.path + self.filename
if use_PyMca == True:
data, motors, counters, lables = xrs_fileIO.PyMcaSpecRead_my(fn,scannumber)
else:
data, motors, counters = xrs_fileIO.SpecRead(fn,scannumber)
edfmat = xrs_fileIO.ReadEdf_justFirstImage(counters['ccdno'],
self.path,
self.EDF_PREFIX,
self.edfName, self.EDF_POSTFIX)
return edfmat
[docs] def readscan_new(self,scannumber,fromtofile=False):
"""
Returns the data, motors, counter-names, and edf-files from the SPEC file defined when
the xrs_read object was initiated.
There should be an alternative that uses the PyMca module if installed.
INPUT:
* scannumber = number of the scan to be loaded
* fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
"""
# first see if scan can be loaded from npz-file
if fromtofile:
scanname = 'Scan%03d' % scannumber
sub_path = os.path.split(self.path[:-1])[0]
fname = sub_path + '/scans/' + scanname + '.npz'
return ReadScanFromFile(fname)
else:
print( 'Parsing EDF- and SPEC-files of scan No. %s' % scannumber )
# load SPEC-file
fn = self.path + self.filename
if use_PyMca == True:
data, motors, counters, lables = xrs_fileIO.PyMcaSpecRead_my(fn,scannumber)
else:
data, motors, counters = xrs_fileIO.SpecRead(fn,scannumber)
# load EDF-files
if not self.single_image:
edfmats = xrs_fileIO.ReadEdfImages_TwoImages(counters['ccdno'], self.DET_PIXEL_NUMx, self.DET_PIXEL_NUMy, self.path, self.EDF_PREFIXh, self.EDF_PREFIXv, self.edfName, self.EDF_POSTFIX)
else:
edfmats = xrs_fileIO.ReadEdfImages(counters['ccdno'], self.DET_PIXEL_NUMx, self.DET_PIXEL_NUMy, self.path, self.EDF_PREFIX, self.edfName, self.EDF_POSTFIX)
# add the scannumber to self.scannumbers, if not already present
if not scannumber in self.scannumbers:
self.scannumbers.extend([scannumber])
# store scan in numpy zip-archive, if desired
if fromtofile:
scanname = 'Scan%03d' % scannumber
sub_path = os.path.split(self.path[:-1])[0]
fname = sub_path + '/scans/' + scanname
xrs_fileIO.WriteScanToFile(fname,data,motors,counters,edfmats)
return data, motors, counters, edfmats #<type 'list'> <type 'list'> <type 'dict'> <type 'numpy.ndarray'
[docs] def readscan(self,scannumber,fromtofile=False):
"""
Returns the data, motors, counter-names, and edf-files from the SPEC file defined when
the xrs_read object was initiated.
There should be an alternative that uses the PyMca module if installed.
INPUT:
* scannumber = number of the scan to be loaded
* fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
"""
# first see if scan can be loaded from npz-file
if fromtofile:
try:
print( 'Trying to load scan from file.' )
scanname = 'Scan%03d' % scannumber
sub_path = os.path.split(self.path[:-1])[0]
fname = sub_path + '/scans/' + scanname + '.npz'
scan = np.load(fname)
data = list(scan['data'])
motors = list(scan['motors'])
counters = scan['counters'].item()
edfmats = scan['edfmats']
return data, motors, counters, edfmats
except:
print( 'Failed loading scan from file, will read EDF- and SPEC-file.' )
pass
# proceed with parsing edf- and SPEC-files, if scan could not be loaded from zip-archive
if not fromtofile:
print( 'Parsing EDF- and SPEC-files of scan No. %s' % scannumber )
# load SPEC-file
fn = self.path + self.filename
# print( " READING ", fn, scannumber)
data, motors, counters = xrs_utilities.specread(fn,scannumber)
if not self.single_image:
# initiate arrays for the edf-files
edfmatsh = np.array(np.zeros((len(counters['ccdno']),self.DET_PIXEL_NUMy/2,self.DET_PIXEL_NUMx)))
edfmatsv = np.array(np.zeros((len(counters['ccdno']),self.DET_PIXEL_NUMy/2,self.DET_PIXEL_NUMx)))
edfmats = np.array(np.zeros((len(counters['ccdno']),self.DET_PIXEL_NUMy,self.DET_PIXEL_NUMx)))
# load edf-files
for m in range(len(counters['ccdno'])):
ccdnumber = counters['ccdno'][m]
edfnameh = self.path + self.EDF_PREFIXh + self.edfName + '_' + "%04d" % ccdnumber + self.EDF_POSTFIX
edfnamev = self.path + self.EDF_PREFIXv + self.edfName + '_' + "%04d" % ccdnumber + self.EDF_POSTFIX
edfmatsh[m,:,:] = (xrs_utilities.edfread_test(edfnameh))
edfmatsv[m,:,:] = (xrs_utilities.edfread_test(edfnamev))
edfmats[m,0:self.DET_PIXEL_NUMy/2,:] = edfmatsv[m,:,:]
edfmats[m,self.DET_PIXEL_NUMy/2:,:] = edfmatsh[m,:,:]
else:
# initiate arrays for the edf-files
#edfmats = np.array(np.zeros((len(counters['ccdno']),self.DET_PIXEL_NUMy,self.DET_PIXEL_NUMx)))
#edfmats = np.array([])
# load edf-files
# summa=0
for m in range(len(counters['ccdno'])):
ccdnumber = counters['ccdno'][m]
edfname = self.path + self.EDF_PREFIX + self.edfName + '_' + "%04d" % ccdnumber + self.EDF_POSTFIX
# print " LEGGO ", edfname
if m == 0:
edfshape = xrs_fileIO.EdfRead(edfname).shape
edfmats = np.zeros(( len(counters['ccdno']), edfshape[0] , edfshape[1] ))
edfmats[m,:,:] = (xrs_fileIO.EdfRead(edfname))
else:
edfmats[m,:,:] = (xrs_fileIO.EdfRead(edfname))
# summa=summa+xrs_fileIO.EdfRead(edfname)
#print " SHAPE ", edfmats.shape
#spia = edfmats.sum(axis=0)
#from pylab import *
#spia=summa
#imshow(log(spia))
#show()
# add the scannumber to self.scannumbers, if not already present
if not scannumber in self.scannumbers:
self.scannumbers.extend([scannumber])
# store scan in numpy zip-archive, if desired
if fromtofile:
scanname = 'Scan%03d' % scannumber
sub_path = os.path.split(self.path[:-1])[0]
fname = sub_path + '/scans/' + scanname
if os.path.exists(sub_path):
print( 'trying to save file in numpy-archive.' )
np.savez(fname, data=data, motors=motors, counters=counters, edfmats=edfmats)
else:
print( 'please create a folder ' + fname + ' first!' )
pass
return data, motors, counters, edfmats #<type 'list'> <type 'list'> <type 'dict'> <type 'numpy.ndarray'>
[docs] def loadscan(self,scannumbers,scantype='generic',fromtofile=False):
"""
Loads the files belonging to scan No. "scannumber" and puts it into an instance
of the xrs_scan-class 'scan'. The default scantype is 'generic', later the scans
will be grouped (and added) based on the scantype.
INPUT:
* scannumbers = integer or list of scannumbers that should be loaded
* scantype = string describing the scan to be loaded (e.g. 'edge1' or 'K-edge')
* fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
"""
# make sure scannumbers are iterable (list)
if not isinstance(scannumbers,list):
scannums = []
scannums.append(scannumbers)
else:
scannums = scannumbers
for number in scannums:
scanname = 'Scan%03d' % number
data, motors, counters, edfmats = self.readscan(number,fromtofile)
# can assign some things here already (even if maybe redundant)
monitor = counters[self.monicolumn]
monoangle = 1 # have to check for this later ... !!! counters['pmonoa']
energy = counters[self.encolumn]
# create an instance of "scan" class for every scan
onescan = xrs_scans.scan(edfmats,number,energy,monitor,counters,motors,data,scantype)
# assign one dictionary entry to each scan
self.scans[scanname] = onescan
[docs] def loadloop(self,begnums,numofregions,fromtofile=False):
"""
Loads a whole loop of scans based on their starting scannumbers and the number of single
scans in the loop.
INPUT:
* begnums = list of scannumbers of the first scans of each loop (is a list)
* numofregions = number of scans in each loop (integer)
"""
typenames = []
for n in range(numofregions):
typenames.append('edge'+str(n+1))
numbers = []
for n in range(len(begnums)):
for m in range(numofregions):
numbers.append((begnums[n]+m))
typenames = typenames*len(begnums)
for n in range(len(typenames)):
thenumber = []
thenumber.append(numbers[n]) # make the scannumber into an interable list, sorry it's messy
self.loadscan(thenumber,typenames[n],fromtofile)
[docs] def loadelastic(self,scann,fromtofile=False):
"""
Loads a scan using the loadscan function and sets the scantype attribute to 'elastic'.
I.e. shorthand for 'obj.loadscan(scannumber,type='elastic')'.
INPUT:
* scann = integer or list of integers
* fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
"""
self.loadscan(scann,'elastic',fromtofile)
[docs] def loadlong(self,scann,fromtofile=False):
"""
Loads a scan using the loadscan function and sets the scantype attribute to 'long'.
I.e. shorthand for 'obj.loadscan(scannumber,type='long')'.
INPUT:
* scann = integer or list of integers
* fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
"""
self.loadscan(scann,'long',fromtofile)
[docs] def set_roiObj(self,roiobj):
self.roi_obj = roiobj
[docs] def orderrois(self,arrangement='vertical',missing=None):
"""
order the rois in an order provided such that e.g. autorois have the correct order
"""
if not self.roi_obj:
print( 'Please select some ROIs first!' )
return
# get some detector infos
all_det_names = ['VD','VU','VB','HR','HL','HB']
all_dets = []
for name in all_det_names:
all_dets.append(xrs_utilities.maxipix_det(name,arrangement))
# find the ROIs for each detector
# go through all detectors and ROIs defined and see which one has centers in which detector
for det in all_dets:
det_x_min = det.get_pixel_range()[0]
det_x_max = det.get_pixel_range()[1]
det_y_min = det.get_pixel_range()[2]
det_y_max = det.get_pixel_range()[3]
for ii in range(len(self.roi_obj.indices)):
x_mean = np.mean(self.roi_obj.x_indices[ii])
y_mean = np.mean(self.roi_obj.y_indices[ii])
if x_mean >= det_x_min and x_mean <= det_x_max and y_mean >= det_y_min and y_mean <= det_y_max:
det.roi_indices.append(self.roi_obj.indices[ii])
det.roi_x_indices.append(self.roi_obj.x_indices[ii])
det.roi_y_indices.append(self.roi_obj.y_indices[ii])
det.roi_x_means.append(x_mean)
det.roi_y_means.append(y_mean)
# count, check with missing, break if something is wrong
for det in all_dets:
if not len(det.roi_indices) == 12:
print( 'WARNING! Module ' + det.name + ' only has ' + '%d' % len(det.roi_indices) + ' ROIs defined, your numbering will be messed up!' )
# rearrange according to 'arrangement' keyword
if arrangement == 'vertical':
verticalIndex = [0,3,6,9,1,4,7,10,2,5,8,11]
for det in all_dets:
# order from low to high y-mean
roi_coords = np.array(det.roi_indices)
roi_y_means = np.array(det.roi_x_means)
sorting_inds = roi_y_means.argsort()
roi_coords_increasing = [roi_coords[i] for i in sorting_inds]
if det.get_det_name() in ['VD','VU','VB']:
# sort from high to low y-center
det.roi_indices = [roi_coords_increasing[i] for i in verticalIndex[::-1]]
elif det.get_det_name() in ['HR','HL','HB']:
# sort from low to high y-center
det.roi_indices = [roi_coords_increasing[i] for i in verticalIndex]
else:
print( 'Sorry, no such module.' )
# reassign all roi_obj variables...
allrois = []
for det in all_dets:
allrois.extend(det.roi_indices)
self.roi_obj.indices = allrois
self.roi_obj.roi_matrix = xrs_rois.convert_inds_to_matrix(self.roi_obj.indices,(512,768))
self.roi_obj.red_rois = xrs_rois.convert_matrix_to_redmatrix(self.roi_obj.roi_matrix)
self.roi_obj.x_indices = xrs_rois.convert_inds_to_xinds(self.roi_obj.indices)
self.roi_obj.y_indices = xrs_rois.convert_inds_to_yinds(self.roi_obj.indices)
self.roi_obj.masks = xrs_rois.convert_roi_matrix_to_masks(self.roi_obj.roi_matrix)
self.roi_obj.number_of_rois = np.amax(self.roi_obj.roi_matrix)
[docs] def getrawdata(self):
"""
Goes through all instances of the scan class and calls it's applyrois method
to sum up over all rois.
"""
if not np.any(self.roi_obj.indices):
print( 'Please define some ROIs first.' )
return
for scan in self.scans:
if len(self.scans[scan].edfmats):
print ("integrating "+scan)
self.scans[scan].applyrois(self.roi_obj.indices)
[docs] def getrawdata_pixelwise(self):
"""
Goes through all instances of the scan class and calls it's applyrois_pw method
to extract intensities for all pixels in each ROI.
"""
if not np.any(self.roi_obj.indices):
print( 'Please define some ROIs first.' )
return
for scan in self.scans:
if len(self.scans[scan].edfmats):
print ("integrating pixelwise "+scan)
self.scans[scan].applyrois_pw(self.roi_obj.indices)
[docs] def SumDirect(self,scannumbers):
sum=None
for number in scannumbers:
data, motors, counters, edfmats = self.readscan(number)
if sum is None:
sum = np.zeros(edfmats[0].shape ,"f")
sum[:] += edfmats.sum(axis=0)
return sum
[docs] def loadscandirect(self,scannumbers,scantype='generic',fromtofile=False,scaling=None):
"""
Loads a scan without saving the edf files in matrices.
scannumbers = integer or list of integers defining the scannumbers from the SPEC file
scantype = string describing the scan to be loaded (e.g. 'edge1' or 'K-edge')
fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
scaling = list of scaling factors to be applied, one for each ROI defined
"""
# make sure scannumbers are iterable (list)
if not isinstance(scannumbers,list):
scannums = []
scannums.append(scannumbers)
else:
scannums = scannumbers
# check if there are ROIs defined
if not self.roi_obj:
print( 'Please define some ROIs first' )
return
for number in scannums:
scanname = 'Scan%03d' % number
data, motors, counters, edfmats = self.readscan(number,fromtofile)
# can assign some things here already (even if maybe redundant)
monitor = counters[self.monicolumn]
monoangle = 1 # counters['pmonoa'] # this still needs checking
energy = counters[self.encolumn]
# create an instance of "scan" class for every scan
onescan = xrs_scans.scan(edfmats,number,energy,monitor,counters,motors,data,scantype)
onescan.applyrois(self.roi_obj.indices,scaling=scaling)
print( 'Deleting -- EDF-files of Scan No. %03d' % number )
onescan.edfmats = [] # delete the edfmats
self.scans[scanname] = onescan
[docs] def loadloopdirect(self,begnums,numofregions,fromtofile=False,scaling=None):
"""
Loads a whole loop of scans based on their starting scannumbers and the number of single
INPUT:
* begnums = list of scannumbers of the first scans of each loop (is a list)
* numofregions = number of scans in each loop (integer)
* fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
* scaling = list of scaling factors to be applied, one for each ROI defined
"""
typenames = []
for n in range(numofregions):
typenames.append('edge'+str(n+1))
numbers = []
for n in range(len(begnums)):
for m in range(numofregions):
numbers.append((begnums[n]+m))
typenames = typenames*len(begnums)
for n in range(len(typenames)):
thenumber = []
thenumber.append(numbers[n]) # make the scannumber into an interable list, sorry it's messy
self.loadscandirect(thenumber,typenames[n],fromtofile,scaling=scaling)
[docs] def loadelasticdirect(self,scann,fromtofile=False):
"""
Loads a scan using the loadscan function and sets the scantype attribute to 'elastic'.
I.e. shorthand for 'obj.loadscan(scannumber,type='elastic')'.
INPUT:
* scann = integer or list of integers
* fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
"""
self.loadscandirect(scann,'elastic',fromtofile)
[docs] def loadlongdirect(self,scann,fromtofile=False,scaling=None):
"""
Loads a scan using the loadscan function and sets the scantype attribute to 'long'.
I.e. shorthand for 'obj.loadscan(scannumber,type='long')'.
INPUT:
* scann = integer or list of integers
* fromtofile = boolean flag, 'True' if the scan should be saved in a pickle-file (this is developmental)
"""
self.loadscandirect(scann,'long',fromtofile,scaling=scaling)
[docs] def deletescan(self,scannumbers):
"""
Deletes scans from the class.
INPUT:
* scannumbers = integer or list of integers (SPEC scan numbers) to delete
"""
numbers = []
if not isinstance(scannumbers,list):
numbers.append(scannumbers)
else:
numbers = scannumbers
for number in numbers:
scanname = 'Scan%03d' % number
del(self.scans[scanname])
self.scannumbers.remove(number)
[docs] def getspectrum(self, include_elastic=False, absCounts=False):
"""
Groups the instances of the scan class by their scantype attribute,
adds equal scans (each group of equal scans) and appends them.
INPUT:
* include_elastic = boolean flag, skips the elastic line if set to 'False' (default)
"""
# find the groups
allgroups = xrs_scans.findgroups(self.scans)
for group in allgroups:
# self.makegroup(group)
onegroup = xrs_scans.makegroup_nointerp(group, absCounts=absCounts)
self.groups[onegroup.get_type()] = onegroup
self.energy,self.signals,self.errors = xrs_scans.appendScans(self.groups,include_elastic)
self.signals_orig = self.signals
self.errors_orig = self.errors
[docs] def geteloss(self):
"""
Defines the energy loss scale for all ROIs by finding the center of mass for each ROI's elastic line.
Interpolates the signals and errors onto a commom energy loss scale. Finds the resolution (FWHM) of the
'elastic' groups.
"""
if not 'elastic' in self.groups:
print( 'Please load/integrate at least one elastic scan first!' )
return
else:
# reset values, in case this is run several times
self.cenom = []
self.resolution = []
Origin=None
valid_cenoms = []
for n in range(len(self.roi_obj.indices)):
# find the center of mass for each ROI
cofm = xrs_utilities.find_center_of_mass(self.groups['elastic'].energy,self.groups['elastic'].signals_orig[:,n])
self.cenom.append(cofm)
if self.there_is_a_valid_roi_at(n) :
valid_cenoms.append(cofm)
if Origin is None :
Origin = cofm
# try finden the FWHM/resolution for each ROI
FWHM,x0 = xrs_utilities.fwhm((self.groups['elastic'].energy - self.cenom[n])*1e3,self.groups['elastic'].signals_orig[:,n])
# try:
# FWHM,x0 = xrs_utilities.fwhm((self.groups['elastic'].energy - self.cenom[n])*1e3,self.groups['elastic'].signals_orig[:,n])
# self.resolution.append(FWHM)
# # append a zero if the FWHM routine fails
# except:
# exc_type, exc_value, exc_traceback = sys.exc_info()
# print "*** print_tb:"
# traceback.print_tb(exc_traceback, limit=None, file=sys.stdout)
# print " need a more sofisticated way of finding the FWHM "
# self.resolution.append(0.0) # need a more sofisticated way of finding the FWHM
self.E0 = np.mean(valid_cenoms)
# define the first eloss scale as the 'master' scale for all ROIs
self.eloss = (self.energy - cofm)*1e3 # energy loss in eV
# define eloss-scale for each ROI and interpolate onto the 'master' eloss-scale
for n in range(len(self.roi_obj.indices)):
# inserting zeros at beginning and end of the vectors to avoid interpolation errors
x = (self.energy-self.cenom[n])*1e3
x = np.insert(x,0,-1e10)
x = np.insert(x,-1,1e10)
y = self.signals_orig[:,n]
y = np.insert(y,0,0)
y = np.insert(y,-1,0)
f = interp1d(x,y, bounds_error=False,fill_value=0.0)
self.signals[:,n] = f(self.eloss)
# do the same for the errors
for n in range(len(self.roi_obj.indices)):
# inserting zeros at beginning and end of the vectors to avoid interpolation errors
x = (self.energy-self.cenom[n])*1e3
x = np.insert(x,0,-1e10)
x = np.insert(x,-1,1e10)
y = self.errors_orig[:,n]
y = np.insert(y,0,0)
y = np.insert(y,-1,0)
f = interp1d(x,y, bounds_error=False,fill_value=0.0)
self.errors[:,n] = f(self.eloss)
[docs] def gettths(self,rvd=0.0,rvu=0.0,rvb=0.0,rhl=0.0,rhr=0.0,rhb=0.0,order=[0,1,2,3,4,5]):
"""
Uses the defined TT_OFFSETS of the read_id20 class to set all scattering angles tth
from the mean angle avtth of the analyzer modules.
INPUT:
* rhl = mean tth angle of HL module (default is 0.0)
* rhr = mean tth angle of HR module (default is 0.0)
* rhb = mean tth angle of HB module (default is 0.0)
* rvd = mean tth angle of VD module (default is 0.0)
* rvu = mean tth angle of VU module (default is 0.0)
* rvb = mean tth angle of VB module (default is 0.0)
* order = list of integers (0-5) which describes the order of modules in which the
ROIs were defined (default is VD, VU, VB, HR, HL, HB; i.e. [0,1,2,3,4,5])
"""
# reset all values, just in case mean angles are redefined (values are otherwise appended to existing values)
self.VDtth = []
self.VUtth = []
self.VBtth = []
self.HRtth = []
self.HLtth = []
self.HBtth = []
self.tth = []
# horizontal modules
# HL (motor name rhl)
v_angles = self.TTH_OFFSETS1
h_angles = self.TTH_OFFSETS2 + rhl
for n in range(len(h_angles)):
self.HLtth.append( np.arccos(np.cos(np.radians(h_angles[n]))*np.cos(np.radians(v_angles[n])))*180.0/np.pi)
# HR (motor name rhr)
v_angles = self.TTH_OFFSETS1
h_angles = self.TTH_OFFSETS2 + rhr
for n in range(len(h_angles)):
self.HRtth.append( np.arccos(np.cos(np.radians(h_angles[n]))*np.cos(np.radians(v_angles[n])))*180.0/np.pi)
# HB (motor name rhb)
v_angles = self.TTH_OFFSETS1
h_angles = self.TTH_OFFSETS2 + rhb
for n in range(len(h_angles)):
self.HBtth.append( np.arccos(np.cos(np.radians(h_angles[n]))*np.cos(np.radians(v_angles[n])))*180.0/np.pi)
# vertical modules
# VD
v_angles = self.TTH_OFFSETS2 + rvd
h_angles = self.TTH_OFFSETS1
for n in range(len(h_angles)):
self.VDtth.append( np.arccos(np.cos(np.radians(v_angles[n]))*np.cos(np.radians(h_angles[n])))*180.0/np.pi)
# VU
v_angles = self.TTH_OFFSETS2 + rvu
h_angles = self.TTH_OFFSETS1
for n in range(len(h_angles)):
self.VUtth.append( np.arccos(np.cos(np.radians(v_angles[n]))*np.cos(np.radians(h_angles[n])))*180.0/np.pi)
# VB
v_angles = self.TTH_OFFSETS2 + rvb
h_angles = self.TTH_OFFSETS1
for n in range(len(h_angles)):
self.VBtth.append( np.arccos(np.cos(np.radians(v_angles[n]))*np.cos(np.radians(h_angles[n])))*180.0/np.pi)
# list of TTH values
tth = [self.VDtth, self.VUtth, self.VBtth, self.HRtth, self.HLtth, self.HBtth]
# list all TTH values in one long list ordered by the 'order'-keyword
for n in order:
self.tth.extend(tth[n])
[docs] def getqvals(self,invangstr=False):
"""
Calculates q-values from E0 and tth values in either atomic units (defalt) or
inverse angstroms.
"""
theqvals = np.zeros_like(self.signals)
if invangstr:
for n in range(len(self.signals[0,:])):
theqvals[:,n] = xrs_utilities.momtrans_inva(self.E0+self.eloss/1e3,self.E0,self.tth[n])
else:
for n in range(len(self.signals[0,:])):
theqvals[:,n] = xrs_utilities.momtrans_au(self.E0+self.eloss/1e3,self.E0,self.tth[n])
self.qvalues = theqvals
[docs] def getqvals_energy(self,energy):
"""
Returns all q-values at a certain energy loss.
INPUT:
* energy = energy loss value for which all q-values are stored
"""
ind = np.abs(self.eloss - energy).argmin()
return self.qvalues[ind,:]
[docs] def copy_edf_files(self,scannumbers,destdir):
"""
Copies all edf-files from scan with scannumber or scannumbers into directory 'destdir'
INPUT:
* scannumbers = integer or list of integers defining the scannumbers from the SPEC file
* destdir = string with absolute path for the destination
"""
import shutil
numbers = []
if not isinstance(scannumbers,list):
numbers.append(scannumbers)
else:
numbers = scannumbers
fn = self.path + self.filename
if not self.single_image:
for n in range(len(numbers)):
data, motors, counters = xrs_utilities.specread(fn,numbers[n])
for m in range(len(counters['ccdno'])):
ccdnumber = counters['ccdno'][m]
edfnameh = self.path + self.EDF_PREFIXh + self.filename + '_' + "%04d" % ccdnumber + self.EDF_POSTFIX
edfnamev = self.path + self.EDF_PREFIXv + self.filename + '_' + "%04d" % ccdnumber + self.EDF_POSTFIX
shutil.copy2(edfnameh, destdir)
shutil.copy2(edfnamev, destdir)
if self.single_image:
for n in range(len(numbers)):
data, motors, counters = xrs_utilities.specread(fn,numbers[n])
for m in range(len(counters['ccdno'])):
ccdnumber = counters['ccdno'][m]
edfname = self.path + self.EDF_PREFIX + self.filename + '_' + "%04d" % ccdnumber + self.EDF_POSTFIX
shutil.copy2(edfname, destdir)
[docs] def printlength(self,scannumbers):
"""
Prints the number of energy points in a scan or a number of scans.
INPUT:
* scannumbers = integer or list of integers
"""
numbers = []
if not isinstance(scannumbers,list):
numbers.append(scannumbers)
else:
numbers = scannumbers
for i in numbers:
name = 'Scan%03d' % i
print( 'length of scan %03d ' %i + ' is ' + str(len(self.scans[name].energy)))
[docs] def removeBackgroundRoi(self,backroinum,estart=None,estop=None):
if not estart:
estart = self.eloss[0]
if not estop:
estop = self.elsos[-1]
if not self.tth:
print( 'Please define the scattering angles first using the gettth method!' )
return
for ii in range(len(self.tth)):
if ii != backroinum:
inds = np.where(np.logical_and(self.eloss>=estart, mpc96.eloss<=estop))
expnorm = np.trapz(self.signals[inds,ii],self.eloss[inds])
backnorm = np.trapz(self.signals[inds,backroinum],self.eloss[inds])
background = self.signals[:,backroinum]/backnorm*expnorm
self.signals[:,ii] -= background # subtract background roi
[docs] def save_raw_data(self,filename):
data = np.zeros((len(self.eloss),len(self.signals[0,:])))
data[:,0] = self.eloss
data[:,1::] = self.signals
np.savetxt(filename,data)
[docs]def animation(id20read_object,scannumber,logscaling=True,timeout=-1,colormap='jet'):
"""
Shows the edf-files of a scan as a 'movie'.
INPUT:
* scannumber = integer/scannumber
* logscaling = set to 'True' (default) if edf-images are to be shown on logarithmic-scale
* timeout = time in seconds defining pause between two images, if negative (default)
images are renewed by mouse clicks
* colormap = matplotlib color scheme used in the display
"""
if isinstance(scannumber,list):
if len(scannumber)>1:
print( 'this only works for a single scan, sorry' )
return
else:
scannumber = scannumber[0]
scanname = 'Scan%03d' % scannumber
edfmats = id20read_object.scans[scanname].edfmats
scanlen = np.shape(edfmats)[0]
plt.ion()
plt.clf()
for n in range(scanlen):
plt.clf()
if logscaling:
theimage = plt.imshow(np.log(edfmats[n,:,:]))
else:
theimage = plt.imshow(edfmats[n,:,:])
plt.xlabel('detector x-axis [pixel]')
plt.ylabel('detector y-axis [pixel]')
if timeout<0:
titlestring = 'Frame No. %d' % (n+1) + ' of %d' % scanlen + ', press key or mouse botton to continue'
plt.title(titlestring)
else:
titlestring = 'Frame No. %d' % (n+1) + ' of %d' % scanlen + ', updating every %2.2f ' % timeout + ' seconds'
plt.title(titlestring)
theimage.set_cmap(colormap)
plt.draw()
plt.waitforbuttonpress(timeout=timeout)
[docs]def alignment_image(id20read_object,scannumber,motorname,filename=None):
"""
Loads a scan from a sample position scan (x-scan, y-scan, z-scan), lets you choose a zoomroi and constructs a 2D image from this
INPUT:
* scannumber = number of the scan
* motorname = string that contains the motor name (must be the same as in the SPEC file)
* filename = optional parameter with filename to store the image
"""
# load the scan
data, motors, counters, edfmats = id20read_object.readscan(scannumber)
# the scan motor
position = counters[motorname.lower()]
# define a zoom ROI
image = xrs_utilities.sumx(edfmats)
roi_finder_obj = roifinder_and_gui.roi_finder()
roi_finder_obj.get_zoom_rois(image)
# construct the image
roixinds = roi_finder_obj.roi_obj.x_indices[0]
roiyinds = roi_finder_obj.roi_obj.y_indices[0]
# go through all edf files of the scan, sum over the height of the roi and stack the resulting lines into a matrix
axesrange = [0,roiyinds[-1],position[-1],position[0]]
theimage = (np.sum(edfmats[:,np.amin(roixinds):np.amax(roixinds)+1,np.amin(roiyinds):np.amax(roiyinds)+1],axis=1))
plt.close()
fig = plt.figure()
ax = fig.add_subplot(111)
ax.imshow(np.log(theimage),extent=axesrange)
ax.set_aspect('auto')
plt.xlabel('pixel along the beam')
ylabelstr = motorname.lower() + ' position [mm]'
plt.ylabel(ylabelstr)
plt.show()
# save the image, if a filename is provided
if filename:
from .xrs_imaging import LRimage
f = open(filename, 'wb')
yrange = np.arange(np.amin(roixinds),np.amax(roixinds)+1)
theobject = LRimage(theimage, position, yrange)
pickle.dump(theobject, f, protocol=-1)
f.close()
[docs]def alignment_image_old(so,scan_number,motorname):
"""
Loads a scan from a sample position scan (x-scan, y-scan, z-scan), lets you choose a zoomroi and constructs a 2D image from this
INPUT:
* scannumber = number of the scan
* motorname = string that contains the motor name (must be the same as in the SPEC file)
* filename = optional parameter with filename to store the image
"""
# load the scan
scan = xrs_scans.Scan()
scan.load(so.path, so.SPECfname, so.EDFprefix, so.EDFname, so.EDFpostfix, scan_number)
motors = scan.motors
counters = scan.counters
edfmats = scan.edfmats
# the scan motor
position = counters[motorname.lower()]
# define a zoom ROI
image = xrs_utilities.sumx(edfmats)
roi_finder_obj = roifinder_and_gui.roi_finder()
roi_finder_obj.get_zoom_rois(image)
# construct the image
roixinds = roi_finder_obj.roi_obj.x_indices[0]
roiyinds = roi_finder_obj.roi_obj.y_indices[0]
# go through all edf files of the scan, sum over the height of the roi and stack the resulting lines into a matrix
axesrange = [0,roiyinds[-1],position[-1],position[0]]
theimage = (np.sum(edfmats[:,np.amin(roixinds):np.amax(roixinds)+1,np.amin(roiyinds):np.amax(roiyinds)+1],axis=1))
plt.close()
fig = plt.figure()
ax = fig.add_subplot(111)
ax.imshow(np.log(theimage),extent=axesrange)
ax.set_aspect('auto')
plt.xlabel('pixel along the beam')
ylabelstr = motorname.lower() + ' position [mm]'
plt.ylabel(ylabelstr)
plt.show()
[docs]def alignment_image_new( so, scan_number, log_scaling=True, cmap='Blues',
interpolation='nearest'):
"""
Loads a scan from a sample position scan (x-scan, y-scan, z-scan), lets
you choose a zoomroi and constructs a 2D image from this
INPUT:
so = Hydra object
scannumber = number of the scan
"""
# try import cursor
try:
from matplotlib.widgets import Cursor
except:
pass
# load the scan
scan = xrs_scans.Scan()
scan.load(so.path, so.SPECfname, so.EDFprefix, so.EDFname, so.EDFpostfix, scan_number)
motors = scan.motors
counters = scan.counters
edfmats = scan.edfmats
# the scan motor
position = scan.energy
# define a zoom ROI
image = xrs_utilities.sumx(edfmats)
roi_finder = roifinder_and_gui.roi_finder()
roi_finder.get_zoom_rois(image)
# get the image
scan.get_raw_signals( roi_finder.roi_obj, method='column' )
if log_scaling:
im = np.log(scan.raw_signals['ROI00'])
im_orig = scan.raw_signals['ROI00']
else:
im = scan.raw_signals['ROI00']
im_shape = im.shape
# plot the image
axes_range = [0,im.shape[1], position[-1], position[0]]
plt.close()
fig = plt.figure( ) #figsize=(7, 20) )
# fig, ((ax0, ax1), (ax2, ax3)) = plt.subplot(2, 2)
# main plot
ax0 = plt.subplot2grid((16, 16), (0, 0), colspan=12, rowspan=12)
ax0.imshow( im, extent=axes_range, cmap=cmap, interpolation=interpolation )
ax0.set_ylabel( '%s position [mm]'%scan.scan_motor.lower() )
ax0.set_aspect( 'auto' )
ax0.tick_params( left=True,
labelleft=True,
right=True,
bottom=True,
top=True,
labelbottom=False,
direction='in')
# projection onto y
ax1 = plt.subplot2grid((16, 16), (0, 12), colspan=4, rowspan=12)
ax1.plot( np.sum(im_orig,axis=1), position, '-k' )
ax1.set_ylim(ax0.get_ylim())
ax1.yaxis.set_label_position("right")
ax1.set_ylabel( '%s position [mm]'%scan.scan_motor.lower() )
ax1.tick_params( left=True,
labelleft=False,
right=True,
labelright=True,
bottom=True,
top=True,
labelbottom=False,
direction='in')
fwhm_lab = xrs_utilities.fwhm(position, np.sum(im_orig,axis=1) )
max_int_y = np.amax(np.sum(im_orig,axis=1))
max_int_ind = np.where( np.sum(im_orig,axis=1) == max_int_y )[0]
ax1.annotate('Max. @ %0.3f \n FWHM @ %0.3f'%( position[max_int_ind], fwhm_lab[1] ), (0.1,0.8), xycoords='axes fraction')
# ax1.legend(['FWHM %0.3f \n @%f'%(fwhm_lab[0], fwhm_lab[1])], frameon=False, loc='best', numpoints=0.1)
# projection onto x
ax2 = plt.subplot2grid((16, 16), (12, 0), colspan=12, rowspan=4)
ax2.plot( np.sum(im_orig,axis=0), '-k' )
ax2.set_xlim([ 0, im.shape[1]])
ax2.tick_params( left=True,
labelleft=False,
right=True,
labelright=False,
bottom=True,
top=True,
labelbottom=True,
direction='in')
ax2.set_xlabel( 'direction along beam [pixel]' )
fwhm_lab = xrs_utilities.fwhm( np.arange(len(np.sum(im_orig,axis=0))) , np.sum(im_orig,axis=0) )
max_int_x = np.amax(np.sum(im_orig,axis=0))
max_int_ind = np.where( np.sum(im_orig,axis=0) == max_int_x )[0]
ax2.annotate('Max. @ %0.3f \n FWHM @ %0.3f'%( np.arange(im_orig.shape[1])[max_int_ind], fwhm_lab[1] ), (0.05,0.7), xycoords='axes fraction')
try:
cursor1 = Cursor(ax0, useblit=True, color='black', linewidth=1)
cursor2 = Cursor(ax1, useblit=True, color='black', linewidth=1, vertOn=False, linestyle='--' )
cursor3 = Cursor(ax2, useblit=True, color='black', linewidth=1, horizOn=False, linestyle='--' )
except:
pass
plt.show()
[docs]def get_scans_pw(id20read_object,scannumbers):
""" **get_scans_pw**
Sums scans from pixelwise ROI integration for use in the PW roi refinement.
"""
if isinstance(scannumbers,list):
scannums = scannumbers
elif isinstance(scannumbers,int):
scannums = [scannumbers]
else:
print('Please provide keyword \'scannumbers\' as integer or list of integers.')
return
if len(scannums)==1:
scanname = 'Scan%03d' % scannums[0]
pw_matrices = id20read_object.scans[scanname].signals_pw
# normalize data
pw_matrices_norm = []
for matrix in pw_matrices:
for ii in range(matrix.shape[1]):
matrix[:,ii] /= id20read_object.scans[scanname].monitor
pw_matrices_norm.append(matrix)
else:
scanname = 'Scan%03d' % scannums[0]
pw_matrices = id20read_object.scans[scanname].signals_pw
# normalize data
pw_matrices_norm = []
for matrix in pw_matrices:
for ii in range(matrix.shape[1]):
matrix[:,ii] /= id20read_object.scans[scanname].monitor
pw_matrices_norm.append(matrix)
for ii in scannums[1:]:
scanname = 'Scan%03d' % ii
for jj in range(len(pw_matrices)):
pw_matrix = id20read_object.scans[scanname].signals_pw[jj]
pw_matrices[jj] += pw_matrix
for kk in range(pw_matrix.shape[1]):
pw_matrix[:,kk] /= id20read_object.scans[scanname].monitor
pw_matrices_norm[jj] += pw_matrix
return pw_matrices_norm