shellbot.bus module

class shellbot.bus.Bus(context)[source]

Bases: object

Represents an information bus between publishers and subscribers

In the context of shellbot, channels are channel identifiers, and messages are python objects serializable with json.

A first pattern is the synchronization of direct channels from the group channel:

  • every direct channel is a subscriber, and filters messages sent to their own channel identifier
  • group channel is a publisher, and broadcast instructions to the list of direct channel identifiers it knows about

A second pattern is the observation by a group channel of what is happening in related direct channels:

  • every direct channel is a publisher, and the channel used is their own channel identifier
  • group channel is a subscriber, and observed messages received from all direct channels it knows about

For example, a distributed voting system can be built by combining the two patterns. The vote itself can be triggered simultaneously to direct channels on due time, so that every participants are involved more or less at the same time. And data that is collected in direct channels can ce centralised back to the group channel where results are communicated.

DEFAULT_ADDRESS = 'tcp://127.0.0.1:5555'
check()[source]

Checks configuration settings

This function reads key bus and below, and update the context accordingly. It handles following parameters:

  • bus.address - focal point of bus exchanges on the network. The default value is tcp://*:5555 which means ‘use TCP port 5555 on local machine’.
publish()[source]

Publishes messages

Returns:Publisher

Example:

# get a publisher for subsequent broadcasts
publisher = bus.publish()

# start the publishing process
publisher.start()

...

# broadcast information_message
publisher.put(channel, message)
subscribe(channels)[source]

Subcribes to some channels

Parameters:channels (str or list of str) – one or multiple channels
Returns:Subscriber

Example:

# subscribe from all direct channels related to this group channel
subscriber = bus.subscribe(bot.direct_channels)

...

# get next message from these channels
message = subscriber.get()
class shellbot.bus.Publisher(context)[source]

Bases: multiprocessing.process.Process

Publishes asynchronous messages

For example, from a group channel, you may send instructions to every direct channels:

# get a publisher
publisher = bus.publish()

# send instruction to direct channels
publisher.put(bot.direct_channels, instruction)

From within a direct channel, you may reflect your state to observers:

# get a publisher
publish = bus.publish()

# share new state
publisher.put(bot.id, bit_of_information_here)
DEFER_DURATION = 0.3
EMPTY_DELAY = 0.005
process(item)[source]

Processes items received from the queue

Parameters:item (str) – the item received

Note that the item should result from serialization of (channel, message) tuple done previously.

put(channels, message)[source]

Broadcasts a message

Parameters:
  • channels (str or list of str) – one or multiple channels
  • message (dict or other json-serializable object) – the message to send

Example:

message = { ... }
publisher.put(bot.id, message)

This function actually put the message in a global queue that is handled asynchronously. Therefore, when the function returns there is no guarantee that message has been transmitted nor received.

run()[source]

Continuously broadcasts messages

This function is looping on items received from the queue, and is handling them one by one in the background.

Processing should be handled in a separate background process, like in the following example:

publisher = Publisher(address)
process = publisher.start()

The recommended way for stopping the process is to change the parameter general.switch in the context. For example:

engine.set('general.switch', 'off')

Alternatively, the loop is also broken when a poison pill is pushed to the queue. For example:

publisher.fan.put(None)
class shellbot.bus.Subscriber(context, channels)[source]

Bases: object

Subscribes to asynchronous messages

For example, from a group channel, you may subscribe from direct channels of all participants:

# subscribe from all direct channels related to this group channel
subscriber = bus.subscribe(bot.direct_channels)

# get messages from direct channels
while True:
    message = subscriber.get()
    ...

From within a direct channel, you may receive instructions sent by the group channel:

# subscribe for messages sent to me
subscriber = bus.subscribe(bot.id)

# get and process instructions one at a time
while True:
    instruction = subscriber.get()
    ...
get(block=False)[source]

Gets next message

Returns:dict or other serializable message or None

This function returns next message that has been made available, or None if no message has arrived yet.

Example:

message = subscriber.get()  # immedaite return
if message:
    ...

Change the parameter block if you prefer to wait until next message arrives.

Example:

message = subscriber.get(block=True)  # wait until available

Note that this function does not preserve the enveloppe of the message. In other terms, the channel used for the communication is lost in translation. Therefore the need to put within messages all information that may be relevant for the receiver.