import os
import sys
import subprocess
import shutil
import copy
import shlex
import yaml
from lwe.core.config import Config
from lwe.core.logger import Logger
import lwe.core.util as util
[docs]
class WorkflowManager:
"""
Manage workflows.
"""
def __init__(self, config=None):
self.config = config or Config()
self.log = Logger(self.__class__.__name__, self.config)
self.user_workflow_dirs = (
self.config.args.workflows_dir
or util.get_environment_variable_list("workflow_dir")
or self.config.get("directories.workflows")
)
self.make_user_workflow_dirs()
self.system_workflow_dirs = [
os.path.join(util.get_package_root(self), "workflows"),
]
self.all_workflow_dirs = self.system_workflow_dirs + self.user_workflow_dirs
self.load_workflows()
[docs]
def get_workflow_dir(self):
package_root = util.get_package_root(self)
workflow_dir = os.path.join(package_root, "backends", "api", "workflow")
return workflow_dir
[docs]
def ensure_workflow(self, workflow_name):
if not workflow_name:
return False, None, "No workflow name specified"
self.log.debug(f"Ensuring workflow {workflow_name} exists")
if workflow_name not in self.workflows:
self.load_workflows()
if workflow_name not in self.workflows:
return False, workflow_name, f"Workflow {workflow_name!r} not found"
message = f"Workflow {workflow_name} exists"
self.log.debug(message)
return True, self.workflows[workflow_name], message
[docs]
def ensure_runnable_workflow(self, workflow_name):
success, workflow, user_message = self.load_workflow(workflow_name)
if not success:
return success, workflow, user_message
if len(workflow) > 0:
if "tasks" in workflow[0]:
return True, workflow, f"Workflow {workflow_name!r} has a valid play with tasks"
return (
False,
workflow,
f"Workflow {workflow_name!r} has no tasks, are you trying to run an 'include' file?",
)
return False, workflow, f"Workflow {workflow_name!r} has invalid format"
[docs]
def make_user_workflow_dirs(self):
for workflow_dir in self.user_workflow_dirs:
if not os.path.exists(workflow_dir):
os.makedirs(workflow_dir)
[docs]
def get_workflow_environment_config(self):
workflow_dir = self.get_workflow_dir()
return {
"ANSIBLE_PYTHON_INTERPRETER": {"op": "add-if-empty", "default": sys.executable},
"ANSIBLE_CONFIG": {
"op": "add-if-empty",
"default": os.path.join(workflow_dir, "ansible.cfg"),
},
# 'ANSIBLE_STDOUT_CALLBACK': {
# 'op': 'add-if-empty',
# 'default': 'community.general.yaml',
# },
# 'ANSIBLE_LIBRARY': {
# 'op': 'append',
# 'default': os.path.join(workflow_dir, 'library'),
# },
# 'ANSIBLE_ROLES_PATH': {
# 'op': 'add-if-empty',
# 'default': os.path.join(workflow_dir, 'roles'),
# },
# 'ANSIBLE_ROLES_PATH': {
# 'op': 'add-if-empty',
# 'default': os.path.join(workflow_dir, 'roles'),
# },
# 'ANSIBLE_PLAYBOOK_DIR': {
# 'op': 'add-if-empty',
# 'default': os.path.join(workflow_dir, 'playbooks'),
# },
}
[docs]
def set_workflow_environment(self):
for var, data in self.get_workflow_environment_config().items():
if data["op"] == "add-if-empty":
if not os.getenv(var):
self.log.debug(
f"Setting workflow environment variable {var}: {data['default']}"
)
os.environ[var] = data["default"]
[docs]
def parse_workflow_args(self, args_string):
args_list = shlex.split(args_string)
final_args = []
for arg in args_list:
key, value = arg.split("=", maxsplit=1)
final_args.append("%s='%s'" % (key, value.replace("'", "\\'")))
if final_args:
return " ".join(final_args)
return ""
[docs]
def run(self, workflow_name, workflow_args):
success, _, user_message = self.ensure_runnable_workflow(workflow_name)
if not success:
return success, workflow_name, user_message
self.set_workflow_environment()
success, workflow_file, message = self.ensure_workflow(workflow_name)
if not success:
return success, workflow_file, message
self.log.info(
f"Running workflow {workflow_name} from {workflow_file} with args: {workflow_args}"
)
env = copy.copy(dict(os.environ))
kwargs = {
"env": env,
"stdin": sys.stdin,
"stdout": sys.stdout,
"stderr": sys.stderr,
"universal_newlines": True,
}
command = [
"ansible-playbook",
workflow_file,
]
args = self.parse_workflow_args(workflow_args)
if args:
command = command + ["--extra-vars", args]
return_code = 1
error = "Unknown error"
try:
proc = subprocess.Popen(command, **kwargs)
return_code = proc.wait()
except Exception as e:
error = e.message if hasattr(e, "message") else str(e)
if return_code == 0:
return True, None, f"Workflow {workflow_name} completed"
message = f"Error running workflow {workflow_name}: {error}"
self.log.error(message)
return False, None, message
[docs]
def load_workflow(self, workflow_name):
success, workflow_file, message = self.ensure_workflow(workflow_name)
if not success:
return success, workflow_file, message
self.log.info(f"Loading workflow {workflow_name} from {workflow_file}")
try:
with open(workflow_file, "r") as f:
workflow = yaml.safe_load(f)
return True, workflow, f"Workflow {workflow_name} successfully loaded"
except Exception as e:
message = f"An error occurred while loading workflow {workflow_name}: {e}"
self.log.error(message)
return False, None, message
[docs]
def load_workflows(self):
self.log.debug("Loading workflows from dirs: %s" % ", ".join(self.all_workflow_dirs))
self.workflows = {}
try:
for workflow_dir in self.all_workflow_dirs:
if os.path.exists(workflow_dir) and os.path.isdir(workflow_dir):
self.log.info(f"Processing directory: {workflow_dir}")
for file_name in os.listdir(workflow_dir):
if file_name.endswith(".yaml") or file_name.endswith(".yml"):
workflow_name = os.path.splitext(file_name)[0]
workflow_file = os.path.join(workflow_dir, file_name)
self.workflows[workflow_name] = workflow_file
else:
message = f"Failed to load workflows: Directory {workflow_dir!r} not found or not a directory"
self.log.error(message)
return False, None, message
return True, self.workflows, "Workflows successfully loaded"
except Exception as e:
message = f"An error occurred while loading workflows: {e}"
self.log.error(message)
return False, None, message
[docs]
def copy_workflow(self, old_name, new_name):
"""
Copies a workflow file to a new location.
:param old_name: The name of the existing workflow file.
:type old_name: str
:param new_name: The name for the new workflow file.
:type new_name: str
:return: A tuple containing a boolean indicating success or failure, the new file path, and a status message.
:rtype: tuple
"""
success, workflow_file, user_message = self.ensure_workflow(old_name)
if not success:
return success, workflow_file, user_message
old_filepath = workflow_file
base_filepath = (
self.user_workflow_dirs[-1]
if self.is_system_workflow(old_filepath)
else os.path.dirname(old_filepath)
)
if not new_name.endswith(".yaml") and not new_name.endswith(".yml"):
new_name += ".yaml"
new_filepath = os.path.join(base_filepath, new_name)
if os.path.exists(new_filepath):
return False, new_filepath, f"{new_filepath} already exists"
shutil.copy2(old_filepath, new_filepath)
self.load_workflows()
return True, new_filepath, f"Copied workflow {old_filepath} to {new_filepath}"
[docs]
def delete_workflow(self, workflow_name, workflow_dir=None):
success, workflow_file, user_message = self.ensure_workflow(workflow_name)
if not success:
return success, workflow_file, user_message
try:
os.remove(workflow_file)
message = f"Successfully deleted workflow {workflow_name!r} from {workflow_file!r}"
self.log.info(message)
return True, workflow_name, message
except Exception as e:
message = f"An error occurred while deleting workflow {workflow_name!r}: {e}"
self.log.error(message)
return False, None, message
[docs]
def is_system_workflow(self, filepath):
for dir in self.system_workflow_dirs:
if filepath.startswith(dir):
return True
return False