167 lines
3.9 KiB
Python
167 lines
3.9 KiB
Python
|
#!/usr/bin/env python3
|
||
|
import argparse
|
||
|
import csv
|
||
|
import os
|
||
|
import re
|
||
|
import shutil
|
||
|
import subprocess
|
||
|
import sys
|
||
|
import tempfile
|
||
|
from datetime import datetime
|
||
|
from pathlib import Path
|
||
|
from subprocess import PIPE
|
||
|
|
||
|
parser = argparse.ArgumentParser(
|
||
|
prog=f"log_storage",
|
||
|
description=f"Store storage info into a csv file",
|
||
|
)
|
||
|
|
||
|
parser.add_argument(
|
||
|
"file",
|
||
|
type=Path,
|
||
|
help=f"Path to the csv, or - for stdout",
|
||
|
)
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
if str(args.file) == "-":
|
||
|
args.file = Path("/dev/stdout")
|
||
|
|
||
|
|
||
|
class Disk:
|
||
|
columns = [
|
||
|
"date",
|
||
|
"uuid",
|
||
|
"label",
|
||
|
"device",
|
||
|
"fstype",
|
||
|
"partition_number",
|
||
|
"parent_device",
|
||
|
"used_space", # In bytes
|
||
|
"size", # In bytes
|
||
|
]
|
||
|
|
||
|
date = None # This is set later
|
||
|
|
||
|
def __init__(self):
|
||
|
self.uuid = None
|
||
|
self.fstype = None
|
||
|
self.device = None
|
||
|
self.label = None
|
||
|
self.partition_number = None
|
||
|
self.parent_device = None
|
||
|
self.used_space = None
|
||
|
self.size = None
|
||
|
|
||
|
def as_csv(self):
|
||
|
return [getattr(self, c) for c in self.columns]
|
||
|
|
||
|
|
||
|
def read_sysfs(p):
|
||
|
with open(p, "r") as f:
|
||
|
return f.read().strip()
|
||
|
|
||
|
|
||
|
def mount_line_info(p):
|
||
|
with open("/proc/mounts", "r") as f:
|
||
|
for line in [l.strip().split() for l in f]:
|
||
|
if line[0] == str(p):
|
||
|
du = shutil.disk_usage(line[1])
|
||
|
|
||
|
return {
|
||
|
"total": du.total,
|
||
|
"used": du.used,
|
||
|
"fstype": line[2],
|
||
|
}
|
||
|
|
||
|
return None
|
||
|
|
||
|
|
||
|
def mount_info(p):
|
||
|
# Check if partition is already in /proc/mounts
|
||
|
info = mount_line_info(p)
|
||
|
|
||
|
if info is not None:
|
||
|
return info
|
||
|
|
||
|
# Quickly mount and unmount a partition to populate /proc/mounts
|
||
|
tmpd = tempfile.mkdtemp()
|
||
|
|
||
|
mount = subprocess.run(["mount", p, tmpd], stderr=PIPE)
|
||
|
|
||
|
if mount.returncode == 0:
|
||
|
info = mount_line_info(p)
|
||
|
|
||
|
subprocess.run(["umount", p], check=True)
|
||
|
os.rmdir(tmpd)
|
||
|
|
||
|
return info
|
||
|
else:
|
||
|
mount_stderr = mount.stderr.decode("utf-8")
|
||
|
m = re.search("unknown filesystem type '([^' ]+)'\.", mount_stderr)
|
||
|
|
||
|
if m is None:
|
||
|
return None
|
||
|
else:
|
||
|
return {"fstype": m[1]}
|
||
|
|
||
|
|
||
|
sysfs = "/sys/class/block"
|
||
|
|
||
|
uuid_path = "/dev/disk/by-uuid"
|
||
|
disk_paths = [Path(f"{uuid_path}/{x}") for x in os.listdir(uuid_path)]
|
||
|
disk_paths = list(filter(lambda x: x.is_symlink(), disk_paths))
|
||
|
|
||
|
label_path = "/dev/disk/by-label"
|
||
|
|
||
|
try:
|
||
|
label_paths = [Path(f"{label_path}/{x}") for x in os.listdir(label_path)]
|
||
|
except FileNotFoundError:
|
||
|
label_paths = list()
|
||
|
|
||
|
# Get label paths
|
||
|
device_to_label = dict()
|
||
|
|
||
|
for p in label_paths:
|
||
|
os.chdir(label_path)
|
||
|
device_to_label[str(p.readlink().resolve())] = p.name
|
||
|
|
||
|
# Build disks
|
||
|
disks = list()
|
||
|
|
||
|
for p in disk_paths:
|
||
|
disk = Disk()
|
||
|
|
||
|
os.chdir(uuid_path)
|
||
|
disk.device = p.readlink().resolve()
|
||
|
|
||
|
sysfs_disk = Path(f"{sysfs}/{disk.device.name}")
|
||
|
|
||
|
disk.uuid = p.name
|
||
|
disk.parent_device = Path("/dev") / sysfs_disk.readlink().parent.resolve().name
|
||
|
disk.partition_number = read_sysfs(sysfs_disk / "partition")
|
||
|
disk.label = device_to_label.get(str(disk.device))
|
||
|
|
||
|
info = mount_info(disk.device)
|
||
|
|
||
|
if info is not None:
|
||
|
disk.size = info.get("total")
|
||
|
disk.used_space = info.get("used")
|
||
|
disk.fstype = info.get("fstype")
|
||
|
|
||
|
disks.append(disk)
|
||
|
|
||
|
# Write output to csv
|
||
|
Disk.date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
|
||
|
|
||
|
if str(args.file) == "/dev/stdout" or not os.path.exists(args.file):
|
||
|
with open(args.file, "w") as csvf:
|
||
|
writer = csv.writer(
|
||
|
csvf, dialect="unix", delimiter=",", quoting=csv.QUOTE_MINIMAL
|
||
|
)
|
||
|
writer.writerow(Disk.columns)
|
||
|
|
||
|
with open(args.file, "a") as csvf:
|
||
|
writer = csv.writer(csvf, dialect="unix", delimiter=",", quoting=csv.QUOTE_MINIMAL)
|
||
|
for disk in disks:
|
||
|
writer.writerow(disk.as_csv())
|