import sys
import os
from json import loads
from random import randrange
from threading import Thread
from webtest import TestApp
from errbot.core_plugins import flask_app
from werkzeug.serving import ThreadedWSGIServer
from errbot import botcmd, BotPlugin, webhook
from urllib.request import unquote
from OpenSSL import crypto
TEST_REPORT = """*** Test Report
URL : %s
Detected your post as : %s
Status code : %i
"""
[docs]def make_ssl_certificate(key_path, cert_path):
"""
Generate a self-signed certificate
The generated key will be written out to key_path, with the corresponding
certificate itself being written to cert_path.
:param cert_path: path where to write the certificate.
:param key_path: path where to write the key.
"""
cert = crypto.X509()
cert.set_serial_number(randrange(1, sys.maxsize))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(60 * 60 * 24 * 365)
subject = cert.get_subject()
subject.CN = '*'
setattr(subject, 'O', 'Self-Signed Certificate for Errbot') # Pep8 annoyance workaround
issuer = cert.get_issuer()
issuer.CN = 'Self-proclaimed Authority'
setattr(issuer, 'O', 'Self-Signed') # Pep8 annoyance workaround
pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_RSA, 4096)
cert.set_pubkey(pkey)
cert.sign(pkey, 'sha256')
f = open(cert_path, 'w')
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8'))
f.close()
f = open(key_path, 'w')
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode('utf-8'))
f.close()
[docs]class Webserver(BotPlugin):
[docs] def __init__(self, *args, **kwargs):
self.server = None
self.server_thread = None
self.ssl_context = None
self.test_app = TestApp(flask_app)
super().__init__(*args, **kwargs)
[docs] def get_configuration_template(self):
return {'HOST': '0.0.0.0',
'PORT': 3141,
'SSL': {'enabled': False,
'host': '0.0.0.0',
'port': 3142,
'certificate': '',
'key': ''}}
[docs] def check_configuration(self, configuration):
# it is a pain, just assume a default config if SSL is absent or set to None
if configuration.get('SSL', None) is None:
configuration['SSL'] = {'enabled': False, 'host': '0.0.0.0', 'port': 3142, 'certificate': '', 'key': ''}
super().check_configuration(configuration)
[docs] def activate(self):
if not self.config:
self.log.info('Webserver is not configured. Forbid activation')
return
if self.server_thread and self.server_thread.is_alive():
raise Exception('Invalid state, you should not have a webserver already running.')
self.server_thread = Thread(target=self.run_server, name='Webserver Thread')
self.server_thread.start()
self.log.debug('Webserver started.')
super().activate()
[docs] def deactivate(self):
if self.server is not None:
self.log.info('Shutting down the internal webserver.')
self.server.shutdown()
self.log.info('Waiting for the webserver thread to quit.')
self.server_thread.join()
self.log.info('Webserver shut down correctly.')
super().deactivate()
[docs] def run_server(self):
try:
host = self.config['HOST']
port = self.config['PORT']
ssl = self.config['SSL']
self.log.info('Starting the webserver on %s:%i', host, port)
ssl_context = (ssl['certificate'], ssl['key']) if ssl['enabled'] else None
self.server = ThreadedWSGIServer(host, ssl['port'] if ssl_context else port, flask_app,
ssl_context=ssl_context)
self.server.serve_forever()
self.log.debug('Webserver stopped')
except KeyboardInterrupt:
self.log.info('Keyboard interrupt, request a global shutdown.')
self.server.shutdown()
except Exception:
self.log.exception('The webserver exploded.')
[docs] @botcmd(template='webstatus')
def webstatus(self, msg, args):
"""
Gives a quick status of what is mapped in the internal webserver
"""
return {'rules': (((rule.rule, rule.endpoint) for rule in flask_app.url_map._rules))}
[docs] @webhook
def echo(self, incoming_request):
"""
A simple test webhook
"""
self.log.debug("Your incoming request is :" + str(incoming_request))
return str(incoming_request)
[docs] @botcmd(split_args_with=' ')
def webhook_test(self, _, args):
"""
Test your webhooks from within err.
The syntax is :
!webhook test [relative_url] [post content]
It triggers the notification and generate also a little test report.
"""
url = args[0]
content = ' '.join(args[1:])
# try to guess the content-type of what has been passed
try:
# try if it is plain json
loads(content)
contenttype = 'application/json'
except ValueError:
# try if it is a form
splitted = content.split('=')
# noinspection PyBroadException
try:
payload = '='.join(splitted[1:])
loads(unquote(payload))
contenttype = 'application/x-www-form-urlencoded'
except Exception as _:
contenttype = 'text/plain' # dunno what it is
self.log.debug('Detected your post as : %s.', contenttype)
response = self.test_app.post(url, params=content, content_type=contenttype)
return TEST_REPORT % (url, contenttype, response.status_code)
[docs] @botcmd(admin_only=True)
def generate_certificate(self, _, args):
"""
Generate a self-signed SSL certificate for the Webserver
"""
yield ('Generating a new private key and certificate. This could take a '
'while if your system is slow or low on entropy')
key_path = os.sep.join((self.bot_config.BOT_DATA_DIR, "webserver_key.pem"))
cert_path = os.sep.join((self.bot_config.BOT_DATA_DIR, "webserver_certificate.pem"))
make_ssl_certificate(key_path=key_path, cert_path=cert_path)
yield f'Certificate successfully generated and saved in {self.bot_config.BOT_DATA_DIR}.'
suggested_config = self.config
suggested_config['SSL']['enabled'] = True
suggested_config['SSL']['host'] = suggested_config['HOST']
suggested_config['SSL']['port'] = suggested_config['PORT'] + 1
suggested_config['SSL']['key'] = key_path
suggested_config['SSL']['certificate'] = cert_path
yield 'To enable SSL with this certificate, the following config is recommended:'
yield f'{suggested_config!r}'