Source code for errbot.backends.xmpp

from __future__ import annotations

import logging
import sys
from datetime import datetime
from functools import lru_cache
from time import sleep
from typing import Callable, List, Optional, Tuple

from errbot.backends.base import (
    AWAY,
    DND,
    OFFLINE,
    ONLINE,
    Identifier,
    Message,
    Person,
    Presence,
    Room,
    RoomNotJoinedError,
    RoomOccupant,
)
from errbot.core import ErrBot
from errbot.rendering import text, xhtml, xhtmlim

log = logging.getLogger(__name__)

try:
    from slixmpp import JID, ClientXMPP
    from slixmpp.exceptions import IqError
    from slixmpp.xmlstream import cert, resolver

except ImportError:
    log.exception("Could not start the XMPP backend")
    log.fatal(
        """
    If you intend to use the XMPP backend please install the support for XMPP with:
    pip install errbot[XMPP]
    """
    )
    sys.exit(-1)

# LRU to cache the JID queries.
IDENTIFIERS_LRU = 1024


[docs]class XMPPIdentifier(Identifier): """ This class is the parent and the basic contract of all the ways the backends are identifying a person on their system. """
[docs] def __init__(self, node, domain, resource): if not node: raise Exception("An XMPPIdentifier needs to have a node.") if not domain: raise Exception("An XMPPIdentifier needs to have a domain.") self._node = node self._domain = domain self._resource = resource self._email = ""
@property def node(self) -> str: return self._node @property def domain(self) -> str: return self._domain @property def resource(self) -> str: return self._resource @property def person(self) -> str: return self._node + "@" + self._domain @property def nick(self) -> str: return self._node @property def fullname(self) -> None: return None # Not supported by default on XMPP. @property def email(self): return self._email @property def client(self): return self._resource def __str__(self): answer = self._node + "@" + self._domain # don't call .person: see below if self._resource: answer += "/" + self._resource return answer def __unicode__(self): return str(self.__str__()) def __eq__(self, other): if not isinstance(other, XMPPIdentifier): log.debug("Weird, you are comparing an XMPPIdentifier to a %s", type(other)) return False return ( self._domain == other._domain and self._node == other._node and self._resource == other._resource )
[docs]class XMPPPerson(XMPPIdentifier, Person): aclattr = XMPPIdentifier.person def __eq__(self, other): if not isinstance(other, XMPPPerson): log.debug("Weird, you are comparing an XMPPPerson to a %s", type(other)) return False return self._domain == other._domain and self._node == other._node
[docs]class XMPPRoom(XMPPIdentifier, Room):
[docs] def __init__(self, room_jid, bot: ErrBot): self._bot = bot self.xep0045 = self._bot.conn.client.plugin["xep_0045"] node, domain, resource = split_identifier(room_jid) super().__init__(node, domain, resource)
[docs] def join( self, username: Optional[str] = None, password: Optional[str] = None ) -> None: """ Join the room. If the room does not exist yet, this will automatically call :meth:`create` on it first. """ room = str(self) self.xep0045.join_muc(room, username, password=password) self._bot.conn.add_event_handler( f"muc::{room}::got_online", self._bot.user_joined_chat ) self._bot.conn.add_event_handler( f"muc::{room}::got_offline", self._bot.user_left_chat ) self.configure() self._bot.callback_room_joined(self, self._bot.bot_identifier) log.info("Joined room %s.", room)
[docs] def leave(self, reason: Optional[str] = None) -> None: """ Leave the room. :param reason: An optional string explaining the reason for leaving the room """ if reason is None: reason = "" room = str(self) try: self.xep0045.leave_muc( room=room, nick=self.xep0045.ourNicks[room], msg=reason ) self._bot.conn.del_event_handler( f"muc::{room}::got_online", self._bot.user_joined_chat ) self._bot.conn.del_event_handler( f"muc::{room}::got_offline", self._bot.user_left_chat ) log.info("Left room %s.", room) self._bot.callback_room_left(self, self._bot.bot_identifier) except KeyError: log.debug("Trying to leave %s while not in this room.", room)
[docs] def create(self) -> None: """ Not supported on this back-end (Slixmpp doesn't support it). Will join the room to ensure it exists, instead. """ logging.warning( "XMPP back-end does not support explicit creation, joining room " "instead to ensure it exists." ) self.join(username=str(self))
[docs] def destroy(self) -> None: """ Destroy the room. Calling this on a non-existing room is a no-op. """ self.xep0045.destroy(str(self)) log.info("Destroyed room %s.", self)
@property def exists(self) -> bool: """ Boolean indicating whether this room already exists or not. :getter: Returns `True` if the room exists, `False` otherwise. """ logging.warning( "XMPP back-end does not support determining if a room exists. Returning the result of joined instead." ) return self.joined @property def joined(self) -> bool: """ Boolean indicating whether this room has already been joined. :getter: Returns `True` if the room has been joined, `False` otherwise. """ return str(self) in self.xep0045.get_joined_rooms() @property def topic(self) -> Optional[str]: """ The room topic. :getter: Returns the topic (a string) if one is set, `None` if no topic has been set at all. :raises: :class:`~RoomNotJoinedError` if the room has not yet been joined. """ if not self.joined: raise RoomNotJoinedError("Must be in a room in order to see the topic.") try: return self._bot._room_topics[str(self)] except KeyError: return None @topic.setter def topic(self, topic: str) -> None: """ Set the room's topic. :param topic: The topic to set. """ # Not supported by Slixmpp at the moment :( raise NotImplementedError( "Setting the topic is not supported on this back-end." ) @property def occupants(self) -> List[XMPPRoomOccupant]: """ The room's occupants. :getter: Returns a list of :class:`~errbot.backends.base.MUCOccupant` instances. :raises: :class:`~MUCNotJoinedError` if the room has not yet been joined. """ occupants = [] try: for occupant in self.xep0045.rooms[str(self)].values(): room_node, room_domain, _ = split_identifier(occupant["room"]) nick = occupant["nick"] occupants.append(XMPPRoomOccupant(room_node, room_domain, nick, self)) except KeyError: raise RoomNotJoinedError("Must be in a room in order to see occupants.") return occupants
[docs] def invite(self, *args) -> None: """ Invite one or more people into the room. :*args: One or more JID's to invite into the room. """ room = str(self) for jid in args: self.xep0045.invite(room, jid) log.info("Invited %s to %s.", jid, room)
[docs] def configure(self) -> None: """ Configure the room. Currently this simply sets the default room configuration as received by the server. May be extended in the future to set a custom room configuration instead. """ room = str(self) affiliation = None while affiliation is None: sleep(0.5) affiliation = self.xep0045.get_jid_property( room=room, nick=self.xep0045.our_nicks[room], jid_property="affiliation" ) if affiliation == "owner": log.debug("Configuring room %s: we have owner affiliation.", room) form = yield from self.xep0045.get_room_config(room) self.xep0045.configure_room(room, form) else: log.debug( "Not configuring room %s: we don't have owner affiliation (affiliation=%s)", room, affiliation, )
[docs]class XMPPRoomOccupant(XMPPPerson, RoomOccupant):
[docs] def __init__(self, node, domain, resource, room): super().__init__(node, domain, resource) self._room = room
@property def person(self): return str(self) # this is the full identifier. @property def real_jid(self) -> str: """ The JID of the room occupant, they used to login. Will only work if the errbot is moderator in the MUC or it is not anonymous. """ room_jid = self._node + "@" + self._domain jid = JID(self._room.xep0045.get_jid_property(room_jid, self.resource, "jid")) return jid.bare @property def room(self) -> XMPPRoom: return self._room nick = XMPPPerson.resource
[docs]class XMPPConnection:
[docs] def __init__( self, jid, password, feature=None, keepalive=None, ca_cert=None, server=None, use_ipv6=None, bot=None, ssl_version=None, ): if feature is None: feature = {} self._bot = bot self.connected = False self.server = server self.client = ClientXMPP( jid, password, plugin_config={"feature_mechanisms": feature} ) self.client.register_plugin("xep_0030") # Service Discovery self.client.register_plugin("xep_0045") # Multi-User Chat self.client.register_plugin("xep_0199") # XMPP Ping self.client.register_plugin("xep_0203") # XMPP Delayed messages self.client.register_plugin("xep_0249") # XMPP direct MUC invites if keepalive is not None: self.client.whitespace_keepalive = ( True # Just in case Slixmpp's default changes to False in the future ) self.client.whitespace_keepalive_interval = keepalive if use_ipv6 is not None: self.client.use_ipv6 = use_ipv6 if ssl_version: self.client.ssl_version = ssl_version self.client.ca_certs = ca_cert # Used for TLS certificate validation self.client.add_event_handler("session_start", self.session_start)
[docs] def session_start(self, _): self.client.send_presence() self.client.get_roster()
[docs] def connect(self) -> XMPPConnection: if not self.connected: if self.server is not None: self.client.connect(self.server) else: self.client.connect() self.connected = True return self
[docs] def disconnect(self) -> None: self.client.disconnect(wait=True) self.connected = False
[docs] def serve_forever(self) -> None: self.client.process()
[docs] def add_event_handler(self, name: str, cb: Callable) -> None: self.client.add_event_handler(name, cb)
[docs] def del_event_handler(self, name: str, cb: Callable) -> None: self.client.del_event_handler(name, cb)
XMPP_TO_ERR_STATUS = { "available": ONLINE, "away": AWAY, "dnd": DND, "unavailable": OFFLINE, }
[docs]def split_identifier(txtrep: str) -> Tuple[str, str, str]: split_jid = txtrep.split("@", 1) node, domain = "@".join(split_jid[:-1]), split_jid[-1] if domain.find("/") != -1: domain, resource = domain.split("/", 1) else: resource = None return node, domain, resource
[docs]class XMPPBackend(ErrBot): room_factory = XMPPRoom roomoccupant_factory = XMPPRoomOccupant
[docs] def __init__(self, config): super().__init__(config) identity = config.BOT_IDENTITY self.jid = identity["username"] # backward compatibility self.password = identity["password"] self.server = identity.get("server", None) self.feature = config.__dict__.get("XMPP_FEATURE_MECHANISMS", {}) self.keepalive = config.__dict__.get("XMPP_KEEPALIVE_INTERVAL", None) self.ca_cert = config.__dict__.get( "XMPP_CA_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt" ) self.xhtmlim = config.__dict__.get("XMPP_XHTML_IM", False) self.use_ipv6 = config.__dict__.get("XMPP_USE_IPV6", None) self.ssl_version = config.__dict__.get("XMPP_SSL_VERSION", None) # generic backend compatibility self.bot_identifier = self._build_person(self.jid) self.conn = self.create_connection() self.conn.add_event_handler("message", self.incoming_message) self.conn.add_event_handler("session_start", self.connected) self.conn.add_event_handler("disconnected", self.disconnected) # presence related handlers self.conn.add_event_handler("got_online", self.contact_online) self.conn.add_event_handler("got_offline", self.contact_offline) self.conn.add_event_handler("changed_status", self.user_changed_status) # MUC subject events self.conn.add_event_handler("groupchat_subject", self.chat_topic) self._room_topics = {} self.md_xhtml = xhtml() self.md_text = text()
[docs] def create_connection(self) -> XMPPConnection: return XMPPConnection( jid=self.jid, # textual and original representation password=self.password, feature=self.feature, keepalive=self.keepalive, ca_cert=self.ca_cert, server=self.server, use_ipv6=self.use_ipv6, bot=self, ssl_version=self.ssl_version, )
def _build_room_occupant(self, txtrep: str) -> XMPPRoomOccupant: node, domain, resource = split_identifier(txtrep) return self.roomoccupant_factory( node, domain, resource, self.query_room(node + "@" + domain) ) def _build_person(self, txtrep: str) -> XMPPPerson: return XMPPPerson(*split_identifier(txtrep))
[docs] def incoming_message(self, xmppmsg: dict) -> None: """Callback for message events""" if xmppmsg["type"] == "error": log.warning("Received error message: %s", xmppmsg) return msg = Message(xmppmsg["body"]) if "html" in xmppmsg.keys(): msg.html = xmppmsg["html"] log.debug("incoming_message from: %s", msg.frm) if xmppmsg["type"] == "groupchat": msg.frm = self._build_room_occupant(xmppmsg["from"].full) msg.to = msg.frm.room else: msg.frm = self._build_person(xmppmsg["from"].full) msg.to = self._build_person(xmppmsg["to"].full) msg.nick = xmppmsg["mucnick"] now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") delay = xmppmsg["delay"]._get_attr( "stamp" ) # this is a bug in sleekxmpp it should be ['from'] msg.delayed = bool(delay and delay != now) self.callback_message(msg)
def _idd_from_event(self, event) -> Union[XMPPRoomOccupant, XMPPPerson]: txtrep = event["from"].full return ( self._build_room_occupant(txtrep) if "muc" in event else self._build_person(txtrep) )
[docs] def contact_online(self, event) -> None: log.debug("contact_online %s.", event) self.callback_presence( Presence(identifier=self._idd_from_event(event), status=ONLINE) )
[docs] def contact_offline(self, event) -> None: log.debug("contact_offline %s.", event) self.callback_presence( Presence(identifier=self._idd_from_event(event), status=OFFLINE) )
[docs] def user_joined_chat(self, event) -> None: log.debug("user_join_chat %s", event) self.callback_presence( Presence(identifier=self._idd_from_event(event), status=ONLINE) )
[docs] def user_left_chat(self, event) -> None: log.debug("user_left_chat %s", event) self.callback_presence( Presence(identifier=self._idd_from_event(event), status=OFFLINE) )
[docs] def chat_topic(self, event) -> None: log.debug("chat_topic %s.", event) room = event.values["mucroom"] topic = event.values["subject"] if topic == "": topic = None self._room_topics[room] = topic room = XMPPRoom(event.values["mucroom"], self) self.callback_room_topic(room)
[docs] def user_changed_status(self, event) -> None: log.debug("user_changed_status %s.", event) errstatus = XMPP_TO_ERR_STATUS.get(event["type"], None) message = event["status"] if not errstatus: errstatus = event["type"] self.callback_presence( Presence( identifier=self._idd_from_event(event), status=errstatus, message=message, ) )
[docs] def connected(self, data) -> None: """Callback for connection events""" self.connect_callback()
[docs] def disconnected(self, data) -> None: """Callback for disconnection events""" self.disconnect_callback()
[docs] def send_message(self, msg: Message) -> None: super().send_message(msg) log.debug("send_message to %s", msg.to) # We need to unescape the unicode characters (not the markup incompatible ones) mhtml = ( xhtmlim.unescape(self.md_xhtml.convert(msg.body)) if self.xhtmlim else None ) self.conn.client.send_message( mto=str(msg.to), mbody=self.md_text.convert(msg.body), mhtml=mhtml, mtype="chat" if msg.is_direct else "groupchat", )
[docs] def change_presence(self, status: str = ONLINE, message: str = "") -> None: log.debug("Change bot status to %s, message %s.", status, message) self.conn.client.send_presence(pshow=status, pstatus=message)
[docs] def serve_forever(self) -> None: self.conn.connect() try: self.conn.serve_forever() finally: log.debug("Trigger disconnect callback") self.disconnect_callback() log.debug("Trigger shutdown") self.shutdown()
[docs] @lru_cache(IDENTIFIERS_LRU) def build_identifier( self, txtrep: str ) -> Union[XMPPRoomOccupant, XMPPRoom, XMPPPerson]: log.debug("build identifier for %s", txtrep) try: xep0030 = self.conn.client.plugin["xep_0030"] info = xep0030.get_info(jid=txtrep) disco_info = info["disco_info"] if disco_info: for category, typ, _, name in disco_info["identities"]: if category == "conference": log.debug("This is a room ! %s", txtrep) return self.query_room(txtrep) if ( category == "client" and "http://jabber.org/protocol/muc" in info["disco_info"]["features"] ): log.debug("This is room occupant ! %s", txtrep) return self._build_room_occupant(txtrep) except IqError as iq: log.debug("xep_0030 is probably not implemented on this server. %s.", iq) log.debug("This is a person ! %s", txtrep) return self._build_person(txtrep)
[docs] def build_reply( self, msg: Message, text: str = None, private: bool = False, threaded: bool = False, ) -> Message: response = self.build_message(text) response.frm = self.bot_identifier if msg.is_group and not private: # stripped returns the full bot@conference.domain.tld/chat_username # but in case of a groupchat, we should only try to send to the MUC address # itself (bot@conference.domain.tld) response.to = XMPPRoom(msg.frm.node + "@" + msg.frm.domain, self) elif msg.is_direct: # preserve from in case of a simple chat message. # it is either a user to user or user_in_chatroom to user case. # so we need resource. response.to = msg.frm elif ( hasattr(msg.to, "person") and msg.to.person == self.bot_config.BOT_IDENTITY["username"] ): # This is a direct private message, not initiated through a MUC. Use # stripped to remove the resource so that the response goes to the # client with the highest priority response.to = XMPPPerson(msg.frm.node, msg.frm.domain, None) else: # This is a private message that was initiated through a MUC. Don't use # stripped here to retain the resource, else the XMPP server doesn't # know which user we're actually responding to. response.to = msg.frm return response
@property def mode(self): return "xmpp"
[docs] def rooms(self) -> List[XMPPRoom]: """ Return a list of rooms the bot is currently in. :returns: A list of :class:`~errbot.backends.base.XMPPMUCRoom` instances. """ xep0045 = self.conn.client.plugin["xep_0045"] return [XMPPRoom(room, self) for room in xep0045.get_joined_rooms()]
[docs] def query_room(self, room) -> XMPPRoom: """ Query a room for information. :param room: The JID/identifier of the room to query for. :returns: An instance of :class:`~XMPPMUCRoom`. """ return XMPPRoom(room, self)
[docs] def prefix_groupchat_reply(self, message: Message, identifier: Identifier): super().prefix_groupchat_reply(message, identifier) message.body = f"@{identifier.nick} {message.body}"
def __hash__(self): return 0