shellbot.bus module¶
-
class
shellbot.bus.
Bus
(context)[source]¶ Bases:
object
Represents an information bus between publishers and subscribers
In the context of shellbot, channels are channel identifiers, and messages are python objects serializable with json.
A first pattern is the synchronization of direct channels from the group channel:
- every direct channel is a subscriber, and filters messages sent to their own channel identifier
- group channel is a publisher, and broadcast instructions to the list of direct channel identifiers it knows about
A second pattern is the observation by a group channel of what is happening in related direct channels:
- every direct channel is a publisher, and the channel used is their own channel identifier
- group channel is a subscriber, and observed messages received from all direct channels it knows about
For example, a distributed voting system can be built by combining the two patterns. The vote itself can be triggered simultaneously to direct channels on due time, so that every participants are involved more or less at the same time. And data that is collected in direct channels can ce centralised back to the group channel where results are communicated.
-
DEFAULT_ADDRESS
= 'tcp://127.0.0.1:5555'¶
-
check
()[source]¶ Checks configuration settings
This function reads key
bus
and below, and update the context accordingly. It handles following parameters:bus.address
- focal point of bus exchanges on the network. The default value istcp://*:5555
which means ‘use TCP port 5555 on local machine’.
-
publish
()[source]¶ Publishes messages
Returns: Publisher Example:
# get a publisher for subsequent broadcasts publisher = bus.publish() # start the publishing process publisher.start() ... # broadcast information_message publisher.put(channel, message)
-
subscribe
(channels)[source]¶ Subcribes to some channels
Parameters: channels (str or list of str) – one or multiple channels Returns: Subscriber Example:
# subscribe from all direct channels related to this group channel subscriber = bus.subscribe(bot.direct_channels) ... # get next message from these channels message = subscriber.get()
-
class
shellbot.bus.
Publisher
(context)[source]¶ Bases:
multiprocessing.process.Process
Publishes asynchronous messages
For example, from a group channel, you may send instructions to every direct channels:
# get a publisher publisher = bus.publish() # send instruction to direct channels publisher.put(bot.direct_channels, instruction)
From within a direct channel, you may reflect your state to observers:
# get a publisher publish = bus.publish() # share new state publisher.put(bot.id, bit_of_information_here)
-
DEFER_DURATION
= 0.3¶
-
EMPTY_DELAY
= 0.005¶
-
process
(item)[source]¶ Processes items received from the queue
Parameters: item (str) – the item received Note that the item should result from serialization of (channel, message) tuple done previously.
-
put
(channels, message)[source]¶ Broadcasts a message
Parameters: - channels (str or list of str) – one or multiple channels
- message (dict or other json-serializable object) – the message to send
Example:
message = { ... } publisher.put(bot.id, message)
This function actually put the message in a global queue that is handled asynchronously. Therefore, when the function returns there is no guarantee that message has been transmitted nor received.
-
run
()[source]¶ Continuously broadcasts messages
This function is looping on items received from the queue, and is handling them one by one in the background.
Processing should be handled in a separate background process, like in the following example:
publisher = Publisher(address) process = publisher.start()
The recommended way for stopping the process is to change the parameter
general.switch
in the context. For example:engine.set('general.switch', 'off')
Alternatively, the loop is also broken when a poison pill is pushed to the queue. For example:
publisher.fan.put(None)
-
-
class
shellbot.bus.
Subscriber
(context, channels)[source]¶ Bases:
object
Subscribes to asynchronous messages
For example, from a group channel, you may subscribe from direct channels of all participants:
# subscribe from all direct channels related to this group channel subscriber = bus.subscribe(bot.direct_channels) # get messages from direct channels while True: message = subscriber.get() ...
From within a direct channel, you may receive instructions sent by the group channel:
# subscribe for messages sent to me subscriber = bus.subscribe(bot.id) # get and process instructions one at a time while True: instruction = subscriber.get() ...
-
get
(block=False)[source]¶ Gets next message
Returns: dict or other serializable message or None This function returns next message that has been made available, or None if no message has arrived yet.
Example:
message = subscriber.get() # immedaite return if message: ...
Change the parameter
block
if you prefer to wait until next message arrives.Example:
message = subscriber.get(block=True) # wait until available
Note that this function does not preserve the enveloppe of the message. In other terms, the channel used for the communication is lost in translation. Therefore the need to put within messages all information that may be relevant for the receiver.
-