Recipes¶
A collection of standalone, copy-paste solutions to specific problems. Each recipe focuses on a single problem and provides a minimal solution which can be adapted to real-world code. The examples are intentionally short and avoid unnecessary abstractions so that the underlying psutil APIs are easy to understand. Most of them are not meant to be used in production.
Processes¶
Finding processes¶
Find process by name:
import psutil
def find_procs_by_name(name):
ls = []
for p in psutil.process_iter(["name"]):
if p.name() == name:
ls.append(p)
return ls
A bit more advanced, check string against Process.name(),
Process.exe() and Process.cmdline():
import os, psutil
def find_procs_by_name_ex(name):
ls = []
for p in psutil.process_iter(["name", "exe", "cmdline"]):
if (
name == p.name()
or (p.exe() and os.path.basename(p.exe()) == name)
or (p.cmdline() and p.cmdline()[0] == name)
):
ls.append(p)
return ls
Find the process listening on a given TCP port:
import psutil
def find_proc_by_port(port):
for proc in psutil.process_iter():
try:
cons = proc.net_connections(kind="tcp")
except psutil.Error:
pass
else:
for conn in cons:
if conn.laddr.port == port and conn.status == psutil.CONN_LISTEN:
return proc
return None
Find all processes that have an active connection to a given remote IP:
import psutil
def find_procs_by_remote_host(host):
ls = []
for proc in psutil.process_iter():
try:
cons = proc.net_connections(kind="inet")
except psutil.Error:
pass
else:
for conn in cons:
if conn.raddr and conn.raddr.ip == host:
ls.append(proc)
return ls
Find all processes that have a given file open (useful on Windows):
import psutil
def find_procs_using_file(path):
ls = []
for p in psutil.process_iter(["open_files"]):
for f in p.open_files() or []:
if f.path == path:
ls.append(p)
break
return ls
Filtering and sorting processes¶
Processes owned by user:
import getpass, psutil
def procs_by_user(user=None):
if user is None:
user = getpass.getuser()
return [
(p.pid, p.name())
for p in psutil.process_iter(["name", "username"])
if p.username() == user
]
Processes using log files:
>>> for p in psutil.process_iter(["name", "open_files"]):
... for file in p.open_files() or []:
... if file.path.endswith(".log"):
... print("{:<5} {:<10} {}".format(p.pid, p.name()[:10], file.path))
...
1510 upstart /home/giampaolo/.cache/upstart/unity-settings-daemon.log
2174 nautilus /home/giampaolo/.local/share/gvfs-metadata/home-ce08efac.log
2650 chrome /home/giampaolo/.config/google-chrome/Default/data_reduction_proxy_leveldb/000003.log
Processes consuming more than 500M of memory:
import psutil
def procs_by_memory(min_bytes=500 * 1024 * 1024):
return [
(p.pid, p.name(), p.memory_info().rss)
for p in psutil.process_iter(["name", "memory_info"])
if p.memory_info().rss > min_bytes
]
Top N processes by cumulative CPU time:
import psutil
def top_cpu_procs(n=3):
procs = sorted(
psutil.process_iter(["name", "cpu_times"]),
key=lambda p: sum(p.cpu_times()[:2]),
)
return [(p.pid, p.name(), sum(p.cpu_times())) for p in procs[-n:]]
Top N processes by cumulative disk read + write
bytes (similar to iotop):
import psutil
def top_io_procs(n=5):
procs = []
for p in psutil.process_iter(["io_counters"]):
io = p.io_counters()
procs.append((io.read_bytes + io.write_bytes, p))
procs.sort(key=lambda x: x[0], reverse=True)
return procs[:n]
Top N processes by open file descriptors (useful for diagnosing fd leaks):
import psutil
def top_open_files(n=5):
procs = []
for p in psutil.process_iter(["num_fds"]):
procs.append((p.num_fds(), p))
procs.sort(key=lambda x: x[0], reverse=True)
return procs[:n]
Monitoring processes¶
Periodically monitor CPU and memory usage of a process using
Process.oneshot() for efficiency:
import time, psutil
def monitor(pid, interval=1):
p = psutil.Process(pid)
while p.is_running():
with p.oneshot():
cpu = p.cpu_percent()
mem = p.memory_info().rss
print("cpu={:<6} mem={}".format(str(cpu) + "%", mem / 1024 / 1024))
time.sleep(interval)
cpu=4.2% mem=23.4M
cpu=3.1% mem=23.5M
Controlling processes¶
Kill a process tree (including grandchildren):
import os, signal, psutil
def kill_proc_tree(
pid,
sig=signal.SIGTERM,
include_parent=True,
timeout=None,
on_terminate=None,
):
"""Kill a process tree (including grandchildren) with signal
"sig" and return a (gone, still_alive) tuple.
"on_terminate", if specified, is a callback function which is
called as soon as a child terminates.
"""
assert pid != os.getpid(), "I won't kill myself!"
parent = psutil.Process(pid)
children = parent.children(recursive=True)
if include_parent:
children.append(parent)
for p in children:
try:
p.send_signal(sig)
except psutil.NoSuchProcess:
pass
gone, alive = psutil.wait_procs(
children, timeout=timeout, callback=on_terminate
)
return (gone, alive)
Terminate a process gracefully, falling back to SIGKILL if it does not exit
within the timeout:
import psutil
def graceful_kill(pid, timeout=3):
p = psutil.Process(pid)
p.terminate()
try:
p.wait(timeout=timeout)
except psutil.TimeoutExpired:
p.kill()
Temporarily pause and resume a process using a context manager:
import contextlib, psutil
@contextlib.contextmanager
def suspended(pid):
p = psutil.Process(pid)
p.suspend()
try:
yield p
finally:
p.resume()
# usage
with suspended(pid):
pass # process is paused here
CPU throttle: limit a process’s CPU usage to a target percentage by alternating
Process.suspend() and Process.resume():
import time, psutil
def throttle(pid, max_cpu_percent=50, interval=0.1):
"""Slow down a process so it uses at most max_cpu_percent% CPU."""
p = psutil.Process(pid)
while p.is_running():
cpu = p.cpu_percent(interval=interval)
if cpu > max_cpu_percent:
p.suspend()
time.sleep(interval * cpu / max_cpu_percent)
p.resume()
Restart a process automatically if it dies:
import subprocess, time, psutil
def watchdog(cmd, max_restarts=None, interval=1):
"""Run cmd as a persistent process. Restart on failure, optionally
with a max restarts. Logs start, exit, and restart events.
"""
restarts = 0
while True:
proc = subprocess.Popen(cmd)
p = psutil.Process(proc.pid)
print(f"started PID {p.pid}")
proc.wait()
if proc.returncode == 0:
# success
print(f"PID {p.pid} exited cleanly")
break
# failure
restarts += 1
print(f"PID {p.pid} died, restarting ({restarts})")
if max_restarts is not None and restarts > max_restarts:
print("max restarts reached, giving up")
break
time.sleep(interval)
if __name__ == "__main__":
watchdog(["python3", "script.py"])
System¶
All APIs returning amounts (memory, disk, network I/O) express them in bytes.
This bytes2human() utility function used in the examples below converts
them to a human-readable string:
def bytes2human(n):
"""
>>> bytes2human(10000)
'9.8K'
>>> bytes2human(100001221)
'95.4M'
"""
symbols = ("K", "M", "G", "T", "P", "E", "Z", "Y")
prefix = {}
for i, s in enumerate(symbols):
prefix[s] = 1 << (i + 1) * 10
for s in reversed(symbols):
if abs(n) >= prefix[s]:
value = float(n) / prefix[s]
return "{:.1f}{}".format(value, s)
return "{}B".format(n)
CPU¶
Print real-time CPU usage percentage:
import psutil
while True:
print("CPU: {}%".format(psutil.cpu_percent(interval=1)))
CPU: 2.1%
CPU: 1.4%
CPU: 0.9%
Memory¶
Show real-time swap activity (Linux, BSD). sout (swap-out) is the
key metric: a non-zero and growing rate means the OS is moving memory from RAM
to disk because RAM is full. sin (swap-in) alone is not alarming;
it just means the system is moving previously evicted pages back into RAM. High
sin and sout together may indicate heavy swapping (thrashing).
import psutil, time
def swap_activity(interval=1):
before = psutil.swap_memory()
while True:
time.sleep(interval)
after = psutil.swap_memory()
sin = after.sin - before.sin
sout = after.sout - before.sout
print("swap-in={}/s swap-out={}/s used={}%".format(
bytes2human(sin), bytes2human(sout), after.percent))
before = after
swap-in=0.0B/s swap-out=0.0B/s used=23%
swap-in=0.0B/s swap-out=1.2M/s used=24%
Disks¶
Show real-time disk I/O:
import psutil, time
def disk_io():
while True:
before = psutil.disk_io_counters()
time.sleep(1)
after = psutil.disk_io_counters()
r = after.read_bytes - before.read_bytes
w = after.write_bytes - before.write_bytes
print("Read: {}/s, Write: {}/s".format(bytes2human(r), bytes2human(w)))
Read: 1.2M/s, Write: 256.0K/s
Read: 0.0B/s, Write: 128.0K/s
Show real-time disk utilization percentage (Linux, FreeBSD):
import psutil, time
def disk_io_percent(interval=1):
while True:
before = psutil.disk_io_counters()
time.sleep(interval)
after = psutil.disk_io_counters()
busy_ms = after.busy_time - before.busy_time
util = min(busy_ms / (interval * 1000) * 100, 100)
print("Disk: {:.1f}%".format(util))
Disk: 3.2%
Disk: 78.5%
Network¶
Show real-time network I/O per interface:
import psutil, time
def net_io():
while True:
before = psutil.net_io_counters(pernic=True)
time.sleep(1)
after = psutil.net_io_counters(pernic=True)
for iface in after:
s = after[iface].bytes_sent - before[iface].bytes_sent
r = after[iface].bytes_recv - before[iface].bytes_recv
print(
"{:<10} sent={:<10} recv={}".format(
iface, bytes2human(s) + "/s", bytes2human(r) + "/s"
)
)
print()
lo sent=0.0B/s recv=0.0B/s
eth0 sent=12.3K/s recv=45.6K/s
List all active TCP connections with their status:
import psutil
def netstat():
templ = "{:<20} {:<20} {:<13} {:<6}"
print(templ.format("Local", "Remote", "Status", "PID"))
for conn in psutil.net_connections(kind="tcp"):
laddr = "{}:{}".format(conn.laddr.ip, conn.laddr.port)
raddr = (
"{}:{}".format(conn.raddr.ip, conn.raddr.port)
if conn.raddr
else "-"
)
print(templ.format(laddr, raddr, conn.status, conn.pid or ""))
Local Remote Status PID
:::1716 - LISTEN 223441
127.0.0.1:631 - LISTEN
10.0.0.4:45278 20.222.111.74:443 ESTABLISHED 437213
10.0.0.4:40130 172.14.148.135:443 ESTABLISHED
0.0.0.0:22 - LISTEN 723345