import copyreg
import logging
import re
import sys
from time import sleep
from typing import BinaryIO, List, Optional, Union
from ansi.color import fg, fx
from markdown import Markdown
from markdown.extensions.extra import ExtraExtension
from pygments import highlight
from pygments.formatters import Terminal256Formatter
from pygments.lexers import get_lexer_by_name
from errbot.backends.base import (
OFFLINE,
ONLINE,
Identifier,
Message,
Person,
Presence,
Room,
RoomOccupant,
Stream,
)
from errbot.core import ErrBot
from errbot.logs import console_hdlr
from errbot.rendering import ansi, imtext, text, xhtml
from errbot.rendering.ansiext import ANSI_CHRS, AnsiExtension, enable_format
log = logging.getLogger(__name__)
ENCODING_INPUT = sys.stdin.encoding
ANSI = hasattr(sys.stderr, "isatty") and sys.stderr.isatty()
enable_format("borderless", ANSI_CHRS, borders=False)
[docs]def borderless_ansi() -> Markdown:
"""This makes a converter from markdown to ansi (console) format.
It can be called like this:
from errbot.rendering import ansi
md_converter = ansi() # you need to cache the converter
ansi_txt = md_converter.convert(md_txt)
"""
md = Markdown(
output_format="borderless", extensions=[ExtraExtension(), AnsiExtension()]
)
md.stripTopLevelTags = False
return md
[docs]class TextPerson(Person):
"""
Simple Person implementation which represents users as simple text strings.
"""
[docs] def __init__(self, person, client=None, nick=None, fullname=None):
self._person = person
self._client = client
self._nick = nick
self._fullname = fullname
self._email = ""
@property
def person(self) -> Person:
return self._person
@property
def client(self) -> str:
return self._client
@property
def nick(self) -> str:
return self._nick
@property
def fullname(self) -> str:
return self._fullname
@property
def email(self) -> str:
return self._email
@property
def aclattr(self) -> str:
return str(self)
def __str__(self):
return "@" + self._person
def __eq__(self, other):
if not isinstance(other, Person):
return False
return self.person == other.person
def __hash__(self):
return self.person.__hash__()
[docs]class TextRoom(Room):
[docs] def __init__(self, name: str, bot: ErrBot):
self._topic = ""
self._joined = False
self.name = name
self._bot = bot
# get the bot username
bot_config = self._bot.bot_config
default_bot_name = "@errbot"
if hasattr(bot_config, "BOT_IDENTITY"):
bot_name = bot_config.BOT_IDENTITY.get("username", default_bot_name)
else:
bot_name = default_bot_name
# fill up the room with a coherent set of identities.
self._occupants = [
TextOccupant("somebody", self),
TextOccupant(TextPerson(bot.bot_config.BOT_ADMINS[0]), self),
TextOccupant(bot_name, self),
]
[docs] def join(
self, username: Optional[str] = None, password: Optional[str] = None
) -> None:
self._joined = True
[docs] def leave(self, reason: Optional[str] = None) -> None:
self._joined = False
[docs] def create(self) -> None:
self._joined = True
[docs] def destroy(self) -> None:
self._joined = False
@property
def exists(self) -> bool:
return True
@property
def joined(self) -> bool:
return self._joined
@property
def topic(self) -> str:
return self._topic
@topic.setter
def topic(self, topic: str) -> None:
self._topic = topic
@property
def occupants(self) -> List["TextOccupant"]:
return self._occupants
[docs] def invite(self, *args):
pass
def __str__(self):
return "#" + self.name
def __eq__(self, other):
return self.name == other.name
def __hash__(self):
return self.name.__hash__()
[docs]class TextOccupant(TextPerson, RoomOccupant):
[docs] def __init__(self, person, room):
super().__init__(person)
self._room = room
@property
def room(self) -> TextRoom:
return self._room
def __str__(self):
return f"#{self._room.name}/{self._person.person}"
def __eq__(self, other):
return self.person == other.person and self.room == other.room
def __hash__(self):
return self.person.__hash__() + self.room.__hash__()
INTRO = """
---
You start as a **bot admin in a one-on-one conversation** with the bot.
### Context of the chat
- Use `!inroom`{:color='blue'} to switch to a room conversation.
- Use `!inperson`{:color='blue'} to switch back to a one-on-one conversation.
- Use `!asuser`{:color='green'} to talk as a normal user.
- Use `!asadmin`{:color='red'} to switch back as a bot admin.
### Preferences
- Use `!ml`{:color='yellow'} to flip on/off the multiline mode (Enter twice at the end to send).
---
"""
[docs]class TextBackend(ErrBot):
[docs] def __init__(self, config):
super().__init__(config)
log.debug("Text Backend Init.")
if (
hasattr(self.bot_config, "BOT_IDENTITY")
and "username" in self.bot_config.BOT_IDENTITY
):
self.bot_identifier = self.build_identifier(
self.bot_config.BOT_IDENTITY["username"]
)
else:
# Just a default identity for the bot if nothing has been specified.
self.bot_identifier = self.build_identifier("@errbot")
log.debug("Bot username set at %s.", self.bot_identifier)
self._inroom = False
self._rooms = []
self._multiline = False
self.demo_mode = (
self.bot_config.TEXT_DEMO_MODE
if hasattr(self.bot_config, "TEXT_DEMO_MODE")
else False
)
if not self.demo_mode:
self.md_html = xhtml() # for more debug feedback on md
self.md_text = text() # for more debug feedback on md
self.md_borderless_ansi = borderless_ansi()
self.md_im = imtext()
self.md_lexer = get_lexer_by_name("md", stripall=True)
self.md_ansi = ansi()
self.html_lexer = get_lexer_by_name("html", stripall=True)
self.terminal_formatter = Terminal256Formatter(style="paraiso-dark")
self.user = self.build_identifier(self.bot_config.BOT_ADMINS[0])
self._register_identifiers_pickling()
@staticmethod
def _unpickle_identifier(identifier_str):
return TextBackend.__build_identifier(identifier_str)
@staticmethod
def _pickle_identifier(identifier):
return TextBackend._unpickle_identifier, (str(identifier),)
def _register_identifiers_pickling(self):
"""
Register identifiers pickling.
"""
TextBackend.__build_identifier = self.build_identifier
for cls in (TextPerson, TextOccupant, TextRoom):
copyreg.pickle(
cls, TextBackend._pickle_identifier, TextBackend._unpickle_identifier
)
[docs] def serve_forever(self) -> None:
self.readline_support()
if not self._rooms:
# artificially join a room if None were specified.
self.query_room("#testroom").join()
if self.demo_mode:
# disable the console logging once it is serving in demo mode.
root = logging.getLogger()
root.removeHandler(console_hdlr)
root.addHandler(logging.NullHandler())
self.connect_callback() # notify that the connection occured
self.callback_presence(Presence(identifier=self.user, status=ONLINE))
self.send_message(Message(INTRO))
try:
while True:
if self._inroom:
frm = TextOccupant(self.user, self.rooms()[0])
to = self.rooms()[0]
else:
frm = self.user
to = self.bot_identifier
print()
full_msg = ""
while True:
prompt = "[␍] " if full_msg else ">>> "
if ANSI or self.demo_mode:
color = (
fg.red
if self.user.person in self.bot_config.BOT_ADMINS[0]
else fg.green
)
prompt = f"{color}[{frm} ➡ {to}] {fg.cyan}{prompt}{fx.reset}"
entry = input(prompt)
else:
entry = input(f"[{frm} ➡ {to}] {prompt}")
if not self._multiline:
full_msg = entry
break
if not entry:
break
full_msg += entry + "\n"
msg = Message(full_msg)
msg.frm = frm
msg.to = to
self.callback_message(msg)
mentioned = [
self.build_identifier(word)
for word in re.findall(r"(?<=\s)@[\w]+", entry)
]
if mentioned:
self.callback_mention(msg, mentioned)
sleep(0.5)
except EOFError:
pass
except KeyboardInterrupt:
pass
finally:
# simulate some real presence
self.callback_presence(Presence(identifier=self.user, status=OFFLINE))
log.debug("Trigger disconnect callback")
self.disconnect_callback()
log.debug("Trigger shutdown")
self.shutdown()
[docs] def readline_support(self) -> None:
try:
# Load readline for better editing/history behaviour
import readline
# Implement a simple completer for commands
def completer(txt, state):
options = (
[i for i in self.all_commands if i.startswith(txt)]
if txt
else list(self.all_commands.keys())
)
if state < len(options):
return options[state]
readline.parse_and_bind("tab: complete")
readline.set_completer(completer)
except ImportError:
# Readline is Unix-only
log.debug("Python readline module is not available")
[docs] def send_message(self, msg: Message) -> None:
if self.demo_mode:
print(self.md_ansi.convert(msg.body))
else:
bar = "\n╌╌[{mode}]" + ("╌" * 60)
super().send_message(msg)
print(bar.format(mode="MD "))
if ANSI:
print(highlight(msg.body, self.md_lexer, self.terminal_formatter))
else:
print(msg.body)
print(bar.format(mode="HTML"))
html = self.md_html.convert(msg.body)
if ANSI:
print(highlight(html, self.html_lexer, self.terminal_formatter))
else:
print(html)
print(bar.format(mode="TEXT"))
print(self.md_text.convert(msg.body))
print(bar.format(mode="IM "))
print(self.md_im.convert(msg.body))
if ANSI:
print(bar.format(mode="ANSI"))
print(self.md_ansi.convert(msg.body))
print(bar.format(mode="BORDERLESS"))
print(self.md_borderless_ansi.convert(msg.body))
print("\n\n")
[docs] def add_reaction(self, msg: Message, reaction: str) -> None:
# this is like the Slack backend's add_reaction
self._react("+", msg, reaction)
[docs] def remove_reaction(self, msg: Message, reaction: str) -> None:
self._react("-", msg, reaction)
def _react(self, sign: str, msg: Message, reaction: str) -> None:
self.send(msg.frm, f"reaction {sign}:{reaction}:", in_reply_to=msg)
[docs] def change_presence(self, status: str = ONLINE, message: str = "") -> None:
log.debug("*** Changed presence to [%s] %s", status, message)
[docs] def build_identifier(
self, text_representation: str
) -> Union[TextOccupant, TextRoom, TextPerson]:
if text_representation.startswith("#"):
rem = text_representation[1:]
if "/" in text_representation:
room, person = rem.split("/")
return TextOccupant(TextPerson(person), TextRoom(room, self))
return self.query_room("#" + rem)
if not text_representation.startswith("@"):
raise ValueError(
"An identifier for the Text backend needs to start with # for a room or @ for a person."
)
return TextPerson(text_representation[1:])
[docs] def build_reply(
self,
msg: Message,
text: str = None,
private: bool = False,
threaded: bool = False,
) -> Message:
response = self.build_message(text)
response.frm = self.bot_identifier
if private:
response.to = msg.frm
else:
response.to = msg.frm.room if isinstance(msg.frm, RoomOccupant) else msg.frm
return response
@property
def mode(self):
return "text"
[docs] def query_room(self, room: str) -> TextRoom:
if not room.startswith("#"):
raise ValueError("A Room name must start by #.")
text_room = TextRoom(room[1:], self)
if text_room not in self._rooms:
self._rooms.insert(0, text_room)
else:
self._rooms.insert(0, self._rooms.pop(self._rooms.index(text_room)))
return text_room
[docs] def rooms(self) -> List[TextRoom]:
return self._rooms
[docs] def prefix_groupchat_reply(self, message: Message, identifier: Identifier):
message.body = f"{identifier.person} {message.body}"
[docs] def send_stream_request(
self,
user: Identifier,
fsource: BinaryIO,
name: str = None,
size: int = None,
stream_type: str = None,
) -> Stream:
"""
Starts a file transfer. For Slack, the size and stream_type are unsupported
:param user: is the identifier of the person you want to send it to.
:param fsource: is a file object you want to send.
:param name: is an optional filename for it.
:param size: not supported in Slack backend
:param stream_type: not supported in Slack backend
:return Stream: object on which you can monitor the progress of it.
"""
stream = Stream(user, fsource, name, size, stream_type)
log.debug(
"Requesting upload of %s to %s (size hint: %d, stream type: %s).",
stream.name,
stream.identifier,
size,
stream_type,
)
return stream