|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | + lantz.ui.scan |
| 4 | + ~~~~~~~~~~~~~ |
| 5 | +
|
| 6 | + A Scan frontend and Backend. Requires scan.ui |
| 7 | +
|
| 8 | + :copyright: 2014 by Lantz Authors, see AUTHORS for more details. |
| 9 | + :license: BSD, see LICENSE for more details. |
| 10 | +""" |
| 11 | + |
| 12 | + |
| 13 | +import time |
| 14 | +import math |
| 15 | +from enum import IntEnum |
| 16 | + |
| 17 | +from lantz.utils.qt import QtCore, QtGui |
| 18 | +from lantz.ui.app import Frontend, Backend, start_gui_app |
| 19 | + |
| 20 | + |
| 21 | +def _linspace_args(start, stop, step_size=None, length=None): |
| 22 | + """Return the step_size and length for a given linspace |
| 23 | + where step_size OR length is defined. |
| 24 | + """ |
| 25 | + |
| 26 | + if step_size is None: |
| 27 | + if length is None: |
| 28 | + length = 10 |
| 29 | + step_size = (stop - start) / (length + 1) |
| 30 | + else: |
| 31 | + if length is not None: |
| 32 | + raise ValueError('step_size and length cannot be both different from None') |
| 33 | + length = math.floor((stop - start) / step_size) + 1 |
| 34 | + |
| 35 | + return step_size, length |
| 36 | + |
| 37 | + |
| 38 | +def _linspace(start, stop, step_size=None, length=None): |
| 39 | + """Yield a linear spacing from start to stop |
| 40 | + with defined step_size OR length. |
| 41 | + """ |
| 42 | + |
| 43 | + step_size, length = _linspace_args(start, stop, step_size, length) |
| 44 | + |
| 45 | + for i in range(length): |
| 46 | + yield start + i * step_size |
| 47 | + |
| 48 | + |
| 49 | +class StepsMode(IntEnum): |
| 50 | + """Step calculation modes. |
| 51 | + """ |
| 52 | + |
| 53 | + #: fixed step size. |
| 54 | + step_size = 0 |
| 55 | + |
| 56 | + #: fixed step count. |
| 57 | + step_count = 1 |
| 58 | + |
| 59 | + |
| 60 | +class Scan(Backend): |
| 61 | + """A backend that iterates over an list of values, |
| 62 | + calling a `body` function in each step. |
| 63 | + """ |
| 64 | + |
| 65 | + #: Signal emitted before starting a new iteration |
| 66 | + #: Parameters: loop counter, step value, overrun |
| 67 | + iteration = QtCore.Signal(int, int, bool) |
| 68 | + |
| 69 | + #: Signal emitted when the loop finished. |
| 70 | + #: The parameter is used to inform if the loop was canceled. |
| 71 | + loop_done = QtCore.Signal(bool) |
| 72 | + |
| 73 | + #: The function to be called. It requires three parameters. |
| 74 | + #: counter - the iteration number. |
| 75 | + #: current value - the current value of the scan. |
| 76 | + #: overrun - a boolean indicating if the time required for the operation |
| 77 | + #: is longer than the interval. |
| 78 | + #: :type: (int, int, bool) -> None |
| 79 | + body = None |
| 80 | + |
| 81 | + #: To be called before the body. Same signature as body |
| 82 | + _pre_body = None |
| 83 | + |
| 84 | + #: To be called after the body. Same signature as body |
| 85 | + _post_body = None |
| 86 | + |
| 87 | + def __init__(self, **kwargs): |
| 88 | + super().__init__(**kwargs) |
| 89 | + self._active = False |
| 90 | + self._internal_func = None |
| 91 | + |
| 92 | + def stop(self): |
| 93 | + """Request the scanning to be stop. |
| 94 | + Will stop when the current iteration is finished. |
| 95 | + """ |
| 96 | + self._active = False |
| 97 | + |
| 98 | + def start(self, body, interval=0, steps=(), timeout=0): |
| 99 | + """Request the scanning to be started. |
| 100 | +
|
| 101 | + :param body: function to be called at each iteration. |
| 102 | + If None, the class body will be used. |
| 103 | + :param interval: interval between starts of the iteration. |
| 104 | + If the body takes too long, the iteration will |
| 105 | + be as fast as possible and the overrun flag will be True |
| 106 | + :param steps: iterable |
| 107 | + :param timeout: total time in seconds that the scanning will take. |
| 108 | + If overdue, the scanning will be stopped. |
| 109 | + If 0, there is no timeout. |
| 110 | + """ |
| 111 | + self._active = True |
| 112 | + body = body or self.body |
| 113 | + |
| 114 | + iterations = len(steps) |
| 115 | + |
| 116 | + def internal(counter, overrun=False, schedule=QtCore.QTimer.singleShot): |
| 117 | + if not self._active: |
| 118 | + self.loop_done.emit(True) |
| 119 | + return |
| 120 | + |
| 121 | + st = time.time() |
| 122 | + self.iteration.emit(counter, iterations, overrun) |
| 123 | + if self._pre_body is not None: |
| 124 | + self._pre_body(counter, steps[counter], overrun) |
| 125 | + if body is not None: |
| 126 | + body(counter, steps[counter], overrun) |
| 127 | + if self._post_body is not None: |
| 128 | + self._post_body(counter, steps[counter], overrun) |
| 129 | + |
| 130 | + if iterations and counter + 1 == iterations: |
| 131 | + self._active = False |
| 132 | + self.loop_done.emit(False) |
| 133 | + return |
| 134 | + elif not self._active: |
| 135 | + self.loop_done.emit(True) |
| 136 | + return |
| 137 | + |
| 138 | + sleep = interval - (time.time() - st) |
| 139 | + schedule(sleep * 1000 if sleep > 0 else 0, |
| 140 | + lambda: self._internal_func(counter + 1, sleep < 0)) |
| 141 | + |
| 142 | + self._internal_func = internal |
| 143 | + if timeout: |
| 144 | + QtCore.QTimer.singleShot(timeout * 1000, self.stop) |
| 145 | + QtCore.QTimer.singleShot(0, lambda: self._internal_func(0)) |
| 146 | + |
| 147 | + |
| 148 | +class ScanUi(Frontend): |
| 149 | + """A frontend to the Scan backend. |
| 150 | +
|
| 151 | + Allows you to create linear sequence of steps between a start a stop, |
| 152 | + with selectable step size or number of steps. |
| 153 | + """ |
| 154 | + |
| 155 | + gui = 'scan.ui' |
| 156 | + |
| 157 | + auto_connect = False |
| 158 | + |
| 159 | + #: Signal emitted when a start is requested. |
| 160 | + #: The parameters are None, interval, vector of steps |
| 161 | + request_start = QtCore.Signal(object, object, object) |
| 162 | + |
| 163 | + #: Signal emitted when a stop is requested. |
| 164 | + request_stop = QtCore.Signal() |
| 165 | + |
| 166 | + def connect_backend(self): |
| 167 | + super().connect_backend() |
| 168 | + |
| 169 | + self.widget.start_stop.clicked.connect(self.on_start_stop_clicked) |
| 170 | + self.widget.mode.currentIndexChanged.connect(self.on_mode_changed) |
| 171 | + |
| 172 | + self.widget.step_count.valueChanged.connect(self.recalculate) |
| 173 | + self.widget.start.valueChanged.connect(self.recalculate) |
| 174 | + self.widget.stop.valueChanged.connect(self.recalculate) |
| 175 | + self.widget.step_size.valueChanged.connect(self.recalculate) |
| 176 | + |
| 177 | + self.widget.progress_bar.setValue(0) |
| 178 | + |
| 179 | + self._ok_palette = QtGui.QPalette(self.widget.progress_bar.palette()) |
| 180 | + self._overrun_palette = QtGui.QPalette(self.widget.progress_bar.palette()) |
| 181 | + self._overrun_palette.setColor(QtGui.QPalette.Highlight, |
| 182 | + QtGui.QColor(QtCore.Qt.red)) |
| 183 | + |
| 184 | + self.backend.iteration.connect(self.on_iteration) |
| 185 | + self.backend.loop_done.connect(self.on_loop_done) |
| 186 | + |
| 187 | + self.request_start.connect(self.backend.start) |
| 188 | + self.request_stop.connect(self.backend.stop) |
| 189 | + |
| 190 | + def on_start_stop_clicked(self, value=None): |
| 191 | + if self.backend._active: |
| 192 | + self.widget.start_stop.setText('...') |
| 193 | + self.widget.start_stop.setEnabled(False) |
| 194 | + self.request_stop.emit() |
| 195 | + return |
| 196 | + |
| 197 | + self.widget.start_stop.setText('Stop') |
| 198 | + self.widget.start_stop.setChecked(True) |
| 199 | + |
| 200 | + vals = [getattr(self.widget, name).value() |
| 201 | + for name in 'start stop step_size step_count wait'.split()] |
| 202 | + start, stop, step_size, step_count, interval = vals |
| 203 | + |
| 204 | + steps = list(_linspace(start, stop, step_size)) |
| 205 | + |
| 206 | + self.request_start.emit(None, interval, steps) |
| 207 | + |
| 208 | + def recalculate(self, *args): |
| 209 | + mode = self.widget.mode.currentIndex() |
| 210 | + if mode == StepsMode.step_size: |
| 211 | + step_size, length = _linspace_args(self.widget.start.value(), |
| 212 | + self.widget.stop.value(), |
| 213 | + self.widget.step_size.value()) |
| 214 | + self.widget.step_count.setValue(length) |
| 215 | + elif mode == StepsMode.step_count: |
| 216 | + step_size, length = _linspace_args(self.widget.start.value(), |
| 217 | + self.widget.stop.value(), |
| 218 | + length=self.widget.step_count.value()) |
| 219 | + self.widget.step_size.setValue(step_size) |
| 220 | + |
| 221 | + def on_iteration(self, counter, iterations, overrun): |
| 222 | + pbar = self.widget.progress_bar |
| 223 | + |
| 224 | + if not counter: |
| 225 | + if iterations: |
| 226 | + pbar.setMaximum(iterations + 1) |
| 227 | + else: |
| 228 | + pbar.setMaximum(0) |
| 229 | + |
| 230 | + if iterations: |
| 231 | + pbar.setValue(counter + 1) |
| 232 | + |
| 233 | + if overrun: |
| 234 | + pbar.setPalette(self._overrun_palette) |
| 235 | + else: |
| 236 | + pbar.setPalette(self._ok_palette) |
| 237 | + |
| 238 | + def on_mode_changed(self, new_index): |
| 239 | + if new_index == StepsMode.step_size: |
| 240 | + self.widget.step_count.setEnabled(False) |
| 241 | + self.widget.step_size.setEnabled(True) |
| 242 | + elif new_index == StepsMode.step_count: |
| 243 | + self.widget.step_count.setEnabled(True) |
| 244 | + self.widget.step_size.setEnabled(False) |
| 245 | + |
| 246 | + self.recalculate() |
| 247 | + |
| 248 | + def on_loop_done(self, cancelled): |
| 249 | + self.widget.start_stop.setText('Start') |
| 250 | + self.widget.start_stop.setEnabled(True) |
| 251 | + self.widget.start_stop.setChecked(False) |
| 252 | + if self.widget.progress_bar.maximum(): |
| 253 | + self.widget.progress_bar.setValue(self.widget.progress_bar.maximum()) |
| 254 | + else: |
| 255 | + self.widget.progress_bar.setMaximum(1) |
| 256 | + |
| 257 | + |
| 258 | +if __name__ == '__main__': |
| 259 | + def func(current, total, overrun): |
| 260 | + print('func', current, total, overrun) |
| 261 | + app = Scan() |
| 262 | + app.body = func |
| 263 | + start_gui_app(app, ScanUi) |
0 commit comments