Source code for fdp.classes.fdpsignal

# -*- coding: utf-8 -*-
"""
signals.py - module containing Signal class

**Classes**

* Signal - signal class for data objects

Created on Tue Jun 23 2015

@author: hyuh
"""
import sys
if sys.version_info > (3,):
    long = int

import inspect
import types
import numpy as np
from . import fdp_globals
#from functools import wraps

MDS_SERVERS = fdp_globals.MDS_SERVERS
# implemented MDS_SERVERS from fdp_globals in place of hard-coded MDS server - DRS 10/18/15
FdpError = fdp_globals.FdpError


# commented out and replaced with FdpError - DRS 10/18/15
#class MdsError(Exception):
#    pass

[docs]class Signal(np.ndarray): """ sig=fdp.Signal(signal_ndarray, units='m/s', axes=['radius','time'], axes_values=[ax1_1Darray, ax2_1Darray], axes_units=['s','cm'] e.g.: mds.Signal(np.arange((20*10)).reshape((10,20)), units='keV', axes=['radius','time'], axes_values=[100+np.arange(10)*5, np.arange(20)*0.1], axes_units=['s','cm']) or an empty signal: s=mds.Signal() default axes order=[time, space] sig=fdp.Signal(units='m/s', axes=['radius','time'], axes_values=[radiusSignal, timeSignal]) """ def __init__(self, **kwargs): pass def __new__(cls, input_array=[], verbose=False, **kwargs): #maybe an **kwargs dict for more attr #name is name of signal...e.g. Te #__doc__ for the signal...decriptor (filled in when? XML/MDSvalue?) if verbose: print('Called __new__:') obj = np.asanyarray(input_array).view(cls).copy() #arr = np.asanyarray(input_array).view(cls) #print '__new__: type(arr) %s' % type(arr) #obj = np.array(arr,copy=True) #obj = np.ndarray.__new__(cls, shape=arr.shape, buffer=arr,dtype=arr.dtype) #obj = np.ndarray.__new__(cls, shape=arr.shape, dtype=arr.dtype) #if input_array is not None: # obj[:] = input_array # obj = np.asarray(input_array).view(cls) #arr = np.asarray(input_array) #obj = np.ndarray.__new__(cls, shape=arr.shape, buffer=arr, dtype=arr.dtype) if verbose: print('__new__: type(obj) %s' % type(obj)) print('__new__: setting attributes') #obj.units = units #obj.axes = axes #obj._parent = parent #obj._root = root #obj._dim_of = dim_of obj._verbose = verbose #obj._slic = slice(None) obj._empty = True #obj._name = name #not necessary but can be defined #obj.mdstree = mdstree #obj.mdsnode = mdsnode #obj.mdsshot = mdsshot for key,value in iter(kwargs.items()): setattr(obj,key,value) #Initiate slic attribute to hold slice index info # try: # for i, axis in enumerate(axes): # print(name, axis) # setattr(obj, axis, axes_values[i]) # for axis, reference in zip(axes, axes_values): # setattr(obj, axis, reference) # except: # pass return obj # def __array_prepare__(self, context=None): # print 'In __array_prepare__:' # print ' context is' , context def __array_finalize__(self, obj): # if self._verbose: # print('Called __array_finalize__:') # print '__array_finalize__: self is type %s ' % type(self) # # try: # print '__array_finalize__: self has len %s' % len(self) # except: # print '__array_finalize__: self has undefined len' # # print('__array_finalize__: self hasattr(self,"slic") is', hasattr(self,'slic')) # print('__array_finalize__: obj is type ', type(obj)) # try: # print '__array_finalize__: obj has len %s' % len(obj) # except: # print '__array_finalize__: obj has undefined len' # print('__array_finalize__: hasattr(obj,"_slic") is', hasattr(obj,"_slic")) # if hasattr(obj,'_slic'): # print('__array_finalize__: obj._slic is', obj._slic) if obj is None: return #simple copying over of attributes. Defaults to None so hasattr check skipped #if hasattr(obj,'units'): objaxes= getattr(obj, 'axes',None) objdict= getattr(obj, '__dict__', None) _nodeltmpattr=False if objdict is not None: #print(type(objdict)) #print(type(objaxes)) if objaxes is not None: for key,val in objdict.items(): if key not in objaxes: setattr(self, key, val) else: for key,val in objdict.items(): setattr(self, key, val) if '_verbose' in objdict: if objdict['_verbose']: try: print("__array_finalize__:Function name {}".format(obj._fname)) except AttributeError: pass try: print("__array_finalize__:Function args {}".format(obj._fargs)) except AttributeError: pass try: print("__array_finalize__:Function kwargs {}".format(obj._fkwargs)) except AttributeError: pass if '_fname' in objdict: if objdict['_fname'] == 'transpose': if objaxes is not None: self.axes = [obj.axes[i] for i in objdict['_fargs'][0]] if objdict.has_key('_fargs') else obj.axes[::-1] if '_debug' in objdict: if objdict['_debug']: _nodeltmpattr=True if objaxes is not None: for axis in objaxes: if '_slic' in obj: #slice axis according to _slic if '_verbose' in objdict: if objdict['_verbose']: print('__array_finalize__: type(obj._slic) is ', type(obj._slic)) print('__array_finalize__: obj._slic is ',obj._slic) try: #1-D if type(obj._slic) is slice or type(obj._slic) is list: setattr(self,axis,getattr(obj, axis)[obj._slic]) #>1-D elif type(obj._slic) is tuple: #if getattr(obj, axis).axes != []: #axes is multidimensional, build correct _slicaxis=tuple([obj._slic[obj.axes.index(axisaxis)] for axisaxis in (getattr(obj, axis).axes + [axis])]) if self._verbose: print('__array_finalize__: Assigning axis ',axis) print('__array_finalize__: type(_slicaxis) is ',type(_slicaxis)) print('__array_finalize__: _slicaxis is ',_slicaxis) print('__array_finalize__: axis shape is ', getattr(obj, axis)[_slicaxis].shape) #if isinstance(_slicaxis[0], (int, long, float, np.generic)): # self.axes=self.axes+[self.axes.pop(self.axes.index(axis))] setattr(self,axis,getattr(obj, axis)[_slicaxis]) if '_verbose' in objdict: if objdict['_verbose']: print('__array_finalize__: Fixing {0} axes'.format(axis)) for axisaxis in getattr(obj, axis).axes: if isinstance(obj._slic[obj.axes.index(axisaxis)], (int, long, float, np.generic)): if '_verbose' in objdict: if objdict['_verbose']: print('__array_finalize__: Removing {0} axis from {1}'.format(axisaxis,axis)) self.axis.axes.remove(axisaxis) else: if '_verbose' in objdict: if objdict['_verbose']: print('__array_finalize__: {0} is not primitive'. format(type(obj._slic[obj.axes.index(axisaxis)]))) else: if '_verbose' in objdict: if objdict['_verbose']: print('_slic is neither slice, list, nor tuple type for ',axis) except: #must not have a len(), e.g. int type if '_verbose' in objdict: if objdict['_verbose']: print('Exception: Axes parsing for ',axis,' failed') pass else: #no slicing, copy each axis as is setattr(self,axis,getattr(obj, axis, None)) #clean-up temp attributes def delattrtry(ob,at): try: delattr(ob,at) except: pass return if _nodeltmpattr: pass else: delattrtry(self,'_slic') delattrtry(self,'_fname') delattrtry(self,'_fargs') delattrtry(self,'_fkwargs') delattrtry(obj,'_slic') delattrtry(obj,'_fname') delattrtry(obj,'_fargs') delattrtry(obj,'_fkwargs') def __array_wrap__(self, out_arr, context=None): if self._verbose: print('Called __array_wrap__:') print('__array_wrap__: self is %s' % type(self)) print('__array_wrap__: arr is %s' % type(out_arr)) # then just call the parent print('__array_wrap__: context is %s' % context) return np.ndarray.__array_wrap__(self, out_arr, context) def __array_prepare__(self, out_arr, context=None): if self._verbose: print('Called __array_prepare__:') print('__array_prepare__: self is %s' % type(self)) print('__array_prepare__: arr is %s' % type(out_arr)) # then just call the parent print('__array_prepare__: context is %s' % context) return np.ndarray.__array_prepare__(self, out_arr, context) def __getitem__(self,index): ''' self must be Signal class for this to be called, so therefore must have the _slic attribute. The _slic attribute preserves indexing for attributes ''' #This passes index to array_finalize after a new signal obj is created to assign axes def parseindex(index, dims): #format index to account for single elements and pad with appropriate slices. #int2slc=lambda i: slice(-1,-2,-1) if int(i) == -1 else slice(int(i),int(i)+1) if isinstance(index, (list, slice, np.ndarray)): if dims <= 1: return index else: newindex=[index] #elif isinstance(index, (int, long, float, np.generic)): newindex=[int2slc(index)] elif isinstance(index, (int, long, float, np.generic)): newindex=[int(index)] elif isinstance(index, tuple): #newindex = [int2slc(i) if isinstance(i, (int, long, float, np.generic)) else i for i in index] newindex = [int(i) if isinstance(i, (int, long, float, np.generic)) else i for i in index] ellipsisbool=[Ellipsis is i for i in newindex] if sum(ellipsisbool) > 0: ellipsisindex=ellipsisbool.index(True) slcpadding=([slice(None)]*(dims-len(newindex)+1)) newindex=newindex[:ellipsisindex] + slcpadding + newindex[ellipsisindex+1:] else: newindex=newindex + ([slice(None)]*(dims-len(newindex))) return tuple(newindex) if self._verbose: print('Called __getitem__:') slcindex=parseindex(index, self.ndim) self._slic=slcindex #Get the data if self._empty is True: # try: self._empty=False data = self._root._get_mdsdata(self) self.resize(data.shape, refcheck=False) self[:] = data # except: # print 'Something went wrong with getting data' #Exec userfunc if method defined: if self._verbose: print('__getitem__: index is ', index) print('__getitem__: type(self._slic) is ', type(self._slic)) print('__getitem__: self._slic is ', self._slic) #print '__getitem__: new is type %s' % type(new) print('__getitem__: self is type %s' % type(self)) #print('__getitem__: self has len %s ' % len(self)) return super(Signal,self).__getitem__(slcindex) def __getattr__(self, attribute): if attribute is '_parent': raise AttributeError("'{}' object has no attribute '{}'".format( type(self), attribute)) if self._parent is None: raise AttributeError("'{}' object has no attribute '{}'".format( type(self), attribute)) attr = getattr(self._parent, attribute) if inspect.ismethod(attr): return types.MethodType(attr.__func__, self) else: return attr def __repr__(self): if self._verbose: print('Called custom __repr__') if self._empty is True: data = self._root._get_mdsdata(self) self.resize(data.shape, refcheck=False) self[:] = data self._empty=False return super(Signal,self).__repr__() #return np.asarray(self).__repr__() def __str__(self): if self._verbose: print('Called custom __str__') if self._empty is True: data = self._root._get_mdsdata(self) self.resize(data.shape, refcheck=False) self[:] = data self._empty=False return super(Signal,self).__str__() #return np.asarray(self).__str__() def __getslice__(self, start, stop): if self._verbose: print('Called __getslice__:') """ This solves a subtle bug, where __getitem__ is not called, and all the dimensional checking not done, when a slice of only the first dimension is taken, e.g. a[1:3]. From the Python docs: Deprecated since version 2.0: Support slice objects as parameters to the __getitem__() method. (However, built-in types in CPython currently still implement __getslice__(). Therefore, you have to override it in derived classes when implementing slicing.) """ return self.__getitem__(slice(start, stop)) def __call__(self, **kwargs): try: slc = [slice(None)] * len(self.axes) except TypeError: print('No axes present for signal {}.'.format(self._name)) return None for kwarg, values in kwargs.items(): if kwarg not in self.axes: print('{} is not a valid axis.'.format(kwarg)) raise TypeError axis = self.axes.index(kwarg) axis_value = getattr(self, kwarg) try: axis_inds = [np.abs(value-axis_value[:]).argmin() for value in values] slc[axis] = slice(axis_inds[0], axis_inds[1]) except TypeError: axis_ind = np.abs(values-axis_value[:]).argmin() #axis_inds = [axis_ind, axis_ind+1] slc[axis] = axis_ind return self[tuple(slc)] def __nonzero__(self): return True def sigwrapper(f): def inner(*args, **kwargs): #print("getarg decorator: Function {} arguments were: {}, {}".format(f.__name__,args, kwargs)) args[0]._fname=f.__name__ if len(args)>1: args[0]._fargs=args[1:] args[0]._fkwargs=kwargs return f(*args, **kwargs) return inner @sigwrapper def amin(self, *args, **kwargs): args[0]._fname=f.__name__ args[0]._fkwargs=kwargs return super(Signal,self).amin(*args, **kwargs) @sigwrapper def transpose(self, *args, **kwargs): return super(Signal,self).transpose(*args, **kwargs)