"""
Common reusable functions
cMeta author and developer: (C) 2025-2026 Grigori Fursin
See the cMeta COPYRIGHT and LICENSE files in the project root for details.
"""
import os
from .common import _error
from .cli import print_params_help
from . import files
###################################################################################################
[docs]
def load_module(
module_path: str, # Absolute path to the Python module file.
module_cache: dict, # Dictionary to store cached module information.
fail_on_error: bool = False, # If True, raises exception on error instead of returning error dict.
init_class: str = None, # Class name to instantiate from the loaded module, if provided.
cmeta = None, # CMeta instance to pass to Category initialization.
suffix: str = None, # Optional suffix for module name sanitization.
self_meta: dict = None, # Optional metadata dictionary to attach to the initialized object.
):
"""
Dynamically load a Python module from file path with caching support.
Loads category API modules and manages them in a cache. Handles module naming
sanitization and creates proper package structures for category modules.
Args:
module_path (str): Absolute path to the Python module file.
module_cache (dict): Dictionary to store cached module information.
fail_on_error (bool): If True, raises exception on error instead of returning error dict.
category (bool): If True, initializes the Category class from the module.
cmeta: CMeta instance to pass to Category initialization.
suffix (str | None): Optional suffix for module name sanitization.
init_class (str): Class name to instantiate from the loaded module, if provided.
self_meta (dict): Optional metadata dictionary to attach to the initialized object.
Returns:
dict: Dictionary with 'return': 0 and 'cache' containing module info,
or 'return' > 0 and 'error' on failure.
Raises:
Exception: Propagated runtime errors, if any.
"""
import os, sys, importlib.util, importlib.machinery, re, hashlib
def sanitize(
name, # Object or artifact name.
suffix = None, # Value for suffix.
):
"""
Sanitize dynamic module/package names for safe Python imports.
Args:
name: Object or artifact name.
suffix: Value for suffix.
Returns:
dict: Operation result.
Raises:
Exception: Propagated runtime errors, if any.
"""
cleaned = re.sub(r'[^0-9a-zA-Z_]', '_', name)
if re.match(r'^\d', cleaned):
cleaned = "_" + cleaned
if cleaned != name or cleaned.strip("_") == "":
if suffix is None or suffix == '':
suffix = hashlib.sha1(name.encode("utf-8")).hexdigest()[:8]
if suffix is not None and suffix != '':
cleaned = f"{cleaned}_{suffix}"
return cleaned
if not os.path.isfile(module_path):
return _error(f'Module file not found: {module_path}', 16, None, fail_on_error)
module_path = os.path.abspath(module_path)
module_dir = os.path.dirname(module_path)
module_name = os.path.splitext(os.path.basename(module_path))[0]
category_dir = os.path.dirname(module_dir)
raw_cat = os.path.basename(category_dir)
raw_pkg = os.path.basename(module_dir)
cat_name = sanitize(raw_cat, suffix)
pkg_name = sanitize(raw_pkg)
full_package_name = f"{cat_name}.{pkg_name}"
full_module_name = f"{full_package_name}.{module_name}"
timestamp = os.path.getmtime(module_path)
if module_path in module_cache:
cached = module_cache[module_path]
if cached.get("timestamp") == timestamp:
return {"return": 0, "cache": cached}
try:
# Ensure category package exists
if cat_name not in sys.modules:
spec = importlib.machinery.ModuleSpec(cat_name, loader=None, is_package=True)
pkg = importlib.util.module_from_spec(spec)
pkg.__path__ = [category_dir]
sys.modules[cat_name] = pkg
# Ensure api subpackage exists
if full_package_name not in sys.modules:
spec = importlib.machinery.ModuleSpec(full_package_name, loader=None, is_package=True)
pkg = importlib.util.module_from_spec(spec)
pkg.__path__ = [module_dir]
sys.modules[full_package_name] = pkg
# Load the plugin module
spec = importlib.util.spec_from_file_location(full_module_name, module_path)
module = importlib.util.module_from_spec(spec)
module.__package__ = full_package_name
module.__file__ = module_path
sys.modules[full_module_name] = module
spec.loader.exec_module(module)
cache_data = {
"python_module": module,
"timestamp": timestamp,
"full_module_name": full_module_name,
}
if init_class:
cls = getattr(module, init_class)
obj = cls(cm=cmeta)
if self_meta is not None and type(self_meta) == dict:
obj.cmeta = self_meta
cache_data["initialized_class"] = obj
module_cache[module_path] = cache_data
return {"return": 0, "cache": cache_data}
except Exception as e:
return _error(f'Failed to import module {full_module_name} at "{module_path}"', 1, e, fail_on_error)
###################################################################################################
[docs]
def find_command_func(
category_api, # Category API object instance.
command: str, # Command name string to search for.
):
"""
Find command function in category API object.
Searches for the command function using standard naming conventions:
command_, command__, or command (in that order).
Args:
category_api: Category API object instance.
command (str): Command name string to search for.
Returns:
dict: Dictionary with 'return': 0, 'func' (function object or None),
and 'func_name' (actual function name if found).
Raises:
Exception: Propagated runtime errors, if any.
"""
func = None
for find_command in [command + '_', command + '__', command]:
if hasattr(category_api, find_command):
available_func = getattr(category_api, find_command)
if callable(available_func):
func = available_func
break
result = {'return':0, 'func': func}
if func is not None:
result['func_name'] = find_command
return result
###################################################################################################
[docs]
def get_func_properties(
f, # Function object to inspect.
):
"""
Extract source code properties from a function object.
Gets the source file path, line numbers, and API documentation text
for a given function.
Args:
f: Function object to inspect.
Returns:
dict: Dictionary with 'return': 0 and properties including 'filename',
'start_line', 'end_line', and 'api_text'.
Raises:
Exception: Propagated runtime errors, if any.
"""
import inspect
filename = inspect.getsourcefile(f) or inspect.getfile(f)
lines, start_line = inspect.getsourcelines(f)
end_line = start_line + len(lines) - 1
r = get_api_text(lines, start_line)
if r['return']>0: return r
api_info = r['api_info']
short_func_desc = ''
j = api_info.find('"""')
if j>0:
x = api_info[j+3:].strip()
if x.endswith('"""'):
x = x[:-3]
if len(x)>0:
j = x.find('\n')
short_func_desc = x[:j] if j>0 else x
if short_func_desc.endswith('"""'):
short_func_desc = short_func_desc[:-3]
return {'return':0, 'func': f,
'filename':filename,
'lines': lines,
'start_line':start_line,
'end_line':end_line,
'api_info':api_info,
'short_func_desc': short_func_desc}
###################################################################################################
[docs]
def find_func_definition(
obj, # Object instance to search for the function.
name: str, # Name of the function to find.
):
"""
Find function definition in an object by name.
Locates a function by name in an object's class, unwraps decorators,
and extracts its source code properties.
Args:
obj: Object instance to search for the function.
name (str): Name of the function to find.
Returns:
dict: Dictionary with 'return': 0 and function properties on success,
or 'return': 1 and 'error' if function not found.
Raises:
Exception: Propagated runtime errors, if any.
"""
import inspect
func = getattr(obj.__class__, name, None)
if func is None:
return {'return':1, 'error':f'function "{name}" not found in {obj.__class__.__name__}'}
# Unwrap in case it's decorated
func = inspect.unwrap(func)
return get_func_properties(func)
###################################################################################################
[docs]
def get_api_info(
category_api, # The category API object.
command: str, # The command name.
full_command: str, # Full command string.
control_params_desc = None, # Control parameters description.
category_apis: list = [], # List of category APIs.
):
"""
Extract function definition and docstring for API information.
Args:
category_api: The category API object.
command (str): The command name.
full_command (str): Full command string.
control_params_desc: Control parameters description.
category_apis (list): List of category APIs.
Returns:
dict: Dictionary with 'return': 0 and 'api_info' string for success,
or 'return' > 0 and 'error' for errors.
Raises:
Exception: Propagated runtime errors, if any.
"""
r = find_func_definition(category_api, command)
if r['return'] > 0:
return r
lines = r['lines']
start_line = r['start_line']
end_line = r['end_line']
filename = r['filename']
x = f'for "{full_command}" ' if full_command != '' else ''
api_info = f'Python API {x}({filename}:{start_line}-{end_line}):\n'
r = get_api_text(lines, start_line)
if r['return']>0: return r
api_info += r['api_info']
# Extract redirects
for line in api_info.split('\n'):
linex = line.strip()
func = ''
xcategory_api = category_api
if linex.startswith('@base.'):
func = linex[6:]
xcategory_api = category_apis[-1]['code']
elif linex.startswith('@self.'):
func = linex[6:]
if func == command:
func = ''
if func!='':
j = func.find('(')
if j>0:
func = func[:j]
r = get_api_info(xcategory_api, func, '', category_apis = category_apis)
if r['return']>0: return r
api_info += '\n' + r['api_info']
if control_params_desc is not None:
r = print_params_help(control_params_desc)
if r['return']>0: return r
api_info += '\nCommon CLI flags (remove -- from keys for the Python API):\n'
api_info += '\n' + r['params_info']
return {'return':0, 'api_info': api_info}
###################################################################################################
[docs]
def get_api_text(
lines: list, # List of source code lines.
start_line: int, # Starting line number.
):
"""
Extract API text from function source lines.
Parses function definition and docstring from source code lines.
Args:
lines (list): List of source code lines.
start_line (int): Starting line number.
Returns:
dict: Dictionary with 'return': 0 and 'api_info' containing formatted text.
Raises:
Exception: Propagated runtime errors, if any.
"""
api_info = ''
# Add function definition lines
func_def_lines = []
docstring_lines = []
found_def = False
found_docstring = False
for line in lines:
stripped = line.strip()
if not found_def and (stripped.startswith('def ') or stripped.startswith('async def ')):
found_def = True
func_def_lines.append(line)
continue
if found_def and not found_docstring:
# Check for docstring immediately after function definition
if stripped.startswith('"""'):
found_docstring = True
docstring_lines.append(line)
if len(stripped)>3 and stripped.endswith('"""'):
break
continue
func_def_lines.append(line)
if found_docstring:
docstring_lines.append(line)
if stripped.endswith('"""') and len(stripped) > 3:
break
elif stripped == '"""':
break
# elif found_def and not found_docstring:
# # If not a docstring, break after function definition line
# break
if func_def_lines:
api_info += "\n"
for l in func_def_lines:
api_info += l
if docstring_lines:
api_info += "\n"
for l in docstring_lines:
api_info += l
return {'return': 0, 'api_info': api_info}
###################################################################################################
############################################################
[docs]
def run(
cmd: str, # Command to execute.
cmds: list = None, # Command to execute.
work_dir: str = None, # Working directory.
env: dict = None, # 2nd (current) env to update global ENV.
envs: dict = None, # 1st level of env to update global ENV.
genv: dict = None, # Global ENV (force in the end).
os_env: dict = os.environ, # Environment variables to inject into the subprocess.
capture_output: bool = False, # False by default.
text_cmd: str = 'RUN', # Text prefix for command display.
timeout: int = None, # None by default. TBD: Current timeout doesn't terminate subprocesses.
verbose: bool = False, # If True, print extra info.
hide_in_cmd: list = None, # List of keys in CMD to hide (for secrets).
hide_in_env: list = None, # List of keys in ENV to hide (for secrets).
save_script: str = '', # Save script for reproducibility.
run_script: bool = False, # Run created script (useful for pipes).
script_prefix: str = '', # Add prefix string to script.
skip_run: bool = False, # If True, skip run.
print_cmd: bool = False, # If True, force print CMD.
con: bool = False, # If True, enable console output.
fail_on_error: bool = False, # If True, raise exception on error.
logger = None, # Optional logger for debug messages.
space = '', # Optional indentation prefix for console output formatting.
capture_env: bool = False, # If True, capture and return environment changes produced by the command.
print_env_keys: list = None, # Environment variable keys to print after execution.
print_extra_line: bool = False, # If True, print an extra blank line in console output.
print_cur_dir: bool = False, # If True, print current directory before running command.
open_shell: bool = False, # Open shell instead of running a command.
open_shell_after: bool = False, # Open shell after running a command.
pack_existing_env_values: bool = False, # If part of new value is the same as value in existing env,
# substitute it with new:${key}
print_env_with_os_sep_on_new_lines: bool = True, # If True, separate env print with os sep on new lines
skip_print_env: bool = False, # If True, skip printing ENV even in verbose
):
"""
Run CMD with environment.
Args:
cmd (str): Command to execute.
work_dir (str | None): Working directory.
env (dict | None): 2nd (current) env to update global ENV.
envs (dict | None): 1st level of env to update global ENV.
genv (dict | None): Global ENV (force in the end).
capture_output (bool): False by default.
text_cmd (str): Text prefix for command display.
timeout (int | None): None by default. TBD: Current timeout doesn't terminate subprocesses.
verbose (bool): If True, print extra info.
hide_in_cmd (list | None): List of keys in CMD to hide (for secrets).
hide_in_env (list | None): List of keys in ENV to hide (for secrets).
save_script (str): Save script for reproducibility.
run_script (bool): Run created script (useful for pipes).
script_prefix (str): Add prefix string to script.
skip_run (bool): If True, skip run.
print_cmd (bool): If True, force print CMD.
con (bool): If True, enable console output.
fail_on_error (bool): If True, raise exception on error.
logger: Optional logger for debug messages.
os_env (dict): Environment variables to inject into the subprocess.
space: Optional indentation prefix for console output formatting.
capture_env (bool): If True, capture and return environment changes produced by the command.
print_env_keys (list): Environment variable keys to print after execution.
print_extra_line (bool): If True, print an extra blank line in console output.
print_cur_dir (bool): If True, print current directory before running command
Returns:
dict: Unified output with 'return', 'returncode', 'stdout', 'stderr'.
Raises:
Exception: Propagated runtime errors, if any.
"""
import subprocess
import os
import platform
if not con: verbose = False
cur_dir = os.getcwd()
if work_dir is not None:
if not os.path.isdir(work_dir):
return {'return':1, 'error':f'Directory doesn\'t exist: {work_dir}'}
os.chdir(work_dir)
cur_dir2 = os.getcwd()
# Initialize mutable defaults
if env is None:
env = {}
if envs is None:
envs = {}
if genv is None:
genv = {}
if hide_in_cmd is None:
hide_in_cmd = []
if hide_in_env is None:
hide_in_env = []
# Just in case, check if input comes from CMD
if timeout is not None:
timeout = int(timeout)
bat_ext = '.bat' if os.name == 'nt' else '.sh'
if run_script:
if save_script == '':
save_script = 'cmeta-run' + bat_ext
if save_script and save_script.endswith('{{file_ext_bat}}'):
save_script = save_script.replace('{{file_ext_bat}}', bat_ext)
cur_env = {} if not os_env else os_env.copy()
for e in [envs, env, genv]:
for k in e:
v = e[k]
if type(v) == list:
v = os.pathsep.join(v)
elif v is not None:
v = str(v)
if k.startswith('+'):
if v != '' and v is not None:
k = k[1:].strip()
v1 = cur_env.get(k, '')
if v1 != '':
if not v.endswith(os.pathsep):
v += os.pathsep
v += v1
else:
v = None
if v is not None:
cur_env[k] = v
temp_file_to_collect_env = None
if capture_env and cmds:
return {'return':1, 'error': 'capture env doesn\'t work with CMDs'}
if capture_env and cmd:
r = files.gen_temp_filepath()
if r['return']>0: return r
temp_file_to_collect_env_before = r['filepath']
r = files.gen_temp_filepath()
if r['return']>0: return r
temp_file_to_collect_env_after = r['filepath']
x = 'set' if os.name == 'nt' else 'env'
xcmd = f'{x} > {temp_file_to_collect_env_before} && '
if cmd != '':
xcmd += cmd + ' && '
xcmd += f'{x} > {temp_file_to_collect_env_after}'
cmd = xcmd
if not cmds and cmd:
cmds = [cmd]
env1 = '%' if platform.system() == "Windows" else '${'
env2 = '%' if platform.system() == "Windows" else '}'
print_env = {}
if print_env_keys is None:
print_env_keys = cur_env.keys()
for k in print_env_keys:
if k in cur_env:
v = str(cur_env[k])
if k not in os_env or str(os_env[k]) != str(v):
if pack_existing_env_values and k in os_env:
vv = str(os_env[k])
j = v.find(vv)
if j>=0:
v = v[:j] + env1 + k + env2
print_env[k] = v
script = ''
if save_script != '':
script = '@echo off\n' if os.name == 'nt' else '#!/bin/bash\n'
cur_dir2 = files.quote_path(cur_dir2)
x = '/d ' if os.name == 'nt' else ''
script += f'\ncd {x}{cur_dir2}\n'
if script_prefix != '':
script += '\n' + script_prefix
if len(print_env) > 0:
if verbose and not skip_print_env:
print('')
if save_script != '':
script += '\n'
for k in sorted(print_env):
v = print_env[k]
if verbose and not skip_print_env:
vx = v if k not in hide_in_env else '***'
if print_env_with_os_sep_on_new_lines and os.pathsep in vx:
print(f'{space}ENV {k}:')
for x in vx.split(os.pathsep):
if x:
print (f'{space} - {x}')
else:
print(f'{space}ENV {k}={vx}')
if save_script != '':
if os.name == 'nt':
x = 'set'
vv = v
else:
x = 'export'
vv = v if ' ' not in v else '"' + v + '"'
script += f'{x} {k}={vv}\n'
if save_script != '':
script += '\n'
returncode = 0
stdout = ''
stderr = ''
# Hide secrets from CMD
_cmds = []
for xcmd in cmds:
for h in hide_in_cmd:
j = xcmd.find(h)
if j >= 0:
j1 = xcmd.find(' ', j + len(h))
if j1 < 0:
j1 = len(xcmd)
if j1 >= 0:
xcmd = xcmd[:j+len(h)] + '***' + xcmd[j1:]
_cmds.append(xcmd)
cmds = _cmds
if verbose and print_cur_dir:
print('')
print (f'{space}PWD: {cur_dir}')
if verbose or print_cmd:
print('')
if skip_run:
for _cmd in cmds:
print (f'{space}SKIP {_cmd}')
if save_script is not None and save_script != '':
for _cmd in cmds:
script += _cmd + '\n'
# Note: This assumes utils.save_txt exists in your codebase
# You may need to import or implement this function
r=files.write_file(save_script, script, fail_on_error=fail_on_error, logger=logger, file_format="text")
if r['return'] > 0:
return r
if run_script:
if os.name == 'nt':
cmd = f'call {save_script}'
else:
x = '' if save_script.startswith('.') or save_script.startswith('/') else '. ./'
cmd = f'bash -c "{x}{save_script}"'
if verbose:
x = 'SKIP ' if skip_run else ''
print('')
print(f'{space}{x}{text_cmd} {cmd}')
is_windows = os.name == 'nt'
if open_shell:
if con:
print ('')
print (f'{space}INFO: Opening shell for testing and debugging. Exiting shell will resume cMeta workflow execution:')
print ('')
if is_windows:
shell_cmd = ["cmd.exe"]
else:
shell_cmd = [os.environ.get("SHELL", "/bin/sh")]
subprocess.run(shell_cmd, env = cur_env)
if con:
print ('')
print (f'{space}INFO: Returned to cMeta workflow. Continue executing ...')
print ('')
_result = {'return':0}
returncode = 0
if not skip_run:
for _cmd in cmds:
if con and (verbose or print_cmd):
print (f'{space}{text_cmd} {_cmd}')
if con and print_extra_line:
print ('')
try:
use_popen = (timeout is not None and not is_windows)
if use_popen:
# ----- UNIX: custom Popen + process group handling -----
import signal
process = subprocess.Popen(
_cmd,
stdout=subprocess.PIPE if capture_output else None,
stderr=subprocess.PIPE if capture_output else None,
text=True,
shell=True,
env=cur_env,
preexec_fn=os.setsid
)
try:
stdout, stderr = process.communicate(timeout=timeout)
returncode = process.returncode
except subprocess.TimeoutExpired:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
stdout, stderr = process.communicate()
returncode = -1
else:
# ----- Windows with timeout: delegate to your wrapper -----
if timeout is not None and is_windows:
returncode, stdout, stderr = run_command_with_timeout_tree_kill_on_windows(
cmd=_cmd,
capture_output=capture_output,
cur_env=cur_env,
timeout=timeout,
shell=True,
text=True,
)
# ----- Normal subprocess.run (any OS) -----
else:
result = subprocess.run(
_cmd,
capture_output=capture_output,
text=True,
shell=True,
env=cur_env,
timeout=timeout,
)
returncode = result.returncode
stdout = result.stdout if capture_output else ''
stderr = result.stderr if capture_output else ''
except Exception as e:
stdout = ''
stderr = format(e)
returncode = -1
if returncode != 0 and stderr != '' and verbose:
print ('')
print (f'{space}WARNING: Command failed: {stderr}')
_result['returncode'] = returncode
_result['stdout'] = stdout
_result['stderr'] = stderr
_result['cur_env'] = cur_env
_result['cmd'] = _cmd
if returncode != 0:
break
if open_shell_after:
if con:
print ('')
print (f'{space}INFO: Opening shell for testing and debugging. Exiting shell will resume cMeta workflow execution:')
print ('')
if is_windows:
shell_cmd = ["cmd.exe"]
else:
shell_cmd = [os.environ.get("SHELL", "/bin/sh")]
subprocess.run(shell_cmd, env = cur_env)
if con:
print ('')
print (f'{space}INFO: Returned to cMeta workflow. Continue executing ...')
print ('')
if work_dir is not None:
os.chdir(cur_dir)
# Check if collect env
if capture_env and cmd:
r = files.read_file(temp_file_to_collect_env_before, fail_on_error = fail_on_error, logger = logger)
if r['return']>0:
return _error(f'Capture env output file not found (before): {temp_file_to_collect_env_before}', 1, None, fail_on_error)
try:
os.remove(temp_file_to_collect_env_before)
except Exception as e:
pass
r = files.parse_env_dump(r['data'])
if r['return']>0: return r
collected_env_before = r['env']
r = files.read_file(temp_file_to_collect_env_after, fail_on_error = fail_on_error, logger = logger)
if r['return']>0:
return _error(f'Capture env output file not found (after): {temp_file_to_collect_env_after}', 1, None, fail_on_error)
try:
os.remove(temp_file_to_collect_env_after)
except Exception as e:
pass
r = files.parse_env_dump(r['data'])
if r['return']>0: return r
collected_env_after = r['env']
_result['collected_env_before'] = collected_env_before
_result['collected_env_after'] = collected_env_after
r = files.diff_env(collected_env_before, collected_env_after)
if r['return']>0: return r
_result['env_added'] = r['env_added']
_result['env_removed'] = r['env_removed']
return _result
###################################################################################################
[docs]
def run_command_with_timeout_tree_kill_on_windows(
cmd: str, # Command string to execute.
capture_output: bool, # If True, capture stdout and stderr.
cur_env: dict, # Environment variables dictionary.
timeout: float, # Timeout in seconds.
shell: bool = True, # If True, run command through shell.
text: bool = True, # If True, decode output as text.
):
"""
Run command on Windows with timeout and process tree termination.
Uses Windows Job Objects to ensure entire process tree is killed on timeout.
Args:
cmd (str): Command string to execute.
capture_output (bool): If True, capture stdout and stderr.
cur_env (dict): Environment variables dictionary.
timeout (float): Timeout in seconds.
shell (bool): If True, run command through shell.
text (bool): If True, decode output as text.
Returns:
tuple: (returncode, stdout, stderr).
Raises:
OSError: If Windows API calls fail.
"""
import subprocess
import ctypes
from ctypes import wintypes
kernel32 = ctypes.windll.kernel32
# Constants
JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE = 0x00002000
JobObjectExtendedLimitInformation = 9
class JOBOBJECT_BASIC_LIMIT_INFORMATION(ctypes.Structure):
_fields_ = [
("PerProcessUserTimeLimit", wintypes.LARGE_INTEGER),
("PerJobUserTimeLimit", wintypes.LARGE_INTEGER),
("LimitFlags", wintypes.DWORD),
("MinimumWorkingSetSize", ctypes.c_size_t),
("MaximumWorkingSetSize", ctypes.c_size_t),
("ActiveProcessLimit", wintypes.DWORD),
("Affinity", ctypes.c_size_t),
("PriorityClass", wintypes.DWORD),
("SchedulingClass", wintypes.DWORD),
]
class IO_COUNTERS(ctypes.Structure):
_fields_ = [
("ReadOperationCount", ctypes.c_ulonglong),
("WriteOperationCount", ctypes.c_ulonglong),
("OtherOperationCount", ctypes.c_ulonglong),
("ReadTransferCount", ctypes.c_ulonglong),
("WriteTransferCount", ctypes.c_ulonglong),
("OtherTransferCount", ctypes.c_ulonglong),
]
class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(ctypes.Structure):
_fields_ = [
("BasicLimitInformation", JOBOBJECT_BASIC_LIMIT_INFORMATION),
("IoInfo", IO_COUNTERS),
("ProcessMemoryLimit", ctypes.c_size_t),
("JobMemoryLimit", ctypes.c_size_t),
("PeakProcessMemoryUsed", ctypes.c_size_t),
("PeakJobMemoryUsed", ctypes.c_size_t),
]
# (Optional but nice) declare arg/return types
kernel32.CreateJobObjectW.argtypes = [wintypes.LPVOID, wintypes.LPCWSTR]
kernel32.CreateJobObjectW.restype = wintypes.HANDLE
kernel32.SetInformationJobObject.argtypes = [
wintypes.HANDLE, wintypes.INT, wintypes.LPVOID, wintypes.DWORD
]
kernel32.SetInformationJobObject.restype = wintypes.BOOL
kernel32.AssignProcessToJobObject.argtypes = [wintypes.HANDLE, wintypes.HANDLE]
kernel32.AssignProcessToJobObject.restype = wintypes.BOOL
kernel32.TerminateJobObject.argtypes = [wintypes.HANDLE, wintypes.UINT]
kernel32.TerminateJobObject.restype = wintypes.BOOL
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL
# --- Windows + timeout: use a Job Object ---
# Create job object
hjob = kernel32.CreateJobObjectW(None, None)
if not hjob:
raise OSError("CreateJobObjectW failed")
try:
# Configure "kill on job close"
info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION()
info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
ok = kernel32.SetInformationJobObject(
hjob,
JobObjectExtendedLimitInformation,
ctypes.byref(info),
ctypes.sizeof(info),
)
if not ok:
raise OSError("SetInformationJobObject failed")
# Start the process
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE if capture_output else None,
stderr=subprocess.PIPE if capture_output else None,
text=text,
shell=shell,
env=cur_env,
)
# Assign to job object so all its children are part of the job
ok = kernel32.AssignProcessToJobObject(hjob, wintypes.HANDLE(process._handle))
if not ok:
# Clean up process if we can't track it via job
process.kill()
raise OSError("AssignProcessToJobObject failed")
try:
stdout, stderr = process.communicate(timeout=timeout)
returncode = process.returncode
except subprocess.TimeoutExpired:
# Kill whole tree
kernel32.TerminateJobObject(hjob, 1)
# Drain pipes after kill
stdout, stderr = process.communicate()
returncode = -1 # or choose a sentinel you like
finally:
kernel32.CloseHandle(hjob)
if not capture_output:
stdout = ""
stderr = ""
return returncode, stdout, stderr
###################################################################################################
###################################################################################################
[docs]
def get_dir_size(
path: str, # Directory path to measure.
binary: bool = False, # If True, use binary (1024) units, else decimal (1000).
unit: str = None, # Force specific unit for size formatting.
skip_datetime: bool = False, # If True, skip datetime conversion fields in the result.
):
"""
Calculate total size of a directory recursively.
Walks through directory tree and sums file sizes.
Args:
path (str): Directory path to measure.
binary (bool): If True, use binary (1024) units, else decimal (1000).
unit (str | None): Force specific unit for size formatting.
skip_datetime (bool): If True, skip datetime conversion fields in the result.
Returns:
dict: Dictionary with 'return': 0, 'size' in bytes, 'nice_size' formatted,
'total_dirs' count, 'total_files' count, 'latest_modification_dt',
and 'weird_dates' list of files with future modification dates.
Raises:
Exception: Propagated runtime errors, if any.
"""
from datetime import datetime
total = 0
total_dirs = 0
total_files = 0
if not skip_datetime:
latest_mtime = None
weird_dates = []
current_time = datetime.now().timestamp()
for root, dirs, files in os.walk(path):
# Count subdirectories at this level
total_dirs += len(dirs)
# Check modification time of the current directory
if not skip_datetime:
try:
dir_mtime = os.path.getmtime(root)
if latest_mtime is None or dir_mtime > latest_mtime:
latest_mtime = dir_mtime
except (OSError, PermissionError):
pass # Skip directories we can't access
for f in files:
fp = os.path.join(root, f)
if os.path.isfile(fp): # avoid broken symlinks
try:
total += os.path.getsize(fp)
total_files += 1
# Check file modification time
if not skip_datetime:
file_mtime = os.path.getmtime(fp)
# Detect files with future modification dates
if file_mtime > current_time:
weird_dates.append({
'path': fp,
'mtime': file_mtime,
'mtime_dt': datetime.fromtimestamp(file_mtime).isoformat()
})
if latest_mtime is None or file_mtime > latest_mtime:
latest_mtime = file_mtime
except (OSError, PermissionError):
pass # Skip files we can't access
r = format_size(total, binary, unit)
if r['return'] > 0:
return r
nice_size = r['nice_size']
result = {
'return': 0,
'size': total,
'nice_size': nice_size,
'total_dirs': total_dirs,
'total_files': total_files,
}
# Convert timestamp to datetime
if not skip_datetime:
result['latest_modification_dt'] = datetime.fromtimestamp(latest_mtime) if latest_mtime is not None else None
result['weird_dates'] = weird_dates
return result
###################################################################################################
[docs]
def get_min_arch_host_info():
"""
Collect lightweight OS and architecture metadata.
Args:
None.
Returns:
dict: Operation result.
Raises:
Exception: Propagated runtime errors, if any.
"""
import os
import platform
import struct
python_bits = struct.calcsize("P") * 8
system = platform.system()
system_lower = system.lower()
cpu_arch = platform.machine()
if system_lower == "windows":
os_bits = 64 if os.environ.get("PROCESSOR_ARCHITEW6432") else python_bits
elif system_lower == "darwin":
os_bits = 64
else: # Linux / Unix
os_bits = 64 if cpu_arch.endswith("64") else 32
return {
"return": 0,
"os": system, # Windows, Linux, Darwin
"os_lower": system.lower(),
"os_release": platform.release(), # OS version
"os_version": platform.version(), # Detailed version
"python_bits": python_bits,
"os_bits": os_bits,
"cpu_arch": cpu_arch, # x86_64, AMD64, arm64, aarch64, etc.
"cpu_arch_lower": cpu_arch.lower(),
}
###################################################################################################
[docs]
def get_min_raw_host_info():
"""
Collect minimal host hardware information including CPU and memory.
Args:
None.
Returns:
dict: Operation result.
Raises:
Exception: Propagated runtime errors, if any.
"""
import psutil
import os
import platform
import sys
import struct
result = get_min_arch_host_info()
if result['return']>0: return result
# --- Number of CPU cores ---
physical_cores = psutil.cpu_count(logical=False)
logical_cores = psutil.cpu_count(logical=True)
mem_info = psutil.virtual_memory()
total_memory = mem_info.total
free_memory = mem_info.available
# --- Memory used by the current Python process ---
process = psutil.Process(os.getpid())
memory_used = process.memory_info().rss # bytes
result.update({
'physical_cores': physical_cores,
'logical_cores': logical_cores,
'total_memory': total_memory,
'free_memory': free_memory,
'memory_used': memory_used,
})
return result
###################################################################################################
[docs]
def get_min_host_info(
only_memory: bool = False, # If True, only return memory information.
binary: bool = False, # If True, use binary (1024) units for memory sizes.
unit: str = 'GB', # Unit for memory size formatting (default: 'GB').
con: bool = False, # If True, print information to console.
line: int = 0, # Prefix line index used for structured console output.
self_time: bool = False, # If True, include self-timing information in console output.
space = '', # Optional indentation prefix for console output formatting.
):
"""
Get minimal host system information including CPU and memory.
Retrieves system information including CPU core counts, total memory,
free memory, and current process memory usage.
Args:
only_memory (bool): If True, only return memory information.
binary (bool): If True, use binary (1024) units for memory sizes.
unit (str): Unit for memory size formatting (default: 'GB').
con (bool): If True, print information to console.
line (int): Prefix line index used for structured console output.
self_time (bool): If True, include self-timing information in console output.
space: Optional indentation prefix for console output formatting.
Returns:
dict: Dictionary with 'return': 0 and host information including:
'physical_cores' (int): Number of physical CPU cores.
'logical_cores' (int): Number of logical CPU cores.
'total_memory' (int): Total system memory in bytes.
'nice_total_memory' (str): Formatted total memory string.
'free_memory' (int): Free system memory in bytes.
'nice_free_memory' (str): Formatted free memory string.
'memory_used' (int): Memory used by current process in bytes.
'nice_memory_used' (str): Formatted process memory string.
'self_time' (float): Execution time in seconds.
'nice_self_time' (str): Formatted execution time.
'string' (str): Formatted output for display.
Raises:
Exception: Propagated runtime errors, if any.
"""
import time
start_time = time.time()
result = get_min_raw_host_info()
if result['return']>0: return result
# --- Total system memory ---
total_memory = result['total_memory']
free_memory = result['free_memory']
# --- Number of CPU cores ---
physical_cores = result['physical_cores']
logical_cores = result['logical_cores']
# --- Memory used by the current Python process ---
memory_used = result['memory_used']
r = format_size(total_memory, binary=binary, unit=unit)
if r['return']>0: return r
nice_total_memory = r['nice_size']
r = format_size(free_memory, binary=binary, unit=unit)
if r['return']>0: return r
nice_free_memory = r['nice_size']
r = format_size(memory_used, binary=binary, unit=unit)
if r['return']>0: return r
nice_memory_used = r['nice_size']
x = ''
if line>0:
x += '='*line + '\n'
if not only_memory:
x += (f"{space}Host physical cores: {physical_cores}\n"
f"{space}Host logical cores: {logical_cores}\n")
x += (f"{space}Host total memory: {nice_total_memory}\n"
f"{space}Host free memory: {nice_free_memory}\n"
f"{space}Memory used by current process: {nice_memory_used}\n")
end_time = time.time()
self_time = end_time - start_time
nice_self_time = f"{self_time:.3f} sec."
if self_time:
x += f"{space}Time to obtain system info: {nice_self_time}\n"
if con:
print (x)
result.update({
'nice_total_memory': nice_total_memory,
'nice_free_memory': nice_free_memory,
'nice_memory_used': nice_memory_used,
'string': x,
'self_time': self_time,
'nice_self_time': nice_self_time
})
return result
##########################################################################################
[docs]
def get_disk_space(
path: str, # Path to check disk space for.
nice: bool = False, # If True, return human-readable sizes with 'nice_*' keys.
binary: bool = False, # If True, use binary (1024) units, else decimal (1000).
unit: str = None, # Force specific unit for size formatting (e.g., 'GB', 'GiB').
line: int = 0, # Prefix line index used for structured console output.
self_time: bool = False, # If True, include self-timing information in console output.
space = '', # Optional indentation prefix for console output formatting.
):
"""
Get disk space information for a given path.
Retrieves total, used, and free disk space for the filesystem containing
the specified path.
Args:
path (str): Path to check disk space for.
nice (bool): If True, return human-readable sizes with 'nice_*' keys.
binary (bool): If True, use binary (1024) units, else decimal (1000).
unit (str | None): Force specific unit for size formatting (e.g., 'GB', 'GiB').
line (int): Prefix line index used for structured console output.
self_time (bool): If True, include self-timing information in console output.
space: Optional indentation prefix for console output formatting.
Returns:
dict: Dictionary with 'return': 0 and:
'total' (int): Total disk space in bytes.
'used' (int): Used disk space in bytes.
'free' (int): Free disk space in bytes.
'self_time' (float): Execution time in seconds.
'nice_self_time' (str): Formatted execution time.
If nice=True, also includes:
'nice_total' (str): Formatted total size.
'nice_used' (str): Formatted used size.
'nice_free' (str): Formatted free size.
Raises:
Exception: Propagated runtime errors, if any.
"""
from shutil import disk_usage
import time
start_time = time.time()
usage = disk_usage(path)
result = {
'return':0,
'total': usage.total,
'used': usage.used,
'free': usage.free
}
if nice:
x = ''
if line>0:
x += space + '='*line + '\n'
x += f'{space}Path: {path}\n'
for key in ['total', 'used', 'free']:
size = result[key]
r = format_size(size, binary, unit)
if r['return']>0: return r
nice_size = r['nice_size']
result['nice_'+key] = nice_size
x += space + key.capitalize() + f' size: {nice_size}\n'
end_time = time.time()
self_time = end_time - start_time
nice_self_time = f"{self_time:.3f} sec."
if self_time:
x += f"{space}Time to obtain disk space: {nice_self_time}\n"
result['string'] = x
end_time = time.time()
self_time = end_time - start_time
nice_self_time = f"{self_time:.3f} sec."
if nice:
x += f"{space}Self time: {nice_self_time}\n"
result['self_time'] = self_time
result['nice_self_time'] = nice_self_time
return result
##########################################################################################
[docs]
def plus_env(
env, # Environment variable mapping.
):
"""
If key starts with + and value is not list, convert to list as path
Args:
env: Environment variable mapping.
Returns:
dict: Operation result.
Raises:
Exception: Propagated runtime errors, if any.
"""
for k in list(env.keys()):
if k.startswith('+'):
v = env[k]
if type(v) != list:
env[k] = v.split(os.pathsep)
return {'return':0}