Add: /ping and /status separated

- /ping can now be used individually to check against any IP address.
- /status will bring up inline keyboard, where you can select either a general status request or per machine

Signed-off-by: hax <hax@lainlounge.org>
This commit is contained in:
h@x 2025-07-22 09:53:21 +00:00
parent e7275ac1de
commit 435c481720

View file

@ -5,7 +5,7 @@
# Dependencies: telebot # Dependencies: telebot
# Usage: python3 lainmonitor.py | or run it as a service # Usage: python3 lainmonitor.py | or run it as a service
# Author: h@x # Author: h@x
# Version: 2.0 # Version: 2.1.0
# -------------------------------------------------------------------------- # --------------------------------------------------------------------------
import subprocess import subprocess
@ -13,45 +13,51 @@ import telebot
import paramiko import paramiko
import requests import requests
import time import time
import socket
import logging import logging
import ssl import ssl
import os import os
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from telebot import types
import config import config
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") # Configure logging
tlogging_format = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=tlogging_format)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Ensure certificate directory exists
CERT_DIR = os.path.join(os.path.dirname(__file__), 'certs') CERT_DIR = os.path.join(os.path.dirname(__file__), 'certs')
os.makedirs(CERT_DIR, exist_ok=True) if not os.path.isdir(CERT_DIR):
os.makedirs(CERT_DIR, exist_ok=True)
bot = telebot.TeleBot(config.TOKEN) bot = telebot.TeleBot(config.TOKEN)
ALLOWED_CHATS = set(config.ALLOWED_CHATS) ALLOWED_CHATS = set(config.ALLOWED_CHATS)
# Utility for command execution with timeout
def run_cmd(cmd, timeout=5): def run_cmd(cmd, timeout=5):
try: try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return result.stdout.strip() return result.stdout.strip()
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired as e:
logger.warning(f"Command {cmd} timed out: {e}")
return 'timeout' return 'timeout'
except OSError as e: except OSError as e:
logger.error("OS error running %s: %s", cmd, e) logger.error(f"OS error running {cmd}: {e}")
return 'error' return 'error'
# Local system info
def get_local_info(): def get_local_info():
try: hostname = run_cmd(['hostname'])
hostname = run_cmd(['hostname']) uptime = run_cmd(['uptime', '-p'])
uptime = run_cmd(['uptime', '-p']) load_line = run_cmd(['uptime'])
load_line = run_cmd(['uptime']) load_avg = load_line.split('load average:')[-1].strip() if 'load average:' in load_line else 'unknown'
load_avg = load_line.split('load average:')[-1].strip() if 'load average:' in load_line else 'unknown' memory = run_cmd(['free', '-h'])
memory = run_cmd(['free', '-h']) disk = run_cmd(['df', '-h'])
disk = run_cmd(['df', '-h']) status = 'online' if hostname not in ('', 'error', 'timeout') else 'offline'
status = 'online' if hostname and hostname not in ('error', 'timeout') else 'offline' return {'hostname': hostname, 'uptime': uptime, 'load_avg': load_avg, 'memory': memory, 'disk': disk, 'status': status}
return {'hostname': hostname, 'uptime': uptime, 'load_avg': load_avg, 'memory': memory, 'disk': disk, 'status': status}
except Exception as e:
logger.error("Local info error: %s", e)
return {'hostname': 'error', 'uptime': 'error', 'load_avg': 'error', 'memory': 'error', 'disk': 'error', 'status': 'error'}
# Fetch and store SSL certificate once
def fetch_certificate(host, port): def fetch_certificate(host, port):
cert_path = os.path.join(CERT_DIR, f"{host}.pem") cert_path = os.path.join(CERT_DIR, f"{host}.pem")
if os.path.isfile(cert_path): if os.path.isfile(cert_path):
@ -60,97 +66,123 @@ def fetch_certificate(host, port):
cert = ssl.get_server_certificate((host, port)) cert = ssl.get_server_certificate((host, port))
with open(cert_path, 'w') as f: with open(cert_path, 'w') as f:
f.write(cert) f.write(cert)
logger.info(f"Saved certificate for {host} to {cert_path}")
return cert_path return cert_path
except Exception as e: except Exception as e:
logger.error("Certificate fetch error for %s: %s", host, e) logger.error(f"Failed to fetch certificate for {host}: {e}")
return False return True
# SSH-based info gathering
def get_ssh_info(ip, cfg): def get_ssh_info(ip, cfg):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try: try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=cfg['ssh_user'], password=cfg['ssh_pass'], timeout=5) client.connect(ip, username=cfg['ssh_user'], password=cfg['ssh_pass'], timeout=5)
info = {} info = {}
cmds = {'hostname':'hostname','uptime':'uptime -p','load_avg':'uptime','memory':'free -h','disk':'df -h'} cmds = {'hostname': 'hostname', 'uptime': 'uptime -p', 'load_avg': 'uptime', 'memory': 'free -h', 'disk': 'df -h'}
for key, cmd in cmds.items(): for key, cmd in cmds.items():
try: try:
stdin, stdout, stderr = client.exec_command(cmd) stdin, stdout, stderr = client.exec_command(cmd, timeout=5)
out = stdout.read().decode().strip() out = stdout.read().decode().strip()
if key == 'load_avg' and 'load average:' in out: if key == 'load_avg' and 'load average:' in out:
out = out.split('load average:')[-1].strip() out = out.split('load average:')[-1].strip()
info[key] = out info[key] = out
except Exception as e: except (socket.timeout, paramiko.SSHException) as e:
logger.error("SSH cmd error %s on %s: %s", cmd, ip, e) logger.error(f"SSH command {cmd} on {ip} failed: {e}")
info[key] = 'error' info[key] = 'error'
info['status'] = 'online' info['status'] = 'online'
except Exception as e: except (paramiko.AuthenticationException, paramiko.SSHException, socket.timeout) as e:
logger.error("SSH connection error to %s: %s", ip, e) logger.error(f"SSH connection to {ip} failed: {e}")
info = {'status': 'unreachable'} info = {'status': 'unreachable'}
finally: finally:
try: try: client.close()
client.close() except Exception as e: logger.warning(f"Error closing SSH to {ip}: {e}")
except Exception:
pass
return ip, info return ip, info
# OPNsense API-based info gathering
def get_opnsense_info(ip, cfg): def get_opnsense_info(ip, cfg):
url = cfg['api_url']
host = url.split('//')[1].split('/')[0].split(':')[0]
port = int(url.split('//')[1].split('/')[0].split(':')[1]) if ':' in url.split('//')[1].split('/')[0] else 443
verify = fetch_certificate(host, port)
try: try:
url = cfg['api_url']
host_part = url.split('//')[-1].split('/')[0]
parts = host_part.split(':')
host = parts[0]
port = int(parts[1]) if len(parts) > 1 else 443
verify = fetch_certificate(host, port)
resp = requests.get(f"{url}/core/get/health", auth=(cfg['api_key'], cfg['api_secret']), verify=verify, timeout=5) resp = requests.get(f"{url}/core/get/health", auth=(cfg['api_key'], cfg['api_secret']), verify=verify, timeout=5)
resp.raise_for_status() resp.raise_for_status()
data = resp.json().get('health', {}) data = resp.json().get('health', {})
return ip, { return ip, {'status': data.get('health','unknown'), 'uptime': data.get('uptime','unknown'), 'memory': f"{data.get('mem_used','?')}MB/{data.get('mem_total','?')}MB", 'load_avg': data.get('load_avg','unknown'), 'disk': f"{data.get('disk_used','?')}%/{data.get('disk_total','?')}%"}
'status': data.get('health', 'unknown'), except requests.RequestException as e:
'uptime': data.get('uptime', 'unknown'), logger.error(f"OPNsense API call for {ip} failed: {e}")
'memory': f"{data.get('mem_used', '?')}MB/{data.get('mem_total', '?')}MB",
'load_avg': data.get('load_avg', 'unknown'),
'disk': f"{data.get('disk_used', '?')}%/{data.get('disk_total', '?')}%"
}
except Exception as e:
logger.error("OPNsense API error for %s: %s", ip, e)
return ip, {'status': 'unreachable'} return ip, {'status': 'unreachable'}
def gather_clients(concurrency=5): # Gather info for given host or all hosts
results = {} def gather_host(ip=None):
with ThreadPoolExecutor(max_workers=concurrency) as executor: if ip and ip in config.HOSTS:
futures = {executor.submit(get_ssh_info if cfg['type'] == 'generic' else get_opnsense_info, ip, cfg): ip for ip, cfg in config.HOSTS.items()} cfg = config.HOSTS[ip]
for future in as_completed(futures): return [get_ssh_info(ip, cfg) if cfg['type']=='generic' else get_opnsense_info(ip, cfg)]
host = futures[future] # all hosts
try: return gather_clients()
ip, info = future.result()
except Exception as e:
logger.error("Gather error for %s: %s", host, e)
ip, info = host, {'status': 'error'}
results[ip] = info
return results
@bot.message_handler(commands=['status', 'ping']) # Ping utility
def ping_ip(ip):
res = run_cmd(['ping', '-c', '1', ip], timeout=3)
if '1 packets transmitted, 1 received' in res or '1 packets transmitted, 1 packets received' in res:
return 'reachable'
if res in ('timeout', 'error'):
return res
return 'unreachable'
# Access control decorator
def restricted(func):
def wrapper(msg, *args, **kwargs):
if msg.chat.id not in ALLOWED_CHATS:
bot.reply_to(msg, 'Unauthorized access')
return
return func(msg, *args, **kwargs)
return wrapper
# /status: show menu of available hosts
@bot.message_handler(commands=['status'])
@restricted
def handle_status(msg): def handle_status(msg):
if msg.chat.id not in ALLOWED_CHATS: keyboard = types.InlineKeyboardMarkup()
bot.reply_to(msg, 'Unauthorized access') for ip in config.HOSTS.keys():
return keyboard.add(types.InlineKeyboardButton(ip, callback_data=f'status:{ip}'))
local = get_local_info() keyboard.add(types.InlineKeyboardButton('All', callback_data='status:all'))
clients = gather_clients() bot.send_message(msg.chat.id, 'Select host for status:', reply_markup=keyboard)
lines = [
f"Local: {local['hostname']} ({local['status']})",
f"Uptime: {local['uptime']}",
f"Load Avg: {local['load_avg']}",
f"Memory:\n{local['memory']}",
f"Disk:\n{local['disk']}",
"Clients:"
]
for ip, info in clients.items():
lines.append(f"{ip}: {info.get('status', 'unknown')}")
bot.reply_to(msg, '\n'.join(lines))
# Callback handler for inline menu
@bot.callback_query_handler(func=lambda c: c.data.startswith('status:'))
@restricted
def callback_status(call):
_, key = call.data.split(':', 1)
if key == 'all':
entries = gather_clients()
else:
entries = dict(gather_host(key))
lines = []
for ip, info in entries.items():
lines.append(f"{ip}: {info.get('status','unknown')}")
if info.get('status')=='online':
for field in ('uptime','load_avg','memory','disk'):
lines.append(f" {field}: {info.get(field,'-')}")
bot.send_message(call.message.chat.id, '\n'.join(lines))
# /ping <IP>
@bot.message_handler(func=lambda m: m.text and m.text.startswith('/ping'))
@restricted
def handle_ping(msg):
parts = msg.text.split()
if len(parts) != 2:
bot.reply_to(msg, 'Usage: /ping <IP>')
return
ip = parts[1]
status = ping_ip(ip)
bot.reply_to(msg, f"Ping {ip}: {status}")
# Run polling with retry
while True: while True:
try: try:
bot.polling() bot.polling()
except Exception as e: except Exception as e:
logger.error("Polling error: %s", e) logger.error(f"Polling error: {e}")
time.sleep(5) time.sleep(5)