Edit File: logger_config.py
"""Logger configuration for CSF migration scripts.""" import logging import subprocess import sys from pathlib import Path import sentry_sdk class StdoutFilter(logging.Filter): """Filter to send debug and info logs to stdout.""" def filter(self, record): return record.levelno <= logging.INFO class StderrFilter(logging.Filter): """Filter to send warning and error logs to stderr.""" def filter(self, record): return record.levelno >= logging.WARNING class ColoredFormatter(logging.Formatter): """Formatter that adds colors to log levels for console output.""" # ANSI color codes COLORS = { logging.WARNING: "\033[93m", # Yellow logging.ERROR: "\033[91m", # Red logging.CRITICAL: "\033[91m", # Red } RESET = "\033[0m" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Check if output supports colors (is a TTY) self.use_colors = hasattr(sys.stderr, "isatty") and sys.stderr.isatty() def format(self, record): if self.use_colors and record.levelno in self.COLORS: # Apply color to the entire log message formatted = super().format(record) return f"{self.COLORS[record.levelno]}{formatted}{self.RESET}" else: return super().format(record) def _collect_output(cmd): """Execute command and return stdout, similar to defence360agent.sentry.collect_output""" try: cp = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10 ) if cp.returncode != 0: return "" return cp.stdout.decode().strip() except (subprocess.TimeoutExpired, FileNotFoundError, OSError): return "" def _get_rpm_version(pkg: str) -> str: """Get RPM package version, similar to defence360agent.sentry.get_rpm_version""" cmd = ["rpm", "-q", "--queryformat=%{VERSION}-%{RELEASE}", pkg] return _collect_output(cmd) def _get_dpkg_version(pkg: str) -> str: """Get DEB package version, similar to defence360agent.sentry.get_dpkg_version""" cmd = ["dpkg-query", "--showformat=${Version}", "--show", pkg] return _collect_output(cmd) def _get_imunify_core_version(): """Get imunify-core package version using system package manager.""" # Try RPM first (most common for CentOS/RHEL) version = _get_rpm_version("imunify-core") if version: return version # Try DEB package manager (for Ubuntu/Debian) version = _get_dpkg_version("imunify-core") if version: return version # Fallback to default if package not found return "1.0.0" def _configure_sentry(): """Configure Sentry for the migrate_csf script with appropriate tags.""" try: # Get the actual imunify-core package version release_version = _get_imunify_core_version() logger = logging.getLogger("migrate_csf") logger.info(f"imunify-core package version: {release_version}") sentry_sdk.init( dsn="https://35eed6af8d418fa4c59c8a9da5cab99a@im360.sentry.cloudlinux.com/36", debug=False, release=release_version, attach_stacktrace="on", ) with sentry_sdk.configure_scope() as scope: # Add the specific script tag as requested scope.set_tag("script", "migrate_csf") return True except Exception: # If Sentry configuration fails, continue without it return False def setup_logger(debug_enabled: bool = False) -> logging.Logger: """ Setup logger with the following configuration: - Debug and info logs to stdout (debug only if debug_enabled=True) - Warning and error logs to stderr (warnings in yellow, errors in red) - All logs to /var/log/imunify360/migrate_csf.log (without colors) - Error logs to Sentry for monitoring and alerting Args: debug_enabled: If True, debug logs will be shown on stdout Returns: Configured logger instance """ # Create logger logger = logging.getLogger("migrate_csf") logger.setLevel(logging.DEBUG) # Clear any existing handlers logger.handlers.clear() # Create formatters colored_console_formatter = ColoredFormatter("%(levelname)s: %(message)s") plain_file_formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) # Setup stdout handler (info and debug) stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.addFilter(StdoutFilter()) stdout_handler.setFormatter(colored_console_formatter) # Set level based on debug flag if debug_enabled: stdout_handler.setLevel(logging.DEBUG) else: stdout_handler.setLevel(logging.INFO) logger.addHandler(stdout_handler) # Setup stderr handler (warning and error) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.addFilter(StderrFilter()) stderr_handler.setFormatter(colored_console_formatter) stderr_handler.setLevel(logging.WARNING) logger.addHandler(stderr_handler) # Setup file handler (all logs) log_file_path = Path("/var/log/imunify360/migrate_csf.log") # Create directory if it doesn't exist log_file_path.parent.mkdir(parents=True, exist_ok=True) try: file_handler = logging.FileHandler(log_file_path) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(plain_file_formatter) logger.addHandler(file_handler) # Log the file location message logger.info(f"All migration logs will be saved to {log_file_path}") except (PermissionError, OSError) as e: # If we can't write to the log file, log the error but continue logger.warning(f"Could not create log file {log_file_path}: {e}") logger.warning("Migration will continue without file logging") # Add Sentry handler for warning-level logs if Sentry was configured successfully sentry_configured = _configure_sentry() if sentry_configured: logger.debug( "Sentry error reporting initialized for migrate_csf script" ) try: sentry_handler = sentry_sdk.integrations.logging.SentryHandler() sentry_handler.setLevel(logging.WARNING) logger.addHandler(sentry_handler) except Exception: logger.warning( "Could not add Sentry handler for warning-level logs" ) return logger def get_logger() -> logging.Logger: """Get the configured logger instance.""" return logging.getLogger("migrate_csf") def capture_exception(exception: Exception, extra_context: dict = None): """ Capture an exception to Sentry with optional extra context. Args: exception: The exception to capture extra_context: Optional dictionary of extra context to include """ if extra_context: with sentry_sdk.push_scope() as scope: for key, value in extra_context.items(): scope.set_extra(key, value) sentry_sdk.capture_exception(exception) else: sentry_sdk.capture_exception(exception)
Back to File Manager