Source code for choirbot.guidance.optimization_thread

from threading import Thread, Event, Lock
from rclpy.guard_condition import GuardCondition

from . import guidance as guidance_module
from ..optimizer import Optimizer
from ..utils import OrEvent


[docs]class OptimizationThread(Thread): """ Base class representing separate thread in which optimization tasks are performed. This is an abstract class and is intended to be extended before instantiation. Attributes: optimizer: Optimizer class """
[docs] def __init__(self, guidance: 'guidance_module.Guidance', optimizer: Optimizer, gc_end: GuardCondition): """ Args: guidance (:class:`~choirbot.guidance.Guidance`): Guidance object optimizer (:class:`~choirbot.optimizer.Optimizer`): Optimizer object gc_end (:class:`rclpy.guard_condition.GuardCondition`): Guard Condition triggered at end of each optimization """ super().__init__() self._halt_event = Event() # event to trigger optimization stop self._begin_event = Event() # event to trigger optimization start self._quit_event = Event() # event to trigger thread quitting self._lock = Lock() self._is_optimizing = False # True if there is an optimization in progress self._gc_end = gc_end # guard condition to be triggered at end of optimization self.guidance = guidance self.optimizer = optimizer self.optimizer.initialize(guidance, self._halt_event)
def run(self): # event used to trigger either optimization stop or thread quitting begin_or_quit = OrEvent(self._begin_event, self._quit_event) # loop until quit event is set while not self._quit_event.is_set(): begin_or_quit.wait() # check if optimization if event has been set if self._begin_event.is_set(): # clear begin event and mark optimization as in progress self._begin_event.clear() with self._lock: self._is_optimizing = True # start optimization self.do_optimize() # mark optimization as ended and (possibly) clear halt event with self._lock: self._is_optimizing = False # clear halt event if previously set if self._halt_event.is_set(): self._halt_event.clear() else: # otherwise trigger guard condition to notify end of optimization self._gc_end.trigger() # clear quit event before actually quitting self._quit_event.clear()
[docs] def do_optimize(self): """ TODO """ raise NotImplementedError
[docs] def halt(self): """ TODO """ self.guidance.get_logger().info('Halting optimization') self._halt_event.set()
def _halt_if_optimizing(self): with self._lock: if self._is_optimizing: self.guidance.get_logger().info('Halting previous optimization') self._halt_event.set()
[docs] def optimize(self): """ TODO """ self._halt_if_optimizing() self.guidance.get_logger().info('Beginning optimization') self._begin_event.set()
[docs] def quit(self): """ TODO """ self._halt_if_optimizing() self.guidance.get_logger().info('Quitting optimization thread') self._quit_event.set()