#!/usr/bin/env python3 # -------------------------------------------------------------------------- # Description: A Telegram bot for monitoring critical infrastructur services # Dependencies: telebot # Usage: python3 lainmonitor.py | or run it as a service # Author: h@x # Version: 2.0 # -------------------------------------------------------------------------- import subprocess import telebot import paramiko import requests import time import logging import ssl import os from concurrent.futures import ThreadPoolExecutor, as_completed import config logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") logger = logging.getLogger(__name__) CERT_DIR = os.path.join(os.path.dirname(__file__), 'certs') os.makedirs(CERT_DIR, exist_ok=True) bot = telebot.TeleBot(config.TOKEN) ALLOWED_CHATS = set(config.ALLOWED_CHATS) def run_cmd(cmd, timeout=5): try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) return result.stdout.strip() except subprocess.TimeoutExpired: return 'timeout' except OSError as e: logger.error("OS error running %s: %s", cmd, e) return 'error' def get_local_info(): try: hostname = run_cmd(['hostname']) uptime = run_cmd(['uptime', '-p']) load_line = run_cmd(['uptime']) load_avg = load_line.split('load average:')[-1].strip() if 'load average:' in load_line else 'unknown' memory = run_cmd(['free', '-h']) disk = run_cmd(['df', '-h']) status = 'online' if hostname and hostname not in ('error', 'timeout') else 'offline' return {'hostname': hostname, 'uptime': uptime, 'load_avg': load_avg, 'memory': memory, 'disk': disk, 'status': status} except Exception as e: logger.error("Local info error: %s", e) return {'hostname': 'error', 'uptime': 'error', 'load_avg': 'error', 'memory': 'error', 'disk': 'error', 'status': 'error'} def fetch_certificate(host, port): cert_path = os.path.join(CERT_DIR, f"{host}.pem") if os.path.isfile(cert_path): return cert_path try: cert = ssl.get_server_certificate((host, port)) with open(cert_path, 'w') as f: f.write(cert) return cert_path except Exception as e: logger.error("Certificate fetch error for %s: %s", host, e) return False def get_ssh_info(ip, cfg): try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(ip, username=cfg['ssh_user'], password=cfg['ssh_pass'], timeout=5) info = {} cmds = {'hostname':'hostname','uptime':'uptime -p','load_avg':'uptime','memory':'free -h','disk':'df -h'} for key, cmd in cmds.items(): try: stdin, stdout, stderr = client.exec_command(cmd) out = stdout.read().decode().strip() if key == 'load_avg' and 'load average:' in out: out = out.split('load average:')[-1].strip() info[key] = out except Exception as e: logger.error("SSH cmd error %s on %s: %s", cmd, ip, e) info[key] = 'error' info['status'] = 'online' except Exception as e: logger.error("SSH connection error to %s: %s", ip, e) info = {'status': 'unreachable'} finally: try: client.close() except Exception: pass return ip, info def get_opnsense_info(ip, cfg): try: url = cfg['api_url'] host_part = url.split('//')[-1].split('/')[0] parts = host_part.split(':') host = parts[0] port = int(parts[1]) if len(parts) > 1 else 443 verify = fetch_certificate(host, port) resp = requests.get(f"{url}/core/get/health", auth=(cfg['api_key'], cfg['api_secret']), verify=verify, timeout=5) resp.raise_for_status() data = resp.json().get('health', {}) return ip, { 'status': data.get('health', 'unknown'), 'uptime': data.get('uptime', 'unknown'), 'memory': f"{data.get('mem_used', '?')}MB/{data.get('mem_total', '?')}MB", 'load_avg': data.get('load_avg', 'unknown'), 'disk': f"{data.get('disk_used', '?')}%/{data.get('disk_total', '?')}%" } except Exception as e: logger.error("OPNsense API error for %s: %s", ip, e) return ip, {'status': 'unreachable'} def gather_clients(concurrency=5): results = {} with ThreadPoolExecutor(max_workers=concurrency) as executor: futures = {executor.submit(get_ssh_info if cfg['type'] == 'generic' else get_opnsense_info, ip, cfg): ip for ip, cfg in config.HOSTS.items()} for future in as_completed(futures): host = futures[future] try: ip, info = future.result() except Exception as e: logger.error("Gather error for %s: %s", host, e) ip, info = host, {'status': 'error'} results[ip] = info return results @bot.message_handler(commands=['status', 'ping']) def handle_status(msg): if msg.chat.id not in ALLOWED_CHATS: bot.reply_to(msg, 'Unauthorized access') return local = get_local_info() clients = gather_clients() lines = [ f"Local: {local['hostname']} ({local['status']})", f"Uptime: {local['uptime']}", f"Load Avg: {local['load_avg']}", f"Memory:\n{local['memory']}", f"Disk:\n{local['disk']}", "Clients:" ] for ip, info in clients.items(): lines.append(f"{ip}: {info.get('status', 'unknown')}") bot.reply_to(msg, '\n'.join(lines)) while True: try: bot.polling() except Exception as e: logger.error("Polling error: %s", e) time.sleep(5)