New storage logging script

This commit is contained in:
Akemi Izuko 2023-12-23 20:14:16 -07:00
parent f0b3b67c68
commit af60b166ad
Signed by: akemi
GPG key ID: 8DE0764E1809E9FC
3 changed files with 186 additions and 0 deletions

View file

@ -0,0 +1,9 @@
[Unit]
Description=Log the current non-volatile storage info
Wants=log_storage.timer
[Service]
User=root
Group=root
Type=oneshot
ExecStart=/usr/local/bin/log_storage.py /home/emiliko/safe/loggers/storage/storage.csv

View file

@ -0,0 +1,11 @@
[Unit]
Description=Log the current non-volatile storage
Requires=log_storage.service
[Timer]
Unit=log_storage.service
OnCalendar=*-*-* 0/2:00:00
RandomizedDelaySec=10m
[Install]
WantedBy=timers.target

View file

@ -0,0 +1,166 @@
#!/usr/bin/env python3
import argparse
import csv
import os
import re
import shutil
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from subprocess import PIPE
parser = argparse.ArgumentParser(
prog=f"log_storage",
description=f"Store storage info into a csv file",
)
parser.add_argument(
"file",
type=Path,
help=f"Path to the csv, or - for stdout",
)
args = parser.parse_args()
if str(args.file) == "-":
args.file = Path("/dev/stdout")
class Disk:
columns = [
"date",
"uuid",
"label",
"device",
"fstype",
"partition_number",
"parent_device",
"used_space", # In bytes
"size", # In bytes
]
date = None # This is set later
def __init__(self):
self.uuid = None
self.fstype = None
self.device = None
self.label = None
self.partition_number = None
self.parent_device = None
self.used_space = None
self.size = None
def as_csv(self):
return [getattr(self, c) for c in self.columns]
def read_sysfs(p):
with open(p, "r") as f:
return f.read().strip()
def mount_line_info(p):
with open("/proc/mounts", "r") as f:
for line in [l.strip().split() for l in f]:
if line[0] == str(p):
du = shutil.disk_usage(line[1])
return {
"total": du.total,
"used": du.used,
"fstype": line[2],
}
return None
def mount_info(p):
# Check if partition is already in /proc/mounts
info = mount_line_info(p)
if info is not None:
return info
# Quickly mount and unmount a partition to populate /proc/mounts
tmpd = tempfile.mkdtemp()
mount = subprocess.run(["mount", p, tmpd], stderr=PIPE)
if mount.returncode == 0:
info = mount_line_info(p)
subprocess.run(["umount", p], check=True)
os.rmdir(tmpd)
return info
else:
mount_stderr = mount.stderr.decode("utf-8")
m = re.search("unknown filesystem type '([^' ]+)'\.", mount_stderr)
if m is None:
return None
else:
return {"fstype": m[1]}
sysfs = "/sys/class/block"
uuid_path = "/dev/disk/by-uuid"
disk_paths = [Path(f"{uuid_path}/{x}") for x in os.listdir(uuid_path)]
disk_paths = list(filter(lambda x: x.is_symlink(), disk_paths))
label_path = "/dev/disk/by-label"
try:
label_paths = [Path(f"{label_path}/{x}") for x in os.listdir(label_path)]
except FileNotFoundError:
label_paths = list()
# Get label paths
device_to_label = dict()
for p in label_paths:
os.chdir(label_path)
device_to_label[str(p.readlink().resolve())] = p.name
# Build disks
disks = list()
for p in disk_paths:
disk = Disk()
os.chdir(uuid_path)
disk.device = p.readlink().resolve()
sysfs_disk = Path(f"{sysfs}/{disk.device.name}")
disk.uuid = p.name
disk.parent_device = Path("/dev") / sysfs_disk.readlink().parent.resolve().name
disk.partition_number = read_sysfs(sysfs_disk / "partition")
disk.label = device_to_label.get(str(disk.device))
info = mount_info(disk.device)
if info is not None:
disk.size = info.get("total")
disk.used_space = info.get("used")
disk.fstype = info.get("fstype")
disks.append(disk)
# Write output to csv
Disk.date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
if str(args.file) == "/dev/stdout" or not os.path.exists(args.file):
with open(args.file, "w") as csvf:
writer = csv.writer(
csvf, dialect="unix", delimiter=",", quoting=csv.QUOTE_MINIMAL
)
writer.writerow(Disk.columns)
with open(args.file, "a") as csvf:
writer = csv.writer(csvf, dialect="unix", delimiter=",", quoting=csv.QUOTE_MINIMAL)
for disk in disks:
writer.writerow(disk.as_csv())