import logging
import sys
from typing import Any, BinaryIO, List, Optional, Union
from errbot.backends.base import (
ONLINE,
Identifier,
Message,
Person,
Room,
RoomError,
RoomOccupant,
Stream,
)
from errbot.core import ErrBot
from errbot.rendering import text
from errbot.rendering.ansiext import TEXT_CHRS, enable_format
log = logging.getLogger(__name__)
UPDATES_OFFSET_KEY = "_telegram_updates_offset"
try:
import telegram
except ImportError:
log.exception("Could not start the Telegram back-end")
log.fatal(
"You need to install the telegram support in order "
"to use the Telegram backend.\n"
"You should be able to install this package using:\n"
"pip install errbot[telegram]"
)
sys.exit(1)
[docs]class RoomsNotSupportedError(RoomError):
[docs] def __init__(self, message: Optional[str] = None):
if message is None:
message = (
"Room operations are not supported on Telegram. "
"While Telegram itself has groupchat functionality, it does not "
"expose any APIs to bots to get group membership or otherwise "
"interact with groupchats."
)
super().__init__(message)
[docs]class TelegramBotFilter:
"""
This is a filter for the logging library that filters the
"No new updates found." log message generated by telegram.bot.
This is an INFO-level log message that gets logged for every
getUpdates() call where there are no new messages, so is way
too verbose.
"""
[docs] @staticmethod
def filter(record):
if record.getMessage() == "No new updates found.":
return 0
[docs]class TelegramIdentifier(Identifier):
[docs] def __init__(self, id):
self._id = str(id)
@property
def id(self) -> str:
return self._id
def __unicode__(self):
return str(self._id)
def __eq__(self, other):
return self._id == other.id
__str__ = __unicode__
aclattr = id
[docs]class TelegramPerson(TelegramIdentifier, Person):
[docs] def __init__(self, id, first_name=None, last_name=None, username=None):
super().__init__(id)
self._first_name = first_name
self._last_name = last_name
self._username = username
@property
def id(self) -> str:
return self._id
@property
def first_name(self) -> str:
return self._first_name
@property
def last_name(self) -> str:
return self._last_name
@property
def fullname(self) -> str:
fullname = self.first_name
if self.last_name is not None:
fullname += " " + self.last_name
return fullname
@property
def username(self) -> str:
return self._username
@property
def client(self) -> None:
return None
person = id
nick = username
[docs]class TelegramRoom(TelegramIdentifier, Room):
[docs] def __init__(self, id, title=None):
super().__init__(id)
self._title = title
@property
def id(self) -> str:
return self._id
@property
def title(self):
"""Return the groupchat title (only applies to groupchats)"""
return self._title
[docs] def join(self, username: str = None, password: str = None) -> None:
raise RoomsNotSupportedError()
[docs] def create(self) -> None:
raise RoomsNotSupportedError()
[docs] def leave(self, reason: str = None) -> None:
raise RoomsNotSupportedError()
[docs] def destroy(self) -> None:
raise RoomsNotSupportedError()
@property
def joined(self) -> None:
raise RoomsNotSupportedError()
@property
def exists(self) -> None:
raise RoomsNotSupportedError()
@property
def topic(self) -> None:
raise RoomsNotSupportedError()
@property
def occupants(self) -> None:
raise RoomsNotSupportedError()
[docs] def invite(self, *args) -> None:
raise RoomsNotSupportedError()
[docs]class TelegramMUCOccupant(TelegramPerson, RoomOccupant):
"""
This class represents a person inside a MUC.
"""
[docs] def __init__(
self, id, room: TelegramRoom, first_name=None, last_name=None, username=None
):
super().__init__(
id=id, first_name=first_name, last_name=last_name, username=username
)
self._room = room
@property
def room(self) -> TelegramRoom:
return self._room
@property
def username(self) -> str:
return self._username
[docs]class TelegramBackend(ErrBot):
[docs] def __init__(self, config):
super().__init__(config)
logging.getLogger("telegram.bot").addFilter(TelegramBotFilter())
identity = config.BOT_IDENTITY
self.token = identity.get("token", None)
if not self.token:
log.fatal(
"You need to supply a token for me to use. You can obtain "
"a token by registering your bot with the Bot Father (@BotFather)"
)
sys.exit(1)
self.telegram = None # Will be initialized in serve_once
self.bot_instance = None # Will be set in serve_once
compact = config.COMPACT_OUTPUT if hasattr(config, "COMPACT_OUTPUT") else False
enable_format("text", TEXT_CHRS, borders=not compact)
self.md_converter = text()
[docs] def set_message_size_limit(self, limit: int = 1024, hard_limit: int = 1024) -> None:
"""
Telegram message size limit
"""
super().set_message_size_limit(limit, hard_limit)
[docs] def serve_once(self) -> None:
log.info("Initializing connection")
try:
self.telegram = telegram.Bot(token=self.token)
me = self.telegram.getMe()
except telegram.TelegramError as e:
log.error("Connection failure: %s", e.message)
return False
self.bot_identifier = TelegramPerson(
id=me.id,
first_name=me.first_name,
last_name=me.last_name,
username=me.username,
)
log.info("Connected")
self.reset_reconnection_count()
self.connect_callback()
try:
offset = self[UPDATES_OFFSET_KEY]
except KeyError:
offset = 0
try:
while True:
log.debug("Getting updates with offset %s", offset)
for update in self.telegram.getUpdates(offset=offset, timeout=60):
offset = update.update_id + 1
self[UPDATES_OFFSET_KEY] = offset
log.debug("Processing update: %s", update)
if not hasattr(update, "message"):
log.warning("Unknown update type (no message present)")
continue
try:
self._handle_message(update.message)
except Exception:
log.exception("An exception occurred while processing update")
log.debug("All updates processed, new offset is %s", offset)
except KeyboardInterrupt:
log.info("Interrupt received, shutting down..")
return True
except Exception:
log.exception("Error reading from Telegram updates stream:")
finally:
log.debug("Triggering disconnect callback")
self.disconnect_callback()
def _handle_message(self, message: Message) -> None:
"""
Handle a received message.
:param message:
A message with a structure as defined at
https://core.telegram.org/bots/api#message
"""
if message.text is None:
log.warning("Unhandled message type (not a text message) ignored")
return
message_instance = self.build_message(message.text)
if message.chat["type"] == "private":
message_instance.frm = TelegramPerson(
id=message.from_user.id,
first_name=message.from_user.first_name,
last_name=message.from_user.last_name,
username=message.from_user.username,
)
message_instance.to = self.bot_identifier
else:
room = TelegramRoom(id=message.chat.id, title=message.chat.title)
message_instance.frm = TelegramMUCOccupant(
id=message.from_user.id,
room=room,
first_name=message.from_user.first_name,
last_name=message.from_user.last_name,
username=message.from_user.username,
)
message_instance.to = room
message_instance.extras["message_id"] = message.message_id
self.callback_message(message_instance)
[docs] def send_message(self, msg: Message) -> None:
super().send_message(msg)
body = self.md_converter.convert(msg.body)
try:
self.telegram.sendMessage(msg.to.id, body)
except Exception:
log.exception(
f"An exception occurred while trying to send the following message to {msg.to.id}: {msg.body}"
)
raise
[docs] def change_presence(self, status: str = ONLINE, message: str = "") -> None:
# It looks like telegram doesn't supports online presence for privacy reason.
pass
[docs] def build_identifier(self, txtrep: str) -> Union[TelegramPerson, TelegramRoom]:
"""
Convert a textual representation into a :class:`~TelegramPerson` or :class:`~TelegramRoom`.
"""
log.debug("building an identifier from %s.", txtrep)
if not self._is_numeric(txtrep):
raise ValueError("Telegram identifiers must be numeric.")
id_ = int(txtrep)
if id_ > 0:
return TelegramPerson(id=id_)
else:
return TelegramRoom(id=id_)
[docs] def build_reply(
self,
msg: Message,
text: Optional[str] = None,
private: bool = False,
threaded: bool = False,
) -> Message:
response = self.build_message(text)
response.frm = self.bot_identifier
if private:
response.to = msg.frm
else:
response.to = msg.frm if msg.is_direct else msg.to
return response
@property
def mode(self) -> text:
return "telegram"
[docs] def query_room(self, room: TelegramRoom) -> None:
"""
Not supported on Telegram.
:raises: :class:`~RoomsNotSupportedError`
"""
raise RoomsNotSupportedError()
[docs] def rooms(self) -> None:
"""
Not supported on Telegram.
:raises: :class:`~RoomsNotSupportedError`
"""
raise RoomsNotSupportedError()
[docs] def prefix_groupchat_reply(self, message: Message, identifier: Identifier):
super().prefix_groupchat_reply(message, identifier)
message.body = f"@{identifier.nick}: {message.body}"
def _telegram_special_message(
self, chat_id: Any, content: Any, msg_type: str, **kwargs
) -> telegram.Message:
"""Send special message."""
if msg_type == "document":
msg = self.telegram.sendDocument(
chat_id=chat_id, document=content, **kwargs
)
elif msg_type == "photo":
msg = self.telegram.sendPhoto(chat_id=chat_id, photo=content, **kwargs)
elif msg_type == "audio":
msg = self.telegram.sendAudio(chat_id=chat_id, audio=content, **kwargs)
elif msg_type == "video":
msg = self.telegram.sendVideo(chat_id=chat_id, video=content, **kwargs)
elif msg_type == "sticker":
msg = self.telegram.sendSticker(chat_id=chat_id, sticker=content, **kwargs)
elif msg_type == "location":
msg = self.telegram.sendLocation(
chat_id=chat_id,
latitude=kwargs.pop("latitude", ""),
longitude=kwargs.pop("longitude", ""),
**kwargs,
)
else:
raise ValueError(
f"Expected a valid choice for `msg_type`, got: {msg_type}."
)
return msg
def _telegram_upload_stream(self, stream: Stream, **kwargs) -> None:
"""Perform upload defined in a stream."""
msg = None
try:
stream.accept()
msg = self._telegram_special_message(
chat_id=stream.identifier.id,
content=stream.raw,
msg_type=stream.stream_type,
**kwargs,
)
except Exception:
log.exception(f"Upload of {stream.name} to {stream.identifier} failed.")
else:
if msg is None:
stream.error()
else:
stream.success()
[docs] def send_stream_request(
self,
identifier: Union[TelegramPerson, TelegramMUCOccupant],
fsource: Union[str, dict, BinaryIO],
name: Optional[str] = "file",
size: Optional[int] = None,
stream_type: Optional[str] = None,
) -> Union[str, Stream]:
"""Starts a file transfer.
:param identifier: TelegramPerson or TelegramMUCOccupant
Identifier of the Person or Room to send the stream to.
:param fsource: str, dict or binary data
File URL or binary content from a local file.
Optionally a dict with binary content plus metadata can be given.
See `stream_type` for more details.
:param name: str, optional
Name of the file. Not sure if this works always.
:param size: str, optional
Size of the file obtained with os.path.getsize.
This is only used for debug logging purposes.
:param stream_type: str, optional
Type of the stream. Choices: 'document', 'photo', 'audio', 'video', 'sticker', 'location'.
If 'video', a dict is optional as {'content': fsource, 'duration': str}.
If 'voice', a dict is optional as {'content': fsource, 'duration': str}.
If 'audio', a dict is optional as {'content': fsource, 'duration': str, 'performer': str, 'title': str}.
For 'location' a dict is mandatory as {'latitude': str, 'longitude': str}.
For 'venue': TODO # see: https://core.telegram.org/bots/api#sendvenue
:return stream: str or Stream
If `fsource` is str will return str, else return Stream.
"""
def _telegram_metadata(fsource):
if isinstance(fsource, dict):
return fsource.pop("content"), fsource
else:
return fsource, None
def _is_valid_url(url) -> bool:
try:
from urlparse import urlparse
except Exception:
from urllib.parse import urlparse
return bool(urlparse(url).scheme)
content, meta = _telegram_metadata(fsource)
if isinstance(content, str):
if not _is_valid_url(content):
raise ValueError(f"Not valid URL: {content}")
self._telegram_special_message(
chat_id=identifier.id, content=content, msg_type=stream_type, **meta
)
log.debug(
"Requesting upload of %s to %s (size hint: %d, stream type: %s).",
name,
identifier.username,
size,
stream_type,
)
stream = content
else:
stream = Stream(identifier, content, name, size, stream_type)
log.debug(
"Requesting upload of %s to %s (size hint: %d, stream type: %s)",
name,
identifier,
size,
stream_type,
)
self.thread_pool.apply_async(self._telegram_upload_stream, (stream,))
return stream
@staticmethod
def _is_numeric(input_) -> bool:
"""Return true if input is a number"""
try:
int(input_)
return True
except ValueError:
return False