Source code for errbot.core_plugins.webserver

import logging
import os
import sys
from json import loads
from random import randrange
from threading import Thread
from urllib.request import unquote

from OpenSSL import crypto
from webtest import TestApp
from werkzeug.serving import ThreadedWSGIServer

from errbot import BotPlugin, botcmd, webhook
from errbot.core_plugins import flask_app

[docs] def make_ssl_certificate(key_path, cert_path): """ Generate a self-signed certificate The generated key will be written out to key_path, with the corresponding certificate itself being written to cert_path. :param cert_path: path where to write the certificate. :param key_path: path where to write the key. """ cert = crypto.X509() cert.set_serial_number(randrange(1, sys.maxsize)) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) subject = cert.get_subject() subject.CN = "*" setattr( subject, "O", "Self-Signed Certificate for Errbot" ) # Pep8 annoyance workaround issuer = cert.get_issuer() issuer.CN = "Self-proclaimed Authority" setattr(issuer, "O", "Self-Signed") # Pep8 annoyance workaround pkey = crypto.PKey() pkey.generate_key(crypto.TYPE_RSA, 4096) cert.set_pubkey(pkey) cert.sign(pkey, "sha256") f = open(cert_path, "w") f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8")) f.close() f = open(key_path, "w") f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode("utf-8")) f.close()
[docs] class Webserver(BotPlugin):
[docs] def __init__(self, *args, **kwargs): self.server = None self.server_thread = None self.ssl_context = None self.test_app = TestApp(flask_app) super().__init__(*args, **kwargs)
[docs] def get_configuration_template(self): return { "HOST": "", "PORT": 3141, "SSL": { "enabled": False, "host": "", "port": 3142, "certificate": "", "key": "", }, }
[docs] def check_configuration(self, configuration): # it is a pain, just assume a default config if SSL is absent or set to None if configuration.get("SSL", None) is None: configuration["SSL"] = { "enabled": False, "host": "", "port": 3142, "certificate": "", "key": "", } super().check_configuration(configuration)
[docs] def activate(self): if not self.config:"Webserver is not configured. Forbid activation") return if self.server_thread and self.server_thread.is_alive(): raise Exception( "Invalid state, you should not have a webserver already running." ) self.server_thread = Thread(target=self.run_server, name="Webserver Thread") self.server_thread.start() self.log.debug("Webserver started.") super().activate()
[docs] def deactivate(self): if self.server is not None:"Shutting down the internal webserver.") self.server.shutdown()"Waiting for the webserver thread to quit.") self.server_thread.join()"Webserver shut down correctly.") super().deactivate()
[docs] def run_server(self): try: host = self.config["HOST"] port = self.config["PORT"] ssl = self.config["SSL"]"Starting the webserver on %s:%i", host, port) ssl_context = (ssl["certificate"], ssl["key"]) if ssl["enabled"] else None self.server = ThreadedWSGIServer( host, ssl["port"] if ssl_context else port, flask_app, ssl_context=ssl_context, ) wsgi_log = logging.getLogger("werkzeug") wsgi_log.setLevel(self.bot_config.BOT_LOG_LEVEL) self.server.serve_forever() self.log.debug("Webserver stopped") except KeyboardInterrupt:"Keyboard interrupt, request a global shutdown.") self.server.shutdown() except Exception: self.log.exception("The webserver exploded.")
[docs] @botcmd(template="webstatus") def webstatus(self, msg, args): """ Gives a quick status of what is mapped in the internal webserver """ return { "rules": (((rule.rule, rule.endpoint) for rule in flask_app.url_map._rules)) }
[docs] @webhook def echo(self, incoming_request): """ A simple test webhook """ self.log.debug("Your incoming request is: %s", incoming_request) return str(incoming_request)
[docs] @botcmd(split_args_with=" ", template="webserver") def webhook_test(self, _, args): """ Test your webhooks from within err. The syntax is : !webhook test [relative_url] [post content] It triggers the notification and generate also a little test report. """ url = args[0] content = " ".join(args[1:]) # try to guess the content-type of what has been passed try: # try if it is plain json loads(content) contenttype = "application/json" except ValueError: # try if it is a form splitted = content.split("=") # noinspection PyBroadException try: payload = "=".join(splitted[1:]) loads(unquote(payload)) contenttype = "application/x-www-form-urlencoded" except Exception as _: contenttype = "text/plain" # dunno what it is self.log.debug("Detected your post as : %s.", contenttype) response =, params=content, content_type=contenttype) return { "url": url, "content": content, "contenttype": contenttype, "response": response, }
[docs] @botcmd(admin_only=True) def generate_certificate(self, _, args): """ Generate a self-signed SSL certificate for the Webserver """ yield ( "Generating a new private key and certificate. This could take a " "while if your system is slow or low on entropy" ) key_path = os.sep.join((self.bot_config.BOT_DATA_DIR, "webserver_key.pem")) cert_path = os.sep.join( (self.bot_config.BOT_DATA_DIR, "webserver_certificate.pem") ) make_ssl_certificate(key_path=key_path, cert_path=cert_path) yield f"Certificate successfully generated and saved in {self.bot_config.BOT_DATA_DIR}." suggested_config = self.config suggested_config["SSL"]["enabled"] = True suggested_config["SSL"]["host"] = suggested_config["HOST"] suggested_config["SSL"]["port"] = suggested_config["PORT"] + 1 suggested_config["SSL"]["key"] = key_path suggested_config["SSL"]["certificate"] = cert_path yield "To enable SSL with this certificate, the following config is recommended:" yield f"{suggested_config!r}"