Source code for errbot.botplugin

import logging
import re
import shlex
from io import IOBase
from threading import Timer, current_thread
from types import ModuleType
from typing import Any, Callable, List, Mapping, Optional, Sequence, Tuple

from errbot.backends.base import (
    ONLINE,
    Card,
    Identifier,
    Message,
    Presence,
    Reaction,
    Room,
    Stream,
)

from .storage import StoreMixin, StoreNotOpenError

log = logging.getLogger(__name__)


[docs] class ValidationException(Exception): pass
[docs] def recurse_check_structure(sample: Any, to_check: Any) -> None: sample_type = type(sample) to_check_type = type(to_check) # Skip this check if the sample is None because it will always be something # other than NoneType when changed from the default. Raising ValidationException # would make no sense then because it would defeat the whole purpose of having # that key in the sample when it could only ever be None. if sample is not None and sample_type != to_check_type: raise ValidationException( f"{sample} [{sample_type}] is not the same type as {to_check} [{to_check_type}]." ) if sample_type in (list, tuple): for element in to_check: recurse_check_structure(sample[0], element) return if sample_type == dict: for key in sample: if key not in to_check: raise ValidationException(f"{to_check} doesn't contain the key {key}.") for key in to_check: if key not in sample: raise ValidationException(f"{to_check} contains an unknown key {key}.") for key in sample: recurse_check_structure(sample[key], to_check[key]) return
[docs] class CommandError(Exception): """ Use this class to report an error condition from your commands, the command did not proceed for a known "business" reason. """
[docs] def __init__(self, reason: str, template: str = None): """ :param reason: the reason for the error in the command. :param template: apply this specific template to report the error. """ self.reason = reason self.template = template
def __str__(self): return str(self.reason)
[docs] class Command: """ This is a dynamic definition of an errbot command. """
[docs] def __init__( self, function: Callable, cmd_type: Optional[Callable] = None, cmd_args=None, cmd_kwargs=None, name: Optional[str] = None, doc: Optional[str] = None, ): """ Create a Command definition. :param function: a function or a lambda with the correct signature for the type of command to inject for example `def mycmd(plugin, msg, args)` for a botcmd. Note: the first parameter will be the plugin itself (equivalent to self). :param cmd_type: defaults to `botcmd` but can be any decorator function used for errbot commands. :param cmd_args: the parameters of the decorator. :param cmd_kwargs: the kwargs parameter of the decorator. :param name: defaults to the name of the function you are passing if it is a first class function or needs to be set if you use a lambda. :param doc: defaults to the doc of the given function if it is a first class function. It can be set for a lambda or overridden for a function with this.""" if cmd_type is None: from errbot import ( # TODO refactor this out of __init__ so it can be reusable. botcmd, ) cmd_type = botcmd if name is None: if function.__name__ == "<lambda>": raise ValueError( "function is a lambda (anonymous), parameter name needs to be set." ) name = function.__name__ self.name = name if cmd_kwargs is None: cmd_kwargs = {} if cmd_args is None: cmd_args = () function.__name__ = name if doc: function.__doc__ = doc self.definition = cmd_type(*((function,) + cmd_args), **cmd_kwargs)
[docs] def append_args(self, args, kwargs): from errbot import arg_botcmd, update_wrapper if hasattr(self.definition, "_err_command_parser"): update_wrapper(self.definition, args, kwargs) else: log.warning( f"Attempting to append arguments to {self.definition} isn't supported." )
# noinspection PyAbstractClass
[docs] class BotPluginBase(StoreMixin): """ This class handle the basic needs of bot plugins like loading, unloading and creating a storage It is the main contract between the plugins and the bot """
[docs] def __init__(self, bot, name=None): self.is_activated = False self.current_pollers = [] self.current_timers = [] self.dependencies = [] self._dynamic_plugins = {} self.log = logging.getLogger(f"errbot.plugins.{name}") self.log.debug("Logger for plugin %s initialized...", name) self._bot = bot self.plugin_dir = bot.repo_manager.plugin_dir self._name = name super().__init__()
@property def name(self) -> str: """ Get the name of this plugin as described in its .plug file. :return: The plugin name. """ return self._name @property def mode(self) -> str: """ Get the current active backend. :return: the mode like 'tox', 'xmpp' etc... """ return self._bot.mode @property def bot_config(self) -> ModuleType: """ Get the bot configuration from config.py. For example you can access: self.bot_config.BOT_DATA_DIR """ # if BOT_ADMINS is just an unique string make it a tuple for backwards # compatibility if isinstance(self._bot.bot_config.BOT_ADMINS, str): self._bot.bot_config.BOT_ADMINS = (self._bot.bot_config.BOT_ADMINS,) return self._bot.bot_config @property def bot_identifier(self) -> Identifier: """ Get bot identifier on current active backend. :return Identifier """ return self._bot.bot_identifier
[docs] def init_storage(self) -> None: log.debug(f"Init storage for {self.name}.") self.open_storage(self._bot.storage_plugin, self.name)
[docs] def activate(self) -> None: """ Override if you want to do something at initialization phase (don't forget to super(Gnagna, self).activate()) """ self.init_storage() self._bot.inject_commands_from(self) self._bot.inject_command_filters_from(self) self.is_activated = True
[docs] def deactivate(self) -> None: """ Override if you want to do something at tear down phase (don't forget to super(Gnagna, self).deactivate()) """ if self.current_pollers: log.debug( "You still have active pollers at deactivation stage, I cleaned them up for you." ) self.current_pollers = [] for timer in self.current_timers: timer.cancel() try: self.close_storage() except StoreNotOpenError: pass self._bot.remove_command_filters_from(self) self._bot.remove_commands_from(self) self.is_activated = False for plugin in self._dynamic_plugins.values(): self._bot.remove_command_filters_from(plugin) self._bot.remove_commands_from(plugin)
[docs] def start_poller( self, interval: float, method: Callable[..., None], times: int = None, args: Tuple = None, kwargs: Mapping = None, ) -> None: """Starts a poller that will be called at a regular interval :param interval: interval in seconds :param method: targetted method :param times: number of times polling should happen (defaults to``None`` which causes the polling to happen indefinitely) :param args: args for the targetted method :param kwargs: kwargs for the targetting method """ if not kwargs: kwargs = {} if not args: args = [] log.debug( f"Programming the polling of {method.__name__} every {interval} seconds " f"with args {str(args)} and kwargs {str(kwargs)}" ) # noinspection PyBroadException try: self.current_pollers.append((method, args, kwargs)) self.program_next_poll(interval, method, times, args, kwargs) except Exception: log.exception("Poller programming failed.")
[docs] def stop_poller( self, method: Callable[..., None], args: Tuple = None, kwargs: Mapping = None ) -> None: if not kwargs: kwargs = {} if not args: args = [] log.debug(f"Stop polling of {method} with args {args} and kwargs {kwargs}") self.current_pollers.remove((method, args, kwargs))
[docs] def program_next_poll( self, interval: float, method: Callable[..., None], times: int = None, args: Tuple = None, kwargs: Mapping = None, ) -> None: if times is not None and times <= 0: return t = Timer( interval=interval, function=self.poller, kwargs={ "interval": interval, "method": method, "times": times, "args": args, "kwargs": kwargs, }, ) self.current_timers.append(t) # save the timer to be able to kill it t.name = f"Poller thread for {type(method.__self__).__name__}" t.daemon = True # so it is not locking on exit t.start()
[docs] def poller( self, interval: float, method: Callable[..., None], times: int = None, args: Tuple = None, kwargs: Mapping = None, ) -> None: previous_timer = current_thread() if previous_timer in self.current_timers: log.debug("Previous timer found and removed") self.current_timers.remove(previous_timer) if (method, args, kwargs) in self.current_pollers: # noinspection PyBroadException try: method(*args, **kwargs) except Exception: log.exception("A poller crashed") if times is not None: times -= 1 self.program_next_poll(interval, method, times, args, kwargs)
[docs] def create_dynamic_plugin( self, name: str, commands: Tuple[Command], doc: str = "" ) -> None: """ Creates a plugin dynamically and exposes its commands right away. :param name: name of the plugin. :param commands: a tuple of command definition. :param doc: the main documentation of the plugin. """ if name in self._dynamic_plugins: raise ValueError("Dynamic plugin %s already created.") # cleans the name to be a valid python type. plugin_class = type( re.sub(r"\W|^(?=\d)", "_", name), (BotPlugin,), {command.name: command.definition for command in commands}, ) plugin_class.__errdoc__ = doc plugin = plugin_class(self._bot, name=name) self._dynamic_plugins[name] = plugin self._bot.inject_commands_from(plugin)
[docs] def destroy_dynamic_plugin(self, name: str) -> None: """ Reverse operation of create_dynamic_plugin. This allows you to dynamically refresh the list of commands for example. :param name: the name of the dynamic plugin given to create_dynamic_plugin. """ if name not in self._dynamic_plugins: raise ValueError("Dynamic plugin %s doesn't exist.", name) plugin = self._dynamic_plugins[name] self._bot.remove_command_filters_from(plugin) self._bot.remove_commands_from(plugin) del self._dynamic_plugins[name]
[docs] def get_plugin(self, name) -> "BotPlugin": """ Gets a plugin your plugin depends on. The name of the dependency needs to be listed in [Code] section key DependsOn of your plug file. This method can only be used after your plugin activation (or having called super().activate() from activate itself). It will return a plugin object. :param name: the name :return: the BotPlugin object requested. """ if not self.is_activated: raise Exception( "Plugin needs to be in activated state to be able to get its dependencies." ) if name not in self.dependencies: raise Exception( f"Plugin dependency {name} needs to be listed in section [Core] key " f'"DependsOn" to be used in get_plugin.' ) return self._bot.plugin_manager.get_plugin_obj_by_name(name)
# noinspection PyAbstractClass
[docs] class BotPlugin(BotPluginBase):
[docs] def get_configuration_template(self) -> Mapping: """ If your plugin needs a configuration, override this method and return a configuration template. For example a dictionary like: return {'LOGIN' : 'example@example.com', 'PASSWORD' : 'password'} Note: if this method returns None, the plugin won't be configured """ return None
[docs] def check_configuration(self, configuration: Mapping) -> None: """ By default, this method will do only a BASIC check. You need to override it if you want to do more complex checks. It will be called before the configure callback. Note if the config_template is None, it will never be called. It means recusively: 1. in case of a dictionary, it will check if all the entries and from the same type are there and not more. 2. in case of an array or tuple, it will assume array members of the same type of first element of the template (no mix typed is supported) In case of validation error it should raise a errbot.ValidationException :param configuration: the configuration to be checked. """ recurse_check_structure( self.get_configuration_template(), configuration ) # default behavior
[docs] def configure(self, configuration: Mapping) -> None: """ By default, it will just store the current configuration in the self.config field of your plugin. If this plugin has no configuration yet, the framework will call this function anyway with None. This method will be called before activation so don't expect to be activated at that point. :param configuration: injected configuration for the plugin. """ self.config = configuration
[docs] def activate(self) -> None: """ Triggered on plugin activation. Override this method if you want to do something at initialization phase (don't forget to `super().activate()`). """ super().activate()
[docs] def deactivate(self) -> None: """ Triggered on plugin deactivation. Override this method if you want to do something at tear-down phase (don't forget to `super().deactivate()`). """ super().deactivate()
[docs] def callback_connect(self) -> None: """ Triggered when the bot has successfully connected to the chat network. Override this method to get notified when the bot is connected. """ pass
[docs] def callback_message(self, message: Message) -> None: """ Triggered on every message not coming from the bot itself. Override this method to get notified on *ANY* message. :param message: representing the message that was received. """ pass
[docs] def callback_mention( self, message: Message, mentioned_people: Sequence[Identifier] ) -> None: """ Triggered if there are mentioned people in message. Override this method to get notified when someone was mentioned in message. [Note: This might not be implemented by all backends.] :param message: representing the message that was received. :param mentioned_people: all mentioned people in this message. """ pass
[docs] def callback_presence(self, presence: Presence) -> None: """ Triggered on every presence change. :param presence: An instance of :class:`~errbot.backends.base.Presence` representing the new presence state that was received. """ pass
[docs] def callback_reaction(self, reaction: Reaction) -> None: """ Triggered on every reaction event. :param reaction: An instance of :class:`~errbot.backends.base.Reaction` representing the new reaction event that was received. """ pass
[docs] def callback_stream(self, stream: Stream) -> None: """ Triggered asynchronously (in a different thread context) on every incoming stream request or file transfer request. You can block this call until you are done with the stream. To signal that you accept / reject the file, simply call stream.accept() or stream.reject() and return. :param stream: the incoming stream request. """ stream.reject() # by default, reject the file as the plugin doesn't want it.
[docs] def callback_botmessage(self, message: Message) -> None: """ Triggered on every message coming from the bot itself. Override this method to get notified on all messages coming from the bot itself (including those from other plugins). :param message: An instance of :class:`~errbot.backends.base.Message` representing the message that was received. """ pass
[docs] def callback_room_joined( self, room: Room, identifier: Identifier, invited_by: Optional[Identifier] = None, ) -> None: """ Triggered when a user has joined a MUC. :param room: An instance of :class:`~errbot.backends.base.MUCRoom` representing the room that was joined. :param identifier: An instance of Identifier (Person). Defaults to bot :param invited_by: An instance of Identifier (Person). Defaults to None """ pass
[docs] def callback_room_left( self, room: Room, identifier: Identifier, kicked_by: Optional[Identifier] = None ) -> None: """ Triggered when a user has left a MUC. :param room: An instance of :class:`~errbot.backends.base.MUCRoom` representing the room that was left. :param identifier: An instance of Identifier (Person). Defaults to bot :param kicked_by: An instance of Identifier (Person). Defaults to None """ pass
[docs] def callback_room_topic(self, room: Room) -> None: """ Triggered when the topic in a MUC changes. :param room: An instance of :class:`~errbot.backends.base.MUCRoom` representing the room for which the topic changed. """ pass
# Proxyfy some useful tools from the motherbot # this is basically the contract between the plugins and the main bot
[docs] def warn_admins(self, warning: str) -> None: """ Send a warning to the administrators of the bot. :param warning: The markdown-formatted text of the message to send. """ self._bot.warn_admins(warning)
[docs] def send( self, identifier: Identifier, text: str, in_reply_to: Message = None, groupchat_nick_reply: bool = False, ) -> None: """ Send a message to a room or a user. :param groupchat_nick_reply: if True the message will mention the user in the chatroom. :param in_reply_to: the original message this message is a reply to (optional). In some backends it will start a thread. :param text: markdown formatted text to send to the user. :param identifier: An Identifier representing the user or room to message. Identifiers may be created with :func:`build_identifier`. """ if not isinstance(identifier, Identifier): raise ValueError( "identifier needs to be of type Identifier, the old string behavior is not supported" ) return self._bot.send(identifier, text, in_reply_to, groupchat_nick_reply)
[docs] def send_card( self, body: str = "", to: Identifier = None, in_reply_to: Message = None, summary: str = None, title: str = "", link: str = None, image: str = None, thumbnail: str = None, color: str = "green", fields: Tuple[Tuple[str, str], ...] = (), ) -> None: """ Sends a card. A Card is a special type of preformatted message. If it matches with a backend similar concept like on Slack it will be rendered natively, otherwise it will be sent as a regular formatted message. :param body: main text of the card in markdown. :param to: the card is sent to this identifier (Room, RoomOccupant, Person...). :param in_reply_to: the original message this message is a reply to (optional). :param summary: (optional) One liner summary of the card, possibly collapsed to it. :param title: (optional) Title possibly linking. :param link: (optional) url the title link is pointing to. :param image: (optional) link to the main image of the card. :param thumbnail: (optional) link to an icon / thumbnail. :param color: (optional) background color or color indicator. :param fields: (optional) a tuple of (key, value) pairs. """ frm = in_reply_to.to if in_reply_to else self.bot_identifier if to is None: if in_reply_to is None: raise ValueError("Either to or in_reply_to needs to be set.") to = in_reply_to.frm self._bot.send_card( Card( body, frm, to, in_reply_to, summary, title, link, image, thumbnail, color, fields, ) )
[docs] def change_presence(self, status: str = ONLINE, message: str = "") -> None: """ Changes the presence/status of the bot. :param status: One of the constant defined in base.py : ONLINE, OFFLINE, DND,... :param message: Additional message :return: None """ self._bot.change_presence(status, message)
[docs] def send_templated( self, identifier: Identifier, template_name: str, template_parameters: Mapping, in_reply_to: Message = None, groupchat_nick_reply: bool = False, ) -> None: """ Sends asynchronously a message to a room or a user. Same as send but passing a template name and parameters instead of directly the markdown text. :param template_parameters: arguments for the template. :param template_name: name of the template to use. :param groupchat_nick_reply: if True it will mention the user in the chatroom. :param in_reply_to: optionally, the original message this message is the answer to. :param identifier: identifier of the user or room to which you want to send a message to. """ return self._bot.send_templated( identifier=identifier, template_name=template_name, template_parameters=template_parameters, in_reply_to=in_reply_to, groupchat_nick_reply=groupchat_nick_reply, )
[docs] def build_identifier(self, txtrep: str) -> Identifier: """ Transform a textual representation of a user identifier to the correct Identifier object you can set in Message.to and Message.frm. :param txtrep: the textual representation of the identifier (it is backend dependent). :return: a user identifier. """ return self._bot.build_identifier(txtrep)
[docs] def send_stream_request( self, user: Identifier, fsource: IOBase, name: str = None, size: int = None, stream_type: str = None, ) -> Callable: """ Sends asynchronously a stream/file to a user. :param user: is the identifier of the person you want to send it to. :param fsource: is a file object you want to send. :param name: is an optional filename for it. :param size: is optional and is the espected size for it. :param stream_type: is optional for the mime_type of the content. It will return a Stream object on which you can monitor the progress of it. """ return self._bot.send_stream_request(user, fsource, name, size, stream_type)
[docs] def rooms(self) -> Sequence[Room]: """ The list of rooms the bot is currently in. """ return self._bot.rooms()
[docs] def query_room(self, room: str) -> Room: """ Query a room for information. :param room: The JID/identifier of the room to query for. :returns: An instance of :class:`~errbot.backends.base.MUCRoom`. :raises: :class:`~errbot.backends.base.RoomDoesNotExistError` if the room doesn't exist. """ return self._bot.query_room(room)
[docs] def start_poller( self, interval: float, method: Callable[..., None], times: int = None, args: Tuple = None, kwargs: Mapping = None, ): """ Start to poll a method at specific interval in seconds. Note: it will call the method with the initial interval delay for the first time Also, you can program for example : self.program_poller(self, 30, fetch_stuff) where you have def fetch_stuff(self) in your plugin :param interval: interval in seconds :param method: targetted method :param times: number of times polling should happen (defaults to``None`` which causes the polling to happen indefinitely) :param args: args for the targetted method :param kwargs: kwargs for the targetting method """ super().start_poller(interval, method, times, args, kwargs)
[docs] def stop_poller( self, method: Callable[..., None], args: Tuple = None, kwargs: Mapping = None ): """ stop poller(s). If the method equals None -> it stops all the pollers you need to regive the same parameters as the original start_poller to match a specific poller to stop :param kwargs: The initial kwargs you gave to start_poller. :param args: The initial args you gave to start_poller. :param method: The initial method you passed to start_poller. """ super().stop_poller(method, args, kwargs)
[docs] class ArgParserBase: """ The `ArgSplitterBase` class defines the API which is used for argument splitting (used by the `split_args_with` parameter on :func:`~errbot.decorators.botcmd`). """
[docs] def parse_args(self, args: str): """ This method takes a string of un-split arguments and parses it, returning a list that is the result of splitting. If splitting fails for any reason it should return an exception of some kind. :param args: string to parse """ raise NotImplementedError()
[docs] class SeparatorArgParser(ArgParserBase): """ This argument splitter splits args on a given separator, like :func:`str.split` does. """
[docs] def __init__(self, separator: str = None, maxsplit: int = -1): """ :param separator: The separator on which arguments should be split. If sep is None, any whitespace string is a separator and empty strings are removed from the result. :param maxsplit: If given, do at most this many splits. """ self.separator = separator self.maxsplit = maxsplit
[docs] def parse_args(self, args: str) -> List: return args.split(self.separator, self.maxsplit)
[docs] class ShlexArgParser(ArgParserBase): """ This argument splitter splits args using posix shell quoting rules, like :func:`shlex.split` does. """
[docs] def parse_args(self, args): return shlex.split(args)