forked from mne-tools/mne-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevoked.py
1269 lines (1096 loc) · 47.5 KB
/
evoked.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# Authors: Alexandre Gramfort <[email protected]>
# Matti Hämäläinen <[email protected]>
# Denis Engemann <[email protected]>
# Andrew Dykstra <[email protected]>
# Mads Jensen <[email protected]>
# Jona Sassenhagen <[email protected]>
#
# License: BSD (3-clause)
from copy import deepcopy
import numpy as np
from .baseline import rescale
from .channels.channels import (ContainsMixin, UpdateChannelsMixin,
SetChannelsMixin, InterpolationMixin)
from .channels.layout import _merge_grad_data, _pair_grad_sensors
from .filter import detrend, FilterMixin
from .utils import (check_fname, logger, verbose, _time_mask, warn, sizeof_fmt,
SizeMixin, copy_function_doc_to_method_doc, _validate_type,
fill_doc, _check_option)
from .viz import (plot_evoked, plot_evoked_topomap, plot_evoked_field,
plot_evoked_image, plot_evoked_topo)
from .viz.evoked import plot_evoked_white, plot_evoked_joint
from .viz.topomap import _topomap_animation
from .io.constants import FIFF
from .io.open import fiff_open
from .io.tag import read_tag
from .io.tree import dir_tree_find
from .io.pick import (channel_type, pick_types, _pick_data_channels,
_picks_to_idx)
from .io.meas_info import read_meas_info, write_meas_info
from .io.proj import ProjMixin
from .io.write import (start_file, start_block, end_file, end_block,
write_int, write_string, write_float_matrix,
write_id)
from .io.base import ToDataFrameMixin, TimeMixin, _check_maxshield
_aspect_dict = {
'average': FIFF.FIFFV_ASPECT_AVERAGE,
'standard_error': FIFF.FIFFV_ASPECT_STD_ERR,
'single_epoch': FIFF.FIFFV_ASPECT_SINGLE,
'partial_average': FIFF.FIFFV_ASPECT_SUBAVERAGE,
'alternating_subaverage': FIFF.FIFFV_ASPECT_ALTAVERAGE,
'sample_cut_out_by_graph': FIFF.FIFFV_ASPECT_SAMPLE,
'power_density_spectrum': FIFF.FIFFV_ASPECT_POWER_DENSITY,
'dipole_amplitude_cuvre': FIFF.FIFFV_ASPECT_DIPOLE_WAVE,
'squid_modulation_lower_bound': FIFF.FIFFV_ASPECT_IFII_LOW,
'squid_modulation_upper_bound': FIFF.FIFFV_ASPECT_IFII_HIGH,
'squid_gate_setting': FIFF.FIFFV_ASPECT_GATE,
}
_aspect_rev = {val: key for key, val in _aspect_dict.items()}
@fill_doc
class Evoked(ProjMixin, ContainsMixin, UpdateChannelsMixin, SetChannelsMixin,
InterpolationMixin, FilterMixin, ToDataFrameMixin, TimeMixin,
SizeMixin):
"""Evoked data.
Parameters
----------
fname : str
Name of evoked/average FIF file to load.
If None no data is loaded.
condition : int, or str
Dataset ID number (int) or comment/name (str). Optional if there is
only one data set in file.
proj : bool, optional
Apply SSP projection vectors.
kind : str
Either 'average' or 'standard_error'. The type of data to read.
Only used if 'condition' is a str.
allow_maxshield : bool | str (default False)
If True, allow loading of data that has been recorded with internal
active compensation (MaxShield). Data recorded with MaxShield should
generally not be loaded directly, but should first be processed using
SSS/tSSS to remove the compensation signals that may also affect brain
activity. Can also be "yes" to load without eliciting a warning.
%(verbose)s
Attributes
----------
info : dict
Measurement info.
ch_names : list of str
List of channels' names.
nave : int
Number of averaged epochs.
kind : str
Type of data, either average or standard_error.
first : int
First time sample.
last : int
Last time sample.
comment : str
Comment on dataset. Can be the condition.
times : array
Array of time instants in seconds.
data : array of shape (n_channels, n_times)
Evoked response.
times : ndarray
Time vector in seconds. Goes from `tmin` to `tmax`. Time interval
between consecutive time samples is equal to the inverse of the
sampling frequency.
%(verbose)s
Notes
-----
Evoked objects contain a single condition only.
"""
@verbose
def __init__(self, fname, condition=None, proj=True,
kind='average', allow_maxshield=False,
verbose=None): # noqa: D102
_validate_type(proj, bool, "'proj'")
# Read the requested data
self.info, self.nave, self._aspect_kind, self.first, self.last, \
self.comment, self.times, self.data = _read_evoked(
fname, condition, kind, allow_maxshield)
self.verbose = verbose
self.preload = True
# project and baseline correct
if proj:
self.apply_proj()
@property
def kind(self):
"""The data kind."""
return _aspect_rev[self._aspect_kind]
@kind.setter
def kind(self, kind):
_check_option('kind', kind, list(_aspect_dict.keys()))
self._aspect_kind = _aspect_dict[kind]
@property
def data(self):
"""The data matrix."""
return self._data
@data.setter
def data(self, data):
"""Set the data matrix."""
self._data = data
@verbose
def apply_baseline(self, baseline=(None, 0), verbose=None):
"""Baseline correct evoked data.
Parameters
----------
baseline : tuple of length 2
The time interval to apply baseline correction. If None do not
apply it. If baseline is (a, b) the interval is between "a (s)" and
"b (s)". If a is None the beginning of the data is used and if b is
None then b is set to the end of the interval. If baseline is equal
to (None, None) all the time interval is used. Correction is
applied by computing mean of the baseline period and subtracting it
from the data. The baseline (a, b) includes both endpoints, i.e.
all timepoints t such that a <= t <= b.
%(verbose_meth)s
Returns
-------
evoked : instance of Evoked
The baseline-corrected Evoked object.
Notes
-----
Baseline correction can be done multiple times.
.. versionadded:: 0.13.0
"""
self.data = rescale(self.data, self.times, baseline, copy=False)
return self
def save(self, fname):
"""Save dataset to file.
Parameters
----------
fname : str
The name of the file, which should end with -ave.fif or
-ave.fif.gz.
Notes
-----
To write multiple conditions into a single file, use
:func:`mne.write_evokeds`.
"""
write_evokeds(fname, self)
def __repr__(self): # noqa: D105
s = "'%s' (%s, N=%s)" % (self.comment, self.kind, self.nave)
s += ", [%0.5g, %0.5g] sec" % (self.times[0], self.times[-1])
s += ", %s ch" % self.data.shape[0]
s += ", ~%s" % (sizeof_fmt(self._size),)
return "<Evoked | %s>" % s
@property
def ch_names(self):
"""Channel names."""
return self.info['ch_names']
@fill_doc
def crop(self, tmin=None, tmax=None, include_tmax=True):
"""Crop data to a given time interval.
Parameters
----------
tmin : float | None
Start time of selection in seconds.
tmax : float | None
End time of selection in seconds.
%(include_tmax)s
Returns
-------
evoked : instance of Evoked
The cropped Evoked object.
Notes
-----
Unlike Python slices, MNE time intervals include both their end points;
crop(tmin, tmax) returns the interval tmin <= t <= tmax.
"""
mask = _time_mask(self.times, tmin, tmax, sfreq=self.info['sfreq'],
include_tmax=include_tmax)
self.times = self.times[mask]
self.first = int(self.times[0] * self.info['sfreq'])
self.last = len(self.times) + self.first - 1
self.data = self.data[:, mask]
return self
def decimate(self, decim, offset=0):
"""Decimate the evoked data.
.. note:: No filtering is performed. To avoid aliasing, ensure
your data are properly lowpassed.
Parameters
----------
decim : int
The amount to decimate data.
offset : int
Apply an offset to where the decimation starts relative to the
sample corresponding to t=0. The offset is in samples at the
current sampling rate.
Returns
-------
evoked : instance of Evoked
The decimated Evoked object.
See Also
--------
Epochs.decimate
Epochs.resample
mne.io.Raw.resample
Notes
-----
Decimation can be done multiple times. For example,
``evoked.decimate(2).decimate(2)`` will be the same as
``evoked.decimate(4)``.
.. versionadded:: 0.13.0
"""
decim, offset, new_sfreq = _check_decim(self.info, decim, offset)
start_idx = int(round(self.times[0] * (self.info['sfreq'] * decim)))
i_start = start_idx % decim + offset
decim_slice = slice(i_start, None, decim)
self.info['sfreq'] = new_sfreq
self.data = self.data[:, decim_slice].copy()
self.times = self.times[decim_slice].copy()
self.first = int(self.times[0] * self.info['sfreq'])
self.last = len(self.times) + self.first - 1
return self
def shift_time(self, tshift, relative=True):
"""Shift time scale in evoked data.
Parameters
----------
tshift : float
The amount of time shift to be applied if relative is True
else the first time point. When relative is True, positive value
of tshift moves the data forward while negative tshift moves it
backward.
relative : bool
If true, move the time backwards or forwards by specified amount.
Else, set the starting time point to the value of tshift.
Notes
-----
Maximum accuracy of time shift is 1 / evoked.info['sfreq']
"""
times = self.times
sfreq = self.info['sfreq']
offset = self.first if relative else 0
self.first = int(tshift * sfreq) + offset
self.last = self.first + len(times) - 1
self.times = np.arange(self.first, self.last + 1,
dtype=np.float) / sfreq
@copy_function_doc_to_method_doc(plot_evoked)
def plot(self, picks=None, exclude='bads', unit=True, show=True, ylim=None,
xlim='tight', proj=False, hline=None, units=None, scalings=None,
titles=None, axes=None, gfp=False, window_title=None,
spatial_colors=False, zorder='unsorted', selectable=True,
noise_cov=None, time_unit='s', verbose=None):
return plot_evoked(
self, picks=picks, exclude=exclude, unit=unit, show=show,
ylim=ylim, proj=proj, xlim=xlim, hline=hline, units=units,
scalings=scalings, titles=titles, axes=axes, gfp=gfp,
window_title=window_title, spatial_colors=spatial_colors,
zorder=zorder, selectable=selectable, noise_cov=noise_cov,
time_unit=time_unit, verbose=verbose)
@copy_function_doc_to_method_doc(plot_evoked_image)
def plot_image(self, picks=None, exclude='bads', unit=True, show=True,
clim=None, xlim='tight', proj=False, units=None,
scalings=None, titles=None, axes=None, cmap='RdBu_r',
colorbar=True, mask=None, mask_style=None,
mask_cmap='Greys', mask_alpha=.25, time_unit='s',
show_names=None, group_by=None):
return plot_evoked_image(
self, picks=picks, exclude=exclude, unit=unit, show=show,
clim=clim, xlim=xlim, proj=proj, units=units, scalings=scalings,
titles=titles, axes=axes, cmap=cmap, colorbar=colorbar, mask=mask,
mask_style=mask_style, mask_cmap=mask_cmap, mask_alpha=mask_alpha,
time_unit=time_unit, show_names=show_names, group_by=group_by)
@copy_function_doc_to_method_doc(plot_evoked_topo)
def plot_topo(self, layout=None, layout_scale=0.945, color=None,
border='none', ylim=None, scalings=None, title=None,
proj=False, vline=[0.0], fig_background=None,
merge_grads=False, legend=True, axes=None,
background_color='w', noise_cov=None, show=True):
"""
Notes
-----
.. versionadded:: 0.10.0
"""
return plot_evoked_topo(
self, layout=layout, layout_scale=layout_scale, color=color,
border=border, ylim=ylim, scalings=scalings, title=title,
proj=proj, vline=vline, fig_background=fig_background,
merge_grads=merge_grads, legend=legend, axes=axes,
background_color=background_color, noise_cov=noise_cov, show=show)
@copy_function_doc_to_method_doc(plot_evoked_topomap)
def plot_topomap(self, times="auto", ch_type=None, layout=None, vmin=None,
vmax=None, cmap=None, sensors=True, colorbar=True,
scalings=None, units=None, res=64,
size=1, cbar_fmt="%3.1f",
time_unit='s', time_format=None,
proj=False, show=True, show_names=False, title=None,
mask=None, mask_params=None, outlines='head',
contours=6, image_interp='bilinear', average=None,
head_pos=None, axes=None, extrapolate='box'):
return plot_evoked_topomap(
self, times=times, ch_type=ch_type, layout=layout, vmin=vmin,
vmax=vmax, cmap=cmap, sensors=sensors, colorbar=colorbar,
scalings=scalings, units=units, res=res,
size=size, cbar_fmt=cbar_fmt, time_unit=time_unit,
time_format=time_format, proj=proj, show=show,
show_names=show_names, title=title, mask=mask,
mask_params=mask_params, outlines=outlines, contours=contours,
image_interp=image_interp, average=average, head_pos=head_pos,
axes=axes, extrapolate=extrapolate)
@copy_function_doc_to_method_doc(plot_evoked_field)
def plot_field(self, surf_maps, time=None, time_label='t = %0.0f ms',
n_jobs=1):
return plot_evoked_field(self, surf_maps, time=time,
time_label=time_label, n_jobs=n_jobs)
@copy_function_doc_to_method_doc(plot_evoked_white)
def plot_white(self, noise_cov, show=True, rank=None, time_unit='s',
verbose=None):
return plot_evoked_white(
self, noise_cov=noise_cov, rank=rank, show=show,
time_unit=time_unit, verbose=verbose)
@copy_function_doc_to_method_doc(plot_evoked_joint)
def plot_joint(self, times="peaks", title='', picks=None,
exclude='bads', show=True, ts_args=None,
topomap_args=None):
return plot_evoked_joint(self, times=times, title=title, picks=picks,
exclude=exclude, show=show, ts_args=ts_args,
topomap_args=topomap_args)
def animate_topomap(self, ch_type=None, times=None, frame_rate=None,
butterfly=False, blit=True, show=True, time_unit='s'):
"""Make animation of evoked data as topomap timeseries.
The animation can be paused/resumed with left mouse button.
Left and right arrow keys can be used to move backward or forward
in time.
Parameters
----------
ch_type : str | None
Channel type to plot. Accepted data types: 'mag', 'grad', 'eeg'.
If None, first available channel type from ('mag', 'grad', 'eeg')
is used. Defaults to None.
times : array of float | None
The time points to plot. If None, 10 evenly spaced samples are
calculated over the evoked time series. Defaults to None.
frame_rate : int | None
Frame rate for the animation in Hz. If None,
frame rate = sfreq / 10. Defaults to None.
butterfly : bool
Whether to plot the data as butterfly plot under the topomap.
Defaults to False.
blit : bool
Whether to use blit to optimize drawing. In general, it is
recommended to use blit in combination with ``show=True``. If you
intend to save the animation it is better to disable blit.
Defaults to True.
show : bool
Whether to show the animation. Defaults to True.
time_unit : str
The units for the time axis, can be "ms" (default in 0.16)
or "s" (will become the default in 0.17).
.. versionadded:: 0.16
Returns
-------
fig : instance of matplotlib.figure.Figure
The figure.
anim : instance of matplotlib.animation.FuncAnimation
Animation of the topomap.
Notes
-----
.. versionadded:: 0.12.0
"""
return _topomap_animation(
self, ch_type=ch_type, times=times, frame_rate=frame_rate,
butterfly=butterfly, blit=blit, show=show, time_unit=time_unit)
def as_type(self, ch_type='grad', mode='fast'):
"""Compute virtual evoked using interpolated fields.
.. Warning:: Using virtual evoked to compute inverse can yield
unexpected results. The virtual channels have `'_v'` appended
at the end of the names to emphasize that the data contained in
them are interpolated.
Parameters
----------
ch_type : str
The destination channel type. It can be 'mag' or 'grad'.
mode : str
Either `'accurate'` or `'fast'`, determines the quality of the
Legendre polynomial expansion used. `'fast'` should be sufficient
for most applications.
Returns
-------
evoked : instance of mne.Evoked
The transformed evoked object containing only virtual channels.
Notes
-----
.. versionadded:: 0.9.0
"""
from .forward import _as_meg_type_evoked
return _as_meg_type_evoked(self, ch_type=ch_type, mode=mode)
@fill_doc
def detrend(self, order=1, picks=None):
"""Detrend data.
This function operates in-place.
Parameters
----------
order : int
Either 0 or 1, the order of the detrending. 0 is a constant
(DC) detrend, 1 is a linear detrend.
%(picks_good_data)s
Returns
-------
evoked : instance of Evoked
The detrended evoked object.
"""
picks = _picks_to_idx(self.info, picks)
self.data[picks] = detrend(self.data[picks], order, axis=-1)
return self
def copy(self):
"""Copy the instance of evoked.
Returns
-------
evoked : instance of Evoked
"""
evoked = deepcopy(self)
return evoked
def __neg__(self):
"""Negate channel responses.
Returns
-------
evoked_neg : instance of Evoked
The Evoked instance with channel data negated and '-'
prepended to the comment.
"""
out = self.copy()
out.data *= -1
out.comment = '-' + (out.comment or 'unknown')
return out
def get_peak(self, ch_type=None, tmin=None, tmax=None,
mode='abs', time_as_index=False, merge_grads=False,
return_amplitude=False):
"""Get location and latency of peak amplitude.
Parameters
----------
ch_type : 'mag', 'grad', 'eeg', 'seeg', 'ecog', 'hbo', hbr', 'misc', None
The channel type to use. Defaults to None. If more than one sensor
Type is present in the data the channel type has to be explicitly
set.
tmin : float | None
The minimum point in time to be considered for peak getting.
If None (default), the beginning of the data is used.
tmax : float | None
The maximum point in time to be considered for peak getting.
If None (default), the end of the data is used.
mode : {'pos', 'neg', 'abs'}
How to deal with the sign of the data. If 'pos' only positive
values will be considered. If 'neg' only negative values will
be considered. If 'abs' absolute values will be considered.
Defaults to 'abs'.
time_as_index : bool
Whether to return the time index instead of the latency in seconds.
merge_grads : bool
If True, compute peak from merged gradiometer data.
return_amplitude : bool
If True, return also the amplitude at the maximum response.
.. versionadded:: 0.16
Returns
-------
ch_name : str
The channel exhibiting the maximum response.
latency : float | int
The time point of the maximum response, either latency in seconds
or index.
amplitude : float
The amplitude of the maximum response. Only returned if
return_amplitude is True.
.. versionadded:: 0.16
""" # noqa: E501
supported = ('mag', 'grad', 'eeg', 'seeg', 'ecog', 'misc', 'hbo',
'hbr', 'None', 'fnirs_raw', 'fnirs_od')
data_picks = _pick_data_channels(self.info, with_ref_meg=False)
types_used = {channel_type(self.info, idx) for idx in data_picks}
_check_option('ch_type', str(ch_type), supported)
if ch_type is not None and ch_type not in types_used:
raise ValueError('Channel type `{ch_type}` not found in this '
'evoked object.'.format(ch_type=ch_type))
elif len(types_used) > 1 and ch_type is None:
raise RuntimeError('More than one sensor type found. `ch_type` '
'must not be `None`, pass a sensor type '
'value instead')
if merge_grads:
if ch_type != 'grad':
raise ValueError('Channel type must be grad for merge_grads')
elif mode == 'neg':
raise ValueError('Negative mode (mode=neg) does not make '
'sense with merge_grads=True')
meg = eeg = misc = seeg = ecog = fnirs = False
picks = None
if ch_type in ('mag', 'grad'):
meg = ch_type
elif ch_type == 'eeg':
eeg = True
elif ch_type == 'misc':
misc = True
elif ch_type == 'seeg':
seeg = True
elif ch_type == 'ecog':
ecog = True
elif ch_type in ('hbo', 'hbr', 'fnirs_raw', 'fnirs_od'):
fnirs = ch_type
if ch_type is not None:
if merge_grads:
picks = _pair_grad_sensors(self.info, topomap_coords=False)
else:
picks = pick_types(self.info, meg=meg, eeg=eeg, misc=misc,
seeg=seeg, ecog=ecog, ref_meg=False,
fnirs=fnirs)
data = self.data
ch_names = self.ch_names
if picks is not None:
data = data[picks]
ch_names = [ch_names[k] for k in picks]
if merge_grads:
data = _merge_grad_data(data)
ch_names = [ch_name[:-1] + 'X' for ch_name in ch_names[::2]]
ch_idx, time_idx, max_amp = _get_peak(data, self.times, tmin,
tmax, mode)
out = (ch_names[ch_idx], time_idx if time_as_index else
self.times[time_idx])
if return_amplitude:
out += (max_amp,)
return out
def _check_decim(info, decim, offset):
"""Check decimation parameters."""
if decim < 1 or decim != int(decim):
raise ValueError('decim must be an integer > 0')
decim = int(decim)
new_sfreq = info['sfreq'] / float(decim)
lowpass = info['lowpass']
if decim > 1 and lowpass is None:
warn('The measurement information indicates data is not low-pass '
'filtered. The decim=%i parameter will result in a sampling '
'frequency of %g Hz, which can cause aliasing artifacts.'
% (decim, new_sfreq))
elif decim > 1 and new_sfreq < 3 * lowpass:
warn('The measurement information indicates a low-pass frequency '
'of %g Hz. The decim=%i parameter will result in a sampling '
'frequency of %g Hz, which can cause aliasing artifacts.'
% (lowpass, decim, new_sfreq)) # > 50% nyquist lim
offset = int(offset)
if not 0 <= offset < decim:
raise ValueError('decim must be at least 0 and less than %s, got '
'%s' % (decim, offset))
return decim, offset, new_sfreq
@fill_doc
class EvokedArray(Evoked):
"""Evoked object from numpy array.
Parameters
----------
data : array of shape (n_channels, n_times)
The channels' evoked response. See notes for proper units of measure.
info : instance of Info
Info dictionary. Consider using ``create_info`` to populate
this structure.
tmin : float
Start time before event. Defaults to 0.
comment : str
Comment on dataset. Can be the condition. Defaults to ''.
nave : int
Number of averaged epochs. Defaults to 1.
kind : str
Type of data, either average or standard_error. Defaults to 'average'.
%(verbose)s
See Also
--------
EpochsArray, io.RawArray, create_info
Notes
-----
Proper units of measure:
* V: eeg, eog, seeg, emg, ecg, bio, ecog
* T: mag
* T/m: grad
* M: hbo, hbr
* Am: dipole
* AU: misc
"""
@verbose
def __init__(self, data, info, tmin=0., comment='', nave=1, kind='average',
verbose=None): # noqa: D102
dtype = np.complex128 if np.any(np.iscomplex(data)) else np.float64
data = np.asanyarray(data, dtype=dtype)
if data.ndim != 2:
raise ValueError('Data must be a 2D array of shape (n_channels, '
'n_samples)')
if len(info['ch_names']) != np.shape(data)[0]:
raise ValueError('Info (%s) and data (%s) must have same number '
'of channels.' % (len(info['ch_names']),
np.shape(data)[0]))
self.data = data
# XXX: this should use round and be tested
self.first = int(tmin * info['sfreq'])
self.last = self.first + np.shape(data)[-1] - 1
self.times = np.arange(self.first, self.last + 1,
dtype=np.float) / info['sfreq']
self.info = info.copy() # do not modify original info
self.nave = nave
self.kind = kind
self.comment = comment
self.picks = None
self.verbose = verbose
self.preload = True
self._projector = None
_validate_type(self.kind, "str", "kind")
if self.kind not in _aspect_dict:
raise ValueError('unknown kind "%s", should be "average" or '
'"standard_error"' % (self.kind,))
self._aspect_kind = _aspect_dict[self.kind]
def _get_entries(fid, evoked_node, allow_maxshield=False):
"""Get all evoked entries."""
comments = list()
aspect_kinds = list()
for ev in evoked_node:
for k in range(ev['nent']):
my_kind = ev['directory'][k].kind
pos = ev['directory'][k].pos
if my_kind == FIFF.FIFF_COMMENT:
tag = read_tag(fid, pos)
comments.append(tag.data)
my_aspect = _get_aspect(ev, allow_maxshield)[0]
for k in range(my_aspect['nent']):
my_kind = my_aspect['directory'][k].kind
pos = my_aspect['directory'][k].pos
if my_kind == FIFF.FIFF_ASPECT_KIND:
tag = read_tag(fid, pos)
aspect_kinds.append(int(tag.data))
comments = np.atleast_1d(comments)
aspect_kinds = np.atleast_1d(aspect_kinds)
if len(comments) != len(aspect_kinds) or len(comments) == 0:
fid.close()
raise ValueError('Dataset names in FIF file '
'could not be found.')
t = [_aspect_rev[a] for a in aspect_kinds]
t = ['"' + c + '" (' + tt + ')' for tt, c in zip(t, comments)]
t = '\n'.join(t)
return comments, aspect_kinds, t
def _get_aspect(evoked, allow_maxshield):
"""Get Evoked data aspect."""
is_maxshield = False
aspect = dir_tree_find(evoked, FIFF.FIFFB_ASPECT)
if len(aspect) == 0:
_check_maxshield(allow_maxshield)
aspect = dir_tree_find(evoked, FIFF.FIFFB_IAS_ASPECT)
is_maxshield = True
if len(aspect) > 1:
logger.info('Multiple data aspects found. Taking first one.')
return aspect[0], is_maxshield
def _get_evoked_node(fname):
"""Get info in evoked file."""
f, tree, _ = fiff_open(fname)
with f as fid:
_, meas = read_meas_info(fid, tree, verbose=False)
evoked_node = dir_tree_find(meas, FIFF.FIFFB_EVOKED)
return evoked_node
def _check_evokeds_ch_names_times(all_evoked):
evoked = all_evoked[0]
ch_names = evoked.ch_names
for ii, ev in enumerate(all_evoked[1:]):
if ev.ch_names != ch_names:
if set(ev.ch_names) != set(ch_names):
raise ValueError(
"%s and %s do not contain the same channels." % (evoked,
ev))
else:
warn("Order of channels differs, reordering channels ...")
ev = ev.copy()
ev.reorder_channels(ch_names)
all_evoked[ii + 1] = ev
if not np.max(np.abs(ev.times - evoked.times)) < 1e-7:
raise ValueError("%s and %s do not contain the same time instants"
% (evoked, ev))
return all_evoked
def combine_evoked(all_evoked, weights):
"""Merge evoked data by weighted addition or subtraction.
Data should have the same channels and the same time instants.
Subtraction can be performed by calling
``combine_evoked([evoked1, -evoked2], 'equal')``
.. Warning::
If you provide an array of weights instead of using `'equal'` or
`'nave'`, strange things may happen with your resulting signal
amplitude and/or `.nave` attribute.
Parameters
----------
all_evoked : list of Evoked
The evoked datasets.
weights : list of float | str
The weights to apply to the data of each evoked instance.
Can also be ``'nave'`` to weight according to evoked.nave,
or ``"equal"`` to use equal weighting (each weighted as ``1/N``).
Returns
-------
evoked : Evoked
The new evoked data.
Notes
-----
.. versionadded:: 0.9.0
"""
naves = np.array([evk.nave for evk in all_evoked], float)
if isinstance(weights, str):
_check_option('weights', weights, ['nave', 'equal'])
if weights == 'nave':
weights = naves / naves.sum()
else:
weights = np.ones_like(naves)
else:
weights = np.array(weights, float)
if weights.ndim != 1 or weights.size != len(all_evoked):
raise ValueError('weights must be the same size as all_evoked')
# cf. https://en.wikipedia.org/wiki/Weighted_arithmetic_mean, section on
# how variances change when summing Gaussian random variables. The variance
# of a weighted sample mean is:
#
# σ² = w₁² σ₁² + w₂² σ₂² + ... + wₙ² σₙ²
#
# We estimate the variance of each evoked instance as 1 / nave to get:
#
# σ² = w₁² / nave₁ + w₂² / nave₂ + ... + wₙ² / naveₙ
#
# And our resulting nave is the reciprocal of this:
new_nave = 1. / np.sum(weights ** 2 / naves)
# This general formula is equivalent to formulae in Matti's manual
# (pp 128-129), where:
# new_nave = sum(naves) when weights='nave' and
# new_nave = 1. / sum(1. / naves) when weights='equal'
all_evoked = _check_evokeds_ch_names_times(all_evoked)
evoked = all_evoked[0].copy()
# use union of bad channels
bads = list(set(evoked.info['bads']).union(*(ev.info['bads']
for ev in all_evoked[1:])))
evoked.info['bads'] = bads
evoked.data = sum(w * e.data for w, e in zip(weights, all_evoked))
evoked.nave = new_nave
evoked.comment = ' + '.join('%0.3f * %s' % (w, e.comment or 'unknown')
for w, e in zip(weights, all_evoked))
return evoked
@verbose
def read_evokeds(fname, condition=None, baseline=None, kind='average',
proj=True, allow_maxshield=False, verbose=None):
"""Read evoked dataset(s).
Parameters
----------
fname : str
The file name, which should end with -ave.fif or -ave.fif.gz.
condition : int or str | list of int or str | None
The index or list of indices of the evoked dataset to read. FIF files
can contain multiple datasets. If None, all datasets are returned as a
list.
baseline : None (default) or tuple of length 2
The time interval to apply baseline correction. If None do not apply
it. If baseline is (a, b) the interval is between "a (s)" and "b (s)".
If a is None the beginning of the data is used and if b is None then b
is set to the end of the interval. If baseline is equal to (None, None)
all the time interval is used. Correction is applied by computing mean
of the baseline period and subtracting it from the data. The baseline
(a, b) includes both endpoints, i.e. all timepoints t such that
a <= t <= b.
kind : str
Either 'average' or 'standard_error', the type of data to read.
proj : bool
If False, available projectors won't be applied to the data.
allow_maxshield : bool | str (default False)
If True, allow loading of data that has been recorded with internal
active compensation (MaxShield). Data recorded with MaxShield should
generally not be loaded directly, but should first be processed using
SSS/tSSS to remove the compensation signals that may also affect brain
activity. Can also be "yes" to load without eliciting a warning.
%(verbose)s
Returns
-------
evoked : Evoked or list of Evoked
The evoked dataset(s); one Evoked if condition is int or str,
or list of Evoked if condition is None or list.
See Also
--------
write_evokeds
"""
check_fname(fname, 'evoked', ('-ave.fif', '-ave.fif.gz',
'_ave.fif', '_ave.fif.gz'))
logger.info('Reading %s ...' % fname)
return_list = True
if condition is None:
evoked_node = _get_evoked_node(fname)
condition = range(len(evoked_node))
elif not isinstance(condition, list):
condition = [condition]
return_list = False
out = [Evoked(fname, c, kind=kind, proj=proj,
allow_maxshield=allow_maxshield,
verbose=verbose).apply_baseline(baseline)
for c in condition]
return out if return_list else out[0]
def _read_evoked(fname, condition=None, kind='average', allow_maxshield=False):
"""Read evoked data from a FIF file."""
if fname is None:
raise ValueError('No evoked filename specified')
f, tree, _ = fiff_open(fname)
with f as fid:
# Read the measurement info
info, meas = read_meas_info(fid, tree, clean_bads=True)
# Locate the data of interest
processed = dir_tree_find(meas, FIFF.FIFFB_PROCESSED_DATA)
if len(processed) == 0:
raise ValueError('Could not find processed data')
evoked_node = dir_tree_find(meas, FIFF.FIFFB_EVOKED)
if len(evoked_node) == 0:
raise ValueError('Could not find evoked data')
# find string-based entry
if isinstance(condition, str):
if kind not in _aspect_dict.keys():
raise ValueError('kind must be "average" or '
'"standard_error"')
comments, aspect_kinds, t = _get_entries(fid, evoked_node,
allow_maxshield)
goods = (np.in1d(comments, [condition]) &
np.in1d(aspect_kinds, [_aspect_dict[kind]]))
found_cond = np.where(goods)[0]
if len(found_cond) != 1:
raise ValueError('condition "%s" (%s) not found, out of '
'found datasets:\n%s'
% (condition, kind, t))
condition = found_cond[0]
elif condition is None:
if len(evoked_node) > 1:
_, _, conditions = _get_entries(fid, evoked_node,
allow_maxshield)
raise TypeError("Evoked file has more than one "
"condition, the condition parameters "
"must be specified from:\n%s" % conditions)
else:
condition = 0
if condition >= len(evoked_node) or condition < 0:
raise ValueError('Data set selector out of range')
my_evoked = evoked_node[condition]
# Identify the aspects
my_aspect, info['maxshield'] = _get_aspect(my_evoked, allow_maxshield)
# Now find the data in the evoked block
nchan = 0
sfreq = -1
chs = []