Source code for lwe.backends.api.workflow.library.lwe_sqlite_query
#!/usr/bin/python
import sqlite3
from ansible.module_utils.basic import AnsibleModule
from lwe.core.config import Config
from lwe.core.logger import Logger
config = Config()
config.set("debug.log.enabled", True)
log = Logger("lwe_sqlite_query", config)
DOCUMENTATION = r"""
---
module: lwe_sqlite_query
short_description: Run a query or multiple queries in a transaction against a SQLite database
description:
- This module runs a query or multiple queries in a transaction against a specified SQLite database and stores any returned data in a structured format.
options:
db:
description:
- The path to the SQLite database file.
type: str
required: true
query:
description:
- The SQL query to execute. Can be a string for a single query or a list of strings for multiple queries in a transaction.
type: raw
required: true
query_params:
description:
- Optional query params to pass to a parameterized query. Should be a list for a single query or a list of lists for multiple queries.
type: raw
required: false
author:
- Chad Phillips (@thehunmonkgroup)
"""
EXAMPLES = r"""
- name: Run a single SELECT query against a SQLite database
lwe_sqlite_query:
db: "/path/to/your/database.db"
query: "SELECT * FROM your_table WHERE id = ?"
query_params:
- 1
- name: Run multiple queries in a transaction
lwe_sqlite_query:
db: "/path/to/your/database.db"
query:
- "INSERT INTO table1 (column1, column2) VALUES (?, ?)"
- "UPDATE table2 SET column1 = ? WHERE id = ?"
query_params:
- ["value1", "value2"]
- ["new_value", 1]
"""
RETURN = r"""
data:
description: The data returned from the query or queries.
type: list
returned: success
row_count:
description: The total number of rows affected or returned from all queries.
type: int
returned: success
"""
[docs]
def run_single_query(cursor, query, params=()):
cursor.execute(query, params)
if not query.lower().strip().startswith(("select")):
return [], cursor.rowcount
data = [dict(row) for row in cursor.fetchall()]
return data, len(data)
[docs]
def run_query(db, query, params=()):
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
total_data = []
total_row_count = 0
try:
if isinstance(query, str):
data, row_count = run_single_query(cursor, query, params)
total_data.extend(data)
total_row_count += row_count
conn.commit()
else:
conn.execute('BEGIN TRANSACTION')
for q, p in zip(query, params):
data, row_count = run_single_query(cursor, q, p)
total_data.extend(data)
total_row_count += row_count
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
return total_data, total_row_count
[docs]
def main():
result = dict(changed=False, response=dict())
module = AnsibleModule(
argument_spec=dict(
db=dict(type="str", required=True),
query=dict(type="raw", required=True),
query_params=dict(type="raw", required=False, default=[]),
),
supports_check_mode=True,
)
db = module.params["db"]
query = module.params["query"]
query_params = module.params["query_params"]
if module.check_mode:
module.exit_json(**result)
# Validate input
if isinstance(query, list):
if not isinstance(query_params, list):
module.fail_json(msg="query_params must be a list when query is a list")
if len(query) != len(query_params):
module.fail_json(msg="query and query_params must have the same length")
if not all(isinstance(p, list) for p in query_params):
module.fail_json(msg="Each item in query_params must be a list when query is a list")
else:
if not isinstance(query_params, list):
module.fail_json(msg="query_params must be a list")
try:
log.debug(f"Running query on database: {db}: query: {query}, params: {query_params}")
data, row_count = run_query(db, query, query_params)
result["changed"] = True
result["data"] = data
result["row_count"] = row_count
module.exit_json(**result)
except Exception as e:
result["failed"] = True
message = f"Failed to run query: {query}, error: {e}"
log.error(message)
module.fail_json(msg=message, **result)
if __name__ == "__main__":
main()