dotfiles/systemd/system/timer-scripts/log_storage.py

166 lines
3.9 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import csv
import os
import re
import shutil
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from subprocess import PIPE
parser = argparse.ArgumentParser(
prog=f"log_storage",
description=f"Store storage info into a csv file",
)
parser.add_argument(
"file",
type=Path,
help=f"Path to the csv, or - for stdout",
)
args = parser.parse_args()
if str(args.file) == "-":
args.file = Path("/dev/stdout")
class Disk:
columns = [
"date",
"uuid",
"label",
"device",
"fstype",
"partition_number",
"parent_device",
"used_space", # In bytes
"size", # In bytes
]
date = None # This is set later
def __init__(self):
self.uuid = None
self.fstype = None
self.device = None
self.label = None
self.partition_number = None
self.parent_device = None
self.used_space = None
self.size = None
def as_csv(self):
return [getattr(self, c) for c in self.columns]
def read_sysfs(p):
with open(p, "r") as f:
return f.read().strip()
def mount_line_info(p):
with open("/proc/mounts", "r") as f:
for line in [l.strip().split() for l in f]:
if line[0] == str(p):
du = shutil.disk_usage(line[1])
return {
"total": du.total,
"used": du.used,
"fstype": line[2],
}
return None
def mount_info(p):
# Check if partition is already in /proc/mounts
info = mount_line_info(p)
if info is not None:
return info
# Quickly mount and unmount a partition to populate /proc/mounts
tmpd = tempfile.mkdtemp()
mount = subprocess.run(["mount", p, tmpd], stderr=PIPE)
if mount.returncode == 0:
info = mount_line_info(p)
subprocess.run(["umount", p], check=True)
os.rmdir(tmpd)
return info
else:
mount_stderr = mount.stderr.decode("utf-8")
m = re.search("unknown filesystem type '([^' ]+)'\.", mount_stderr)
if m is None:
return None
else:
return {"fstype": m[1]}
sysfs = "/sys/class/block"
uuid_path = "/dev/disk/by-uuid"
disk_paths = [Path(f"{uuid_path}/{x}") for x in os.listdir(uuid_path)]
disk_paths = list(filter(lambda x: x.is_symlink(), disk_paths))
label_path = "/dev/disk/by-label"
try:
label_paths = [Path(f"{label_path}/{x}") for x in os.listdir(label_path)]
except FileNotFoundError:
label_paths = list()
# Get label paths
device_to_label = dict()
for p in label_paths:
os.chdir(label_path)
device_to_label[str(p.readlink().resolve())] = p.name
# Build disks
disks = list()
for p in disk_paths:
disk = Disk()
os.chdir(uuid_path)
disk.device = p.readlink().resolve()
sysfs_disk = Path(f"{sysfs}/{disk.device.name}")
disk.uuid = p.name
disk.parent_device = Path("/dev") / sysfs_disk.readlink().parent.resolve().name
disk.partition_number = read_sysfs(sysfs_disk / "partition")
disk.label = device_to_label.get(str(disk.device))
info = mount_info(disk.device)
if info is not None:
disk.size = info.get("total")
disk.used_space = info.get("used")
disk.fstype = info.get("fstype")
disks.append(disk)
# Write output to csv
Disk.date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
if str(args.file) == "/dev/stdout" or not os.path.exists(args.file):
with open(args.file, "w") as csvf:
writer = csv.writer(
csvf, dialect="unix", delimiter=",", quoting=csv.QUOTE_MINIMAL
)
writer.writerow(Disk.columns)
with open(args.file, "a") as csvf:
writer = csv.writer(csvf, dialect="unix", delimiter=",", quoting=csv.QUOTE_MINIMAL)
for disk in disks:
writer.writerow(disk.as_csv())