Source code for errbot.backends.test

import importlib
import logging
import sys
import textwrap
import unittest
from os.path import abspath, sep
from queue import Empty, Queue
from tempfile import mkdtemp
from threading import Thread
from typing import BinaryIO, List, Optional

import pytest

from errbot.backends.base import (
    ONLINE,
    Identifier,
    Message,
    Person,
    Presence,
    Room,
    RoomOccupant,
)
from errbot.bootstrap import setup_bot
from errbot.core import ErrBot
from errbot.core_plugins.wsview import reset_app
from errbot.rendering import text
from errbot.utils import deprecated

log = logging.getLogger(__name__)

QUIT_MESSAGE = "$STOP$"

STZ_MSG = 1
STZ_PRE = 2
STZ_IQ = 3


[docs] class TestPerson(Person): """ This is an identifier just represented as a string. DO NOT USE THIS DIRECTLY AS IT IS NOT COMPATIBLE WITH MOST BACKENDS, use self.build_identifier(identifier_as_string) instead. Note to back-end implementors: You should provide a custom <yourbackend>Identifier object that adheres to this interface. You should not directly inherit from SimpleIdentifier, inherit from object instead and make sure it includes all properties and methods exposed by this class. """ __test__ = False
[docs] def __init__(self, person, client=None, nick=None, fullname=None, email=None): self._person = person self._client = client self._nick = nick self._fullname = fullname self._email = email
@property def person(self): """This needs to return the part of the identifier pointing to a person.""" return self._person @property def client(self) -> Optional[str]: """This needs to return the part of the identifier pointing to a client from which a person is sending a message from. Returns None is unspecified""" return self._client @property def nick(self) -> Optional[str]: """This needs to return a short display name for this identifier e.g. gbin. Returns None is unspecified""" return self._nick @property def fullname(self) -> Optional[str]: """This needs to return a long display name for this identifier e.g. Guillaume Binet. Returns None is unspecified""" return self._fullname @property def email(self) -> Optional[str]: """This needs to return an email for this identifier e.g. Guillaume.Binet@gmail.com. Returns None is unspecified""" return self._email aclattr = person def __unicode__(self) -> str: if self.client: return f"{self._person}/{self._client}" return f"{self._person}" __str__ = __unicode__ def __eq__(self, other): if not isinstance(other, Person): return False return self.person == other.person
# noinspection PyAbstractClass
[docs] class TestOccupant(TestPerson, RoomOccupant): """This is a MUC occupant represented as a string. DO NOT USE THIS DIRECTLY AS IT IS NOT COMPATIBLE WITH MOST BACKENDS, """ __test__ = False
[docs] def __init__(self, person, room): super().__init__(person) self._room = room
@property def room(self) -> Room: return self._room def __unicode__(self): return self._person + "@" + str(self._room) __str__ = __unicode__ def __eq__(self, other): return self.person == other.person and self.room == other.room
[docs] class TestRoom(Room): __test__ = False
[docs] def invite(self, *args): pass
[docs] def __init__( self, name: str, occupants: Optional[List[TestOccupant]] = None, topic: Optional[str] = None, bot: ErrBot = None, ): """ :param name: Name of the room :param occupants: Occupants of the room :param topic: The MUC's topic """ if occupants is None: occupants = [] self._occupants = occupants self._topic = topic self._bot = bot self._name = name self._bot_mucid = TestOccupant( self._bot.bot_config.BOT_IDENTITY["username"], self._name )
@property def occupants(self) -> List[TestOccupant]: return self._occupants
[docs] def find_croom(self) -> Optional["TestRoom"]: """find back the canonical room from a this room""" for croom in self._bot._rooms: if croom == self: return croom return None
@property def joined(self) -> bool: room = self.find_croom() if room: return self._bot_mucid in room.occupants return False
[docs] def join( self, username: Optional[str] = None, password: Optional[str] = None ) -> None: if self.joined: logging.warning( "Attempted to join room %s, but already in this room.", self ) return if not self.exists: log.debug("Room %s doesn't exist yet, creating it.", self) self.create() room = self.find_croom() room._occupants.append(self._bot_mucid) log.info("Joined room %s.", self) self._bot.callback_room_joined(room, self._bot_mucid)
[docs] def leave(self, reason: Optional[str] = None) -> None: if not self.joined: logging.warning("Attempted to leave room %s, but not in this room.", self) return room = self.find_croom() room._occupants.remove(self._bot_mucid) log.info("Left room %s.", self) self._bot.callback_room_left(room, self._bot_mucid)
@property def exists(self) -> bool: return self.find_croom() is not None
[docs] def create(self) -> None: if self.exists: logging.warning("Room %s already created.", self) return self._bot._rooms.append(self) log.info("Created room %s.", self)
[docs] def destroy(self) -> None: if not self.exists: logging.warning("Cannot destroy room %s, it doesn't exist.", self) return self._bot._rooms.remove(self) log.info("Destroyed room %s.", self)
@property def topic(self) -> str: return self._topic @topic.setter def topic(self, topic: str): self._topic = topic room = self.find_croom() room._topic = self._topic log.info("Topic for room %s set to %s.", self, topic) self._bot.callback_room_topic(self) def __unicode__(self): return self._name def __str__(self): return self._name def __eq__(self, other): return self._name == other._name
[docs] class TestRoomAcl(TestRoom):
[docs] def __init__(self, name, occupants=None, topic=None, bot=None): super().__init__(name, occupants, topic, bot)
@property def aclattr(self): return self._name def __str__(self): return "not room name"
[docs] class TestBackend(ErrBot):
[docs] def change_presence(self, status: str = ONLINE, message: str = "") -> None: pass
[docs] def __init__(self, config): config.BOT_LOG_LEVEL = logging.DEBUG config.CHATROOM_PRESENCE = ( "testroom", ) # we are testing with simple identfiers config.BOT_IDENTITY = { "username": "err" } # we are testing with simple identfiers self.bot_identifier = self.build_identifier("Err") # whatever super().__init__(config) self.incoming_stanza_queue = Queue() self.outgoing_message_queue = Queue() self.sender = self.build_identifier( config.BOT_ADMINS[0] ) # By default, assume this is the admin talking self.reset_rooms() self.md = text()
[docs] def send_message(self, msg: Message) -> None: log.info("\n\n\nMESSAGE:\n%s\n\n\n", msg.body) super().send_message(msg) self.outgoing_message_queue.put(self.md.convert(msg.body))
[docs] def send_stream_request( self, user: Identifier, fsource: Optional[BinaryIO], name: Optional[str], size: Optional[int], stream_type: Optional[str], ) -> None: # Just dump the stream contents to the message queue self.outgoing_message_queue.put(fsource.read())
[docs] def serve_forever(self) -> None: self.connect_callback() # notify that the connection occured try: while True: log.debug("waiting on queue") stanza_type, entry, extras = self.incoming_stanza_queue.get() log.debug("message received") if entry == QUIT_MESSAGE: log.info("Stop magic message received, quitting...") break if stanza_type is STZ_MSG: msg = Message(entry, extras=extras) msg.frm = self.sender msg.to = self.bot_identifier # To me only self.callback_message(msg) # implements the mentions. mentioned = [ self.build_identifier(word[1:]) for word in entry.split() if word.startswith("@") ] if mentioned: self.callback_mention(msg, mentioned) elif stanza_type is STZ_PRE: log.info("Presence stanza received.") self.callback_presence(entry) elif stanza_type is STZ_IQ: log.info("IQ stanza received.") else: log.error("Unknown stanza type.") except EOFError: pass except KeyboardInterrupt: pass finally: log.debug("Trigger disconnect callback") self.disconnect_callback() log.debug("Trigger shutdown") self.shutdown()
[docs] def shutdown(self) -> None: if self.is_open_storage(): self.close_storage() self.plugin_manager.shutdown() self.repo_manager.shutdown()
# super().shutdown()
[docs] def connect(self) -> None: return
[docs] def build_identifier(self, text_representation) -> TestPerson: return TestPerson(text_representation)
[docs] def build_reply( self, msg: Message, text=None, private: bool = False, threaded: bool = False ) -> Message: msg = self.build_message(text) msg.frm = self.bot_identifier msg.to = msg.frm return msg
@property def mode(self) -> str: return "test"
[docs] def rooms(self) -> List[TestRoom]: return [r for r in self._rooms if r.joined]
[docs] def query_room(self, room: TestRoom) -> TestRoom: try: return [r for r in self._rooms if str(r) == str(room)][0] except IndexError: r = TestRoom(room, bot=self) return r
[docs] def prefix_groupchat_reply(self, message: Message, identifier: Identifier): super().prefix_groupchat_reply(message, identifier) message.body = f"@{identifier.nick} {message.body}"
[docs] def pop_message(self, timeout: int = 5, block: bool = True): return self.outgoing_message_queue.get(timeout=timeout, block=block)
[docs] def push_message(self, msg: Message, extras=""): self.incoming_stanza_queue.put((STZ_MSG, msg, extras), timeout=5)
[docs] def push_presence(self, presence): """presence must at least duck type base.Presence""" self.incoming_stanza_queue.put((STZ_PRE, presence), timeout=5)
[docs] def zap_queues(self) -> None: while not self.incoming_stanza_queue.empty(): msg = self.incoming_stanza_queue.get(block=False) log.error("Message left in the incoming queue during a test: %s.", msg) while not self.outgoing_message_queue.empty(): msg = self.outgoing_message_queue.get(block=False) log.error("Message left in the outgoing queue during a test: %s.", msg)
[docs] def reset_rooms(self) -> None: """Reset/clear all rooms""" self._rooms = []
[docs] class ShallowConfig: pass
[docs] class TestBot: """ A minimal bot utilizing the TestBackend, for use with unit testing. Only one instance of this class should globally be active at any one time. End-users should not use this class directly. Use :func:`~errbot.backends.test.testbot` or :class:`~errbot.backends.test.FullStackTest` instead, which use this class under the hood. """
[docs] def __init__( self, extra_plugin_dir=None, loglevel=logging.DEBUG, extra_config=None ): self.bot_thread = None self.setup( extra_plugin_dir=extra_plugin_dir, loglevel=loglevel, extra_config=extra_config, )
[docs] def setup( self, extra_plugin_dir: Optional[str] = None, loglevel=logging.DEBUG, extra_config=None, ): """ :param extra_config: Piece of extra configuration you want to inject to the config. :param extra_plugin_dir: Path to a directory from which additional plugins should be loaded. :param loglevel: Logging verbosity. Expects one of the constants defined by the logging module. """ tempdir = mkdtemp() # This is for test isolation. config = ShallowConfig() config.__dict__.update( importlib.import_module("errbot.config-template").__dict__ ) config.BOT_DATA_DIR = tempdir config.BOT_LOG_FILE = tempdir + sep + "log.txt" config.STORAGE = "Memory" if extra_config is not None: log.debug("Merging %s to the bot config.", repr(extra_config)) for k, v in extra_config.items(): setattr(config, k, v) # reset logging to console logging.basicConfig(format="%(levelname)s:%(message)s") file = logging.FileHandler(config.BOT_LOG_FILE, encoding="utf-8") self.logger = logging.getLogger("") self.logger.setLevel(loglevel) self.logger.addHandler(file) config.BOT_EXTRA_PLUGIN_DIR = extra_plugin_dir config.BOT_LOG_LEVEL = loglevel self.bot_config = config
[docs] def start(self, timeout: int = 2) -> None: """ Start the bot Calling this method when the bot has already started will result in an Exception being raised. :param timeout: Timeout for the ready message pop. pop will be done 60 times so the total timeout is 60*timeout """ if self.bot_thread is not None: raise Exception("Bot has already been started") self._bot = setup_bot("Test", self.logger, self.bot_config) self.bot_thread = Thread( target=self.bot.serve_forever, name="TestBot main thread", daemon=True, ) self.bot_thread.start() self.bot.push_message("!echo ready") # Ensure bot is fully started and plugins are loaded before returning try: for i in range(60): # Gobble initial error messages... msg = self.bot.pop_message(timeout=timeout) if msg == "ready": break log.warning("Queue was not empty, the non-consumed message is:") log.warning(msg) log.warning("Check the previous test and remove spurrious messages.") except Empty: raise AssertionError('The "ready" message has not been received (timeout).')
@property def bot(self) -> ErrBot: return self._bot
[docs] def stop(self) -> None: """ Stop the bot Calling this method before the bot has started will result in an Exception being raised. """ if self.bot_thread is None: raise Exception("Bot has not yet been started") self.bot.push_message(QUIT_MESSAGE) self.bot_thread.join() reset_app() # empty the bottle ... hips! log.info("Main bot thread quits") self.bot.zap_queues() self.bot.reset_rooms() self.bot_thread = None
[docs] def pop_message(self, timeout: int = 5, block: bool = True): return self.bot.pop_message(timeout, block)
[docs] def push_message(self, msg: Message, extras=""): return self.bot.push_message(msg, extras=extras)
[docs] def push_presence(self, presence: Presence): """presence must at least duck type base.Presence""" return self.bot.push_presence(presence)
[docs] def exec_command(self, command, timeout: int = 5): """Execute a command and return the first response. This makes more py.test'ist like: assert 'blah' in exec_command('!hello') """ self.bot.push_message(command) return self.bot.pop_message(timeout)
[docs] def zap_queues(self): return self.bot.zap_queues()
[docs] def assertInCommand(self, command, response, timeout=5, dedent=False): """Assert the given command returns the given response""" if dedent: command = "\n".join(textwrap.dedent(command).splitlines()[1:]) self.bot.push_message(command) msg = self.bot.pop_message(timeout) assert response in msg, f"{response} not in {msg}."
[docs] @deprecated(assertInCommand) def assertCommand(self, command, response, timeout=5, dedent=False): """Assert the given command returns the given response""" pass
[docs] def assertCommandFound(self, command, timeout=5): """Assert the given command exists""" self.bot.push_message(command) assert "not found" not in self.bot.pop_message(timeout)
[docs] def inject_mocks(self, plugin_name: str, mock_dict: dict): """Inject mock objects into the plugin Example:: mock_dict = { 'field_1': obj_1, 'field_2': obj_2, } testbot.inject_mocks(HelloWorld, mock_dict) assert 'blah' in testbot.exec_command('!hello') """ plugin = self.bot.plugin_manager.get_plugin_obj_by_name(plugin_name) if plugin is None: raise Exception(f'"{plugin_name}" is not loaded.') for field, mock_obj in mock_dict.items(): if not hasattr(plugin, field): raise ValueError(f'No property/attribute named "{field}" attached.') setattr(plugin, field, mock_obj)
[docs] class FullStackTest(unittest.TestCase, TestBot): """ Test class for use with Python's unittest module to write tests against a fully functioning bot. For example, if you wanted to test the builtin `!about` command, you could write a test file with the following:: from errbot.backends.test import FullStackTest class TestCommands(FullStackTest): def test_about(self): self.push_message('!about') self.assertIn('Err version', self.pop_message()) """
[docs] def setUp( self, extra_plugin_dir=None, extra_test_file=None, loglevel=logging.DEBUG, extra_config=None, ) -> None: """ :param extra_plugin_dir: Path to a directory from which additional plugins should be loaded. :param extra_test_file: [Deprecated but kept for backward-compatibility, use extra_plugin_dir instead] Path to an additional plugin which should be loaded. :param loglevel: Logging verbosity. Expects one of the constants defined by the logging module. :param extra_config: Piece of extra bot config in a dict. """ if extra_plugin_dir is None and extra_test_file is not None: extra_plugin_dir = sep.join(abspath(extra_test_file).split(sep)[:-2]) self.setup( extra_plugin_dir=extra_plugin_dir, loglevel=loglevel, extra_config=extra_config, ) self.start()
[docs] def tearDown(self) -> None: self.stop()
[docs] @pytest.fixture def testbot(request) -> TestBot: """ Pytest fixture to write tests against a fully functioning bot. For example, if you wanted to test the builtin `!about` command, you could write a test file with the following:: def test_about(testbot): testbot.push_message('!about') assert "Err version" in testbot.pop_message() It's possible to provide additional configuration to this fixture, by setting variables at module level or as class attributes (the latter taking precedence over the former). For example:: extra_plugin_dir = '/foo/bar' def test_about(testbot): testbot.push_message('!about') assert "Err version" in testbot.pop_message() ..or:: extra_plugin_dir = '/foo/bar' class Tests: # Wins over `extra_plugin_dir = '/foo/bar'` above extra_plugin_dir = '/foo/baz' def test_about(self, testbot): testbot.push_message('!about') assert "Err version" in testbot.pop_message() ..to load additional plugins from the directory `/foo/bar` or `/foo/baz` respectively. This works for the following items, which are passed to the constructor of :class:`~errbot.backends.test.TestBot`: * `extra_plugin_dir` * `loglevel` """ def on_finish() -> TestBot: bot.stop() # setup the logging to something digestable. logger = logging.getLogger("") logging.getLogger("MARKDOWN").setLevel( logging.ERROR ) # this one is way too verbose in debug logger.setLevel(logging.DEBUG) console_hdlr = logging.StreamHandler(sys.stdout) console_hdlr.setFormatter( logging.Formatter("%(levelname)-8s %(name)-25s %(message)s") ) logger.handlers = [] logger.addHandler(console_hdlr) kwargs = {} for attr, default in ( ("extra_plugin_dir", None), ("extra_config", None), ("loglevel", logging.DEBUG), ): if hasattr(request, "instance"): kwargs[attr] = getattr(request.instance, attr, None) if kwargs[attr] is None: kwargs[attr] = getattr(request.module, attr, default) bot = TestBot(**kwargs) bot.start() request.addfinalizer(on_finish) return bot