Source code for gabrieltool.statemachine.callable_zoo.predicate_zoo.base

# -*- coding: utf-8 -*-
"""Callable classes for Transition Predicates.

All the classes here should be a callable and return either True/False when
called (to indicate whether or not to take a transition). All classes should
inherit from CallableBase class and annoate their constructor (if there is one)
with the @record_kwargs decorator for proper serialization.
"""
import time

from gabrieltool.statemachine.callable_zoo import record_kwargs
from gabrieltool.statemachine.callable_zoo import CallableBase


[docs]class HasObjectClass(CallableBase): """Check if there is an object class in the extracted information of the sensor data. """ @record_kwargs def __init__(self, class_name): """Constructor. Args: class_name (string): Class name or id. """ super().__init__() self.class_name = class_name def __call__(self, app_state): return self.class_name in app_state
[docs]class Always(CallableBase): """Always take this transition. Useful for welcome message when the application starts. """ def __call__(self, app_state): return True
[docs]class HasObjectClassWhileNotOthers(CallableBase): """Check if there are some object classes in the extracted information while some other classes are not. """ @record_kwargs def __init__(self, has_classes=None, absent_classes=None): """Constructor. Args: existed_classes (list of strings, optional): Names of classes that need to exist. Defaults to None. absent_classes (list of strings, optional): Name of classes that need to be absent. Defaults to None. """ super().__init__() self.has_classes = has_classes if has_classes is not None else [] self.absent_classes = absent_classes if absent_classes is not None else [] def __call__(self, app_state): return all( [class_name in app_state for class_name in self.has_classes]) and all( [class_name not in app_state for class_name in self.absent_classes] )
[docs]class Wait(CallableBase): """Wait for some time before turning true. """ @record_kwargs def __init__(self, wait_time=None): """Constructor. Args: wait_time (int, optional): Wait time in seconds. Defaults to None. """ super().__init__() self.wait_time = wait_time if wait_time is not None else 0 # set when this predicate this first called # or returned True in the last call. self._start_time = None def __call__(self, app_state): if self._start_time is None: self._start_time = time.time() else: cur_time = time.time() if (cur_time - self._start_time) > self.wait_time: # reset self._start_time = None return True return False