Source code for cmeta.core

"""
СMeta core class and functions

cMeta author and developer: (C) 2025-2026 Grigori Fursin

See the cMeta COPYRIGHT and LICENSE files in the project root for details.
"""

import json
import logging
import os
import sys
import time
import inspect

from typing import Dict, Any, List, Optional
from pathlib import Path

from . import config
from . import utils
from .repos import Repos
from .packages import Packages
from .version import __version__

control_params_desc = config.params_desc + config.params_command2_desc + config.params_command_desc + config.params_init_desc

[docs] class CMeta: """A common meta system to manage and reuse common artifacts and their automations.""" ###################################################################################################
[docs] def __init__( self, home: Optional[str] = None, # Path to cMeta home directory. debug: Optional[bool] = None, # If True, sets log_level to "DEBUG" (overrides log_level parameter). fail_on_error: Optional[bool] = None, # If True, raise error instead of returning dictionary with error log_level: Optional[str] = None, # Logging level as string (case-insensitive). log_file: Optional[str] = None, # Path to log file. If provided and not empty, logs will be written to this file. log_format: Optional[str] = None, # Logging format string. pause_if_error: Optional[bool] = None, # If True, pause before exiting after errors. package_allow_install: Optional[bool] = True, # If True, allow automatic Python package installation. package_timeout: Optional[float] = None, # Default timeout for package installation and checks, in seconds. print_host_info: Optional[bool] = False, # If True, print host system information during access calls. ): """ Initialize CMeta with repositories Args: home (Optional[str]): Path to cMeta home directory. debug (Optional[bool]): If True, sets log_level to "DEBUG" (overrides log_level parameter). fail_on_error (Optional[bool]): If True, raise error instead of returning dictionary with error log_level (Optional[str]): Logging level as string (case-insensitive). log_file (Optional[str]): Path to log file. If provided and not empty, logs will be written to this file. log_format (Optional[str]): Logging format string. pause_if_error (Optional[bool]): If True, pause before exiting after errors. package_allow_install (Optional[bool]): If True, allow automatic Python package installation. package_timeout (Optional[float]): Default timeout for package installation and checks, in seconds. print_host_info (Optional[bool]): If True, print host system information during access calls. Returns: None: This is a constructor that initializes the CMeta instance. Raises: FileExistsError: If there's a race condition when creating the home directory. OSError: If there's an error creating the home directory due to permissions or other OS-level issues. PermissionError: If the process lacks permissions to create the home directory. Exception: For any other unexpected errors during directory creation or initialization. """ ################################################################################################### # Initialize logging self.config = config self.cfg = self.config.cfg cfg = self.cfg self.__version__ = __version__ r = config.check_init_vars_from_env() if r['return'] > 0: raise Exception(f"cMeta init failed: {r['error']}") cmeta_init = r['init'] # Setup logger and vars from environment if not explicitly set cmeta_init_forced = {} if home is not None: cmeta_init_forced['home'] = home if debug is not None: cmeta_init_forced['debug'] = debug if fail_on_error is not None: cmeta_init_forced['fail_on_error'] = fail_on_error if log_level is not None: cmeta_init_forced['log_level'] = log_level if log_file is not None: cmeta_init_forced['log_file'] = log_file if log_format is not None: cmeta_init_forced['log_format'] = log_format if pause_if_error is not None: cmeta_init_forced['pause_if_error'] = pause_if_error r = self.config.update_init_and_setup_logger(__name__, cmeta_init, cmeta_init_forced) if r['return'] > 0: raise Exception(f"cMeta init failed: {r['error']}") self.logger = r['logger'] init = r['init'] self.debug = init['debug'] self.fail_on_error = init['fail_on_error'] self.pause_if_error = init.get('pause_if_error', False) self.logger.debug("Initializing CMeta class ...") self.home_path = Path(init['home']) self.repos_path = self.home_path / cfg["repos_dir"] self.repos_config_path = self.home_path / cfg["repos_config_filename"] self.index_path = self.home_path / cfg["index_dir"] self.path = os.path.dirname(__file__) paths_list = _list_paths(self) for log_path in paths_list: self.logger.info(log_path) self._error = utils.common._error self.check_params = utils.common.check_params self.module_cache = {} # Some debug functions self.j = utils.common.safe_print_json self.jj = utils.common.safe_print_json_with_enter self.jv = utils.common.print_module_vars self.js = utils.common.safe_print_json_to_str self.q = utils.files.quote_path self.qq = utils.files.quote_path2 self.utils = utils self.print_host_info = print_host_info ################################################################################# # Create directory if it doesn't exist (thread/process safe) for path in [self.home_path]: #, self.repos_path, self.index_path]: try: path.mkdir(parents=True, exist_ok=True) except FileExistsError: # Another process created it, that's fine pass except Exception as e: self.logger.error(f"Error creating home directory: {e}") raise ################################################################################# # Initialize package manager self.packages = Packages( cfg=self.cfg, logger=self.logger, fail_on_error=self.fail_on_error, allow_install=package_allow_install, timeout=package_timeout, add_install_args=os.environ.get(cfg['env_var_pip_install_args']), ) ################################################################################# # Initialize repositories manager self.repos = Repos( cfg=self.cfg, home_path=self.repos_path, index_path=self.index_path, repos_config_path=self.repos_config_path, logger=self.logger, fail_on_error=self.fail_on_error, match_version_func=self.packages.match_version )
###################################################################################################
[docs] def error( self, error_msg, # Error message text. return_code = 1, # Numeric return code. exception = None, # Exception object associated with the error. fail16 = False, # If True, treat return code 16 as a fatal error. fail_on_error = None, # If True, raise exceptions instead of returning error dictionaries. extra = {}, ): """ Create or raise a cMeta error using framework-level defaults. Args: error_msg: Error message text. return_code: Numeric return code. exception: Exception object associated with the error. fail16: If True, treat return code 16 as a fatal error. fail_on_error: If True, raise exceptions instead of returning error dictionaries. Returns: dict: Operation result. Raises: Exception: Propagated runtime errors, if any. """ if not fail_on_error: fail_on_error = self.fail_on_error return utils.common._error(error_msg, return_code, exception, fail_on_error=fail_on_error, fail_on_16 = fail16, extra = extra)
###################################################################################################
[docs] def catch_error( self, r, # cMeta access return dict fail16 = False, # If True, treat return code 16 as a fatal error. ): """ Catches error and creates return dictionary or raise exception based on fail_on_error flag. Args: r: cMeta access return dict fail16: If True, treat return code 16 as a fatal error. Returns: dict: Dictionary with 'return' and 'error' keys. Raises: Exception: If fail_on_error is True and return_code != 16. Example: if self.cm.catch_error(r): return r """ ret = r['return'] if r.get('return',0)>0: rr = self._error(r.get('error'), ret, exception = None, fail_on_error = self.fail_on_error, fail_on_16 = fail16) r['error'] = rr['error'] return ret != 0 and (ret !=16 or fail16)
###################################################################################################
[docs] def catch_error_and_halt( self, r, # cMeta return dictionary. fail16 = False, # If True, treat return code 16 as a fatal error. ): """ Raise or print an error and terminate execution when needed. Args: r (dict): cMeta return dictionary. fail16 (bool): If True, treat return code 16 as a fatal error. Returns: dict: Original input dictionary. Raises: Exception: Propagated runtime errors, if any. """ if self.catch_error(r, fail16): self.halt(r) return r
############################################################
[docs] def halt( self, r, # output from CM function with "return" and "error" ): """ If r['return']>0: print error and halt Args: r (dict): output from CM function with "return" and "error" Returns: (dict): r Raises: Exception: Propagated runtime errors, if any. """ import sys if r['return']>0: error_text = self.cfg['con_error_prefix'] + r['error'] + '!' sys.stderr.write('\n' + error_text) sys.exit(r['return'])
############################################################
[docs] def dump( self, filename, # File name or path string. meta, # Metadata dictionary to persist. wait = False, # If True, wait for interactive confirmation after writing data. ): """ Write metadata to a file and optionally wait for user confirmation. Args: filename: File name or path string. meta: Metadata dictionary to persist. wait: If True, wait for interactive confirmation after writing data. Returns: dict: Operation result. Raises: Exception: Propagated runtime errors, if any. """ r = utils.files.write_file(filename, meta) if self.catch_error(r): return r if wait: print ('') print (f'Data was dumped to "{filename}"') print ('') input ('Press Enter to continue:') return r
###################################################################################################
[docs] def outdated( slef, path, meta ): msg = f'WARNING: API version 1 is outdated in {path}.' last_api_ver = meta.get('last_api_version') if last_api_ver: msg += f' The last one is {last_api_ver}.' print ('*'*80) print (msg) return {'return':0}
###################################################################################################
[docs] def access( self, request: Dict[str, Any], # Dictionary containing the request data ) -> Dict[str, Any]: """ Access common meta framework in a unified way Args: request (Dict[str, Any]): Dictionary containing the request data Returns: Dictionary with {"return": 0, ...} for success or {"return": >0, "error": "error text"} for errors Raises: Exception: Propagated runtime errors, if any. """ self_time_start = time.perf_counter() if self.print_host_info: utils.sys.get_min_host_info(only_memory=True, con=True, line=80) # Manual override of global self.debug and self.fail_on_error self_fail_on_error = self.fail_on_error self_debug = self.debug # Log where this call is coming from if debug if self_debug: self.logger.debug(60*'=') self.logger.debug(f'ACCESS({self.js(request, indent=2)})') stack = inspect.stack() if len(stack) > 1: caller_frame = stack[1] caller_filename = caller_frame.filename abs_path = os.path.abspath(caller_filename) self.logger.debug(f'ACCESS is from "{abs_path}"') r = utils.sys.get_min_host_info() if r['return'] == 0: self.logger.debug(r['string']) # Make shallow copy of top keys to avoid altering original input keys # It's relatively fast in comparison with deep copy # particularly for nested calls with a large "ctx" ... params = request.copy() # Prepare context and origin if first run ... if 'ctx' not in params: params['ctx'] = {} ctx = params.get('ctx', {}) cur_dir = os.getcwd() # If origin(al) call is not in the context, add it for further # reuse, debugging and reproducibility if 'origin' not in ctx: origin = {} # This helps deep scripts get original directory # (useful for various automations) origin['pwd'] = cur_dir if '_cli' in params: origin['cli'] = params['_cli'] del(params['_cli']) origin['params'] = request ctx['origin'] = origin inside_cli = 'cli' in ctx.get('origin',{}) # Check nested call if 'nested_call' not in ctx: nested_call = 0 else: nested_call = ctx['nested_call'] + 1 self.logger.debug(f'ACCESS nested call: {nested_call}') ctx['nested_call'] = nested_call # Check and extract control params r = utils.check_params(params, control_params_desc, fail_on_error=self_fail_on_error) if self.catch_error(r): return r # remaining params are command params command_params = r['remaining_params'] control_params = r['checked_params'] saved_control = ctx.get('control') ctx['control'] = control_params # Continue processing request con = control_params.get('con', False) # Force con in control_params to simplify APIs control_params['con'] = con repro = control_params.get('repro', False) result = {'return':0} ########################################################################################### if repro: for x in ['repro_input_file', 'repro_input_file_rt', 'repro_output_file']: if os.path.isfile(self.cfg[x]): r = utils.files.remove_files_and_dirs_in_path(self.cfg[x]) if self.catch_error(r): return r request_copy = request.copy() # Clean repro request to avoid overwriting reproducibility files x_cmd = request_copy.get('_cli', {}).get('cmd',{}) if x_cmd: for x in ['r', 'repro']: if x in request_copy: del(request_copy[x]) for x1 in ['-', '--']: x2 = x1 + x if x2 in x_cmd: x_cmd.remove(x2) r = utils.files.write_file(self.cfg['repro_input_file'], request_copy) if self.catch_error(r): return r ctx_repro = ctx.setdefault('repro', {}) ########################################################################################### category_obj = control_params.get('category') # Check if runs for the first time (there is no repos.json and index) if 'verbose' not in control_params and config.is_on(os.environ.get(self.cfg['env_var_cmeta_verbose'])): control_params['verbose'] = True verbose = control_params.get('verbose', False) if 'control' not in ctx['origin']: ctx['origin']['control'] = control_params r = self.repos.init(con=con, verbose=verbose) if self.catch_error(r): return r if category_obj is None: if params.get('version', False) or params.get('V', False): from .version import __version__ result['version'] = __version__ if con: print (self.cfg['name'] + f' version {__version__}') print ('') print (self.cfg['copyright']) if con: print ('') paths_list = _list_paths(self) for log_path in paths_list: print (log_path) # Check latest version r = utils.net.access_api(url = self.cfg['default_ctuning_api'], params = {'command':'get-last-cmeta-version'}, timeout = 3) if r['return'] == 0: rr = r['response'] if rr['return'] == 0: last_cmeta_version = rr['last_cmeta_version'] result['last_cmeta_version'] = last_cmeta_version r = utils.common.compare_versions(last_cmeta_version, __version__) if r['return'] == 0: if r['comparison'] == '>': result['requires_update'] = True if con: print ('') print (f'WARNING: Your cMeta version ({__version__}) is outdated.') print (f' Latest version: {last_cmeta_version}') print (f' Update via: pip install -U cmeta') else: if con: print ('') print (f'Your cMeta version is up-to-date!') else: return self.error(f'Accessing latest version info failed: {r["error"]}') elif control_params.get('reindex', False): r = self.repos.reindex(con=con, verbose=verbose) if self.catch_error(r): return r else: return self.error('"category" is not defined') else: # Prepare to search for category record as artifact (category_name -> artifact_name, category_name = "category")! r = utils.names.parse_cmeta_obj(category_obj, key = "artifact", fail_on_error = self_fail_on_error) if self.catch_error(r): return r cmeta_ref_parts = r['obj_parts'] cmeta_ref_parts['category_alias'] = 'category' cmeta_ref_parts['category_uid'] = 'dd9ea50e7f76467f' r = self.repos.find(cmeta_ref_parts) if self.catch_error(r, fail16=True): return r category_artifacts = r['artifacts'] if len(category_artifacts) == 0 or len(category_artifacts)>1: if len(category_artifacts) == 0: return self.error(f'category "{category_obj}" not found', 8) else: err = f'Ambiguity for category "{category_obj}" - please specify the full name:' for c in category_artifacts: r = utils.names.restore_cmeta_obj(c['cmeta_ref_parts'], key='artifact', fail_on_error = self_fail_on_error) if r['return']>0: return r category_str = r['obj'] err += f"\n* {category_str} ({c['path']})" return self.error(err, 8) # Prepare command command = control_params.get('command', None) if command is None: command = '' else: command = command.strip().lower().replace('-', '_') if command.endswith('_'): return self.error(f"command shouldn't end with _ ({command})") ctx['command'] = command # Unique category found - check meta and code category_artifact = category_artifacts[0] category_meta = category_artifact['cmeta'] category_uid = category_artifact['cmeta_ref_parts']['artifact_uid'] # Update context with some duplication for simplicity of further use ... ctx['category_artifact'] = category_artifact ctx['category_cmeta'] = category_meta ctx['category'] = category_artifact['cmeta_ref_parts'] ################################################################################################### # Initialize main and base categories for API unless already in cache base_command = control_params.get('base', False) category_api_ver = control_params.get('api', None) category_api_module_ver = '1' base_category_api_module_ver = '1' str_category_api_ver = None if category_api_ver is None else str(category_api_ver) if base_command: if category_meta.get('skip_base_category_commands', False): return self.error('this category doesn\'t use base commands') if str_category_api_ver is not None: base_category_api_module_ver = str_category_api_ver elif 'base_category_last_api_version' in self.cfg: base_category_api_module_ver = self.cfg['base_category_last_api_version'] else: if category_api_ver is not None: category_api_module_ver = str_category_api_ver elif 'last_api_version' in category_meta: category_api_module_ver = str(category_meta['last_api_version']) if not category_meta.get('skip_base_category_commands', False): if category_meta.get('base_category_default_api_versions', {}).get(category_api_module_ver) is not None: base_category_api_module_ver = str(category_meta['base_category_default_api_versions'][category_api_module_ver]) elif category_meta.get('base_category_default_api_version') is not None: base_category_api_module_ver = str(category_meta['base_category_default_api_version']) # Check min cMeta versions category_min_cmeta_version = category_meta.get('min_cmeta_version_api') if category_min_cmeta_version is None and category_api_module_ver is not None: category_min_cmeta_version = category_meta.get('min_cmeta_version',{}).get(str(category_api_module_ver)) if category_min_cmeta_version is not None: from .version import __version__ r = utils.common.compare_versions(category_min_cmeta_version, __version__) if self.catch_error(r): return r if r['comparison'] == '>': return self.error(f'this category requires min cMeta version "{category_min_cmeta_version}" but "{__version__}" is installed') if repro: ctx_repro['api'] = str(category_api_module_ver) # Prepare paths to APIs category_apis = [] if not base_command and category_api_module_ver is not None: category_api_path = os.path.join(category_artifact['path'], 'api', f'v{category_api_module_ver}.py') if os.path.isfile(category_api_path): category_apis.append({'path':category_api_path, 'suffix': category_uid, 'api_version': category_api_module_ver}) elif category_api_ver is not None or category_api_module_ver != '1': return self.error(f'couldn\'t find category API "{category_api_path}"') # Either base command or API file doesn't if base_category_api_module_ver is not None and not category_meta.get('skip_base_category_commands', False): category_api_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'category_api_v{base_category_api_module_ver}.py') if not os.path.isfile(category_api_path): return self.error(f'couldn\'t find category API "{category_api_path}"') category_apis.append({'path':category_api_path, 'base':True, 'api_version': base_category_api_module_ver}) # Load categories for category_api in category_apis: # category api path should be resolved by now suffix = category_api.get('suffix') r = utils.sys.load_module(category_api['path'], self.module_cache, fail_on_error = self_fail_on_error, init_class = "Category", cmeta = self, suffix = suffix, self_meta = category_meta, ) if self.catch_error(r): return r category_api['code'] = r['cache']['initialized_class'] category_api['code'].api_version = category_api['api_version'] category_api['full_module_name'] = r['cache']['full_module_name'] ################################################################################################### # If empty command, print help if command == '': if params.get('version', False) or params.get('V', False): if con: category_version = category_meta.get('version', None) result['version'] = category_version print (f'Category version: {category_version}') category_copyright = category_meta.get('copyright', None) result['copyright'] = category_copyright print (f'Category copyright: {category_copyright}') print ('') from .version import __version__ result['cmeta_version'] = __version__ print (self.cfg['name'] + f' version: {__version__}') print (self.cfg['name'] + ' copyright: ' + self.cfg['copyright']) else: caller = params.get('_cli', {}).get('caller') if caller is None: caller = os.path.basename(sys.executable) + f" -m {__package__}" if not con: return self.error(f'"command" key is missing in the request {request}') else: if not control_params.get('help', False): print(self.cfg['con_error_prefix'] + '<command> is missing after <category>!') print('') print(f'{caller} {category_obj} <command> --help | <flags>') for category_api in category_apis: names = [] category_api_code = category_api['code'] x1 = '' x2 = '' if category_api.get('base', False): x1 = ' base (common)' x2 = ' for all categories' for name in sorted(dir(category_api_code)): if callable(getattr(category_api_code, name)) and not name.startswith('_'): nname = name if name.endswith('___'): nname = name[:-3] elif name.endswith('__'): nname = name[:-2] elif name.endswith('_'): nname = name[:-1] r = utils.sys.find_func_definition(category_api_code, name) if self.catch_error(r): return r filename = r['filename'] start_line = r['start_line'] end_line = r['end_line'] short_func_desc = r['short_func_desc'] short_func_desc += f' ({filename}:{start_line}-{end_line})' # Check aliases nname_aliases = [] for where in [category_artifact['cmeta'], self.cfg]: command_aliases = where.get('command_aliases',{}) for command_alias in command_aliases: real_command = command_aliases[command_alias] if real_command == nname: nname_aliases.append(command_alias) if len(nname_aliases)>0: nname += ' (' + '|'.join(nname_aliases) + ')' names.append((f'{nname}', short_func_desc)) if len(names)>0: longest_name = max((len(item[0]) for item in names), default=0) print ('') print(f"Available{x1} commands{x2}:") for name in names: print(f' {name[0]:<{longest_name}} {name[1]}') ################################################################################################### else: func = None command_func_name = None # Check command aliases: command_alias = command # First in meta for where in [category_artifact['cmeta'], self.cfg]: tmp_command_alias = where.get('command_aliases',{}).get(command) if tmp_command_alias != None and tmp_command_alias != '': command_alias = tmp_command_alias break if self_debug: self.logger.debug(f'Resolved command alias: {command_alias}') for category_api in category_apis: category_api_code = category_api['code'] # Select which function to use (we check names with __ to differentiate from internal Python names if needed) r = utils.sys.find_command_func(category_api_code, command_alias) if self.catch_error(r): return r func = r['func'] if func is not None: command_func_name = r['func_name'] break if func is None: x = command if command_alias != command: x += f' ({command_alias})' # Shouldn't fail in debug since it's used to check multiple functions ... return self.error(f'command "{x}" doesn\'t exist in category API "{category_api_path}"', 32, fail_on_error=False) if control_params.get('help', False): r = utils.names.restore_cmeta_obj(cmeta_ref_parts, key='artifact', fail_on_error = self_fail_on_error) if self.catch_error(r): return r category_str = r['obj'] r = utils.sys.get_api_info(category_api_code, command_func_name, f'{category_str} {command}', control_params_desc, category_apis=category_apis) if self.catch_error(r): return r help_text = r['api_info'] if con: print (help_text) result['help'] = help_text else: if self_debug: r = utils.sys.find_func_definition(category_api_code, command_func_name) if self.catch_error(r): return r filename = r['filename'] start_line = r['start_line'] end_line = r['end_line'] self.logger.debug(f'Calling {command_func_name}() @ {filename}:{start_line}-{end_line}') self.logger.debug(f' with parameters {command_params} ...') try: command_params['ctx'] = ctx if command_func_name.endswith('_') and not command_func_name.endswith('__'): result = func(**command_params) else: result = func(command_params) except TypeError as te: if self_fail_on_error: raise ste = str(te) # j = ste.find('unexpected ') # if j>0: # ste = ste[j:] r = utils.names.restore_cmeta_obj(cmeta_ref_parts, key='artifact', fail_on_error = self_fail_on_error) if self.catch_error(r): return r category_str = r['obj'] extra_flags_help = category_meta.get('extra_flags_help', '') err = f'API call "{category_str} {command}" failed - {ste}.\n\nRerun with {extra_flags_help}--help to view API usage and options' # if '() got an unexpected keyword argument' in ste: # r = utils.sys.get_api_info(category_api_code, command_func_name, f'{category_str} {command}', control_params_desc) # if r['return'] > 0: return r # # err += '\n\nSee ' + r['api_info'] return self.error(err) # Get self timing self_time = time.perf_counter() - self_time_start ctx['last_self_time'] = self_time ctx['nested_call'] -= 1 if saved_control: ctx['control'] = saved_control else: del(ctx['control']) # Timer for debugging if self_debug: self.logger.debug('') self.logger.debug(f'SELF TIME: {self_time:.3f} sec.') self.logger.debug('') # Save reproducibility info (similar to json_file but with fixed file) if repro: r = utils.files.write_file(self.cfg['repro_input_file_rt'], ctx_repro) if self.catch_error(r): return r r = utils.files.write_file(self.cfg['repro_output_file'], result) if self.catch_error(r): return r # Finalize call if control_params.get('json', False): print (60*'-') utils.common.safe_print_json(result) if result['return']>0: result['skip_print_error'] = True json_file = control_params.get('json_file') if json_file is not None and json_file!='': r = utils.files.write_file(json_file, result) if self.catch_error(r): return r if control_params.get('dump', False): cur_dir2 = os.getcwd() os.chdir(cur_dir) r = utils.files.write_file(self.cfg['dump_ctx_output_file'], ctx) if self.catch_error(r): return r os.chdir(cur_dir2) if control_params.get('pause_at_the_end', False): print ('') input ('Press Enter to finish the command!') return result
def _list_paths( cmeta, # CMeta instance. ): """ Generate a list of formatted path strings for CMeta configuration. Args: cmeta: CMeta instance. Returns: list: List of formatted path strings for logging/display. Raises: Exception: Propagated runtime errors, if any. """ import sys paths_list = [] paths_list.append(f"cMeta home path: {cmeta.home_path}") paths_list.append(f"cMeta repositories path: {cmeta.repos_path}") paths_list.append(f"cMeta repositories config: {cmeta.repos_config_path}") paths_list.append(f"cMeta index path: {cmeta.index_path}") paths_list.append("") paths_list.append(f"cMeta python path: {sys.executable}") paths_list.append(f"cMeta package path: {cmeta.path}") return paths_list