import io
import json
from errbot import BotPlugin, arg_botcmd, botcmd
from errbot.core_plugins.acls import get_acl_usr, glob
from errbot.flow import FLOW_END, Flow, FlowNode, FlowRoot
[docs]
class Flows(BotPlugin):
"""Management commands related to flows / conversations."""
[docs]
def recurse_node(
self, response: io.StringIO, stack, f: FlowNode, flow: Flow = None
):
if f in stack:
response.write(f'{"  " * (len(stack))}↺<br>')
return
if isinstance(f, FlowRoot):
doc = f.description if flow else ""
response.write("Flow [" + f.name + "] " + doc + " <br>")
if flow and flow.current_step == f:
response.write(f"↪ Start (_{flow.requestor}_)<br>")
else:
cmd = "END" if f is FLOW_END else self._bot.all_commands[f.command]
requestor = (
f"(_{str(flow.requestor)}_)" if flow and flow.current_step == f else ""
)
doc = cmd.__doc__ if flow and f is not FLOW_END else ""
response.write(
f'{"  " * len(stack)}'
f'↪ **{f if f is not FLOW_END else "END"}** {doc if doc else ""} {requestor}<br>'
)
for _, sf in f.children:
self.recurse_node(response, stack + [f], sf, flow)
[docs]
@botcmd(syntax="<name>")
def flows_show(self, _, args):
"""Shows the structure of a flow."""
if not args:
return "You need to specify a flow name."
with io.StringIO() as response:
flow_node = self._bot.flow_executor.flow_roots.get(args, None)
if flow_node is None:
return f"Flow {args} doesn't exist."
self.recurse_node(response, [], flow_node)
return response.getvalue()
# noinspection PyUnusedLocal
[docs]
@botcmd
def flows_list(self, msg, args):
"""Displays the list of setup flows."""
with io.StringIO() as response:
for name, flow_node in self._bot.flow_executor.flow_roots.items():
if flow_node.description:
response.write("- **" + name + "** " + flow_node.description + "\n")
else:
response.write("- **" + name + "** " + "No description" + "\n")
return response.getvalue()
[docs]
@botcmd(split_args_with=" ", syntax="<name> [initial_payload]")
def flows_start(self, msg, args):
"""Manually start a flow within the context of the calling user.
You can prefeed the flow data with a json payload.
Example:
!flows start poll_setup {"title":"yeah!","options":["foo","bar","baz"]}
"""
if not args:
return "You need to specify a flow to manually start"
context = {}
flow_name = args[0]
if len(args) > 1:
json_payload = " ".join(args[1:])
try:
context = json.loads(json_payload)
except Exception as e:
return f"Cannot parse json {json_payload}: {e}."
self._bot.flow_executor.start_flow(flow_name, msg.frm, context)
return f"Flow **{flow_name}** started ..."
[docs]
@botcmd()
def flows_status(self, msg, args):
"""Displays the list of started flows."""
with io.StringIO() as response:
if not self._bot.flow_executor.in_flight:
response.write("No Flow started.\n")
else:
if not [
flow
for flow in self._bot.flow_executor.in_flight
if self.check_user(msg, flow)
]:
response.write(
f"No Flow started for current user: {get_acl_usr(msg)}.\n"
)
else:
if args:
for flow in self._bot.flow_executor.in_flight:
if self.check_user(msg, flow):
if flow.name == args:
self.recurse_node(response, [], flow.root, flow)
else:
for flow in self._bot.flow_executor.in_flight:
if self.check_user(msg, flow):
next_steps = [
f"\\*{str(step[1].command)}\\*"
for step in flow._current_step.children
if step[1].command
]
next_steps_str = "\n".join(next_steps)
text = (
f"\\>>> {str(flow.requestor)} is using flow \\*{flow.name}\\* on step "
f"\\*{flow.current_step}\\*\nNext Step(s): \n{next_steps_str}"
)
response.write(text)
return response.getvalue()
[docs]
@botcmd(syntax="[flow_name]")
def flows_stop(self, msg, args):
"""Stop flows you are in.
optionally, stop a specific flow you are in.
"""
if args:
flow = self._bot.flow_executor.stop_flow(args, msg.frm)
if flow:
yield flow.name + " stopped."
return
yield "Flow not found."
return
one_stopped = False
for flow in self._bot.flow_executor.in_flight:
if flow.requestor == msg.frm:
flow = self._bot.flow_executor.stop_flow(flow.name, msg.frm)
if flow:
one_stopped = True
yield flow.name + " stopped."
if not one_stopped:
yield "No Flow found."
[docs]
@arg_botcmd("flow_name", type=str)
@arg_botcmd("user", type=str)
def flows_kill(self, _, user, flow_name):
"""Admin command to kill a specific flow."""
flow = self._bot.flow_executor.stop_flow(flow_name, self.build_identifier(user))
if flow:
return flow.name + " killed."
return "Flow not found."
[docs]
def check_user(self, msg, flow):
"""Checks to make sure that either the user started the flow, or is a bot admin"""
if glob(get_acl_usr(msg), self.bot_config.BOT_ADMINS):
return True
elif glob(get_acl_usr(msg), flow.requestor.person):
return True
return False