Source code for errbot.streaming
import io
import logging
import os
from itertools import repeat, starmap
from threading import Thread
from typing import Callable, Optional
from .backends.base import STREAM_TRANSFER_IN_PROGRESS, STREAM_WAITING_TO_START
CHUNK_SIZE = 4096
log = logging.getLogger(__name__)
[docs]
def repeatfunc(
func: Callable[..., None], times: Optional[int] = None, *args
): # from the itertools receipes
"""Repeat calls to func with specified arguments.
Example: repeatfunc(random.random)
:param args: params to the function to call.
:param times: number of times to repeat.
:param func: the function to repeatedly call.
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
[docs]
class Tee:
"""Tee implements a multi reader / single writer"""
[docs]
def __init__(self, incoming_stream, clients):
"""clients is a list of objects implementing callback_stream"""
self.incoming_stream = incoming_stream
self.clients = clients
[docs]
def start(self) -> Thread:
"""starts the transfer asynchronously"""
t = Thread(target=self.run)
t.start()
return t
[docs]
def run(self):
"""streams to all the clients synchronously"""
nb_clients = len(self.clients)
pipes = [
(io.open(r, "rb"), io.open(w, "wb"))
for r, w in repeatfunc(os.pipe, nb_clients)
]
streams = [self.incoming_stream.clone(pipe[0]) for pipe in pipes]
def streamer(index):
try:
self.clients[index].callback_stream(streams[index])
if streams[index].status == STREAM_WAITING_TO_START:
streams[index].reject()
plugin = self.clients[index].name
logging.warning(
"%s did not accept nor reject the incoming file transfer",
plugin,
)
logging.warning("I reject it as a fallback.")
except Exception as _:
# internal error, mark the error.
streams[index].error()
else:
if streams[index].status == STREAM_TRANSFER_IN_PROGRESS:
# if the plugin didn't do it by itself, mark the transfer as a success.
streams[index].success()
# stop the stream if the callback_stream returns
read, write = pipes[index]
pipes[index] = (None, None) # signal the main thread to stop streaming
read.close()
write.close()
threads = [Thread(target=streamer, args=(i,)) for i in range(nb_clients)]
for thread in threads:
thread.start()
while True:
if self.incoming_stream.closed:
break
chunk = self.incoming_stream.read(CHUNK_SIZE)
log.debug("dispatch %d bytes", len(chunk))
if not chunk:
break
for _, w in pipes:
if w:
w.write(chunk)
log.debug("EOF detected")
for r, w in pipes:
if w:
w.close() # close should flush too
# we want to be sure that if we join on the main thread,
# everything is either fully transfered or errored
for thread in threads:
thread.join()