Source code for errbot.streaming
import os
import io
from itertools import starmap, repeat
from threading import Thread
from .backends.base import STREAM_WAITING_TO_START, STREAM_TRANSFER_IN_PROGRESS
import logging
CHUNK_SIZE = 4096
log = logging.getLogger(__name__)
[docs]def repeatfunc(func, times=None, *args): # from the itertools receipes
"""Repeat calls to func with specified arguments.
Example: repeatfunc(random.random)
:param args: params to the function to call.
:param times: number of times to repeat.
:param func: the function to repeatedly call.
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
[docs]class Tee(object):
""" Tee implements a multi reader / single writer """
[docs] def __init__(self, incoming_stream, clients):
""" clients is a list of objects implementing callback_stream """
self.incoming_stream = incoming_stream
self.clients = clients
[docs] def start(self):
""" starts the transfer asynchronously """
t = Thread(target=self.run)
t.start()
return t
[docs] def run(self):
""" streams to all the clients synchronously """
nb_clients = len(self.clients)
pipes = [(io.open(r, 'rb'), io.open(w, 'wb')) for r, w in repeatfunc(os.pipe, nb_clients)]
streams = [self.incoming_stream.clone(pipe[0]) for pipe in pipes]
def streamer(index):
try:
self.clients[index].callback_stream(streams[index])
if streams[index].status == STREAM_WAITING_TO_START:
streams[index].reject()
plugin = self.clients[index].name
logging.warning('%s did not accept nor reject the incoming file transfer', plugin)
logging.warning('I reject it as a fallback.')
except Exception as _:
# internal error, mark the error.
streams[index].error()
else:
if streams[index].status == STREAM_TRANSFER_IN_PROGRESS:
# if the plugin didn't do it by itself, mark the transfer as a success.
streams[index].success()
# stop the stream if the callback_stream returns
read, write = pipes[index]
pipes[index] = (None, None) # signal the main thread to stop streaming
read.close()
write.close()
threads = [Thread(target=streamer, args=(i,)) for i in range(nb_clients)]
for thread in threads:
thread.start()
while True:
if self.incoming_stream.closed:
break
chunk = self.incoming_stream.read(CHUNK_SIZE)
log.debug("dispatch %d bytes", len(chunk))
if not chunk:
break
for (_, w) in pipes:
if w:
w.write(chunk)
log.debug("EOF detected")
for (r, w) in pipes:
if w:
w.close() # close should flush too
# we want to be sure that if we join on the main thread,
# everything is either fully transfered or errored
for thread in threads:
thread.join()