#!/usr/bin/python # Copyright (C) 2013 Jeff Epler # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import contextlib import fcntl import os import re import string import subprocess import sys import time import warnings def _addarg(command, flag, value, sentinel=None, transform=str): if value == sentinel: return command.extend([flag, transform(value)]) def _addargs(command, flag, values): for v in values: command.extend([flag, v]) def _addflag(command, flag, value): if not value: return command.append(flag) def _addsort(command, aflag, dflag, sort): if not sort: return if isinstance(sort, basestring): sort = sort.split() for s in sort: if s.startswith('-'): command.extend(['-S', s[1:]]) elif s.startswith('+'): command.extend(['-s', s[1:]]) else: command.extend(['-s', s]) def _addprops(command, flag, props): for k, v in props.items(): command.extend(flag, "%s=%s" % (k, v)) class ZfsError(RuntimeError): def __init__(self, command, retcode, stderr): msg = "zfs command %s failed with return code %d" % (_sq(command), retcode) if stderr: msg += "\nStderr:\n" + stderr RuntimeError.__init__(self, msg) self.stderr = stderr self.command = command def _zfserror(command, result, stderr, retcode): raise ZfsError(command, retcode, stderr.rstrip()) class ZfsWarning(UserWarning): pass def _zfswarning(stderr): warnings.warn(stderr.rstrip(), ZfsWarning) # Shell-safe quoting, used not for the shell but to print commands as they're # executed whitelist = whitelist = string.lowercase + string.uppercase + string.digits + "_+-=@%^/.,:{}" def _sq(s): if not isinstance(s, basestring): return " ".join(_sq(i) for i in s) b = s.lstrip(whitelist) if b: return s[:-len(b)] + '\'' + b.replace('\'', '\'\\\'') + '\'' else: return s def _log(cl): print >>sys.stderr, "#", _sq(cl) def _run(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, fail_ok=False): #print >>sys.stderr, "_run fail_ok?", fail_ok _log(command) s = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr) stdout, stderr = s.communicate() retcode = s.returncode if retcode != 0 and not fail_ok: _zfserror(command, stdout, stderr, retcode) elif stderr: _zfswarning(stderr) return stdout def _parse(output, columns=None): if not output: return [] if columns: rowfunc = lambda row: dict(zip(columns, row)) else: rowfunc = lambda x: x return [rowfunc(row.split('\t')) for row in output.rstrip('\n').split('\n')] def build_pool_list(pools=[], display=None): command = ['zpool', 'list', '-H'] _addarg(command, '-o', display, transform = ','.join) command.extend(pools) return command def run_pool_list(*args, **kw): fail_ok = kw.pop('fail_ok', False) command = build_pool_list(args, **kw) result = _run(command, fail_ok=fail_ok) return _parse(result, kw.get('display', None)) def build_list(roots=[], display=None, type=None, sort=None, recursive=False, depth=None): command = ['zfs', 'list', '-H'] _addarg(command, '-o', display, transform = ','.join) _addarg(command, '-t', type) _addsort(command, '-s', '-S', sort) _addflag(command, '-r', recursive) _addarg(command, '-d', depth) command.extend(roots) return command def run_list(*args, **kw): fail_ok = kw.pop('fail_ok', False) command = build_list(args, **kw) result = _run(command, fail_ok=fail_ok) return _parse(result, kw.get('display', None)) def build_send(snapshot, incremental=None, all_incremental=None, replication=False, verbose=False): command = ['zfs', 'send'] _addarg(command, '-i', incremental) _addarg(command, '-I', all_incremental) _addflag(command, '-R', replication) _addflag(command, '-v', verbose) command.append(snapshot) return command def run_send(snapshot, stdout=subprocess.PIPE, **kw): command = build_send(snapshot, **kw) result = _run(command, stdout=stdout) return result def build_receive(target, use_sent_name=False, unmounted=False, verbose=False, dry_run=False, force_rollback=False): command = ['zfs', 'receive'] _addflag(command, '-v', verbose) _addflag(command, '-u', unmounted) _addflag(command, '-n', dry_run) _addflag(command, '-F', force_rollback) _addflag(command, '-d', use_sent_name) command.append(target) return command def run_receive(target, stdin=subprocess.PIPE, **args): command = build_receive(target, **args) _run(command, stdin=stdin) def build_snapshot(snapname, recursive=False, props={}): command = ['zfs', 'snapshot'] _addflag(command, '-r', recursive) _addprops(command, '-o', props) command.append(snapname) return command def run_snapshot(snapname, fail_ok=False, **kw): command = build_snapshot(snapname, **kw) _run(command, fail_ok) def build_destroy(snapname, recursive=False): command = ['zfs', 'destroy'] _addflag(command, '-r', recursive) command.append(snapname) return command def run_destroy(snapname, recursive=False, fail_ok=False): # print >>sys.stderr, "run_destroy", fail_ok command = build_destroy(snapname, recursive=recursive) _run(command, fail_ok=fail_ok) def build_rename(src, dest, recursive=False): command = ['zfs', 'rename'] _addflag(command, '-r', recursive) command.append(src) command.append(dest) return command def run_rename(src, dest, recursive=False): command = build_rename(src, dest, recursive=recursive) _run(command) def age_snapshot(src, dest, tmp, recursive=False): print >>sys.stderr, "# age_snapshot", src, dest, tmp dest_existed = exists(dest) if not dest_existed: run_rename(src, dest, recursive) return run_destroy(tmp, recursive, fail_ok=True) run_rename(dest, tmp, recursive) try: run_rename(src, dest, recursive) except: # In case of failure, put back old destination run_rename(tmp, dest, recursive) # if this fails we're hosed raise else: run_destroy(tmp, recursive) def transceive(source, target, all_verbose=False, incremental=None, all_incremental=None, replication=False, send_verbose=False, use_sent_name=False, unmounted=False, receive_verbose=False, dry_run=False, force_rollback=False): send_command = build_send(source, incremental=incremental, all_incremental=all_incremental, replication=replication, verbose=all_verbose or send_verbose) recv_command = build_receive(target, use_sent_name = use_sent_name, unmounted=unmounted, verbose = all_verbose or send_verbose, dry_run=dry_run, force_rollback=force_rollback) mbuffer_command = ["mbuffer", "-m", "2G"] _log(send_command) send_process = subprocess.Popen(send_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=sys.stderr) _log(mbuffer_command) mbuffer_process = subprocess.Popen(mbuffer_command, stdin=send_process.stdout, stdout=subprocess.PIPE, stderr=sys.stderr) _log(recv_command) recv_process = subprocess.Popen(recv_command, stdin=mbuffer_process.stdout, stdout=sys.stdout, stderr=sys.stderr) send_process.stdin.close() send_process.stdout.close() send_process.stdout=None mbuffer_process.stdout.close() mbuffer_process.stdout=None recv_result = recv_process.wait() mbuffer_result = mbuffer_process.wait() send_result = send_process.wait() if recv_result: _zfserror(recv_command, None, None, recv_result) if mbuffer_result: _zfserror(mbuffer_command, None, None, mbuffer_result) if send_result: _zfserror(send_command, None, None, send_result) def ze(m): m = m.group(0) if m == ':': return ':.' elif m == '/': return ':-' else: return ":%02x" % ord(m) wre = re.compile( '[^' + string.lowercase + string.uppercase + string.digits + '._-]') def zq(name): return wre.sub(ze, name) @contextlib.contextmanager def ReplicatorLock(name): path = name.replace("%", "%25").replace("/", "%2f") path = os.path.join("/var/lock", path) fd = os.open(path, os.O_RDWR | os.O_CREAT) fcntl.lockf(fd, fcntl.LOCK_EX) print >>sys.stderr, "# Locked", _sq(name) try: yield finally: os.close(fd) print >>sys.stderr, "# Unlocked", _sq(name) def get_oldest_snapshot(fs): snapshots = run_list(fs, type="snapshot", depth=1, display=['name', 'creation'], sort=['creation']) if not snapshots: return None return snapshots[0]['name'].split("@")[1] def exists(name): return run_list(name, fail_ok=True) def replicate(source, target, replication=True, all_verbose=False, send_verbose=False, receive_verbose=False, dry_run=False, tag=''): # Algorithm: # Lock the source and target # Create a new replicator snapshot # Look for a prior replicator snapshot # If it exists, transceive an incremental replication stream # Otherwise, transceive a full replication stream # If successful, age the replicator snapshots # Release the lock alt_snap_name_prefix = "net.unpy.zreplicator:%s:" % zq(target) snap_name_prefix = "net.unpy.zreplicator:%s" % zq(tag) new_snap_name = "%s:%s" % (snap_name_prefix, time.time()) with ReplicatorLock(source), ReplicatorLock(target): existing_source_snaps = run_list(source, depth=1, type="snapshot", display=['name'], sort=['creation'], fail_ok=True) existing_source_snaps = [s['name'].split("@")[1] for s in existing_source_snaps] existing_source_snaps = [s for s in existing_source_snaps if s.startswith(snap_name_prefix) or s.startswith(alt_snap_name_prefix)] existing_target_snaps = run_list(target, depth=1, type="snapshot", display=['name'], sort=['creation'], fail_ok=True) existing_target_snaps = [s['name'].split("@")[1] for s in existing_target_snaps] existing_target_snaps = [s for s in existing_target_snaps if s.startswith(snap_name_prefix) or s.startswith(alt_snap_name_prefix)] best_snap = None for e in existing_source_snaps: if e in existing_target_snaps: best_snap = e old_snap_name = best_snap run_snapshot('%s@%s' % (source, new_snap_name), recursive=False, fail_ok=False) try: if old_snap_name: transceive("%s@%s" % (source, new_snap_name), target, all_incremental=old_snap_name, replication=replication, all_verbose=all_verbose, send_verbose=send_verbose, receive_verbose=receive_verbose, dry_run=dry_run, force_rollback=True) else: if exists(target): # "cannot receive new filesystem stream:" # "destination has snapshots" run_destroy(target, recursive=True) oldest_snapshot = get_oldest_snapshot(source) print >>sys.stderr, "Oldest snapshot", oldest_snapshot transceive("%s@%s" % (source, oldest_snapshot), target, replication=replication, all_verbose=all_verbose, send_verbose=send_verbose, dry_run=dry_run, receive_verbose=receive_verbose, force_rollback=True) transceive("%s@%s" % (source, new_snap_name), target, all_incremental=oldest_snapshot, replication=replication, all_verbose=all_verbose, send_verbose=send_verbose, dry_run=dry_run, receive_verbose=receive_verbose, force_rollback=True) except: run_destroy("%s@%s" % (source, new_snap_name), recursive=False) raise for e in existing_source_snaps: run_destroy("%s@%s" % (source, e), recursive=False) for e in existing_target_snaps: run_destroy("%s@%s" % (target, e), recursive=False, fail_ok=True)