from typing import Tuple, Union, Sequence, List, Generator, Dict
import json
import re
import logging
import os
import shutil
import subprocess
from collections import namedtuple
from datetime import timedelta, datetime
from os import path
import tarfile
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
from urllib.parse import urlparse
from errbot.storage import StoreMixin
from errbot.storage.base import StoragePluginBase
from .utils import git_clone
from .utils import git_pull
from .utils import ON_WINDOWS
log = logging.getLogger(__name__)
[docs]def human_name_for_git_url(url):
# try to humanize the last part of the git url as much as we can
s = url.split(':')[-1].split('/')[-2:]
if s[-1].endswith('.git'):
s[-1] = s[-1][:-4]
return str('/'.join(s))
INSTALLED_REPOS = 'installed_repos'
REPO_INDEXES_CHECK_INTERVAL = timedelta(hours=1)
REPO_INDEX = 'repo_index'
LAST_UPDATE = 'last_update'
RepoEntry = namedtuple('RepoEntry', 'entry_name, name, python, repo, path, avatar_url, documentation')
FIND_WORDS_RE = re.compile(r"(\w[\w']*\w|\w)")
[docs]class RepoException(Exception):
pass
[docs]def makeEntry(repo_name: str, plugin_name: str, json_value):
return RepoEntry(entry_name=repo_name,
name=plugin_name,
python=json_value['python'],
repo=json_value['repo'],
path=json_value['path'],
avatar_url=json_value['avatar_url'],
documentation=json_value['documentation'])
[docs]def tokenizeJsonEntry(json_dict):
"""
Returns all the words in a repo entry.
"""
search = ' '.join((str(word) for word in json_dict.values()))
return set(FIND_WORDS_RE.findall(search.lower()))
[docs]def which(program):
if ON_WINDOWS:
program += '.exe'
def is_exe(file_path):
return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
[docs]def check_dependencies(req_path: Path) -> Tuple[Union[str, None], Sequence[str]]:
""" This methods returns a pair of (message, packages missing).
Or None, [] if everything is OK.
"""
log.debug('check dependencies of %s', req_path)
# noinspection PyBroadException
try:
from pkg_resources import get_distribution
missing_pkg = []
if not req_path.is_file():
log.debug('%s has no requirements.txt file', req_path)
return None, missing_pkg
with req_path.open() as f:
for line in f:
stripped = line.strip()
# skip empty lines.
if not stripped:
continue
# noinspection PyBroadException
try:
get_distribution(stripped)
except Exception:
missing_pkg.append(stripped)
if missing_pkg:
return f'You need these dependencies for {req_path}: ' + ','.join(missing_pkg), missing_pkg
return None, missing_pkg
except Exception:
log.exception('Problem checking for dependencies.')
return 'You need to have setuptools installed for the dependency check of the plugins', []
[docs]class BotRepoManager(StoreMixin):
"""
Manages the repo list, git clones/updates or the repos.
"""
[docs] def __init__(self, storage_plugin: StoragePluginBase, plugin_dir: str, plugin_indexes: Tuple[str, ...]) -> None:
"""
Make a repo manager.
:param storage_plugin: where the manager store its state.
:param plugin_dir: where on disk it will git clone the repos.
:param plugin_indexes: a list of URL / path to get the json repo index.
"""
super().__init__()
self.plugin_indexes = plugin_indexes
self.storage_plugin = storage_plugin
self.plugin_dir = plugin_dir
self.open_storage(storage_plugin, 'repomgr')
[docs] def shutdown(self) -> None:
self.close_storage()
[docs] def check_for_index_update(self) -> None:
if REPO_INDEX not in self:
log.info('No repo index, creating it.')
self.index_update()
return
if datetime.fromtimestamp(self[REPO_INDEX][LAST_UPDATE]) < datetime.now() - REPO_INDEXES_CHECK_INTERVAL:
log.info('Index is too old, update it.')
self.index_update()
[docs] def index_update(self) -> None:
index = {LAST_UPDATE: datetime.now().timestamp()}
for source in reversed(self.plugin_indexes):
try:
if urlparse(source).scheme in ('http', 'https'):
with urlopen(url=source, timeout=10) as request: # nosec
log.debug('Update from remote source %s...', source)
encoding = request.headers.get_content_charset()
content = request.read().decode(encoding if encoding else 'utf-8')
else:
with open(source, encoding='utf-8', mode='r') as src_file:
log.debug('Update from local source %s...', source)
content = src_file.read()
index.update(json.loads(content))
except (HTTPError, URLError, IOError):
log.exception('Could not update from source %s, keep the index as it is.', source)
break
else:
# nothing failed so ok, we can store the index.
self[REPO_INDEX] = index
log.debug('Stored %d repo entries.', len(index) - 1)
[docs] def get_repo_from_index(self, repo_name: str) -> List[RepoEntry]:
"""
Retrieve the list of plugins for the repo_name from the index.
:param repo_name: the name of the repo
:return: a list of RepoEntry
"""
plugins = self[REPO_INDEX].get(repo_name, None)
if plugins is None:
return None
result = []
for name, plugin in plugins.items():
result.append(makeEntry(repo_name, name, plugin))
return result
[docs] def search_repos(self, query: str) -> Generator[RepoEntry, None, None]:
"""
A simple search feature, keywords are AND and case insensitive on all the fields.
:param query: a string query
:return: an iterator of RepoEntry
"""
# first see if we are up to date.
self.check_for_index_update()
if REPO_INDEX not in self:
log.error('No index.')
return
query_work_set = set(FIND_WORDS_RE.findall(query.lower()))
for repo_name, plugins in self[REPO_INDEX].items():
if repo_name == LAST_UPDATE:
continue
for plugin_name, plugin in plugins.items():
if query_work_set.intersection(tokenizeJsonEntry(plugin)):
yield makeEntry(repo_name, plugin_name, plugin)
[docs] def get_installed_plugin_repos(self) -> Dict[str, str]:
return self.get(INSTALLED_REPOS, {})
[docs] def add_plugin_repo(self, name: str, url: str) -> None:
with self.mutable(INSTALLED_REPOS, {}) as repos:
repos[name] = url
[docs] def set_plugin_repos(self, repos: Dict[str, str]) -> None:
""" Used externally.
"""
self[INSTALLED_REPOS] = repos
[docs] def get_all_repos_paths(self) -> List[str]:
return [os.path.join(self.plugin_dir, d) for d in self.get(INSTALLED_REPOS, {}).keys()]
[docs] def install_repo(self, repo: str) -> str:
"""
Install the repository from repo
:param repo:
The url, git url or path on disk of a repository. It can point to either a git repo or
a .tar.gz of a plugin
:returns:
The path on disk where the repo has been installed on.
:raises: :class:`~RepoException` if an error occured.
"""
self.check_for_index_update()
human_name = None
# try to find if we have something with that name in our index
if repo in self[REPO_INDEX]:
human_name = repo
repo_url = next(iter(self[REPO_INDEX][repo].values()))['repo']
elif not repo.endswith('tar.gz'):
# This is a repo url, make up a plugin definition for it
human_name = human_name_for_git_url(repo)
repo_url = repo
else:
repo_url = repo
# TODO: Update download path of plugin.
if repo_url.endswith('tar.gz'):
fo = urlopen(repo_url) # nosec
tar = tarfile.open(fileobj=fo, mode='r:gz')
tar.extractall(path=self.plugin_dir)
s = repo_url.split(':')[-1].split('/')[-1]
human_name = s[:-len('.tar.gz')]
else:
human_name = human_name or human_name_for_git_url(repo_url)
try:
git_clone(repo_url, os.path.join(self.plugin_dir, human_name))
except Exception as exception: # dulwich errors all base on exceptions.Exception
raise RepoException(f'Could not load this plugin: \n\n{repo_url}\n\n---\n\n{exception}')
self.add_plugin_repo(human_name, repo_url)
return os.path.join(self.plugin_dir, human_name)
[docs] def update_repos(self, repos) -> Generator[Tuple[str, int, str], None, None]:
"""
This git pulls the specified repos on disk.
Yields tuples like (name, success, reason)
"""
# protects for update outside of what we know is installed
names = set(self.get_installed_plugin_repos().keys()).intersection(set(repos))
for d in (path.join(self.plugin_dir, name) for name in names):
success = 1
try:
git_pull(d)
feedback = "Pulled remote"
success = 0
except Exception as exception:
feedback = f"Error pulling remote {exception}"
pass
dep_err, missing_pkgs = check_dependencies(Path(d) / 'requirements.txt')
if dep_err:
feedback += dep_err + '\n'
yield d, success, feedback
[docs] def update_all_repos(self) -> Generator[Tuple[str, int, str], None, None]:
return self.update_repos(self.get_installed_plugin_repos().keys())
[docs] def uninstall_repo(self, name: str) -> None:
repo_path = path.join(self.plugin_dir, name)
# ignore errors because the DB can be desync'ed from the file tree.
shutil.rmtree(repo_path, ignore_errors=True)
repos = self.get_installed_plugin_repos()
del(repos[name])
self.set_plugin_repos(repos)