Source code for shellbot.machines.base

# -*- coding: utf-8 -*-

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
import logging
from multiprocessing import Manager, Lock, Process, Queue
import signal
import time


[docs]class Machine(object): """ Implements a state machine The life cycle of a machine can be described as follows:: 1. A machine instance is created and configured:: a_bot = ShellBot(...) machine = Machine(bot=a_bot) machine.set(states=states, transitions=transitions, ... 2. The machine is switched on and ticked at regular intervals:: machine.start() 3. Machine can process more events than ticks:: machine.execute('hello world') 4. When a machine is expecting data from the chat space, it listens from the ``fan`` queue used by the shell:: engine.fan.put('special command') 5. When the machine is coming end of life, resources can be disposed:: machine.stop() credit: Alex Bertsch <abertsch@dropbox.com> securitybot/state_machine.py """ DEFER_DURATION = 0.0 # time to pause before working, in seconds TICK_DURATION = 0.2 # time to wait between ticks, in seconds def __init__(self, bot=None, states=None, transitions=None, initial=None, during=None, on_enter=None, on_exit=None, **kwargs): """ Implements a state machine :param bot: the bot linked to this machine :type : ShellBot :param states: All states supported by this machine :type states: list of str :param transitions: Transitions between states. Each transition is a dictionary. Each dictionary must feature following keys: source (str): The source state of the transition target (str): The target state of the transition Each dictionary may contain following keys: condition (function): A condition that must be true for the transition to occur. If no condition is provided then the state machine will transition on a step. action (function): A function to be executed while the transition occurs. :type transitions: list of dict :param initial: The initial state :type initial: str :param during: A mapping of states to functions to execute while in that state. Each key should map to a callable function. :type during: dict :param on_enter: A mapping of states to functions to execute when entering that state. Each key should map to a callable function. :type on_enter: dict :param on_exit: A mapping of states to functions to execute when exiting that state. Each key should map to a callable function. :type on_exit: dict Example:: machine = Machine(bot=bot) """ self.bot = bot self.lock = Lock() # prevent Manager() process to be interrupted handler = signal.signal(signal.SIGINT, signal.SIG_IGN) self.mutables = Manager().dict() # restore current handler for the rest of the program signal.signal(signal.SIGINT, handler) self.mixer = Queue() self.on_init(**kwargs) if states: self.build(states, transitions, initial, during, on_enter, on_exit)
[docs] def on_init(self, **kwargs): """ Adds to machine initialisation This function should be expanded in sub-class, where necessary. Example:: def on_init(self, prefix='my.machine', **kwargs): ... """ pass
[docs] def get(self, key, default=None): """ Retrieves the value of one key :param key: one attribute of this state machine instance :type key: str :param default: default value is the attribute has not been set yet :type default: an type that can be serialized This function can be used across multiple processes, so that a consistent view of the state machine is provided. """ with self.lock: value = self.mutables.get(key, default) if value is not None: return value return default
[docs] def set(self, key, value): """ Remembers the value of one key :param key: one attribute of this state machine instance :type key: str :param value: new value of the attribute :type value: an type that can be serialized This function can be used across multiple processes, so that a consistent view of the state machine is provided. """ with self.lock: self.mutables[key] = value
[docs] def build(self, states, transitions, initial, during=None, on_enter=None, on_exit=None): """ Builds a complete state machine :param states: All states supported by this machine :type states: list of str :param transitions: Transitions between states. Each transition is a dictionary. Each dictionary must feature following keys: source (str): The source state of the transition target (str): The target state of the transition Each dictionary may contain following keys: condition (function): A condition that must be true for the transition to occur. If no condition is provided then the state machine will transition on a step. action (function): A function to be executed while the transition occurs. :type transitions: list of dict :param initial: The initial state :type initial: str :param during: A mapping of states to functions to execute while in that state. Each key should map to a callable function. :type during: dict :param on_enter: A mapping of states to functions to execute when entering that state. Each key should map to a callable function. :type on_enter: dict :param on_exit: A mapping of states to functions to execute when exiting that state. Each key should map to a callable function. :type on_exit: dict """ if during is None: during = {} if on_enter is None: on_enter = {} if on_exit is None: on_exit = {} states = sorted(list(set(states))) self._states = dict() for state in states: self._states[state] = State(state, during.get(state, None), on_enter.get(state, None), on_exit.get(state, None)) try: self.mutables['initial_state'] = self._states[initial].name self.mutables['state'] = self.mutables['initial_state'] except KeyError: raise ValueError(u'Invalid initial state {}'.format(initial)) self._transitions = defaultdict(list) for transition in transitions: try: source_state = self._states[transition['source']] except KeyError: if 'source' not in transition: raise ValueError(u'Missing source state') else: raise ValueError(u'Invalid source state {}'.format( transition['source'])) try: target_state = self._states[transition['target']] except KeyError: if 'target' not in transition: raise ValueError(u'Missing target state') else: raise ValueError(u'Invalid target state {}'.format( transition['target'])) item = Transition(source_state, target_state, transition.get('condition', None), transition.get('action', None)) self._transitions[transition['source']].append(item)
[docs] def state(self, name): """ Provides a state by name :param name: The label of the target state :type name: str :return: State This function raises KeyError if an unknown name is provided. """ return self._states[name]
@property def current_state(self): """ Provides current state :return: State This function raises AttributeError if it is called before ``build()``. """ try: name = self.mutables['state'] except KeyError: raise AttributeError('Machine has not been built') return self._states[name]
[docs] def reset(self): """ Resets a state machine before it is restarted :return: True if the machine has been actually reset, else False This function moves a state machine back to its initial state. A typical use case is when you have to recycle a state machine multiple times, like in the following example:: if new_cycle(): machine.reset() machine.start() If the machine is running, calling ``reset()`` will have no effect and you will get False in return. Therefore, if you have to force a reset, you may have to stop the machine first. Example of forced reset:: machine.stop() machine.reset() """ if self.is_running: logging.warning(u"Cannot reset a running state machine") return False # purge the mixer queue while not self.mixer.empty(): self.mixer.get() # restore initial state self.set('state', self.get('initial_state')) logging.warning(u"Resetting machine to '{}'".format( self.current_state.name)) # do the rest self.on_reset() return True
[docs] def on_reset(self): """ Adds processing to machine reset This function should be expanded in sub-class, where necessary. Example:: def on_reset(self): self.sub_machine.reset() """ pass
[docs] def step(self, **kwargs): """ Brings some life to the state machine Thanks to ``**kwargs``, it is easy to transmit parameters to underlying functions: - ``current_state.during(**kwargs)`` - ``transition.condition(**kwargs)`` Since parameters can vary on complex state machines, you are advised to pay specific attention to the signatures of related functions. If you expect some parameter in a function, use ``kwargs.get()``to get its value safely. For example, to inject the value of a gauge in the state machine on each tick:: def remember(**kwargs): gauge = kwargs.get('gauge') if gauge: db.save(gauge) during = { 'measuring', remember } ... machine.build(during=during, ... ) while machine.is_running: machine.step(gauge=get_measurement()) Or, if you have to transition on a specific threshold for a gauge, you could do:: def if_threshold(**kwargs): gauge = kwargs.get('gauge') if gauge > 20: return True return False def raise_alarm(): mail.post_message() transitions = [ {'source': 'normal', 'target': 'alarm', 'condition': if_threshold, 'action': raise_alarm}, ... ] ... machine.build(transitions=transitions, ... ) while machine.is_running: machine.step(gauge=get_measurement()) Shellbot is using this mechanism for itself, and the function can be called at various occasions: - machine tick - This is done at regular intervals in time - input from the chat - Typically, in response to a question - inbound message - Received from subscription, over the network Following parameters are used for machine ticks: - event='tick' - fixed value Following parameters are used for chat input: - event='input' - fixed value - arguments - the text that is submitted from the chat Following parameters are used for subscriptions: - event='inbound' - fixed value - message - the object that has been transmitted This machine should report on progress by sending messages with one or multiple ``self.bot.say("Whatever message")``. """ self.current_state.during(**kwargs) for transition in self._transitions[self.current_state.name]: if transition.condition(**kwargs): logging.debug('Transitioning: {0}'.format(transition)) transition.action() self.current_state.on_exit() self.mutables['state'] = transition.target.name self.current_state.on_enter() break
[docs] def start(self, tick=None, defer=None): """ Starts the machine :param tick: The duration set for each tick (optional) :type tick: positive number :param defer: wait some seconds before the actual work (optional) :type defer: positive number :return: either the process that has been started, or None This function starts a separate thread to tick the machine in the background. """ if tick: assert tick > 0.0 # number of seconds self.TICK_DURATION = tick if defer is not None: assert defer >= 0.0 # number of seconds self.DEFER_DURATION = defer process = Process(target=self.run) # do not daemonize process.start() while not self.is_running: # prevent race condition on stop() time.sleep(0.001) return process
[docs] def restart(self, **kwargs): """ Restarts the machine This function is very similar to reset(), except that it also starts the machine on successful reset. Parameters given to it are those that are expected by start(). Note: this function has no effect on a running machine. """ if not self.reset(): return False self.start(**kwargs) return True
[docs] def stop(self): """ Stops the machine This function sends a poison pill to the queue that is read on each tick. """ if self.is_running: self.mixer.put(None) time.sleep(self.TICK_DURATION+0.05)
[docs] def run(self): """ Continuously ticks the machine This function is looping in the background, and calls ``step(event='tick')`` at regular intervals. The recommended way for stopping the process is to call the function ``stop()``. For example:: machine.stop() The loop is also stopped when the parameter ``general.switch`` is changed in the context. For example:: engine.set('general.switch', 'off') """ logging.info(u"Starting machine") self.set('is_running', True) self.on_start() time.sleep(self.DEFER_DURATION) try: while self.bot.engine.get('general.switch', 'on') == 'on': try: if self.mixer.empty(): self.on_tick() time.sleep(self.TICK_DURATION) continue item = self.mixer.get(True, self.TICK_DURATION) if item is None: logging.debug('Stopping machine on poison pill') break logging.debug('Processing item') self.execute(arguments=item) except Exception as feedback: logging.exception(feedback) break except KeyboardInterrupt: pass self.on_stop() self.set('is_running', False) logging.info(u"Machine has been stopped")
[docs] def on_start(self): """ Adds to machine start This function is invoked when the machine is started or restarted. It can be expanded in sub-classes where required. Example:: def on_start(self): # clear bot store on machine start self.bot.forget() """ pass
[docs] def on_stop(self): """ Adds to machine stop This function is invoked when the machine is stopped. It can be expanded in sub-classes where required. Example:: def on_stop(self): # dump bot store on machine stop self.bot.publisher.put( self.bot.id, self.bot.recall('input')) """ pass
[docs] def on_tick(self): """ Processes one tick """ self.step(event='tick') message = self.bot.subscriber.get() if message: self.step(event='inbound', message=message)
[docs] def execute(self, arguments=None, **kwargs): """ Processes data received from the chat :param arguments: input to be injected into the state machine :type arguments: str is recommended This function can be used to feed the machine asynchronously """ self.step(event='input', arguments=arguments, **kwargs)
@property def is_running(self): """ Determines if this machine is runnning :return: True or False """ return self.get('is_running', False)
[docs]class State(object): """ Represents a state of the machine Each state has a function to perform while it's active, when it's entered into, and when it's exited. These functions may be None. """ def __init__(self, name, during=None, on_enter=None, on_exit=None): """ Represents a state in the machine :param name: name of the state :type name: str ;param during: A function to call while this state is active. :type during: function :param on_enter: A function to call when transitioning into this state. :type on_enter: function :param on_exit: Function to call when transitioning out of this state. :type on_exit: function """ self.name = name self._during = during self._on_enter = on_enter self._on_exit = on_exit def __repr__(self): """ Provides a representation of this state :rtype: str """ return u"State({0}, {1}, {2}, {3})".format(self.name, self._during, self._on_enter, self._on_exit ) def __str__(self): """ Provides a string handle to this state :rtype: str """ return self.name
[docs] def during(self, **kwargs): """ Does some stuff while in this state """ if self._during is not None: self._during(**kwargs)
[docs] def on_enter(self): """ Does some stuf while transitioning into this state """ if self._on_enter is not None: self._on_enter()
[docs] def on_exit(self): """ Does some stuff while transitioning out of this state """ if self._on_exit is not None: self._on_exit()
[docs]class Transition(object): """ Represents a transition between two states Each transition object holds a reference to its source and destination states, as well as the condition function it requires for transitioning and the action to perform upon transitioning. """ def __init__(self, source, target, condition=None, action=None): """ Represents a transition between two states Args: source (State): The source State for this transition. target (State): The destination State for this transition. condition (function): The transitioning condition callback. action (function): An action to perform upon transitioning. """ self.source = source self.target = target self._condition = condition self._action = action def __repr__(self): """ Provides a representation of this transition :rtype: str """ return u"Transition({0}, {1}, {2}, {3})".format(repr(self.source), repr(self.target), self._condition, self._action ) def __str__(self): """ Provides a string handle to this transition :rtype: str """ return "{0} => {1}".format(self.source, self.target)
[docs] def condition(self, **kwargs): """ Checks if transition can be triggered :return: True or False Condition default to True if none is provided """ return True if self._condition is None else self._condition(**kwargs)
[docs] def action(self): """ Does some stuff while transitioning """ if self._action is not None: self._action()