Source code for ossom.monitor

# -*- coding: utf-8 -*-
"""
Lacks documentation!

Created on Sat Jun 27 20:25:33 2020

@author: João Vitor Gutkoski Paes
"""

import numpy as np
import time
import multiprocessing as mp
from ossom import Recorder, Player, Configurations
from typing import Union


config = Configurations()


[docs]class Monitor(object): """Monitor class."""
[docs] def __init__(self, target: callable = lambda x: x, samplerate: int = config.samplerate, waittime: float = 1., args: tuple = (0,)): """ Control a multiprocessing.Process to visualize data from recording or playing. Parameters ---------- target : callable, optional DESCRIPTION. The default is None. args : tuple, optional DESCRIPTION. The default is None. Returns ------- None. """ self.target = target self.waitTime = waittime self.samplerate = samplerate self.readLen = int(np.ceil(waittime * samplerate)) self.args = args return
[docs] def __call__(self, strm: Union[Recorder, Player] = None, blocksize: int = None): """ Configure the monitor and starts de process. Parameters ---------- strm : Union[Recorder, Player], optional DESCRIPTION. The default is None. buffersize : int, optional DESCRIPTION. The default is None. Returns ------- None. """ self.running = strm.running self.finished = strm.finished self._buffer = strm.get_buffer(blocksize=self.readLen) # assert self.buffer.data is strm.data self._process = mp.Process(target=self._loop) return
[docs] def setup(self): """Any setup step needed to the end monitoring object. Must be overriden on subclasses.""" pass
[docs] def tear_down(self): """Any destroying step needed to finish the end monitoring object. Must be overriden on subclasses.""" pass
def _loop(self): """ Actual monitoring loop. Parameters ---------- buffer : Audio DESCRIPTION. Returns ------- None. """ self.setup() self.running.wait() time.sleep(0.25) self.nextTime = time.time() + self.waitTime while self.running.is_set(): sleepTime = self.nextTime - time.time() if sleepTime > 0.: time.sleep(sleepTime) self.target(self._buffer.read_next(self.readLen), *self.args) self.nextTime = time.time() + self.waitTime if self.finished.is_set(): break self.tear_down() return
[docs] def start(self): """Start the parallel process.""" self._process.start() return
[docs] def wait(self): """Finish the parallel process.""" self._process.join() self._process.close() # self._buffer.close() return