Source code for errbot.streaming

import io
import logging
import os
from itertools import repeat, starmap
from threading import Thread
from typing import Callable, Optional

from .backends.base import STREAM_TRANSFER_IN_PROGRESS, STREAM_WAITING_TO_START

CHUNK_SIZE = 4096

log = logging.getLogger(__name__)


[docs] def repeatfunc( func: Callable[..., None], times: Optional[int] = None, *args ): # from the itertools receipes """Repeat calls to func with specified arguments. Example: repeatfunc(random.random) :param args: params to the function to call. :param times: number of times to repeat. :param func: the function to repeatedly call. """ if times is None: return starmap(func, repeat(args)) return starmap(func, repeat(args, times))
[docs] class Tee: """Tee implements a multi reader / single writer"""
[docs] def __init__(self, incoming_stream, clients): """clients is a list of objects implementing callback_stream""" self.incoming_stream = incoming_stream self.clients = clients
[docs] def start(self) -> Thread: """starts the transfer asynchronously""" t = Thread(target=self.run) t.start() return t
[docs] def run(self): """streams to all the clients synchronously""" nb_clients = len(self.clients) pipes = [ (io.open(r, "rb"), io.open(w, "wb")) for r, w in repeatfunc(os.pipe, nb_clients) ] streams = [self.incoming_stream.clone(pipe[0]) for pipe in pipes] def streamer(index): try: self.clients[index].callback_stream(streams[index]) if streams[index].status == STREAM_WAITING_TO_START: streams[index].reject() plugin = self.clients[index].name logging.warning( "%s did not accept nor reject the incoming file transfer", plugin, ) logging.warning("I reject it as a fallback.") except Exception as _: # internal error, mark the error. streams[index].error() else: if streams[index].status == STREAM_TRANSFER_IN_PROGRESS: # if the plugin didn't do it by itself, mark the transfer as a success. streams[index].success() # stop the stream if the callback_stream returns read, write = pipes[index] pipes[index] = (None, None) # signal the main thread to stop streaming read.close() write.close() threads = [Thread(target=streamer, args=(i,)) for i in range(nb_clients)] for thread in threads: thread.start() while True: if self.incoming_stream.closed: break chunk = self.incoming_stream.read(CHUNK_SIZE) log.debug("dispatch %d bytes", len(chunk)) if not chunk: break for _, w in pipes: if w: w.write(chunk) log.debug("EOF detected") for r, w in pipes: if w: w.close() # close should flush too # we want to be sure that if we join on the main thread, # everything is either fully transfered or errored for thread in threads: thread.join()