# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from builtins import str
import json
import logging
from multiprocessing import Process, Queue
from six import string_types
import time
import zmq
[docs]class Bus(object):
"""
Represents an information bus between publishers and subscribers
In the context of shellbot, channels are channel identifiers, and messages
are python objects serializable with json.
A first pattern is the synchronization of direct channels from the group
channel:
- every direct channel is a subscriber, and filters messages sent to their
own channel identifier
- group channel is a publisher, and broadcast instructions to the list
of direct channel identifiers it knows about
A second pattern is the observation by a group channel of what is
happening in related direct channels:
- every direct channel is a publisher, and the channel used is their own
channel identifier
- group channel is a subscriber, and observed messages received from all
direct channels it knows about
For example, a distributed voting system can be built by combining
the two patterns. The vote itself can be triggered simultaneously to
direct channels on due time, so that every participants are involved
more or less at the same time. And data that is collected in direct
channels can ce centralised back to the group channel where results
are communicated.
"""
# DEFAULT_ADDRESS = 'ipc://shellbot'
DEFAULT_ADDRESS = 'tcp://127.0.0.1:5555'
def __init__(self, context):
"""
Represents an information bus between publishers and subscribers
:param context: general settings
:type context: Context
"""
self.context = context
[docs] def check(self):
"""
Checks configuration settings
This function reads key ``bus`` and below, and update
the context accordingly. It handles following parameters:
* ``bus.address`` - focal point of bus exchanges on the network.
The default value is ``tcp://*:5555`` which means 'use TCP port 5555
on local machine'.
"""
self.context.check('bus.address', self.DEFAULT_ADDRESS)
[docs] def subscribe(self, channels):
"""
Subcribes to some channels
:param channels: one or multiple channels
:type channels: str or list of str
:return: Subscriber
Example::
# subscribe from all direct channels related to this group channel
subscriber = bus.subscribe(bot.direct_channels)
...
# get next message from these channels
message = subscriber.get()
"""
return Subscriber(context=self.context, channels=channels)
[docs] def publish(self):
"""
Publishes messages
:return: Publisher
Example::
# get a publisher for subsequent broadcasts
publisher = bus.publish()
# start the publishing process
publisher.start()
...
# broadcast information_message
publisher.put(channel, message)
"""
return Publisher(context=self.context)
[docs]class Subscriber(object):
"""
Subscribes to asynchronous messages
For example, from a group channel, you may subscribe from
direct channels of all participants::
# subscribe from all direct channels related to this group channel
subscriber = bus.subscribe(bot.direct_channels)
# get messages from direct channels
while True:
message = subscriber.get()
...
From within a direct channel, you may receive instructions sent
by the group channel::
# subscribe for messages sent to me
subscriber = bus.subscribe(bot.id)
# get and process instructions one at a time
while True:
instruction = subscriber.get()
...
"""
def __init__(self, context, channels):
"""
Subscribes to asynchronous messages
:param context: general settings
:type context: Context
:param channels: one or multiple channels
:type channels: str or list of str
"""
self.context = context
address=self.context.get('bus.address')
logging.debug(u"Subscribing at {}".format(address))
assert channels or channels == ''
if isinstance(channels, string_types):
channels = [channels]
self.channels = channels
for channel in self.channels:
if channel:
logging.debug(u"- {}".format(channel))
else:
logging.debug(u"- {}".format('<all channels>'))
self.socket = None # defer binding to first get()
[docs] def get(self, block=False):
"""
Gets next message
:return: dict or other serializable message or None
This function returns next message that has been made available,
or None if no message has arrived yet.
Example::
message = subscriber.get() # immedaite return
if message:
...
Change the parameter ``block`` if you prefer to wait until next
message arrives.
Example::
message = subscriber.get(block=True) # wait until available
Note that this function does not preserve the enveloppe of the message.
In other terms, the channel used for the communication is lost
in translation. Therefore the need to put within messages all
information that may be relevant for the receiver.
"""
if not self.socket:
zmq_context = zmq.Context.instance()
self.socket = zmq_context.socket(zmq.SUB)
self.socket.linger = 0
address=self.context.get('bus.address')
self.socket.connect(address)
for channel in self.channels:
self.socket.setsockopt_string(zmq.SUBSCRIBE, str(channel))
try:
flags = zmq.NOBLOCK if not block else 0
snippet = str(self.socket.recv(flags=flags))
(channel, text) = snippet.split(' ', 1)
return json.loads(text)
except zmq.error.Again:
return None
[docs]class Publisher(Process):
"""
Publishes asynchronous messages
For example, from a group channel, you may send instructions to every
direct channels::
# get a publisher
publisher = bus.publish()
# send instruction to direct channels
publisher.put(bot.direct_channels, instruction)
From within a direct channel, you may reflect your state to observers::
# get a publisher
publish = bus.publish()
# share new state
publisher.put(bot.id, bit_of_information_here)
"""
DEFER_DURATION = 0.3 # allow subscribers to connect
EMPTY_DELAY = 0.005 # time to wait if queue is empty
def __init__(self, context):
"""
Publishes asynchronous messages
:param context: general settings
:type context: Context
"""
Process.__init__(self)
self.daemon = True
self.context = context
self.fan = Queue()
self.socket = None # allow socket injection for tests
[docs] def run(self):
"""
Continuously broadcasts messages
This function is looping on items received from the queue, and
is handling them one by one in the background.
Processing should be handled in a separate background process, like
in the following example::
publisher = Publisher(address)
process = publisher.start()
The recommended way for stopping the process is to change the
parameter ``general.switch`` in the context. For example::
engine.set('general.switch', 'off')
Alternatively, the loop is also broken when a poison pill is pushed
to the queue. For example::
publisher.fan.put(None)
"""
address=self.context.get('bus.address')
if not self.socket:
zmq_context = zmq.Context.instance()
self.socket = zmq_context.socket(zmq.PUB)
self.socket.linger = 0
self.socket.bind(address)
time.sleep(self.DEFER_DURATION) # allow subscribers to connect
logging.info(u"Starting publisher")
logging.debug(u"- publishing at {}".format(address))
try:
self.context.set('publisher.counter', 0)
while self.context.get('general.switch', 'on') == 'on':
if self.fan.empty():
time.sleep(self.EMPTY_DELAY)
continue
try:
item = self.fan.get_nowait()
if item is None:
break
self.context.increment('publisher.counter')
self.process(item)
except Exception as feedback:
logging.exception(feedback)
except KeyboardInterrupt:
pass
self.socket.close()
self.socket = None
logging.info("Publisher has been stopped")
[docs] def process(self, item):
"""
Processes items received from the queue
:param item: the item received
:type item: str
Note that the item should result from serialization
of (channel, message) tuple done previously.
"""
logging.debug(u"Publishing {}".format(item))
self.socket.send_string(item)
[docs] def put(self, channels, message):
"""
Broadcasts a message
:param channels: one or multiple channels
:type channels: str or list of str
:param message: the message to send
:type message: dict or other json-serializable object
Example::
message = { ... }
publisher.put(bot.id, message)
This function actually put the message in a global queue that is
handled asynchronously. Therefore, when the function returns there is
no guarantee that message has been transmitted nor received.
"""
assert channels
if isinstance(channels, string_types):
channels = [channels]
assert message
text = json.dumps(message)
for channel in channels:
item = channel + ' ' + text
logging.debug(u"Queuing {}".format(item))
self.fan.put(item)