#!/usr/bin/perl -T # # Copyright (C) 2012-2016 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration # (NASA). All Rights Reserved. # # This software is distributed under the NASA Open Source Agreement # (NOSA), version 1.3. The NOSA has been approved by the Open Source # Initiative. See http://www.opensource.org/licenses/nasa1.3.php # for the complete NOSA document. # # THE SUBJECT SOFTWARE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY OF ANY # KIND, EITHER EXPRESSED, IMPLIED, OR STATUTORY, INCLUDING, BUT NOT # LIMITED TO, ANY WARRANTY THAT THE SUBJECT SOFTWARE WILL CONFORM TO # SPECIFICATIONS, ANY IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR # A PARTICULAR PURPOSE, OR FREEDOM FROM INFRINGEMENT, ANY WARRANTY THAT # THE SUBJECT SOFTWARE WILL BE ERROR FREE, OR ANY WARRANTY THAT # DOCUMENTATION, IF PROVIDED, WILL CONFORM TO THE SUBJECT SOFTWARE. THIS # AGREEMENT DOES NOT, IN ANY MANNER, CONSTITUTE AN ENDORSEMENT BY # GOVERNMENT AGENCY OR ANY PRIOR RECIPIENT OF ANY RESULTS, RESULTING # DESIGNS, HARDWARE, SOFTWARE PRODUCTS OR ANY OTHER APPLICATIONS RESULTING # FROM USE OF THE SUBJECT SOFTWARE. FURTHER, GOVERNMENT AGENCY DISCLAIMS # ALL WARRANTIES AND LIABILITIES REGARDING THIRD-PARTY SOFTWARE, IF # PRESENT IN THE ORIGINAL SOFTWARE, AND DISTRIBUTES IT "AS IS". # # RECIPIENT AGREES TO WAIVE ANY AND ALL CLAIMS AGAINST THE UNITED STATES # GOVERNMENT, ITS CONTRACTORS AND SUBCONTRACTORS, AS WELL AS ANY PRIOR # RECIPIENT. IF RECIPIENT'S USE OF THE SUBJECT SOFTWARE RESULTS IN ANY # LIABILITIES, DEMANDS, DAMAGES, EXPENSES OR LOSSES ARISING FROM SUCH USE, # INCLUDING ANY DAMAGES FROM PRODUCTS BASED ON, OR RESULTING FROM, # RECIPIENT'S USE OF THE SUBJECT SOFTWARE, RECIPIENT SHALL INDEMNIFY AND # HOLD HARMLESS THE UNITED STATES GOVERNMENT, ITS CONTRACTORS AND # SUBCONTRACTORS, AS WELL AS ANY PRIOR RECIPIENT, TO THE EXTENT PERMITTED # BY LAW. RECIPIENT'S SOLE REMEDY FOR ANY SUCH MATTER SHALL BE THE # IMMEDIATE, UNILATERAL TERMINATION OF THIS AGREEMENT. # # This program coordinates the tracking of file operations that are # performed as part of a user-initiated transfer. It provides a way for # the Shift/Mesh client to add operations, set their state, and retrieve # operations for processing. It also provides various status output to # the user upon request. require 5.009_003; use strict; use Compress::Zlib; use DB_File; use Fcntl qw(:DEFAULT :flock :mode); use File::Basename; use File::Path; use File::Spec; use File::Temp qw(tempfile); use Getopt::Long qw(:config bundling no_ignore_case require_order); use IO::File; use IO::Handle; # use embedded IPC::Open3 since versions prior to perl 5.14.0 are buggy require IPC::Open3; use List::Util qw(first max min sum); use Math::BigInt; use MIME::Base64; use Net::Ping; use POSIX qw(ceil setsid setuid strftime); use Storable qw(dclone nfreeze nstore retrieve thaw); use Symbol qw(gensym); use Text::ParseWords; our $VERSION = 0.90; # binary byte string conversions my %bibytes = ( '' => 1, K => 1024, M => 1024**2, G => 1024**3, T => 1024**4, P => 1024**5, E => 1024**6, ); # byte string conversions my %bytes = ( '' => 1, K => 1000, M => 1000**2, G => 1000**3, T => 1000**4, P => 1000**5, E => 1000**6, ); # second string conversions my %seconds = ( s => 1, m => 60, h => 60 * 60, d => 24 * 60 * 60, w => 7 * 24 * 60 * 60, ); # define default defaults my %conf = ( bandwidth_ind => "100m", bandwidth_org => "1g", bandwidth_xge => "10g", data_expire => 604800, default_buffer => "4m", default_clients => 1, default_files => "1k", 'default_find-files' => "2k", default_hosts => 1, default_local => "shift,fish,fish-tcp", default_preallocate => 0, default_remote => "shift", default_retry => 2, default_size => "4g", default_split => 0, 'default_split-tar' => "500g", default_stripe => "1g", default_threads => 4, latency_lan => 0.001, latency_wan => 0.05, local_small => "shift,fish,fish-tcp", lustre_default_stripe => 1, max_streams_lan => 8, max_streams_wan => 16, min_split => "1g", min_streams_lan => 1, min_streams_wan => 4, min_window_lan => "1m", min_window_wan => "4m", opts_bbcp => "", opts_bbftp => "", opts_gridftp => "", opts_mcp => "--double-buffer", opts_msum => "--double-buffer", opts_ssh => "", opts_ssh_secure => "", org_domains => "com|edu|gov|mil|net|org", remote_small => "shift", small_size_lan => "256m", small_size_local => "1g", small_size_wan => "64m", status_lines => 20, ); my %db; my $dbgfh; my $ilockfh; my $ulockfh; my %nload; my $localtime = localtime; my %meta; my $time = time; my %udb; # files only readable by owner unless explicitly specified umask 077; # untaint path $ENV{PATH} = "/bin:/usr/bin:/usr/local/bin"; # untaint env delete $ENV{ENV}; # unlock files at program termination END { close $dbgfh if (defined $dbgfh); close $ilockfh if (defined $ilockfh); close $ulockfh if (defined $ulockfh); } # parse options my %opts; my $rc = GetOptions(\%opts, "alive", "get", "history", "host=s", "id=s", "lock", "meta:1", "mounts", "pid=i", "put", "restart", "search=s", "shift-mgr", "state=s", "stats", "status:s", "stop", "sync", "user=s", ); die "Invalid options\n" if (!$rc || scalar(@ARGV) != 0); # parse configuration foreach my $file ("/etc/shiftrc", (getpwuid($<))[7] . "/.shiftrc") { open(FILE, '<', $file) or next; my $mline; while (my $line = ) { # strip whitespace and comments $line =~ s/^\s+|\s+$|\s*#.*//g; next if (!$line); # support line continuation operator $mline .= $line; next if ($mline =~ s/\s*\\$/ /); if ($mline =~ /^(\S+)\s+(.*)/) { $conf{$1} = $2; } $mline = undef; } close FILE; } die "The user_dir setting must be configured\n" if (!$conf{user_dir}); # opts_bbftp may have fake newlines that must become real newlines $conf{opts_bbftp} =~ s/\\n/\n/g; # process --stats immediately before setuid or $conf{user_dir} changes if ($opts{stats}) { stats(); exit; } if (defined $opts{user}) { die "Only root can specify user\n" if ($> != 0); # untaint user $opts{user} = $1 if ($opts{user} =~ /^([\w-]+)$/); # perform operations without notifying user $opts{quiet} = 1; # become user so synchronization will work correctly my $uid = getpwnam($opts{user}); setuid($uid) if (defined $uid); die "Unable to setuid to user\n" if (!defined $uid || $< != $uid || $> != $uid); } else { $opts{user} = getpwuid($<); } if (defined $opts{host}) { # untaint host $opts{host} = $1 if ($opts{host} =~ /^([\w.-]+)$/); } elsif ($opts{get} || $opts{put}) { die "No host name given\n"; } if (defined $opts{id}) { # untaint id.cid ($opts{id}, $opts{cid}) = ($1, $2) if ($opts{id} =~ /^(\d+)(\.\d+)?$/); } if (defined $opts{search}) { # unescape whitespace and special characters in search string $opts{search} = unescape($opts{search}); } # save user_dir and modify it for globbing across all users $opts{user_dir} = $conf{user_dir}; $opts{user_dir} =~ s/%u/*/g; # replace %u with user in config and make directory if necessary; if ($conf{user_dir} =~ s/%u/$opts{user}/g) { if (-e $conf{user_dir} && ! -d $conf{user_dir}) { die "$conf{user_dir} exists and is not a directory\n"; } elsif (! -d $conf{user_dir}) { # directory should be world readable for load info mkdir $conf{user_dir} or die "Cannot create user metadata directory: $!\n"; chmod(0755, $conf{user_dir}); } } $conf{udb_file} = "$conf{user_dir}/$opts{user}.db"; if ($opts{put} && !defined $opts{id}) { # lock user info open($ulockfh, '>', "$conf{user_dir}/$opts{user}.lock"); flock($ulockfh, LOCK_EX); # new transfer so create identifier and directory my @ids; my $dir = $conf{user_dir}; my $cdir; while (-d $dir) { my @dirs = glob "$dir/$opts{user}.[0-9]*"; # linux has a compiled in max of 32k subdirs so cap at 30k $cdir = $dir if (!$cdir && scalar(@dirs) < 30000); push(@ids, @dirs); $dir .= "/$opts{user}.more"; } if (!$cdir) { mkdir $dir or die "Cannot create overflow metadata directory: $!\n"; chmod(0700, $dir); $cdir = $dir; } @ids = map {substr($_, rindex($_, '.') + 1)} @ids; $opts{id} = (sort {$b <=> $a} @ids)[0]; $opts{id}++; # untaint id $opts{id} = $1 if ($opts{id} =~ /(\d+)/); $opts{base} = "$cdir/$opts{user}.$opts{id}"; mkdir $opts{base} or die "Cannot create transfer metadata directory: $!\n"; chmod(0700, $conf{base}); # unlock user info close $ulockfh; # initialize tells $meta{$_} = 0 foreach (qw(do tree rtree)); # initialize log sizes $meta{"$_\_size"} = 0 foreach (qw(do done error meta tree)); # initialize done, error, size, and total counts foreach (qw(chattr cksum cp find ln mkdir sum)) { $meta{"d_$_"} = 0; $meta{"e_$_"} = 0; $meta{"s_$_"} = 0; $meta{"t_$_"} = 0; } $meta{"e_$_"} = 0 foreach (qw(corruption exception)); # initialize run counts $meta{s_run} = 0; $meta{t_run} = 0; $meta{w_run} = 0; $meta{s_error} = 0; $meta{s_total} = 0; $meta{t_split} = 0; # initialize other items $meta{last} = 0; $meta{origin} = $opts{host}; $meta{split_id} = 0; $meta{stop} = 0; $meta{time0} = $time; # store initial metadata to file put_meta(); put_meta(\%meta); # return id print "$opts{id}\n"; } elsif ($opts{mounts}) { # replace mount info in user db while (my $line = ) { $line =~ s/\s*\r?\n$//; my %op = split(/[= ]+/, $line); # ignore malformed lines with undefined op values next if (grep(!/./, values %op)); if ($op{args} eq 'mount') { # eliminate any random double slashes that crept in $line =~ s/\/\//\//g; $udb{"mount_$op{host}:$op{local}"} = $line; } elsif ($op{args} eq 'shell') { $udb{"shell_$op{host}"} = 1; } } # store user db to file nstore(\%udb, $conf{udb_file}); # synchronize user db sync_local("$opts{user}.db") if ($conf{sync_host}); exit; } elsif (defined $opts{meta}) { $opts{meta} = 1 if ($opts{meta} <= 0); die "Identifier required\n" if (!defined $opts{id}); meta(); exit; } elsif ($opts{history}) { history(); exit; } elsif (!defined $opts{id} && defined $opts{status}) { status(); exit; } elsif ($opts{sync}) { sync_remote(); exit; } elsif (!defined $opts{id}) { die "Invalid options\n"; } else { my $dir = $conf{user_dir}; while (-d $dir) { last if (-d "$dir/$opts{user}.$opts{id}"); $dir .= "/$opts{user}.more"; } $opts{base} = "$dir/$opts{user}.$opts{id}"; } if (! -d $opts{base}) { if ($opts{get} || $opts{put}) { print "args=stop\n"; # exit with success so old crontabs fail in loop exit; } die "Invalid identifier\n"; } # prevent other processes from accessing files lock_id(); if ($opts{lock}) { # indicate ok to proceed print "OK\n"; STDIN->flush; $SIG{ALRM} = sub {exit 1}; alarm 300; # block until alarm or connection closed ; exit; } # retrieve metadata from file %meta = %{get_meta()}; $opts{meta_pack} = unpack("H*", pack("Q", $meta{meta_size})); # perform requested actions that require only metadata read access if (defined $opts{status} && $opts{state} eq 'none') { print status(); exit; } elsif (defined $opts{status}) { id_status(); exit; } elsif ($opts{restart} && !($meta{stop} || $meta{time1} && sum(map {$meta{"e_$_"}} qw(chattr cksum cp find ln mkdir sum)) > 0)) { die "Only transfers in stop or error states can be restarted\n"; } elsif ($opts{stop} && ($meta{stop} || $meta{time1})) { die "Only running transfers can be stopped\n"; } # initialize next metadata line so can detect interruption put_meta(); # initialize other items for hosts that have never been seen if (defined $opts{host} && !defined $meta{"host_$opts{host}"}) { $meta{"clients_$opts{host}"} = 1; $meta{"host_$opts{host}"} = 1; $meta{ohosts}++; } # create host-specific doing log if doesn't already exist $opts{doing} = "doing_$opts{host}$opts{cid}"; $opts{doing_log} = "$opts{base}/$opts{doing}"; if (! -f $opts{doing_log} && ($opts{get} || $opts{put})) { open(FILE, '>', $opts{doing_log}); close FILE; $meta{$opts{doing}} = 0; } # update last access time $meta{"last_$opts{host}"} = $time if ($opts{alive} || $opts{get} || $opts{put}); # track client pids to prevent inadvertent simultaneous processing if ($opts{pid}) { my $pids = "pids_$opts{host}$opts{cid}"; if ($meta{$pids} !~ /(?:^|,)$opts{pid}(?:,|$)/) { # a new process has taken over the transfer $meta{$pids} .= "," if ($meta{$pids}); $meta{$pids} .= $opts{pid}; } } # perform put separately so it can be combined with other operations put() if ($opts{put} && (!$opts{pid} || # only process puts of most recent client $meta{"pids_$opts{host}$opts{cid}"} =~ /(?:^|,)$opts{pid}$/)); # perform requested actions that require metadata write access if ($opts{stop}) { $meta{stop} = 1; $meta{time1} = $time; } elsif ($opts{restart}) { # clear counts $meta{"e_$_"} = 0 foreach (qw(chattr cksum cp find ln mkdir sum)); $meta{$_} = 0 foreach (qw(stop s_run t_run w_run)); delete $meta{time1}; delete $meta{"t0_$_"} foreach (qw(chattr cksum cp find ln mkdir sum)); delete $meta{"s0_$_"} foreach (qw(chattr cksum cp find ln mkdir sum)); # clear host/client info so clients can be respawned $meta{ohosts} = 0; delete $meta{$_} foreach (grep(/^(clients|email|host|load|os|perl|pids|shell|sleep|version|warn)_/, keys %meta)); # move all failed operations out of error back into do/tree open(DO, '>>', "$opts{base}/do"); open(TREE, '>>', "$opts{base}/tree"); open(ERROR, '+<', "$opts{base}/error"); while () { # reset number of attempts s/((^|\s)try=)\d+/${1}0/; if ($meta{'create-tar'} && /(^|\s)args=find/) { # find retries must go in tree during tar creation print TREE $_; } else { # use do for all other cases print DO $_; } } # clear error contents truncate(ERROR, 0); close ERROR; # move all running operations out of doing_* back into do/tree foreach my $file (glob "$opts{base}/doing_*") { my $log = $file; $log =~ s/.*\///; # untaint file $file = $1 if ($file =~ /^(.*)$/); open(FILE, '+<', $file); seek(FILE, $meta{$log}, 0); while (my $line = ) { $line =~ s/\s*\r?\n$//; if ($line !~ /^ /) { # record position and skip processing if already done $meta{$log} = tell FILE; next; } # record operation as done so not retried later my $tell = $meta{$log}; $meta{$log} = tell FILE; seek(FILE, $tell, 0); print FILE $opts{meta_pack}; seek(FILE, $meta{$log}, 0); # put operation in do/tree my %op = split(/[= ]+/, substr($line, 16)); delete $op{$_} foreach (qw(doing rate run time)); # do not delete hash when retrying cksum delete $op{hash} if ($op{args} !~ /^cksum/); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)) . "\n"; if ($meta{'create-tar'} && $op{args} =~ /^find/) { # find retries must go in tree during tar creation print TREE $line; } else { # use do for all other cases print DO $line; } } } close DO; close TREE; } elsif ($opts{get} && ($meta{stop} || $meta{time1} || # stop client if a new process has taken over $meta{"pids_$opts{host}$opts{cid}"} =~ /(?:^|,)$opts{pid},/)) { print "args=stop\n"; } elsif ($opts{get}) { get(); } elsif ($opts{alive}) { # host has functional cron if --alive used $meta{"cron_$opts{host}"} = 1; print "args=stop\n" if ($meta{stop} || $meta{time1}); } #TODO: option to list operations or state of each client host # send email status updates email_status() if ($meta{mail} && !$opts{quiet} && $opts{put} && $conf{email_domain}); # update log sizes foreach my $file (glob "$opts{base}/*") { next if ($file =~ /\/(?:find|lock)$/); my $log = $file; $log =~ s/.*\///; $meta{"$log\_size"} = (stat $file)[7]; } # store metadata to file put_meta(\%meta); # synchronize log files sync_local() if ($conf{sync_host}); # unlock id before cleanup close $ilockfh if (defined $ilockfh); # detach process during cleanup close STDIN; close STDOUT; close STDERR; setsid; open(STDIN, "/dev/null"); open(STDERR, ">/dev/null"); # update global load info after detach to avoid blocking on other transfers if ($opts{get} || $opts{put}) { # lock user info open($ulockfh, '>', "$conf{user_dir}/$opts{user}.lock"); flock($ulockfh, LOCK_EX); my %loaddb = eval {%{retrieve("$conf{user_dir}/$opts{user}.load")}}; if ($meta{time1}) { # remove load info for completed transfers delete $loaddb{$_} foreach (grep(/^(next_)?id_$opts{id}(\.|_)/, keys %loaddb)); } else { my $key = "id_$opts{id}$opts{cid}_$opts{host}"; my %cload = split(/[= ]+/, $meta{"load_$opts{host}$opts{cid}"}); if ($cload{ratio} == -1 && $loaddb{$key}) { # client was throttled so recompute current load my %load = split(/[= ]+/, $loaddb{$key}); my $old = $load{time}; my $new = $old + $cload{time}; delete $load{time}; # scale rates by adjusted interval $load{$_} *= $old / $new foreach (keys %load); $load{"cpu_host_$opts{host}"} += $cload{cpu} * $cload{time} / $new; $load{time} = $new; $loaddb{$key} = join(" ", map {"$_=$load{$_}"} keys(%load)); } elsif ($loaddb{"next_$key"}) { my %load = split(/[= ]+/, $loaddb{"next_$key"}); $cload{time} = 1 if (!$cload{time}); # convert sizes to MB/s and scale by actual/estimated ratio $load{$_} = $cload{ratio} * $load{$_} / 1E6 / $cload{time} foreach (keys %load); $load{"cpu_host_$opts{host}"} = $cload{cpu}; $load{time} = $cload{time}; $loaddb{$key} = join(" ", map {"$_=$load{$_}"} keys(%load)); } # update next load with fs/host info collected in get() $loaddb{"next_$key"} = join(" ", map {"$_=$nload{$_}"} keys(%nload)) if (scalar(keys %nload)); } nstore(\%loaddb, "$conf{user_dir}/$opts{user}.load"); chmod(0644, "$conf{user_dir}/$opts{user}.load"); sync_local("$opts{user}.load") if ($conf{sync_host}); # unlock user info close $ulockfh; } # remove status directories older than expiration time my $more; while (-d "$conf{user_dir}/$more") { foreach my $dir (glob "$conf{user_dir}/$more$opts{user}.[0-9]*") { # untaint dir (should be user.id under base+more user directory) $dir = $1 if ($dir =~ /^(\Q$conf{user_dir}\E\/\Q$more\E\Q$opts{user}\E\.\d+)$/); my $id = $dir; $id =~ s/.*\.//; # do not remove directory associated with this manager invocation next if ($id == $opts{id}); my $mtime = (stat("$dir/meta"))[9]; if ($mtime + $conf{data_expire} < $time) { rmtree($dir); # synchronize deleted directory sync_local("$more$opts{user}.$id") if ($conf{sync_host}); } } $more .= "$opts{user}.more/"; } #################### #### build_find #### #################### # build tied db of processed directories from entries in tree log sub build_find { # remove old db unlink "$opts{base}/find"; my %find; tie(%find, 'DB_File', "$opts{base}/find", O_RDWR | O_CREAT, 0600); if (open(TREE, '<', "$opts{base}/tree")) { while () { s/\s*\r?\n$//; my %op = split(/[= ]+/, $_); my @args = split(/,/, $op{args}); # only initial finds are used during reconstruction next if ($args[0] ne 'find' || defined $op{try}); $find{unescape($args[1])} = 1; } close TREE; $find{t_find} = $meta{t_find}; } untie %find; #TODO: error handling if cannot tie or open tree } ##################### #### debug_print #### ##################### # print given text from get to stdout and mirror to file if debugging enabled sub debug_print { my $type = shift; if ($conf{debug} || $conf{"debug_$opts{user}"}) { # open user-specific debug file if not already open open($dbgfh, '>>', "$conf{user_dir}/$opts{user}.debug") if (!$dbgfh); print $dbgfh "$localtime $opts{host} $opts{id}$opts{cid} $type "; print $dbgfh $_ foreach (@_); } if ($type eq 'GET') { print $_ foreach (@_); } } ######################## #### default_select #### ######################## # return random host whose sshd is pingable from set of given hosts sub default_select { # choose original host if available my $host = shift; my @hosts = @_; my $np = Net::Ping->new('tcp', 1); $np->port_number(22); do { # pick random host $host = splice(@hosts, rand @hosts, 1); # check availability via tcp ping to ssh port return $host if ($np->ping($host)); } while ($host); return undef; } ###################### #### email_status #### ###################### # send invoking user email with current status sub email_status { # obtain status by parsing status() output my $table = status(); my @rows = split(/\n/, $table); my @cols = split(/\s*\|\s*/, $rows[3]); my $state0 = $cols[1]; my $state = $state0; # ignore warnings when --sync used $state =~ s/\+warn// if ($meta{sync}); $state =~ s/run\+//; # abort if running or have sent this message type before return if ($state eq 'run' || $meta{"email_$state0"}); # show original command so will be correct for user's installation my $ucmd = $meta{command}; # customized escape to allow ' ', ':', '=', and '\' $ucmd =~ s/([^A-Za-z0-9\- :=\\_.!~*'()\/])/sprintf("%%%02X", ord($1))/eg; # limit length of command line for performance/usability my $dindex = rindex($ucmd, " "); $ucmd = substr($ucmd, 0, rindex($ucmd, " ", 1024)) . "..." . substr($ucmd, $dindex) if ($dindex > 1024); my $cmd = $ucmd; $cmd =~ s/(^\S*(?:\s+|\/)shiftc?[^\s\/]*)(?:\s|$).*/$1/; # use simple html pre wrapper so will show correctly on html/text clients my $msg = "
\n";
    $msg .= "#" x length($ucmd);
    $msg .= "\n$ucmd\n";
    $msg .= "#" x length($ucmd);

    # status table is always shown
    $msg .= "\n\n$table";
    # record email type to prevent duplicate emails
    $meta{"email_$state0"} = $time;
    if ($state =~ s/throttle/throttled/) {
        $msg .= "\n\nThis transfer is being throttled based on user or admin-specified";
        $msg .= "\nresource limits to preserve the stability of the environment.";
        $msg .= "\nIt will continue at a rate reduced in proportion to the load it is";
        $msg .= "\ngenerating until system load decreases to configured thresholds.";
    }
    if ($state =~ s/warn/warning/) {
        my $stable = id_status('warn');
        if (($stable =~ tr/\n/\n/) == 23) {
            # subset of warnings
            $msg .= "\n\nThe first 10 warnings encountered are shown below.";
            $msg .= "\nTo show the complete set, run the following:\n\n";
            $msg .= "    $cmd --id=$opts{id} --status --state=warn";
        } else {
            # all warnings
            $msg .= "\n\nThe set of the warnings encountered is shown below.";
        }
        $msg .= "\n\nThese operations will be retried automatically and may";
        $msg .= "\nstill complete successfully.  To stop this transfer";
        $msg .= "\nwithout retrying these operations, run the following:\n\n";
        $msg .= "    $cmd --id=$opts{id} --stop\n";
        $msg .= "\n\n" . $stable . "\n\n";
    }
    if ($state =~ /error/) {
        my $stable = id_status('error');
        if (($stable =~ tr/\n/\n/) == 23) {
            # subset of errors
            $msg .= "\n\nThe first 10 errors encountered are shown below.";
            $msg .= "\nTo show the complete set, run the following:\n\n";
            $msg .= "    $cmd --id=$opts{id} --status --state=error";
        } else {
            # all errors
            $msg .= "\n\nThe set of the errors encountered is shown below.";
        }
        if ($state0 =~ /run/) {
            $msg .= "\n\nThis transfer will continue to run until all remaining";
            $msg .= "\noperations have been attempted.  To stop this transfer";
            $msg .= "\nwithout attempting the remainder, run the following:\n\n";
            $msg .= "    $cmd --id=$opts{id} --stop\n";
        } else {
            $msg .= "\n\nTo retry the failed/incomplete portions of this ";
            $msg .= "transfer,\nrun the following on $meta{origin} ";
            $msg .= "(or equivalent):\n\n";
            $msg .= "    $cmd --id=$opts{id} --restart\n";
        }
        $msg .= "\n\n" . $stable . "\n\n";
    }
    $msg .= "
\n"; # send message using server on localhost require Mail::Sendmail; Mail::Sendmail::sendmail( Smtp => 'localhost', From => "$opts{user}\@$conf{email_domain}", To => "$opts{user}\@$conf{email_domain}", Subject => "shift transfer $opts{id} $state", Message => $msg, 'Content-Type' => "text/html", ); } ################ #### escape #### ################ # return uri-escaped version of given string sub escape { my $text = shift; $text =~ s/([^A-Za-z0-9\-\._~\/])/sprintf("%%%02X", ord($1))/eg if (defined $text); return $text; } ######################## #### format_seconds #### ######################## # return human-readable time output for given number of seconds sub format_seconds { my $rem = shift; my $secs; foreach my $unit (sort {$seconds{$b} <=> $seconds{$a}} keys(%seconds)) { # keep dividing by largest unit my $div = int($rem / $seconds{$unit}); $rem %= $seconds{$unit}; # concatenate each result if ($opts{status} eq 'pad') { if ($unit eq 'd' || $unit eq 'w') { $secs .= sprintf("%0d$unit", $div); } else { $secs .= sprintf("%02d$unit", $div); } } else { $secs .= "$div$unit" if ($div); } } return $secs ? $secs : ($opts{status} eq 'pad' ? "00s" : "0s"); } ###################### #### format_bytes #### ###################### # return human-readable size output for given number of bytes sub format_bytes { my $nbytes = shift; my $empty_zero = shift; return "" if (!$nbytes && $empty_zero); foreach my $unit (sort {$bytes{$b} <=> $bytes{$a}} keys(%bytes)) { if (abs $nbytes >= $bytes{$unit}) { # use 3 significant digits in fixed/scientific notation with unit return sprintf("%.3g$unit\B", $nbytes / $bytes{$unit}); } } # use 1 significant digit for fractional values return sprintf("%.1f\B", $nbytes) if ($nbytes < 1); # use 3 significant digits in fixed/scientific notation without unit return sprintf("%.3g\B", $nbytes); } ############# #### get #### ############# # output a set of operations for the invoking client to process sub get { if ($meta{"host_$opts{host}"} == 1) { $meta{"host_$opts{host}"} = 2; # host retrieving operations so reduce outstanding hosts $meta{ohosts}--; } # retrieve global database from file eval { local $SIG{__WARN__} = sub {die}; %db = %{retrieve($conf{db_file})}; }; if ($@) { # database could not be opened %db = (); } # retrieve user database from file eval { local $SIG{__WARN__} = sub {die}; %udb = %{retrieve($conf{udb_file})}; }; if ($@) { # database could not be opened %udb = (); } if ($meta{"warn_$opts{host}$opts{cid}"} == 1) { # use exponential backoff my $sleep = 1 << $meta{"sleep_$opts{host}$opts{cid}"}; $sleep = 10 + int(rand($sleep)) * 60; # wait for more files or for transfer to be done debug_print('GET', "args=sleep,$sleep\n"); # keep doubling sleep time up to an hour $meta{"sleep_$opts{host}$opts{cid}"}++ if ($meta{"sleep_$opts{host}$opts{cid}"} < 6); return; } elsif ($meta{"warn_$opts{host}$opts{cid}"} == 0) { # progress has been made so reset sleep timer $meta{"sleep_$opts{host}$opts{cid}"} = 0; } # throttle if load beyond given resource limits my $sleep = throttle(); if ($sleep > 0) { debug_print('GET', "args=sleep,$sleep\n"); $meta{"throttle_$opts{host}$opts{cid}"} += $sleep; $meta{"throttled_$opts{host}$opts{cid}"} = 1; #TODO: do something with throttle info in stats return; } else { delete $meta{"throttled_$opts{host}$opts{cid}"}; } # send static options first foreach (qw(check create-tar cron dereference exclude extract-tar find-files ignore-times include index-tar newer no-stripe offline older ports preallocate preserve secure stripe sync verify verify-fast)) { debug_print('GET', "args=getopt,$_ text=", escape($meta{$_}), "\n") if (defined $meta{$_}); } foreach (qw(lustre_default_stripe)) { debug_print('GET', "args=getopt,$_ text=", escape($conf{$_}), "\n") if (defined $conf{$_}); } # determine logs to process my @logs = ("tree", $opts{doing}); my %sizes = ($opts{doing} => (stat $opts{doing_log})[7]); # add operations from hosts that have cron and have timed out foreach my $cron (grep(/^cron_/, keys %meta)) { my $host = substr($cron, 5); if ($meta{"last_$host"} + 1800 < $time) { # idle for 30 minutes if ($host ne $opts{host}) { push(@logs, "doing_$host"); # add logs for extra clients foreach my $i (2..$meta{clients}) { push(@logs, "doing_$host.$i"); } } } } # only process other logs during tar creation when tar_creat chattrs done push(@logs, qw(do rtree)) if (!$meta{'create-tar'} || $meta{last} && $meta{d_chattr} >= $meta{tar_creat}); open(DOING, '>>', $opts{doing_log}); open(ERROR, '>>', "$opts{base}/error"); #TODO: need error if cannot be opened my ($size, $files, $ops, $all); my (%diskfs, %localfs, %rtthost); LOG: foreach my $log (@logs) { # process dir attrs last by themselves last if ($log eq 'rtree' && (!$meta{last} || $meta{t_run} || $ops || (!$meta{check} && !$meta{preserve}))); my $fh; if ($log eq 'rtree') { $fh = IO::File->new("$opts{base}/tree"); } elsif ($log =~ /^doing_/) { # need read/write access to doing log $fh = IO::File->new("$opts{base}/$log", 'r+'); } else { $fh = IO::File->new("$opts{base}/$log"); } $fh->seek($meta{$log}, 0); next if (!defined $fh); #TODO: need error if cannot be opened or seeked my $line; while ($size < $meta{size} && $all < $meta{files} && ($log ne 'rtree' && defined($line = $fh->getline) || $log eq 'rtree' && defined($line = last_line($fh)))) { $line =~ s/\s*\r?\n$//; # first line of rtree will be blank next if (!$line); # prevent loop of current host doing log next LOG if (defined $sizes{$log} && $fh->tell > $sizes{$log}); if ($log =~ /^doing_/) { if ($line !~ /^ /) { # record position and skip processing if already done $meta{$log} = $fh->tell; next; } $line = substr($line, 16); } my %op = split(/[= ]+/, $line); my @args = split(/,/, $op{args}); my $cmd = shift @args; my $save_arg = $args[-1]; # only mkdirs are used for dir chattrs next if ($cmd ne 'mkdir' && $log eq 'rtree'); if ($log =~ /^doing_/) { # this operation was originally not completed so record failure $meta{s_run} -= $op{size}; $meta{t_run}--; $op{text} = escape("Host or process failure"); if ($op{try} >= $meta{retry}) { # record as error and abort my $tell = $meta{$log}; $meta{$log} = $fh->tell; $fh->seek($tell, 0); # mark operation as done in case dead host reports in $fh->print($opts{meta_pack}); $fh->seek($meta{$log}, 0); $line =~ s/(^|\s)text=\S+//; print ERROR $line, " text=$op{text}\n"; $meta{s_error} += $op{size}; $meta{"e_$cmd"}++; $meta{"e_$op{tool}"}++; $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); next; } else { # record as warning and retry $op{try}++; $meta{w_run}++; $op{state} = "warn"; } } for (my $i = 0; $i < scalar(@args); $i++) { # skip arg 1 of ln/ln chattr since it's a name and not a file next if (!$i && ($cmd eq 'ln' || $cmd eq 'chattr' && $op{ln})); my $ref = {}; # write access needed for last arg of chattr/cp/find/ln/mkdir my $ref->{rw} = $cmd =~ /^(?:chattr|cp|find|ln|mkdir)/ && $i == scalar(@args) - 1 ? 1 : 0; if ($args[$i] !~ /^\//) { # do not map remote tar dst to prevent size/split corruption next if ($i == 1 && $cmd eq 'find' && $meta{'create-tar'}); $args[$i] = map_remote($opts{host}, $args[$i], $ref); last LOG if (!defined $args[$i]); my $host = $args[$i] =~ /^([^\/:]+)%3A/ ? $1 : "localhost"; if ($host ne 'localhost' && !defined $rtthost{$host}) { # determine if already have round-trip time for domain my $dn = $host; $dn =~ s/^[^.]+.//; # only include hosts in new domains $rtthost{$host} = $meta{"rtt_$dn"} ? -1 : 1; } if ($cmd eq 'cp' && $i == 1 && $ref->{local}) { # store target file systems for disk throttling $diskfs{"$host:$ref->{local}"} = "$ref->{servers}:$ref->{remote}"; } } else { my $new = map_local($op{host}, $args[$i], $opts{host}, $ref); if (defined $new) { $args[$i] = $new; next if (!$ref->{local}); $localfs{$ref->{local}} |= $ref->{rw}; if ($cmd eq 'cp' && $i == 1) { # store target file systems for disk throttling $diskfs{"localhost:$ref->{local}"} = "$ref->{servers}:$ref->{remote}"; } } elsif ($meta{"host_$opts{host}"}) { # host does not have access to appropriate # file system and host name hasn't changed last LOG; } } # store file system type/options my $loc = $i == 0 && scalar(@args) > 1 ? "src" : "dst"; $op{"${loc}fs"} = "$ref->{type},$ref->{opts}" if ($ref->{type}); # store amount of data read/written from/to host/file systems if ($cmd eq 'cp' || $cmd eq 'sum' && $i == 0 || $cmd eq 'cksum' && $i == 1) { if ($cmd eq 'cp' && $i == 1) { $nload{"iow_fs_$ref->{servers}:$ref->{remote}"} += $op{size}; $nload{"iow_host_$ref->{host}"} += $op{size}; if ($args[0] !~ /^\// || $args[1] !~ /^\//) { # remote transfer so record network load $nload{"netr_host_$ref->{host}"} += $op{size}; } } else { $nload{"ior_fs_$ref->{servers}:$ref->{remote}"} += $op{size}; $nload{"ior_host_$ref->{host}"} += $op{size}; if ($cmd eq 'cp' && ($args[0] !~ /^\// || $args[1] !~ /^\//)) { # remote transfer so record network load $nload{"netw_host_$ref->{host}"} += $op{size}; } } } } # never count mkdir or chattr on dir against the file total $files++ if (scalar(@args) > 1); $size += $op{size} if ($cmd =~ /^(?:cp|sum|cksum)/); $ops++ if ($log ne 'rtree'); $all++; $meta{s_run} += $op{size} if ($op{size}); $meta{t_run}++; $meta{w_run}-- if ($op{state} eq 'warn'); # record time of first instance of each operation type if (!defined $meta{"t0_$cmd"} && # operations should be timed from end of previous stage ($cmd ne 'sum' || $meta{d_cp} == $meta{t_cp}) && ($cmd ne 'cksum' || $meta{d_sum} == $meta{t_sum}) && ($cmd ne 'chattr' || $meta{d_cksum} == $meta{t_cksum})) { $meta{"t0_$cmd"} = $time; # record size already done $meta{"s0_$cmd"} = $meta{($cmd eq 'chattr' ? "d_" : "s_") . $cmd}; if ($cmd eq 'cp' && $meta{'verify-fast'}) { # sum operations begin with copies when --verify-fast enabled $meta{t0_sum} = $time; $meta{s0_sum} = 0; } } my $tell = $meta{$log}; $meta{$log} = $fh->tell; if ($log =~ /^doing_/) { $fh->seek($tell, 0); # mark operation as done in case dead host reports in $fh->print($opts{meta_pack}); $fh->seek($meta{$log}, 0); } # rtree is for preserving directory attributes $cmd = "chattr" if ($log eq 'rtree'); # rejoin mapped arguments $op{args} = join(",", $cmd, @args); $op{host} = $opts{host}; $op{run} = $time; $op{doing} = tell DOING; delete $op{state}; delete $op{tool}; # dynamically insert tar_last for last record of last tar split if ($meta{'create-tar'} && $meta{"tar_last_$save_arg"} > 0) { my ($t1, $t2) = split(/-/, $op{tar_bytes}); if ($meta{"tar_last_$save_arg"} < $t2 + 512) { $op{tar_last} = 1; # only insert very first time (will be propagated) delete $meta{"tar_last_$save_arg"}; } } my $get = join(" ", map {"$_=$op{$_}"} sort(keys %op)) . "\n"; print DOING " " x 16, $get; debug_print('GET', $get); } $fh->close; } my $errs = sum(map {$meta{"e_$_"}} qw(chattr cksum cp find ln mkdir sum)); if (!$errs && $meta{last} && !$all && !$meta{t_run} && $meta{tar_mv}) { foreach my $file (grep(/^tar_split_/, keys %meta)) { next if ($meta{$file} != 1); $file =~ s/^tar_split_//; # dynamically insert tar_mv as final op during single split my $get = "args=chattr,$file-1.tar host=$opts{host} size=0"; $get .= " tar_mv=" . ($meta{"tar_nosum_$file"} ? 1 : 2); $get .= " run=$time doing=" . (tell DOING) . "\n"; print DOING " " x 16, $get; debug_print('GET', $get); $all++; $meta{t_run}++; } } elsif (!$all) { if ($meta{last} && !$meta{t_run}) { # no retries and none running so stop debug_print('GET', "args=stop\n"); $meta{"sleep_$opts{host}$opts{cid}"} = 0; } else { # use exponential backoff my $sleep = 1 << $meta{"sleep_$opts{host}$opts{cid}"}; $sleep = 10 + int(rand($sleep)) * 60; # wait for more files or for transfer to be done debug_print('GET', "args=sleep,$sleep\n"); # keep doubling sleep time up to an hour $meta{"sleep_$opts{host}$opts{cid}"}++ if ($meta{"sleep_$opts{host}$opts{cid}"} < 6); } } close DOING; close ERROR; return if (!$all); # send potentially dynamic options last foreach (keys %diskfs) { # send target file systems for disk throttling debug_print('GET', "args=getopt,disk_$_ text=$diskfs{$_}\n"); } # these could potentially be dynamic in the future foreach (qw(buffer threads)) { debug_print('GET', "args=getopt,$_ text=", $meta{$_}, "\n") if ($meta{$_}); } # send individual transport options foreach (qw(bbcp bbftp gridftp mcp msum), $meta{secure} ? "ssh_secure" : "ssh") { my $val = $conf{"opts_$_"}; next if (!$val); debug_print('GET', "args=getopt,opts_$_ text=", escape($val), "\n"); } # attempt to determine type of transfer (i.e. local/lan/wan) my ($net_dn, $net_rtt, $net_type) = ($opts{host}, 0, "wan"); $net_dn =~ s/^[^.]+\.//; foreach my $host (keys %rtthost) { # transfer is on lan if domain of invoking host matches target domain $net_type = "lan" if ($net_dn && $host =~ /\Q$net_dn\E$/); # find latency for associated domain my $dn = $host; $dn =~ s/^[^.]+\.//; $net_rtt = $meta{"rtt_$dn"} if ($meta{"rtt_$dn"}); # negative values are for rtt/type calculations only next if ($rtthost{$host} == -1); # send remote hosts for latency measurements debug_print('GET', "args=getopt,rtt_$host\n"); } $net_rtt = parse_bytes($conf{"latency_$net_type"}) if (!$net_rtt); my $net_bw = $meta{bandwidth}; if (!$net_bw) { # set default bandwidth based on xge availability or host domain my $type = $meta{"xge_$opts{host}"} ? "xge" : ($net_dn =~ /\.(?:$conf{org_domains})$/ ? "org" : "ind"); $net_bw = parse_bytes($conf{"bandwidth_$type"}); } my $net_win = $meta{window}; if (!$meta{window}) { # set default window to BDP $net_win = int($net_bw * $net_rtt / 8); # make sure default window is less than max window $net_win = min($net_win, $meta{"tcpwin_$opts{host}"}) if ($meta{"tcpwin_$opts{host}"}); # make sure default window is greater than configured minimum $net_win = max($net_win, parse_bytes($conf{"min_window_$net_type"})); } debug_print('GET', "args=getopt,window text=$net_win\n"); my $net_ns = $meta{streams}; if (!$meta{streams}) { # set default streams to number of max windows needed to consume bw $net_ns = int($net_bw * $net_rtt / 8 / $meta{"tcpwin_$opts{host}"}) if ($meta{"tcpwin_$opts{host}"} && $net_win >= $meta{"tcpwin_$opts{host}"}); $net_ns = int($net_bw * $net_rtt / 8 / $net_win) if ($net_win < $meta{"tcpwin_$opts{host}"}); # make sure default streams is less than configured maximum $net_ns = min($net_ns, $conf{"max_streams_$net_type"}); # make sure default streams is greater than configured minimum $net_ns = max($net_ns, parse_bytes($conf{"min_streams_$net_type"})); } debug_print('GET', "args=getopt,streams text=$net_ns\n"); # send local/remote transport selections based on average file size debug_print('GET', "args=getopt,local text=", # use given transport if specified $meta{local} ? $meta{local} : # optimize for small files if avg file size less than defined size ($size / ($files + 1) < parse_bytes($conf{small_size_local}) ? $conf{local_small} : $conf{default_local}), "\n"); debug_print('GET', "args=getopt,remote text=", # use given transport if specified $meta{remote} ? $meta{remote} : # optimize for small files if avg file size less than defined size ($size / ($files + 1) < parse_bytes($conf{"small_size_$net_type"}) ? $conf{remote_small} : $conf{default_remote}), "\n"); if (grep(/^host_/, keys %meta) < $meta{hosts} || $meta{"clients_$opts{host}"} < $meta{clients}) { # run client on other/same hosts if there are enough files my $qfiles = $meta{t_split} - $meta{t_run}; $qfiles += $meta{"t_$_"} - $meta{"d_$_"} - $meta{"e_$_"} foreach (qw(chattr cksum cp find ln mkdir sum)); my $qsize = $meta{s_total} - $meta{s_run} - $meta{s_error}; $qsize += 2 * $meta{s_total} if ($meta{verify}); $qsize -= $meta{"s_$_"} foreach (qw(cksum cp sum)); my $nclients; if ($qsize * $meta{files} > $qfiles * $meta{size}) { # queue avg size per file greater than limit avg size per file # estimate nclients based on queue sizes $nclients = 1.0 * $qsize / $meta{size}; # don't use more hosts than number of files $nclients = $qfiles if ($nclients > $qfiles); } else { # queue avg size per file less than limit avg size per file # estimate nclients based on queue files $nclients = 1.0 * $qfiles / $meta{files}; } # reduce by outstanding hosts $nclients -= $meta{ohosts}; if ($nclients > 0 && scalar(keys %localfs) > 0) { my %hosts; if ($meta{'host-list'}) { # use given host list $hosts{$_} = {} foreach (split(/,/, $meta{'host-list'})); } else { # find accessible hosts based on global/user db $hosts{substr($_, 6)} = {} foreach (grep(/^shell_/, keys %meta)); $hosts{substr($_, 6)} = {} foreach (grep(/^shell_/, keys %db)); $hosts{substr($_, 6)} = {} foreach (grep(/^shell_/, keys %udb)); } # determine potential hosts foreach my $fs (keys %localfs) { foreach my $host (keys %hosts) { if ($meta{"host_$host"} || $meta{"nohost_$host"} || !$meta{'host-list'} && !map_local($opts{host}, $fs, $host, {rw => $localfs{$fs}})) { # remove hosts without local file system access delete $hosts{$host}; } } } while ($nclients > 0 && scalar(keys %hosts) > 0 && grep(/^host_/, keys %meta) < $meta{hosts}) { my $host; if (defined $conf{select_hook}) { # select host using configured selection hook my ($fh, $file) = tempfile(UNLINK => 1); close $fh; nstore(\%hosts, $file); # invoke configured selection hook #TODO: remove extra shell spawn $host = open3_get([-1, undef, -1], "$conf{select_hook} $opts{host} $opts{host} $file"); } # select host using random selection policy $host = (keys %hosts)[rand(keys %hosts)] if (!$host); $host =~ s/\s*\r?\n$//; delete $hosts{$host}; debug_print('GET', "args=host,$host\n"); $meta{"host_$host"} = 1; $meta{"clients_$host"} = 1; $meta{ohosts}++; $nclients--; } } # spawn extra clients on invoking host if enough work remains while ($nclients > 0 && $meta{"clients_$opts{host}"} < $meta{clients}) { $nclients--; debug_print('GET', "args=client,$opts{id}.", ++$meta{"clients_$opts{host}"}, "\n"); } } } ################## #### get_meta #### ################## # return (and possibly revert to) last validated metadata from given meta file sub get_meta { my $mfile = shift; my $past = shift; my $mtell; if (!defined $mfile) { $mfile = "$opts{base}/meta"; $mtell = 0; } my $meta; my $fh; open($fh, '<', $mfile); seek($fh, -1, 2); while (1) { # find line starting with '[' and ending with ']', indicating valid line my $line = last_line($fh); last if (!defined $line); $meta = substr($line, 1, -1); last if (substr($line, 0, 1) eq '[' && substr($line, -1, 1) eq ']' && (!defined $past || !--$past)); $meta = undef; $mtell = $fh->tell + 1 if (defined $mtell); } close $fh; if ($meta) { # meta lines are serialized, compressed, and base64 encoded my $zmeta64 = decode_base64($meta); my $zmeta = uncompress($zmeta64); $meta = thaw($zmeta); } if ($meta && defined $mtell && $mtell > 0) { # metadata corrupted so revert to last known good state foreach my $file (glob "$opts{base}/*") { next if ($file =~ /\/(?:find|lock|meta)$/); my $log = $file; $log =~ s/.*\///; my $size = defined $meta->{"$log\_size"} ? $meta->{"$log\_size"} : 0; # untaint file $file = $1 if ($file =~ /^(.*)$/); # truncate all logs to last known good size truncate($file, $size); if ($log =~ /^doing_/ && defined $meta->{"$log\_size"}) { # undo all operations that were processed after last good state open(FILE, '+<', $file); my $tell0 = $meta->{"$log\_size"}; seek(FILE, $tell0, 0); while () { my $tell = tell FILE; my $msize = substr($_, 0, 16); $msize = unpack("Q", pack("H*", $msize)); if ($msize >= $meta->{meta_size}) { seek(FILE, $tell0, 0); print FILE " " x 16; seek(FILE, $tell, 0); } $tell0 = $tell; } close FILE; } } # rebuild find db since it may contain reverted operations build_find() if ($meta{dereference} && !$meta{'extract-tar'}); # truncate last in case any other operations interrupted truncate($mfile, $mtell); } #TODO: handle errors; return $meta; } ################# #### history #### ################# # output table of hosts and commands for invoking user sub history { require Text::FormatTable; # configure table headers my $t = Text::FormatTable->new('r | l | l'); $t->head(qw(id origin command)); $t->rule; # sort by modification time of meta file my @metas; my $dir = $conf{user_dir}; while (-d $dir) { push(@metas, glob "$dir/$opts{user}.*/meta"); $dir .= "/$opts{user}.more"; } foreach my $file (sort {(stat $a)[9] <=> (stat $b)[9]} @metas) { my $id = $file; $id =~ s/.*\.|\/meta//g; next if (defined $opts{id} && $opts{id} != $id); # retrieve metadata from file %meta = %{get_meta($file)}; # ignore rows that do not match optional search next if ($opts{search} && join(" ", $meta{origin}, $meta{command}) !~ qr/$opts{search}/); # add row for each transfer my $cmd = $meta{command}; # limit length of command line for performance/usability my $dindex = rindex($cmd, " "); $cmd = substr($cmd, 0, rindex($cmd, " ", 1024)) . "..." . substr($cmd, $dindex) if ($dindex > 1024); $t->row($id, "$meta{origin}\n[$meta{cwd}]", $cmd); } # output final table print $t->render; } ################### #### id_status #### ################### # output detailed table of all relevant operations in current transfer or # return subset of table in given state sub id_status { my $state = shift; my $nrows = 10000; my $once = 0; if (defined $state) { # this is used in email_status() to send a subset of errors/warnings $nrows = 10; $once = 1; } else { # user only wants items in a particular state $state = $opts{state}; } require Text::FormatTable; my $t0 = Text::FormatTable->new('l | l | l | r | r | r | r'); # target is the same for all files during tar creation so use source my @row = (qw(state op), $meta{'create-tar'} ? "source" : "target", qw(size date length rate)); my @row2 = ("", "tool", "info", "", "time", "", ""); if ($opts{status} eq 'csv') { print join(",", @row, @row2), "\n"; } else { $t0->head(@row); $t0->head(@row2); $t0->rule; } my $rows = 0; my $t = dclone($t0); if (!$state || $state =~ /^(?:queue|warn)$/) { # queued/warn operations are found in the do log open(FILE, '<', "$opts{base}/do"); seek(FILE, $meta{do}, 0); while () { chomp; # unescape colons in remote paths s/%3A/:/g; my %op = split(/[= ]+/); my @args = split(/,/, $op{args}); # ignore rows that do not match optional search next if ($opts{search} && join(" ", @args) !~ qr/$opts{search}/); # target is the same for all files during tar creation so use source $args[-1] = $op{tar_name} if ($meta{'create-tar'}); my $size = "-"; $size = format_bytes($op{size}) if ($args[0] =~ /^(?:cksum|cp|sum)/); if ($op{state} eq 'warn' && (!$state || $state eq 'warn')) { # add first row for each operation with bulk of info @row = ("warn", $args[0], $args[-1], $size, "-", "-", "-"); # add second row for each operation with tool and message @row2 = ("", $op{tool}, unescape($op{text}), "", "", "", ""); } elsif ($op{state} ne 'warn' && (!$state || $state eq 'queue')) { @row = ("queue", $args[0], $args[-1], $size, "-", "-", "-"); @row2 = ("", "", "", "", "", "", ""); } else { next; } if ($opts{status} eq 'csv') { $row2[2] =~ s/"/""/g; $row2[2] = "\"$row2[2]\"" if ($row2[2] =~ /[,"\n]/); print join(",", @row, @row2), "\n"; } else { $t->row(@row); $t->row(@row2) if ($row[0] ne 'queue'); if (++$rows >= $nrows) { last if ($once); # render in multiple parts when large number of rows print $t->render, "\n"; $t = dclone($t0); $rows = 0; } } } close FILE; } if (!$state || $state eq 'run') { # running operations are found in the doing logs for each host foreach my $file (glob "$opts{base}/doing_*") { my $doing = basename($file); my $host = $doing; $host =~ s/^doing_//; open(FILE, '<', $file); seek(FILE, $meta{$doing}, 0); while () { next if (!/^ /); $_ = substr($_, 16); chomp; # unescape colons in remote paths s/%3A/:/g; my %op = split(/[= ]+/); my @args = split(/,/, $op{args}); # ignore rows that do not match optional search next if ($opts{search} && join(" ", @args) !~ qr/$opts{search}/); # target is the same for all files during tar creation so use source $args[-1] = $op{tar_name} if ($meta{'create-tar'}); my $size = "-"; $size = format_bytes($op{size}) if ($args[0] =~ /^(?:cksum|cp|sum)/); # add first row for each operation with bulk of info @row = ("run", $args[0], $args[-1], $size, strftime('%m/%d', localtime($op{run})), format_seconds($time - $op{run}), "-"); # add second row for each operation with tool and message @row2 = ("", $op{tool}, "\@$host" . ($op{bytes} ? " [$op{bytes})" : ""), "", strftime('%R', localtime($op{run})), "", ""); if ($opts{status} eq 'csv') { $row2[2] =~ s/"/""/g; $row2[2] = "\"$row2[2]\"" if ($row2[2] =~ /[,"\n]/); print join(",", @row, @row2), "\n"; } else { $t->row(@row); $t->row(@row2); if (++$rows >= $nrows) { last if ($once); # render in multiple parts when large number of rows print $t->render, "\n"; $t = dclone($t0); $rows = 0; } } } close FILE; } } if (!$state || $state eq 'error') { # error operations are found in the error log open(FILE, '<', "$opts{base}/error"); while () { chomp; # unescape colons in remote paths s/%3A/:/g; my %op = split(/[= ]+/); my @args = split(/,/, $op{args}); # ignore rows that do not match optional search next if ($opts{search} && join(" ", @args) !~ qr/$opts{search}/); # target is the same for all files during tar creation so use source $args[-1] = $op{tar_name} if ($meta{'create-tar'}); my $size = "-"; $size = format_bytes($op{size}) if ($args[0] =~ /^(?:cksum|cp|sum)/); # add first row for each operation with bulk of info @row = ("error", $args[0], $args[-1], $size, "-", "-", "-"); # add second row for each operation with tool and message @row2 = ("", $op{tool}, unescape($op{text}), "", "", "", ""); if ($opts{status} eq 'csv') { $row2[2] =~ s/"/""/g; $row2[2] = "\"$row2[2]\"" if ($row2[2] =~ /[,"\n]/); print join(",", @row, @row2), "\n"; } else { $t->row(@row); $t->row(@row2); if (++$rows >= $nrows) { last if ($once); # render in multiple parts when large number of rows print $t->render, "\n"; $t = dclone($t0); $rows = 0; } } } close FILE; } if (!$state || $state eq 'done') { # done operations are found in the done log open(FILE, '<', "$opts{base}/done"); while () { chomp; # unescape colons in remote paths s/%3A/:/g; my %op = split(/[= ]+/); my @args = split(/,/, $op{args}); # ignore rows that do not match optional search next if ($opts{search} && join(" ", @args) !~ qr/$opts{search}/); # target is the same for all files during tar creation so use source $args[-1] = $op{tar_name} if ($meta{'create-tar'}); my $secs = $op{time} > 0 ? $op{time} : 1; my $size = "-"; my $rate = "-"; if ($args[0] =~ /^(?:cksum|cp|sum)/) { $size = format_bytes($op{size}); $rate = format_bytes($op{rate}) . "/s"; } my $info; $info = "#$op{hash}" if ($op{hash} && $args[0] =~ /^(?:cp|sum)/); $info .= " " if ($info && $op{bytes}); $info .= "[$op{bytes})" if ($op{bytes}); $info = "-" if (!$info); # add first row for each operation with bulk of info @row = ("done", $args[0], $args[-1], $size, strftime('%m/%d', localtime($op{run})), format_seconds($secs), $rate); # add second row for each operation with tool and message @row2 = ("", $op{tool}, $info, "", strftime('%R', localtime($op{run})), "", ""); if ($opts{status} eq 'csv') { $row2[2] =~ s/"/""/g; $row2[2] = "\"$row2[2]\"" if ($row2[2] =~ /[,"\n]/); print join(",", @row, @row2), "\n"; } else { $t->row(@row); $t->row(@row2); if (++$rows >= $nrows) { last if ($once); # render in multiple parts when large number of rows print $t->render, "\n"; $t = dclone($t0); $rows = 0; } } } close FILE; } if ($opts{status} ne 'csv') { # return/output final table depending on initial given state $once ? return $t->render : print $t->render; } } ################# #### init_id #### ################# # initialize settings for transfer based on getopt lines and/or defaults sub init_id { # initialize options with default values foreach (qw(clients cpu hosts io ior iow net netr netw ports retry threads)) { $meta{$_} = $conf{"default_$_"} if (!defined $meta{$_} && $conf{"default_$_"}); } # change files unit from billion to gig $meta{files} =~ tr/[bB]/g/ if (defined $meta{files}); # convert size strings to numbers foreach my $key (qw(bandwidth buffer files find-files size split split-tar stripe window)) { # stripe can be zero next if ($key eq 'stripe' && defined $meta{$key} && $meta{$key} == 0); # parse some values in binary bytes instead of decimal bytes my $bin = $key =~ /^(?:buffer|split|stripe)$/ ? 1 : 0; my $new = defined $meta{$key} ? parse_bytes($meta{$key}, $bin) : undef; if (!defined $new && defined $conf{"default_$key"}) { $new = parse_bytes($conf{"default_$key"}, $bin); # indicate that striping was not specified $meta{'no-stripe'} = 1 if ($key eq 'stripe'); } if ($key =~ /^(?:buffer|split|stripe)$/) { # adjust binary values to power of 2 if string not used if ($new && $meta{$key} !~ /\D/) { my $tmp = 1; $tmp <<= 1 while ($new >>= 1); $new = $tmp; } } if ($key =~ /^(?:files|find-files|size)$/) { # do not allow zero values $new = 1 if (!$new); } if ($key =~ /^(?:split|split-tar)$/) { # do not allow values that would cause metadata overrun my $min = parse_bytes($conf{"min_split"}, $bin); $new = $min if ($new && $new < $min); } $meta{$key} = $new if (defined $new); } } ################### #### last_line #### ################### # return the line before the current position of a given file handle sub last_line { my $fh = shift; my $tell0 = $fh->tell; # return nothing when file is at beginning return undef if ($tell0 == 0); my $tell = $tell0; my ($buf, $line, $len, $pos); do { $tell = $tell0 - 1024; $tell = 0 if ($tell < 0); # seek to earlier position in file $fh->seek($tell, 0); my $len = 1024; $len = $tell0 - $tell if ($len > $tell0); # read up to initial location or that of last round $fh->read($line, $len); $buf = $line . $buf; # find last newline in buffer $pos = rindex($buf, "\n"); $tell0 = $tell; # keep looping while no newline found } while ($tell > 0 && $pos < 0); $pos = 0 if ($pos < 0); # set file handle position for next invocation $fh->seek($tell + $pos, 0); # return buffer after newline my $buf = substr($buf, $pos); $buf =~ s/\r?\n//; return $buf; } ################# #### lock_id #### ################# # lock the current transfer or wait for an unlock sub lock_id { open($ilockfh, '>', "$opts{base}/lock") || return 0; flock($ilockfh, LOCK_EX); } ################## #### fs_mount #### ################## # return the mount point on the given host holding the given path my %fs_cache; sub fs_mount { my ($host, $path) = @_; # check cache first to see if mount already computed my $pos = length($path); while (($pos = rindex($path, "/", $pos)) > 0) { my $mnt = $fs_cache{"$host:" . substr($path, 0, $pos-- || 1)}; return $mnt if ($mnt); } # use mount info provided by global/user db my @mnts = grep(/^mount_\Q$host\E:/, keys %meta); push(@mnts, grep(/^mount_\Q$host\E:/, keys %db)); push(@mnts, grep(/^mount_\Q$host\E:/, keys %udb)); my %mnt; # sort in descending length order to find greatest prefix foreach (sort {length($b) <=> length($a)} @mnts) { my $mnt = $meta{$_} ? $meta{$_} : ($udb{$_} ? $udb{$_} : $db{$_}); %mnt = split(/[= ]+/, $mnt); if ($path =~ /^\Q$mnt{local}\E/) { # path begins with mount point so stop looking last; } else { %mnt = (); } } if ($mnt{servers}) { $pos = length($path); while (($pos = rindex($path, "/", $pos)) > 0) { # save in cache to speed up future requests $fs_cache{"$host:" . substr($path, 0, $pos-- || 1)} = \%mnt; last if ($pos < length($mnt{local})); } return \%mnt; } # return undef if local file system return undef; } ###################### #### map_fs_mount #### ###################### # return the mount point on the given host that corresponds to the # given mount point on another my %map_fs_cache; sub map_fs_mount { my ($mnt1, $host2) = @_; # check cache first to see if mapping already computed my $mnt2 = $map_fs_cache{"$host2:$mnt1"}; return $mnt2 if ($mnt2); my @srv1 = split(/,/, $mnt1->{servers}); # use mount info provided by global/user db my @mnts2 = grep(/^mount_\Q$host2\E:/, keys %meta); push(@mnts2, grep(/^mount_\Q$host2\E:/, keys %db)); push(@mnts2, grep(/^mount_\Q$host2\E:/, keys %udb)); # sort in descending length order to find greatest prefix foreach (sort {length($b) <=> length($a)} @mnts2) { $mnt2 = $meta{$_} ? $meta{$_} : ($udb{$_} ? $udb{$_} : $db{$_}); my %mnt2 = split(/[= ]+/, $mnt2); # must have same remote path and type on server if ($mnt1->{remote} eq $mnt2{remote} && $mnt1->{type} eq $mnt2{type}) { # compute intersection of servers my %srv2 = map {$_ => 1} split(/,/, $mnt2{servers}); if (grep($srv2{$_}, @srv1)) { # save in cache to speed up future requests $map_fs_cache{"$host2:$mnt1"} = \%mnt2; return \%mnt2; } } } $map_fs_cache{"$host2:$mnt1"} = -1; return -1; } ################### #### map_local #### ################### # return the equivalent of a given path on a given host on another given host sub map_local { my ($host1, $path1, $host2, $ref) = @_; # find file system mount of path on original host my $mnt1 = fs_mount($host1, $path1); if ($host1 eq $host2) { if ($mnt1) { # store mount info $ref->{$_} = $mnt1->{$_} foreach (keys %{$mnt1}); } # return original path return $path1; } elsif (!$mnt1) { # no equivalent mount found on host return undef; } my $mnt2 = map_fs_mount($mnt1, $host2); # must have correct mode if ($mnt2 != -1 && (!$ref->{rw} || $mnt2->{opts} =~ /(?:^|,)rw(?:$|,)/)) { # replace original mount point with new mount point $path1 =~ s/^\Q$mnt1->{local}\E/$mnt2->{local}/; # store mount info $ref->{$_} = $mnt2->{$_} foreach (keys %{$mnt2}); return $path1; } return undef; } #################### #### map_remote #### #################### # return the equivalent of a given remote path on a given host my %map_remote_cache; sub map_remote { my ($lhost, $path1, $ref) = @_; # remote paths will still be escaped at this point if ($path1 =~ /^([^\/:]+)%3A(\/.*)?/) { my ($rhost, $rpath) = ($1, $2); # check if remote file system exists on local host my $path2 = map_local($rhost, $rpath, $lhost, $ref); return $path2 if (defined $path2); # find file system mount of path on original host my $mnt1 = fs_mount($rhost, $rpath); # return original if no mount found return $path1 if (!$mnt1); # check cache first to see if mapping already computed my $mnt2 = $map_remote_cache{"$mnt1->{host}:$mnt1->{local}"}; if (!defined $mnt2) { # find accessible hosts based on global/user db my @hosts = grep(/^shell_/, keys %meta); push(@hosts, grep(/^shell_/, keys %db)); push(@hosts, grep(/^shell_/, keys %udb)); my %fs_hosts; # determine potential hosts foreach my $host (@hosts) { $host =~ s/^shell_//; $mnt2 = map_fs_mount($mnt1, $host); if ($mnt2 != -1) { # host has access to the file if (!$ref->{rw} || $mnt2->{opts} =~ /(?:^|,)rw(?:$|,)/) { # host has proper read/write access $fs_hosts{$host} = $mnt2; } } } # prune potential hosts based on number currently assigned my $min = 1E9; my %min_hosts; foreach my $host (keys %fs_hosts) { my $npicks = scalar(keys %{$meta{"picks_$host"}}); # don't count previous selection for this host $npicks-- if ($meta{"picks_$host"}->{$lhost}); next if ($npicks > $min); $min = $npicks; $min_hosts{$npicks} = [] if (!defined $min_hosts{$npicks}); push(@{$min_hosts{$npicks}}, $host); } my %picks; $picks{$_} = $fs_hosts{$_} foreach (@{$min_hosts{$min}}); $mnt2 = undef; my $pick; if (defined $conf{select_hook}) { # select host using configured selection hook my ($fh, $file) = tempfile(UNLINK => 1); close $fh; nstore(\%picks, $file); # invoke configured selection hook $pick = open3_get([-1, undef], "$conf{select_hook} $lhost $rhost $file"); } # revert to default selection policy when no selection $pick = default_select($rhost, keys %picks) if (!$pick); $pick =~ s/\s*\r?\n$//; if ($pick) { # clear previously picked hosts foreach (grep(/^picks_/, keys %meta)) { delete $meta{$_}->{$lhost}; delete $meta{$_} if (scalar(keys %{$meta{$_}}) == 0); } # store that host has already been selected $meta{"picks_$pick"}->{$lhost} = 1; $mnt2 = $picks{$pick}; # save in cache to speed up future requests $map_remote_cache{"$mnt1->{host}:$mnt1->{local}"} = $mnt2; } } if (!$mnt2) { # store mount info $ref->{$_} = $mnt1->{$_} foreach (keys %{$mnt1}); # return original path if can't find suitable mount return $path1 } # replace original mount point with new mount point $rpath =~ s/^\Q$mnt1->{local}\E/$mnt2->{local}/; # construct remote path using escaped colon after host $rpath = "$mnt2->{host}%3A$rpath"; # store mount info $ref->{$_} = $mnt2->{$_} foreach (keys %{$mnt2}); return $rpath; } return undef; } ############## #### meta #### ############## # output metadata for transfer specified with id option sub meta { my $dir = $conf{user_dir}; while (-d $dir) { last if (-d "$dir/$opts{user}.$opts{id}"); $dir .= "/$opts{user}.more"; } if (-d $dir) { my $file = "$dir/$opts{user}.$opts{id}/meta"; print "$file:\n"; # retrieve metadata from file %meta = %{get_meta($file, $opts{meta})}; foreach my $key (sort keys(%meta)) { if ($key =~ /^picks_/) { print " $key = ", join(",", keys %{$meta{$key}}), "\n"; } else { print " $key = $meta{$key}\n"; } } } } ################### #### open3_get #### ################### # run given command with stdin/stdout/stderr from/to given files # and return command output when requested sub open3_get { my $files = shift; my @args = @_; my $fhpid = open3_run($files, @args); return undef if (!defined $fhpid); my $ifh; if (!defined $files->[1]) { $ifh = 1; } elsif (scalar(@{$files}) == 3 && !defined $files->[2]) { $ifh = 2; } my $out; if ($ifh) { $out .= $_ while (defined ($_ = $fhpid->[$ifh]->getline)); } open3_wait($fhpid); return $out; } ################### #### open3_run #### ################### # run given command with stdin/stdout/stderr either from/to given files # or from/to autocreated pipes and return associated file handles and pid sub open3_run { my $files = shift; my @args = @_; if (scalar(@args) == 1) { $args[0] =~ s/^\s+|\s+$//g; @args = quotewords('\s+', 0, $args[0]); } my (@fh, @o3); foreach my $i (0 .. scalar(@{$files}) - 1) { my $dir = $i ? '>' : '<'; my $file = $files->[$i]; $file = File::Spec->devnull if ($file == -1); if ($file) { open($fh[$i], $dir, $file); $o3[$i] = $dir . '&' . $fh[$i]->fileno; } else { $o3[$i] = gensym; $fh[$i] = $o3[$i]; } } # combine stdout/stderr if nothing given for stderr $o3[2] = $o3[1] if (scalar(@{$files}) == 2); my $pid; eval {$pid = IPC::Open3::open3(@o3, @args)}; if ($@ || !defined $pid) { open3_wait([@fh]); return undef; } else { $o3[0]->autoflush(1) if (ref $o3[0]); return [@fh, $pid]; } } #################### #### open3_wait #### #################### # wait for processes and clean up handles created by open3_run sub open3_wait { my $fhpid = shift; return if (!defined $fhpid); my $pid = pop(@{$fhpid}); close $_ foreach(@{$fhpid}); waitpid($pid, 0); } ##################### #### parse_bytes #### ##################### # return decimal/binary equivalent of given string sub parse_bytes { my $text = shift; my $binary = shift; my $tbytes = $binary ? \%bibytes : \%bytes; if ($text =~ /^([1-9]\d*)([kmgt])?$/i) { my ($val, $unit) = ($1, $2); $unit = "" if (!defined $unit); return $val * $tbytes->{uc $unit}; } return undef; } ############# #### put #### ############# # record the state of file operations that were processed by a client sub put { # open/create log files my %fhs; foreach (qw(do done error tree)) { $fhs{$_} = IO::File->new(">>$opts{base}/$_"); #TODO: need error if cannot be opened } $fhs{doing} = IO::File->new("+<$opts{doing_log}"); my $doing_size = ($fhs{doing}->stat)[7]; my $more_finds = $meta{d_find} + $meta{e_find} == $meta{t_find} ? 0 : 1; my %find; my %mnts; $meta{"warn_$opts{host}$opts{cid}"} = -1; $SIG{ALRM} = sub {exit 1}; alarm 300; while (my $line = ) { debug_print('PUT', $line); $line =~ s/\s*\r?\n$//; #TODO: size limit? compression? my %op = split(/[= ]+/, $line); # ignore malformed lines with undefined op values next if (grep(!/./, values %op)); next if (defined $op{doing} && ($op{doing} < $meta{$opts{doing}} || $op{doing} >= $doing_size)); if (defined $op{doing}) { $fhs{doing}->seek($op{doing}, 0); my $done; $fhs{doing}->read($done, 1); # skip processing if this operation has been timed out next if ($done ne ' '); # indicate that operation has been seen $fhs{doing}->seek($op{doing}, 0); $fhs{doing}->print($opts{meta_pack}); } my @args = split(/,/, $op{args}); my $cmd = shift @args; my ($sid, $split) = split(/:/, $op{split}); if ($cmd =~ /^ckattr/ && defined $op{state}) { $meta{s_run} -= $op{size}; $meta{t_run}--; if ($op{state} eq 'error') { # dst does not exist so next state is cp $cmd = "cp"; } else { if (defined $op{split}) { # record all split copies done $meta{"sd_cp_$sid"}->bnot; $meta{"st_cp_$sid"} = 0; if ($meta{verify} && $op{state} eq 'done') { # record all split sums and cksums done $meta{"sd_sum_$sid"}->bnot; $meta{"st_sum_$sid"} = 0; $meta{"sd_cksum_$sid"}->bnot; $meta{"st_cksum_$sid"} = 0; } } # record copy done $meta{s_cp} += $op{size}; $meta{d_cp}++; if ($op{state} eq 'done') { if ($meta{verify}) { # record sum and cksum done $meta{s_sum} += $op{size}; $meta{s_cksum} += $op{size}; $meta{d_sum}++; $meta{d_cksum}++; } # dst exists with same attrs so next state is chattr $cmd = "chattr"; # do not create partial operations for chattr delete $op{split} if (defined $op{split}); } else { # dst exists with diff/ignored attrs so next state is cp/sum $cmd = $meta{verify} ? "sum" : "cp"; } } $op{args} =~ s/^[^,]+/$cmd/; # more work to be done delete $op{$_} foreach (qw(doing rate run state text time)); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); if (defined $op{split}) { my $pos = 0; my $split = 0; while ($pos < $op{size}) { # create a partial operation for each split my $end = min($pos + $meta{split}, $op{size}); # adjust size my $size = $end - $pos; $line =~ s/size=\d+/size=$size/; $line =~ s/split=\S+/split=$sid:$split/; $fhs{do}->print("$line bytes=$pos-$end\n"); $split++; $pos += $meta{split}; } } else { $fhs{do}->print("$line\n"); } } elsif ($cmd eq 'latency') { # record domain network latency foreach my $host (keys %op) { next if ($host eq 'args'); my $dn = $host; $dn =~ s/^[^.]+.//; $meta{"rtt_$dn"} = $op{$host}; } } elsif ($cmd eq 'load') { # record host load for throttling $meta{"load_$opts{host}$opts{cid}"} = $line; } elsif ($cmd eq 'network') { # record client network properties $meta{"tcpwin_$opts{host}"} = $op{tcpwin} if ($op{tcpwin}); $meta{"xge_$opts{host}"} = $op{xge} if ($op{xge}); } elsif ($cmd eq 'getopt') { # initialize transfer settings once all getopt lines received init_id() if ($args[0] eq 'end'); # check validity of option next if ($args[0] !~ /^(?:bandwidth|buffer|check|clients|command|cpu|create-tar|cron|cwd|dereference|disk|exception|exclude|extract-tar|files|host-list|hosts|ignore-times|include|index-tar|io[rw]?|local|mail|net[rw]?|newer|offline|older|os|perl|ports|preallocate|preserve|remote|retry|secure|size|split|split-tar|streams|stripe|sync|threads|verify|verify-fast|version|window)$/); if ($args[0] eq 'exception') { # track exceptions for stats processing $meta{e_exception}++; } $args[0] .= "_$opts{host}$opts{cid}" if ($args[0] =~ /(?:exception|os|perl|version)$/); $meta{$args[0]} = defined $op{text} ? unescape($op{text}) : 1; } elsif ($cmd eq 'host') { # host error so remove host from outstanding hosts $meta{ohosts}--; delete $meta{"host_$args[0]"}; $meta{"nohost_$args[0]"} = 1; } elsif ($op{state} eq 'done') { $meta{"warn_$opts{host}$opts{cid}"} = 0; $fhs{done}->print("$line\n"); $meta{s_run} -= $op{size}; $meta{"s_$cmd"} += $op{size}; $meta{t_run}--; # count operations that check in after transfer stopped against rate $meta{time1} = $time if ($meta{time1}); if (defined $op{split}) { my $test = Math::BigInt->new(1); $test->blsft($split); if ($test->copy->band($meta{"sd_$cmd\_$sid"})->is_zero) { # record that this particular split was done; $meta{"sd_$cmd\_$sid"}->bior($test); # decrement number of splits that need to be done; $meta{"st_$cmd\_$sid"}--; } } if (!defined $op{split} || $meta{"st_$cmd\_$sid"} <= 0) { # only update cmd totals for unsplit files or last split $meta{"d_$cmd"}++; $meta{"d_$op{tool}"}++; } if ($meta{verify} && $cmd eq 'cp') { if ($op{hash}) { # transport already summed so next state is cksum $cmd = "cksum"; $op{args} =~ s/^[^,]+/$cmd/; if (defined $op{split}) { my $test = Math::BigInt->new(1); $test->blsft($split); if ($test->copy->band($meta{"sd_sum_$sid"})->is_zero) { # record that this particular split was done; $meta{"sd_sum_$sid"}->bior($test); # decrement number of splits that need to be done; $meta{"st_sum_$sid"}--; } } if (!defined $op{split} || $meta{"st_sum_$sid"} <= 0) { # only update sum totals for unsplit files or last split $meta{d_sum}++; } $meta{s_sum} += $op{size}; } else { # next state is sum $cmd = "sum"; $op{args} =~ s/^[^,]+/$cmd/; } } elsif ($meta{verify} && $cmd eq 'sum') { # next state is cksum $cmd = "cksum"; $op{args} =~ s/^[^,]+/$cmd/; } elsif (($meta{check} || $meta{preserve}) && ($cmd =~ /^(?:cksum|cp|ln)/ || # tar mkdirs are not put in tree so are not handled by rtree $cmd eq 'mkdir' && $meta{'create-tar'})) { if (!defined $op{split} || $cmd eq 'cksum' && $meta{"st_cksum_$sid"} <= 0 || $cmd eq 'cp' && $meta{"st_cp_$sid"} <= 0) { # indicate operation was ln so can handle differently $op{ln} = 1 if ($cmd eq 'ln'); # only chattr unsplit files or last split # next state is chattr $cmd = "chattr"; $op{args} =~ s/^[^,]+/$cmd/; delete $op{bytes}; delete $op{hash}; delete $op{split}; } else { # ignore splits before last split next; } } else { $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); next; } # more work to be done delete $op{$_} foreach (qw(doing rate run state text time)); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); $fhs{do}->print("$line\n"); } elsif ($op{state} && $op{try} >= $meta{retry}) { $meta{"warn_$opts{host}$opts{cid}"} = 0; $fhs{error}->print("$line\n"); $meta{t_run}--; $meta{s_run} -= $op{size}; $meta{s_error} += $op{size}; $meta{"e_$cmd"}++; $meta{"e_$op{tool}"}++; $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); if ($cmd eq 'chattr' && unescape($op{text}) =~ /file sizes differ$/ || $cmd eq 'cksum' && unescape($op{text}) =~ /^Corruption/) { # track corruption for stats processing $meta{e_corruption}++; } } elsif ($op{state}) { $meta{s_run} -= $op{size}; $meta{t_run}--; $op{try}++; # count operations that check in after transfer stopped against rate $meta{time1} = $time if ($meta{time1}); if ($cmd eq 'chattr' && unescape($op{text}) =~ /file sizes differ$/) { # track corruption for stats processing $meta{e_corruption}++; # reset size since may have changed during chattr split join $op{size} = (split(/,/, $op{attrs}))[7]; # file corrupted so next state is cp $cmd = "cp"; $op{args} =~ s/^[^,]+/$cmd/; # mark operations as not done $meta{d_cp}--; $meta{s_cp} -= $op{size}; if ($meta{verify}) { $meta{d_sum}--; $meta{d_cksum}--; $meta{s_sum} -= $op{size}; $meta{s_cksum} -= $op{size}; } if ($meta{split} > 0 && $op{size} > $meta{split}) { $op{state} = "warn"; # more work to be done delete $op{$_} foreach (qw(doing rate run time)); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); my ($x1, $x2) = (0, $op{size}); # bytes must be subset of existing tar bytes range ($x1, $x2) = ($1, $2) if ($op{tar_bytes} =~ /(\d+)-(\d+)/); my $split = 0; while ($x1 < $x2) { $meta{w_run}++; # create a partial copy operation for each split my $end = min($x1 + $meta{split}, $x2); # adjust size my $size = $end - $x1; $line =~ s/size=\d+/size=$size/; $fhs{do}->print("$line split=$meta{split_id}:$split bytes=$x1-$end\n"); $split++; $x1 += $meta{split}; } # use new split id (old one lost during chattr stage) foreach ($meta{verify} ? qw(cp sum cksum) : qw(cp)) { $meta{t_split} += $split; $meta{"st_$_\_$meta{split_id}"} = $split; $meta{"sd_$_\_$meta{split_id}"} = Math::BigInt->new(0); } $meta{split_id}++; next; } elsif ($op{tar_bytes}) { # tar operations expect bytes field to exist $op{bytes} = $op{tar_bytes}; } } elsif ($cmd eq 'cksum' && unescape($op{text}) =~ /^Corruption,?(.*\d)?/) { my $bytes = $1; # track corruption for stats processing $meta{e_corruption}++; $meta{s_cksum} += $op{size}; if ($bytes) { my $end = (split(/,/, $op{attrs}))[7]; # bytes must be subset of existing tar bytes range $end = $1 if ($op{tar_bytes} =~ /\d+-(\d+)/); if (defined $end) { # adjust ranges to sane values my @ranges = split(/,/, $bytes); foreach my $range (@ranges) { my ($x1, $x2) = split(/-/, $range); if ($x1 >= $end) { # remove range if min beyond end offset $range = undef; } elsif ($x2 > $end) { # truncate dst if max beyond end offset $range = $x1 . "-" . $end } } $bytes = join(",", @ranges); # remove empty ranges $bytes =~ s/^,+|,+$//g; $bytes =~ s/,,+/,/g; } if ($bytes) { # reduce tries if progress being made $op{try}-- if ($op{bytes} ne $bytes); $op{bytes} = $bytes; # adjust size of remaining operations $op{size} = 0; foreach (split(/,/, $bytes)) { $op{size} += $2 - $1 if (/(\d+)-(\d+)/); } } } if (!$bytes && defined $bytes) { # remaining operations empty so done $meta{"warn_$opts{host}$opts{cid}"} = 0; $fhs{done}->print("$line\n"); $meta{"s_$cmd"} += $op{size}; if (defined $op{split}) { my $test = Math::BigInt->new(1); $test->blsft($split); if ($test->copy->band($meta{"sd_$cmd\_$sid"})->is_zero) { # record that this particular split was done; $meta{"sd_$cmd\_$sid"}->bior($test); # decrement number of splits that need to be done; $meta{"st_$cmd\_$sid"}--; } } if (!defined $op{split} || $meta{"st_$cmd\_$sid"} <= 0) { # only update cmd totals for unsplit files or last split $meta{"d_$cmd"}++; $meta{"d_$op{tool}"}++; if ($meta{check} || $meta{preserve}) { # only chattr unsplit files or last split # next state is chattr $cmd = "chattr"; $op{args} =~ s/^[^,]+/$cmd/; } else { # ignore splits before last split next; } } else { $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); next; } # more work to be done delete $op{$_} foreach (qw(bytes doing hash rate run split state text time)); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); $fhs{do}->print("$line\n"); next; } if (defined $op{split}) { my $test = Math::BigInt->new(1); $test->blsft($split); $test->bnot; foreach (qw(cp sum)) { # record that this particular split was not done; $meta{"sd_$_\_$sid"}->band($test); # increment number of splits that need to be done; $meta{"st_$_\_$sid"}++; $meta{"d_$_"}-- if ($meta{"st_$_\_$sid"} == 1); } } else { $meta{d_cp}--; $meta{d_sum}--; } # file corrupted so next state is cp $cmd = "cp"; $op{args} =~ s/^[^,]+/$cmd/; # reduce sizes by amount of file that was corrupt $meta{s_cp} -= $op{size}; $meta{s_sum} -= $op{size}; $meta{s_cksum} -= $op{size}; } elsif ($meta{"warn_$opts{host}$opts{cid}"}) { $meta{"warn_$opts{host}$opts{cid}"} = 1; } $op{state} = "warn"; $meta{w_run}++; # more work to be done delete $op{$_} foreach (qw(doing rate run time)); # do not delete hash when retrying cksum delete $op{hash} if ($op{args} !~ /^cksum/); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); # find retries must go in tree during tar creation my $log = $cmd eq 'find' && $meta{'create-tar'} ? "tree" : "do"; $fhs{$log}->print("$line\n"); } elsif (defined $op{size}) { $meta{"t_$cmd"}++; $meta{t_chattr}++ if ($meta{check} || $meta{preserve}); if ($meta{'create-tar'} && $cmd =~ /^(?:cp|ln|mkdir)/) { if (!defined $meta{"tar_size_$args[-1]"}) { # initialize tar metadata for this file $meta{"tar_size_$args[-1]"} = 0; $meta{"tar_split_$args[-1]"} = 1; $meta{"tar_index_$args[-1]"} = 0 if ($meta{'index-tar'}); $meta{"tar_nosum_$args[-1]"} = 1 if ($meta{verify}); } elsif ($meta{"tar_size_$args[-1]"} < 0) { # a negative size indicates the final size of the last split $meta{"tar_size_$args[-1]"} = 0; $meta{"tar_split_$args[-1]"}++; $meta{"tar_index_$args[-1]"} = 0 if ($meta{'index-tar'}); } # need .sum mv for reg files / no tracking needed for 2+ splits delete $meta{"tar_nosum_$args[-1]"} if ($cmd eq 'cp' || $meta{"tar_split_$args[-1]"} > 1); $op{tar_start} = $meta{"tar_size_$args[-1]"}; if ($cmd eq 'ln') { my $llen = length(unescape($args[0])); if ($llen > 100) { # add size of long link plus extra record my $asize = $llen + 512; $asize += (512 - ($asize % 512)) if ($asize % 512 > 0); $meta{"tar_size_$args[-1]"} += $asize; } } my $tar_name = unescape($op{tar_name}); if (length($tar_name) > 100) { my $pos = index($tar_name, "/", length($tar_name) - 100); if ($pos == -1 || $pos > 155 || length($tar_name) > 255) { # add size of long name plus extra record my $asize = length($tar_name) + 512; $asize += (512 - ($asize % 512)) if ($asize % 512 > 0); $meta{"tar_size_$args[-1]"} += $asize; } } my $size = $cmd ne 'cp' ? 0 : $op{size}; # tar entries contain 512 byte header plus file plus padding $meta{"tar_size_$args[-1]"} += 512; # file contents are written after the header $op{bytes} = $meta{"tar_size_$args[-1]"} . "-"; $meta{"tar_size_$args[-1]"} += $size; $op{bytes} .= $meta{"tar_size_$args[-1]"}; $op{tar_bytes} = $op{bytes}; # pad entry to 512 byte boundary $meta{"tar_size_$args[-1]"} += (512 - ($size % 512)) if ($size > 0 && $size % 512 > 0); # use appropriate split as target $op{args} .= "-" . $meta{"tar_split_$args[-1]"} . ".tar"; if ($meta{'index-tar'}) { # designate position of entry in index file $meta{"tar_index_$args[-1]"} += $op{tar_index}; $op{tar_index} = $meta{"tar_index_$args[-1]"} - $op{tar_index}; } if ($meta{'split-tar'} && $meta{"tar_size_$args[-1]"} >= $meta{'split-tar'}) { # indicate last tar entry so final padding can be added $op{tar_last} = 1; # insert chattr op in tree to preallocate and stripe $fhs{tree}->print("args=chattr,$args[-1]-" . $meta{"tar_split_$args[-1]"} . ".tar host=$opts{host}" . " tar_creat=" . $meta{"tar_size_$args[-1]"} . "\n"); $meta{t_chattr}++; $meta{tar_creat}++; # move to next split by inverting size to save final value $meta{"tar_size_$args[-1]"} = -$meta{"tar_size_$args[-1]"}; $meta{"tar_index_$args[-1]"} = 0 if ($meta{'index-size'}); } $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); } if ($cmd eq 'mkdir') { my $log = $meta{'create-tar'} ? "do" : "tree"; $fhs{$log}->print("$line\n"); } elsif ($cmd eq 'cp') { $meta{s_total} += $op{size}; if ($meta{verify}) { $meta{t_sum}++; $meta{t_cksum}++; } if ($meta{split} > 0 && $op{size} > $meta{split}) { my ($x1, $x2) = (0, $op{size}); # bytes must be subset of existing tar bytes range ($x1, $x2) = ($1, $2) if ($op{bytes} =~ /(\d+)-(\d+)/); my $split = 0; while ($x1 < $x2) { # create a partial copy operation for each split my $end = min($x1 + $meta{split}, $x2); # adjust size my $size = $end - $x1; $line =~ s/size=\d+/size=$size/; $line =~ s/ bytes=\S+//; $fhs{do}->print("$line split=$meta{split_id}:$split bytes=$x1-$end\n"); $split++; $x1 += $meta{split}; } foreach ($meta{verify} ? qw(cp sum cksum) : qw(cp)) { $meta{t_split} += $split; $meta{"st_$_\_$meta{split_id}"} = $split; $meta{"sd_$_\_$meta{split_id}"} = Math::BigInt->new(0); } $meta{split_id}++; } else { $fhs{do}->print("$line\n"); } } else { if ($cmd =~ /^ckattr/) { # create additional operations without adding to logs my @ops = qw(cp); push(@ops, qw(sum cksum)) if ($meta{verify}); $meta{"t_$_"}++ foreach (@ops); $meta{s_total} += $op{size}; if ($meta{split} > 0 && $op{size} > $meta{split}) { my $split = ceil($op{size} / $meta{split}); foreach (@ops) { $meta{t_split} += $split; $meta{"st_$_\_$meta{split_id}"} = $split; $meta{"sd_$_\_$meta{split_id}"} = Math::BigInt->new(0); } # record split info for result processing $line .= " split=$meta{split_id}:$split"; $meta{split_id}++; } } $fhs{do}->print("$line\n"); } } elsif ($cmd eq 'find') { if ($meta{dereference} && !$meta{'extract-tar'}) { # these conditions are only valid after getopt lines processed if (!defined $find{t_find}) { tie(%find, 'DB_File', "$opts{base}/find", O_RDWR, 0600); if (!defined $find{t_find} || $find{t_find} != $meta{t_find}) { # this can happen when mgr fails over as find not sync'd untie %find; build_find(); tie(%find, 'DB_File', "$opts{base}/find", O_RDWR, 0600); } #TODO: need error if cannot be tied } # skip src directories already processed due to symlinks next if ($find{$args[0]}); $find{$args[0]} = 1; $find{t_find}++; } $meta{"t_$cmd"}++; $fhs{tree}->print("$line\n"); } elsif ($cmd eq 'mount') { $mnts{"mount_$op{host}:$op{local}"} = $line; } elsif ($cmd eq 'shell') { $mnts{"shell_$op{host}"} = $op{pbs} ? "pbs" : 1; } } alarm 0; if ($more_finds && $meta{d_find} == $meta{t_find} && $meta{'create-tar'}) { # tar transition from finds outstanding to no finds outstanding foreach my $file (grep(/^tar_size_/, keys %meta)) { my $size = abs $meta{$file}; $file =~ s/^tar_size_//; my $split = $meta{"tar_split_$file"}; # store file and size so final cp op can insert tar eof padding $meta{"tar_last_$file-$split.tar"} = $size; if ($split == 1) { # rename first split if there is only one split $meta{tar_mv}++; # use chattr to track additional move $meta{t_chattr}++; } # insert chattr op in tree to preallocate and stripe $fhs{tree}->print("args=chattr,$file-$split.tar host=$opts{host} ", "tar_creat=$size\n"); $meta{t_chattr}++; $meta{tar_creat}++; } } # close log files $fhs{$_}->close foreach (keys %fhs); untie %find if (defined $find{t_find}); if ($more_finds && $meta{d_find} + $meta{e_find} == $meta{t_find}) { # non-tar transition from finds outstanding to no finds outstanding # mark transfers complete if no files after find $meta{time1} = $time if (!run()); if ($meta{e_find} + $meta{t_cp} + $meta{t_ln} + $meta{t_mkdir} == 0) { # force error if no files (e.g. non-matching --include) my $line; if (open(TREE, '<', "$opts{base}/tree")) { # use first tree line (should be find) for error line $line = ; close TREE; chomp $line; } # this should never happen if client/manager versions match $line = "args=find,no_src,no_dst host=no_host" if (!$line); $line .= " run=$time state=error tool=shift-mgr text=" . escape("No files found - this transfer cannot be restarted"); if (open(ERROR, '>>', "$opts{base}/error")) { print ERROR $line, "\n"; close ERROR; } } elsif (!$meta{e_find}) { # mark initialization done $meta{last} = 1; # initialize rtree size (log files must be closed before this) $meta{rtree} = (stat "$opts{base}/tree")[7]; } } # update user db if (scalar(keys %mnts) > 0) { # retrieve global database from file eval { local $SIG{__WARN__} = sub {die}; %db = %{retrieve($conf{db_file})}; }; if ($@) { # database could not be opened %db = (); } # retrieve user database from file eval { local $SIG{__WARN__} = sub {die}; %udb = %{retrieve($conf{udb_file})}; }; if ($@) { # database could not be opened %udb = (); } my $store; while (my ($key, $val) = each %mnts) { if ($key =~ /^shell_/) { # only add hosts that are not in global db next if ($db{$key}); if ($val eq 'pbs') { $meta{$key} = 1; } else { $udb{$key} = 1; $store = 1; } next; } # eliminate any random double slashes that crept in $val =~ s/\/\//\//g; my %mnt = split(/[= ]+/, $val); # only add hosts that are not in global db next if ($db{"shell_$mnt{host}"}); if ($mnts{"shell_$mnt{host}"} eq 'pbs') { $meta{$key} = $val; } else { $udb{$key} = $val; $store = 1; } # add implicit mounts foreach my $srv (split(/,/, $mnt{servers})) { my %imnt = %mnt; $imnt{local} = $imnt{remote}; $imnt{host} = $srv; $imnt{servers} = $srv; $udb{"mount_$srv:$imnt{remote}"} = join(" ", map {"$_=$imnt{$_}"} sort(keys %imnt)); $store = 1; } } if ($store) { # store user db to file nstore(\%udb, $conf{udb_file}); # synchronize user db sync_local("$opts{user}.db") if ($conf{sync_host}); } } } ################## #### put_meta #### ################## # begin metadata line or save given metadata and end line sub put_meta { my $meta = shift; my $file = "$opts{base}/meta"; open(FILE, '>>', $file); if (defined $meta) { print FILE encode_base64(compress(nfreeze($meta)), ""), "]\n"; } else { print FILE "["; } close FILE; #TODO: handle errors; } ############# #### run #### ############# # return whether or not the current transfer is running sub run { my $expect = $meta{t_find} + $meta{tar_creat}; if (!$meta{'create-tar'} || $meta{last} && $meta{d_chattr} >= $meta{tar_creat}) { $expect += $meta{"t_$_"} foreach (qw(cp ln mkdir)); } if ($meta{verify} && ($meta{check} || $meta{preserve})) { # expect sums for done cps and cksums for done sums $expect += $meta{"d_$_"} foreach (qw(cp sum)); # expect file chattrs for done cksums and done lns $expect += $meta{d_cksum} + $meta{d_ln}; # expect dir chattrs only when no other errors my $errs = sum(map {$meta{"e_$_"}} qw(chattr cksum cp find ln mkdir sum)); $expect += $errs ? 0 : $meta{t_mkdir}; } elsif ($meta{verify}) { # expect sums for done cps and cksums for done sums $expect += $meta{"d_$_"} foreach (qw(cp sum)); } elsif ($meta{check} || $meta{preserve}) { # expect file chattrs for done cps and done lns $expect += $meta{d_cp} + $meta{d_ln}; # expect dir chattrs only when no other errors my $errs = sum(map {$meta{"e_$_"}} qw(chattr cp find ln mkdir)); # when errs > 0 and rtree == 0, any chattr errors are from dirs # and not files, so should still expect t_mkdir dir chattrs $expect += $errs && $meta{rtree} ? 0 : $meta{t_mkdir}; } my $actual = sum(map {$meta{"d_$_"}} qw(chattr cksum cp find ln mkdir sum)); my $errs = sum(map {$meta{"e_$_"}} qw(chattr cksum cp find ln mkdir sum)); $actual += $errs; # expect tar_mv chattrs only when no other errors $expect += $errs ? 0 : $meta{tar_mv}; # running if actual operations differ from expected operations return ($expect != $actual); } ############### #### stats #### ############### # output table of consolidated stats across all transfers of invoking # user or all users if invoked as root sub stats { my $all; my %heads; my %types; my %users; # define headers for each table type $heads{Transfers} = [qw(xfers local lan wan dirs files size sums ssize attrs hosts)]; $heads{Rates} = [qw(local_min local_max local_avg lan_min lan_max lan_avg wan_min wan_max wan_avg all_min all_max all_avg)]; $heads{Tools} = [qw(bbcp bbftp fish fish-tcp gridftp mcp msum rsync shiftc shift-aux)]; $heads{Options_1} = [qw(bandwidth buffer clients cpu create-tar exclude extract-tar files host-list hosts include index-tar)]; $heads{Options_2} = [qw(io ior iow local net netr netw newer no-check no-cron no-offline no-mail no-preserve no-verify older)]; $heads{Options_3} = [qw(ports preallocate remote retry secure size split split-tar streams stripe sync threads verify-fast window)]; $heads{Errors} = [qw(corruption exception chattr cksum cp host ln mkdir sum)]; # define order in output my @order = qw(Transfers Rates Tools Options_1 Options_2 Options_3 Errors); # add tool errors push(@{$heads{Errors}}, @{$heads{Tools}}); $_ = "e_$_" foreach (@{$heads{Errors}}); $_ = "o_$_" foreach ( @{$heads{Options_1}}, @{$heads{Options_2}}, @{$heads{Options_3}}); if (!$opts{user} && $> == 0) { # replace %u with * to get stats from all users $conf{user_dir} =~ s/%u/*/g; } else { $opts{user} = getpwuid($<) if (!$opts{user}); $conf{user_dir} =~ s/%u/$opts{user}/g; } # compute totals over all transfers my @metas; my $dir = $conf{user_dir}; while (1) { my @files = glob "$dir/*/meta"; last if (scalar(@files) == 0); push(@metas, @files); $dir .= "/*.more"; } foreach my $file (@metas) { # skip transfers that have expired my $mtime = (stat($file))[9]; next if ($mtime + $conf{data_expire} < $time); # retrieve metadata from file my %meta = %{get_meta($file)}; # derive transfer type my $type = "local"; my @args = split(/\s+/, $meta{command}); if ($meta{origin} =~ /\Q$conf{email_domain}\E$/ && grep(/^picks_.*\Q$conf{email_domain}\E$/, keys %meta)) { # original client host is in local domain and remote host picked $type = "lan"; } elsif ($meta{origin} !~ /\Q$conf{email_domain}\E$/) { # original client host is not in local domain $type = "wan"; } # derive user from meta file my $user = $file; $user =~ s/.*\/(\w+)\.\d+\/meta/$1/; foreach (qw(e_corruption e_exception)) { # add corruption/exception totals even if transfer not completed $all->{$_} += $meta{$_}; $users{$user}->{$_} += $meta{$_}; $types{$type}->{$_} += $meta{$_}; } # skip transfers that have not completed next if (!$meta{time1}); my %totals; # transfer totals $totals{attrs} = $meta{d_chattr}; $totals{hosts} = grep(/^last_/, keys %meta); $totals{dirs} = $meta{d_mkdir}; $totals{files} = $meta{d_cp} + $meta{d_ln}; $totals{size} = $meta{s_cp}; $totals{ssize} = $meta{s_sum} + $meta{s_cksum}; $totals{sums} = $meta{d_sum} + $meta{d_cksum}; $totals{xfers} = 1; $totals{$type} = 1; # tool operation totals and tool error totals foreach (@{$heads{Tools}}) { $totals{$_} = $meta{"d_$_"}; $totals{"e_$_"} = $meta{"e_$_"}; } # option totals # options that must differ from configured default foreach my $key (qw(buffer clients cpu files hosts io ior iow net netr netw ports retry size split split-tar streams stripe threads window)) { # parse some values in binary bytes instead of decimal bytes my $bin = $key =~ /^(?:buffer|split|stripe)$/ ? 1 : 0; my $default = parse_bytes($conf{"default_$key"}, $bin); $totals{"o_$key"} = defined $meta{$key} && $meta{$key} ne $default ? 1 : 0; } # options that must be inverted $totals{"o_no-offline"} = !$meta{offline} && !$meta{'create-tar'} && !$meta{'extract-tar'} ? 1 : 0; foreach (qw(check cron mail preserve verify)) { $totals{"o_no-$_"} = !$meta{$_} ? 1 : 0; } # normal options foreach (qw(create-tar exclude extract-tar host-list include index-tar local newer older remote secure sync verify-fast wait)) { $totals{"o_$_"} = $meta{$_} ? 1 : 0; } # error totals (corruption and exception handled earlier) foreach (qw(chattr cksum cp find ln mkdir sum)) { $totals{"e_$_"} = $meta{"e_$_"}; } $totals{e_host} = grep(/^nohost_/, keys %meta); # add transfer stats to totals per user, per type, and overall foreach my $head (keys %heads) { # rates must be processed differently next if ($head eq 'Rates'); foreach my $key (@{$heads{$head}}) { $all->{$key} += $totals{$key}; $users{$user}->{$key} += $totals{$key}; $types{$type}->{$key} += $totals{$key}; } } # compute rate for this transfer my $dtime = $meta{time1} - $meta{time0}; $dtime = 1 if ($dtime <= 0); my $rate = $meta{s_cp} / $dtime; # ignore rates of zero next if (!$rate); # compute rates per user, per type, and overall foreach my $ref ($users{$user}, $types{$type}, $all) { $ref->{"$type\_max"} = max($rate, $ref->{"$type\_max"}); $ref->{"$type\_min"} = $ref->{"$type\_min"} ? min($rate, $ref->{"$type\_min"}) : $rate; $ref->{all_max} = max($rate, $ref->{all_max}); $ref->{all_min} = $ref->{all_min} ? min($rate, $ref->{all_min}) : $rate; # cumulative moving averages $ref->{"$type\_avg"} += (($rate - $ref->{"$type\_avg"}) / $ref->{$type}); $ref->{all_avg} += (($rate - $ref->{all_avg}) / $ref->{xfers}); } } # convert rates to human readable format foreach my $rate (@{$heads{Rates}}) { $all->{$rate} = format_bytes($all->{$rate}) . "/s" if ($all->{$rate}); foreach my $user (keys %users) { $users{$user}->{$rate} = format_bytes($users{$user}->{$rate}) . "/s" if ($users{$user}->{$rate}); } foreach my $type (keys %types) { $types{$type}->{$rate} = format_bytes($types{$type}->{$rate}) . "/s" if ($types{$type}->{$rate}); } } # convert sizes to human readable format foreach my $size (qw(size ssize)) { $all->{$size} = format_bytes($all->{$size}, 1); foreach my $user (keys %users) { $users{$user}->{$size} = format_bytes($users{$user}->{$size}, 1); } foreach my $type (keys %types) { $types{$type}->{$size} = format_bytes($types{$type}->{$size}, 1); } } # compute start and end dates my $date1 = strftime('%m/%d/%y', localtime($time - $conf{data_expire})); my $date2 = strftime('%m/%d/%y', localtime); # print tables require Text::FormatTable; foreach my $head (@order) { my @heads = @{$heads{$head}}; print "$head per user ($date1 - $date2)\n\n"; # configure table headers my $t = Text::FormatTable->new("r" . " | r" x scalar(@heads)); $t->head("user", map {/^\w_/ ? substr($_, 2) : $_} @heads); $t->rule; # add row for each user my $rows = 0; foreach my $user (sort keys(%users)) { my @row = map {$users{$user}->{$_} || ""} @heads; # only print row if there is an actual non-empty value next if (!first {$_} @row); $t->row($user, @row); $rows++; } # add separator between user and type rows $t->rule; # add row for each transfer type foreach my $type (qw(local lan wan)) { $t->row($type, map {$types{$type}->{$_} || ""} @heads); } # add overall totals $t->row("all ($rows)", map {$all->{$_} || ""} @heads); # output final table print $t->render, "\n\n"; } # print error message table print "Error messages per user ($date1 - $date2)\n\n"; # configure table headers my $t = Text::FormatTable->new("r | r | l | l"); $t->head(qw(user id op target)); $t->head("", "", "tool", "message"); $t->rule; my $ulast; foreach my $file # sort by user.id (sort {(split(/\//, $a))[-2] cmp (split(/\//, $b))[-2]} @metas) { # skip transfers that have expired my $mtime = (stat($file))[9]; next if ($mtime + $conf{data_expire} < $time); # retrieve metadata from file my %meta = %{get_meta($file)}; # skip transfers without errors next if (!$meta{error_size} && !$meta{e_exception}); # derive user and id from meta file my ($user, $id); if ($file =~ /.*\/(\w+)\.(\d+)\/meta/) { ($user, $id) = ($1, $2); } else { next; } my $count; # add all exceptions stored in metadata if ($meta{e_exception}) { foreach my $ex (grep(/^exception_/, keys %meta)) { # separate different users with line $t->rule if ($ulast && $user ne $ulast); # only print user and id once to reduce clutter $t->row($user ne $ulast ? $user : "", !$count ? $id : "", "-", $ex); $t->row("", "", "shiftc", unescape($meta{$ex})); $count++; $ulast = $user; } } # add up to three error messages stored in error file $file =~ s/meta$/error/; if (open(FILE, '<', $file)) { # separate different users with line $t->rule if ($ulast && $user ne $ulast); foreach (1..3) { my $line = ; last if (!$line); $line =~ s/\s*\r?\n$//; my %op = split(/[= ]+/, $line); my @args = split(/,/, $op{args}); # only print user and id once to reduce clutter $t->row($user ne $ulast ? $user : "", !$count ? $id : "", $args[0], unescape($args[-1])); $t->row("", "", $op{tool}, unescape($op{text})); $count++; $ulast = $user; } close FILE; } } # output final table print $t->render; } ################ #### status #### ################ # output table of all transfers with status and statistics or # return single row when manager invoked with id option sub status { require Text::FormatTable; # configure table headers my $t = Text::FormatTable->new('r | l | r | r | r | r | r | r'); my @row = (qw(id state dirs files), "file size", qw(date run rate)); my @row2 = ("", "", "sums", "attrs", "sum size", "time", "left", ""); if ($opts{status} eq 'csv') { print join(",", @row, @row2), "\n"; } else { $t->head(@row); $t->head(@row2); $t->rule; } # sort by modification time of meta file my @metas; my @rows; my $dones; my $dir = $> != 0 ? $conf{user_dir} : $opts{user_dir}; my $user = $> != 0 ? $opts{user} : "*"; while (1) { my @dirs = glob "$dir/$user.[0-9]*/meta"; last if (!scalar(@dirs)); push(@metas, @dirs); $dir .= "/*.more"; } foreach my $file (sort {$> != 0 ? (stat $a)[9] <=> (stat $b)[9] : # sort by user name when root invocation across all transfers $a <=> $b} @metas) { my $id = $file; if ($> != 0) { $id =~ s/.*\.|\/meta//g; } else { # ignore old transfers next if ((stat $file)[9] + $conf{data_expire} < $time); # leave user name in id $id =~ s/.*\/([\w-]+\.\d+)\/meta/$1/g; } if ($opts{id}) { # ignore other ids when id is defined next if ($id != $opts{id}); } else { # retrieve metadata from file %meta = %{get_meta($file)}; } my $time1 = defined $meta{time1} ? $meta{time1} : $time; my $state = "run"; # compute number of operations in various states my $done = sum(map {$meta{"d_$_"}} qw(chattr cksum cp find ln mkdir sum)); my $error = sum(map {$meta{"e_$_"}} qw(chattr cksum cp find ln mkdir sum)); my $total = sum(map {$meta{"t_$_"}} qw(chattr cksum cp find ln mkdir sum)); # determine transfer state if ($meta{last} && defined $meta{time1} && $done == $total) { $state = "done"; $dones++; } elsif ($meta{stop}) { $state = "stop"; } elsif ($meta{time1}) { $state = "error"; } else { $state .= "+warn" if ($meta{w_run} > 0); $state .= "+error" if ($error > 0); $state .= "+throttle" if (grep(/^throttled_/, keys(%meta))); } # skip transfers that do not match the given state next if (!$opts{id} && $opts{state} && $state !~ /(?:^|\+)\Q$opts{state}\E(?:$|\+)/); # add first row for each transfer with bulk of info my $rate = $time1 - $meta{time0} ? $meta{s_cp} / ($time1 - $meta{time0}) : $meta{s_cp}; my @row = ($id, $state, "$meta{d_mkdir}/$meta{t_mkdir}" . ($meta{last} ? "" : "+"), ($meta{d_cp} + $meta{d_ln}) . "/" . ($meta{t_cp} + $meta{t_ln}) . ($meta{last} ? "" : "+"), format_bytes($meta{s_cp}) . "/" . format_bytes($meta{s_total}) . ($meta{last} ? "" : "+"), strftime('%m/%d', localtime($meta{time0})), format_seconds($time1 - $meta{time0}), format_bytes($rate) . "/s"); my $left; if ($rate && $meta{last} && !$meta{time1}) { # add estimated time to completion my $rate1 = $rate; # add time for cps, sums, cksums foreach my $cmd (qw(cp sum cksum)) { # skip if no operations of this type needed next if ($meta{"t_$cmd"} == 0); if (defined $meta{"t0_$cmd"} && $time1 > $meta{"t0_$cmd"} && $meta{"s_$cmd"} > $meta{"s0_$cmd"}) { # use previous rate for same operation type if available $rate1 = ($meta{"s_$cmd"} - $meta{"s0_$cmd"}) / ($time1 - $meta{"t0_$cmd"}); } # use rate for previous operation type otherwise $left += ($meta{s_total} - $meta{"s_$cmd"}) / $rate1; } my $ncli = sum(map {$meta{$_}} (grep(/^clients_/, keys %meta))); $ncli = 1 if (!$ncli); if (defined $meta{t0_chattr} && $time1 > $meta{t0_chattr} && $meta{d_chattr} > $meta{s0_chattr}) { # use previous rate for chattrs when available $rate1 = ($meta{d_chattr} - $meta{s0_chattr}) / ($time1 - $meta{t0_chattr}); $left += ($meta{t_chattr} - $meta{d_chattr}) / $rate1; } else { # add time for chattrs assuming 100/s rate otherwise $left += ($meta{t_chattr} - $meta{d_chattr}) / 100 / $ncli; } # add time for non-cp manager calls assuming 1/s rate foreach (qw(chattr cksum sum)) { $left += ($meta{"t_$_"} - $meta{"d_$_"} - $meta{"e_$_"}) / $meta{files} / $ncli; } $left = format_seconds($left); } my $s_total = $meta{verify} ? 2 * $meta{s_total} : 0; # add second row for each transfer with sums, attrs and sum size my @row2 = ("", "", ($meta{d_sum} + $meta{d_cksum}) . "/" . ($meta{t_sum} + $meta{t_cksum}) . ($meta{last} ? "" : "+"), "$meta{d_chattr}/$meta{t_chattr}" . ($meta{last} ? "" : "+"), format_bytes($meta{s_sum} + $meta{s_cksum}) . "/" . format_bytes($s_total) . ($meta{last} ? "" : "+"), strftime('%R', localtime($meta{time0})), $left, ""); if ($opts{status} eq 'csv') { print join(",", @row, @row2), "\n"; } else { push(@rows, \@row, \@row2); } } # csv output has already been printed by this point return if ($opts{status} eq 'csv'); if (scalar(@metas) > $conf{status_lines}) { if ($dones && $dones < scalar(@metas) - $conf{status_lines}) { # leave at least one completed transfer in output $dones--; } elsif ($dones > scalar(@metas) - $conf{status_lines}) { # skip older completed transfers beyond configured output limit $dones = scalar(@metas) - $conf{status_lines}; } } my $skip = $> != 0 && $dones && !$opts{id} && !$opts{state} && scalar(@metas) > $conf{status_lines} ? $dones : 0; for (my $i = 0; $i < scalar(@rows); $i += 2) { next if ($skip && $rows[$i]->[1] eq 'done' && $dones-- > 0); # add saved rows into table $t->row(@{$rows[$i]}); $t->row(@{$rows[$i + 1]}); } # return/output final table depending on id option $opts{id} ? return $t->render : print $t->render; # notify user when completed transfers not shown print "\n" . ucfirst("$skip completed transfer(s) omitted ") . "(show using \"--status --state=done\")\n" if ($skip); } #################### #### sync_local #### #################### # synchronize given file or metadata of current transfer to configured sync host sub sync_local { my $file = shift; my $fhpid = open3_run([undef, undef, -1], "ssh $conf{sync_host} shift-mgr --sync"); my ($out, $in) = ($fhpid->[0], $fhpid->[1]); my $rc0 = sync_return($in); if ($file && ! -e "$conf{user_dir}/$file") { $out->write("#" . escape($file) . " -1\n### 200\n"); sync_return($in); } else { my ($meta1, $meta2); my @files; if ($file) { @files = ($file); } else { @files = glob "$opts{base}/*"; @files = map {s/^$conf{user_dir}\///; $_} @files; my $mfile = "$opts{base}/meta"; $meta1 = get_meta($mfile, 1); $meta2 = get_meta($mfile, 2); } foreach $file (@files) { my $base = basename($file); # find is not log-structured so is rebuilt instead of sync'd next if ($base eq 'find'); my $doing; if ($base =~ /^doing_/ && $meta1 && $meta2) { $doing = min($meta1->{$base}, $meta2->{$base}); } # retry on failure foreach (1 .. $conf{default_retry}) { my @stat = stat "$conf{user_dir}/$file"; $out->write("#" . escape($file) . " $stat[7] $stat[2] $doing\n"); my $rc = sync_local_io($in, $out, $file, $stat[7]); last if (!ref $rc); } } } $out->write("#exit\n") if (!ref $rc0); open3_wait($fhpid); } ####################### #### sync_local_io #### ####################### # perform local side of sync and return result or return error message in hash sub sync_local_io { my ($in, $out, $file, $len) = @_; my $fh = IO::File->new("$conf{user_dir}/$file", O_RDONLY); my $err; if (!defined $fh) { $err = {error => "Error opening $file: $!"}; # remove newlines so doesn't interfere with protocol $err->{error} =~ s/\n//g; $out->write("### 500 $err->{error}: $!\n"); } else { $out->write("### 100\n"); } my $rc = sync_return($in); return (ref $err ? $err : $rc) if (ref $err || ref $rc); my $off = $rc; $len -= $off; # assume seek works $fh->seek($off, 0); $rc = undef; my $size = 4 * 1048576; while ($len > 0) { $size = $len if ($len < $size); my $buf; my $n = $fh->sysread($buf, $size); last if ($n < $size); $out->write("### 200\n"); $out->write($buf); $len -= $n; } $fh->close; if ($len > 0) { $rc = {error => "Error reading $file: $!"}; # remove newlines so doesn't interfere with protocol $rc->{error} =~ s/\n//g; $out->write("### 500 $rc->{error}\n"); sync_return($in); } else { $out->write("### 200\n"); $rc = sync_return($in); } return $rc; } ##################### #### sync_remote #### ##################### # initiate fish protocol and perform each transfer given on STDIN sub sync_remote { $SIG{'CHLD'} = 'IGNORE'; my $in = \*STDIN; my $out = \*STDOUT; $out->autoflush(1); # indicate running $out->write("### 200\n"); while (defined($_ = $in->getline)) { s/^\s+|\s+$//g; next if (!s/^#//); my @args = map {unescape($_)} split(/\s+/); exit if (scalar(@args) == 1 && $args[0] eq 'exit'); my $rc = sync_remote_io($in, $out, @args); } } ######################## #### sync_remote_io #### ######################## # perform remote side of sync and return result or return error message in hash sub sync_remote_io { my ($in, $out, $file, $len, $mode, $doing) = @_; # untaint file $file = $1 if ($file =~ /(.*)/); # untaint mode $mode = $1 if ($mode =~ /(.*)/); $file = "$conf{user_dir}/$file"; if ($len < 0) { # file/dir does not exist on client so remove rmtree($file); $out->write("### 200\n"); return sync_return($in); } my $off = (stat $file)[7]; if ($off > $len || $file =~ /\.(db|load)$/) { # server file bigger than client file or db/lock file - copy whole truncate($file, 0); $off = 0; } elsif ($file =~ /doing_/) { $off = $doing; } elsif ($off && $off < $len) { $off--; } # create implicit directories eval {mkpath(dirname($file), {mode => 0755})}; my $fh = IO::File->new($file, O_WRONLY | O_CREAT); my $err; if (!defined $fh) { $err = {error => "Error opening $file: $!"}; } elsif (defined $off && !$fh->seek($off, 0)) { $fh->close; $err = {error => "Error seeking $file: $!"}; } if ($err) { # remove newlines so doesn't interfere with protocol $err->{error} =~ s/\n//g; $out->write("### 500 $err->{error}\n"); } else { $out->write("$off\n"); $out->write("### 100\n"); } my $rc = sync_return($in); return (ref $err ? $err : $rc) if (ref $err || ref $rc); $rc = undef; $len -= $off; my $size = 4 * 1048576; while ($len > 0) { $size = $len if ($len < $size); $rc = sync_return($in); if (ref $rc) { $fh->close; return $rc; } my $buf; my $n = $in->read($buf, $size); last if ($n < $size); $fh->syswrite($buf); $len -= $n; } $fh->close; chmod($mode & 07777, $file); if ($len > 0) { $rc = {error => "Error reading $file: $!"}; # remove newlines so doesn't interfere with protocol $rc->{error} =~ s/\n//g; $out->write("### 500 $rc->{error}\n"); sync_return($in); # revert to original size truncate($file, $off); } else { $out->write("### 200\n"); $rc = sync_return($in); } return $rc; } ##################### #### sync_return #### ##################### # parse fish return values and return text or return error message in hash sub sync_return { my $in = shift; my $text; while (defined($_ = $in->getline)) { if (/^###\s+(\d+)(.*)/) { if ($1 != 200 && $1 != 100) { return {error => $2}; } else { $text =~ s/\s+$//; return $text; } } else { $text .= $_; } } return {error => "Invalid protocol return"}; } ################## #### throttle #### ################## # return amount of time transfer should sleep based on configured limits sub throttle { my %cli_load = split(/[= ]+/, $meta{"load_$opts{host}$opts{cid}"}); my $sleep = 0; # disk throttling foreach my $fs (grep(/^disk_/, keys %cli_load)) { if (defined $meta{$fs} && $cli_load{$fs} <= $meta{$fs}) { # load has become less than lower threshold delete $meta{$fs}; } elsif (defined $meta{$fs}) { # load still higher than lower threshold $sleep = 300; } foreach my $hl ($meta{disk}, $conf{"throttle_$fs"}) { next if ($hl !~ /^(\d+):(\d+)$/); my ($high, $low) = split(/:/, $hl); if ($cli_load{$fs} >= $high) { # load has become higher than upper threshold $meta{$fs} = $low; $sleep = 300; } } } # only throttle further when there was some load generated return $sleep if ($cli_load{ratio} <= 0); my @cli_keys = grep(/^(cpu|io[rw]?|net[rw]?)$/, keys %meta); my @user_keys = grep(/^throttle_\w+_user/, keys %conf); my @fshost_keys = grep(/^throttle_\w+_(fs|host)/, keys %conf); # only throttle further when configured return $sleep if (!scalar(@cli_keys) && !scalar(@user_keys) && !scalar(@fshost_keys)); # compute new load for this transfer since its global data not updated yet my %my_loaddb = eval {%{retrieve("$conf{user_dir}/$opts{user}.load")}}; my %my_load = split(/[= ]+/, $my_loaddb{"next_id_$opts{id}$opts{cid}_$opts{host}"}); $cli_load{time} = 1 if (!$cli_load{time}); # convert sizes to MB/s and scale by actual/estimated ratio $my_load{$_} = $cli_load{ratio} * $my_load{$_} / 1E6 / $cli_load{time} foreach (keys %my_load); $my_load{time} = $cli_load{time}; # client throttling foreach my $key (@cli_keys) { next if ($meta{$key} <= 0); my $metric = $key; # count both r/w cases when r/w not specified $metric .= "." if ($metric eq 'io' || $metric eq 'net'); my $total; $total += $my_load{$_} foreach (grep(/^$metric\_host_/, keys %my_load)); # sleep amount necessary to bring average to specified limit my $tmp = ($total / $meta{$key} - 1) * $my_load{time}; $sleep = $tmp if ($tmp > $sleep); } # user throttling my $my_key = "id_$opts{id}$opts{cid}_$opts{host}"; foreach my $key (@user_keys) { next if ($conf{$key} <= 0); if ($key =~ /^throttle_([a-z]+)_user(?:_(\S+))?$/) { my ($metric, $user) = ($1, $2); # only throttle if limit relevant to this user next if ($user && $user ne $opts{user}); # count both r/w cases when r/w not specified $metric .= "." if ($metric eq 'io' || $metric eq 'net'); my @id_vals; my $id_load; my $my_index; # compute relevant load for all transfers of user foreach my $id_key (grep(/^id_/, keys %my_loaddb)) { if ($id_key eq $my_key) { # use current computed load for this transfer $my_index = scalar(@id_vals); $id_load = \%my_load; } else { # all other transfers based on global load data $id_load = {split(/[= ]+/, $my_loaddb{$id_key})}; } my $val; # value may be based on multiple items when r/w not given $val += $id_load->{$_} foreach (grep(/^$metric\_host_/, keys %{$id_load})); push(@id_vals, $val); } # only throttle if combined load of all transfers is above limit next if (!scalar(@id_vals) || sum(@id_vals) <= $conf{$key}); # each transfer initially gets an equal share of the load limit my $per_id = $conf{$key} / scalar(@id_vals); my ($extra, $n_extra); # determine if any transfers are not using their entire share foreach (@id_vals) { my $tmp = $per_id - $_; if ($tmp > 0) { $extra += $tmp; $n_extra++; } } # adjust per transfer limit by dividing up unused shares $per_id += $extra / (scalar(@id_vals) - $n_extra); # sleep amount necessary to bring average to specified limit my $tmp = ($id_vals[$my_index] / $per_id - 1) * $cli_load{time}; $sleep = $tmp if ($tmp > $sleep); } } # fs/host throttling my %all_loaddb; if (scalar(@fshost_keys)) { # consolidate the load info from all users foreach my $file (glob "$opts{user_dir}/*.load") { my $user = $file; $user =~ s/.*\/|\.load$//g; my %loaddb = eval {%{retrieve($file)}}; # ignore the ^next_ load fields $all_loaddb{"$user\_$_"} = $loaddb{$_} foreach (grep(/^id_/, keys %loaddb)); } } $my_key = "$opts{user}_$my_key"; foreach my $key (@fshost_keys) { next if ($conf{$key} <= 0); if ($key =~ /^throttle_([a-z]+)_(fs|host)(?:_(\S+))?$/) { my ($metric, $type, $type_val) = ($1, $2, $3); # count both r/w cases when r/w not specified $metric .= "." if ($metric eq 'io' || $metric eq 'net'); # only throttle if limit relevant to this transfer next if ($type_val && !grep(/^$metric\_$type\_$type_val$/, keys %my_load)); # compute the fs/host values applicable to this transfer my %my_type_vals; if ($type_val) { # use specified value when given $my_type_vals{$type_val} = 1; } else { # use all fs/host values in this transfer when no value given foreach (grep(/^$metric\_$type\_/, keys %my_load)) { my $val = $_; $val =~ s/^$metric\_$type\_//; $my_type_vals{$val} = 1; } } foreach my $my_type_val (keys %my_type_vals) { my @all_users; my @all_vals; my $all_load; my ($my_index, $my_user_index1, $my_user_index2, $prev_user); # compute relevant load for all transfers foreach my $all_key (sort(keys %all_loaddb)) { if ($all_key eq $my_key) { # use current computed load for this transfer $my_index = scalar(@all_vals); $all_load = \%my_load; } else { # all other transfers based on global load data $all_load = {split(/[= ]+/, $all_loaddb{$all_key})}; } my $user = $all_key; $user =~ s/_id_.*//g; # store where each user's transfer begin in load list if ($prev_user ne $user) { $my_user_index1 = scalar(@all_vals) if ($user eq $opts{user}); $my_user_index2 = scalar(@all_vals) if ($prev_user eq $opts{user}); push(@all_users, scalar(@all_vals)); $prev_user = $user; } my $val; # value may be based on multiple items when r/w not given $val += $all_load->{$_} foreach (grep(/^$metric\_$type\_$my_type_val$/, keys %{$all_load})); push(@all_vals, $val); } # only throttle if combined load of all transfers is above limit next if (!scalar(@all_vals) || sum(@all_vals) <= $conf{$key}); # each user initially gets an equal share of the load limit my $per_user = $conf{$key} / scalar(@all_users); my $my_user_index2 = scalar(@all_vals) if (!defined $my_user_index2); # no throttling needed if this user is under per_user limit next if (sum(@all_vals[ $my_user_index1 .. $my_user_index2 - 1]) <= $per_user); # add extra index for processing of last user push(@all_users, scalar(@all_vals)); my $index1 = shift @all_users; my ($extra, $n_extra); # determine if any users are not using their entire share foreach my $index2 (@all_users) { my $tmp = $per_user - sum(@all_vals[$index1 .. $index2 - 1]); if ($tmp > 0) { $extra += $tmp; $n_extra++; } $index1 = $index2; } # adjust per user limit by dividing up unused shares $per_user += $extra / (scalar(@all_vals) - $n_extra); # each transfer initially gets an equal share of the user limit my $per_id = $per_user / ($my_user_index2 - $my_user_index1); ($extra, $n_extra) = (0, 0); # determine if any transfers are not using their entire share foreach (@all_vals[$my_user_index1 .. $my_user_index2 - 1]) { my $tmp = $per_id - $_; if ($tmp > 0) { $extra += $tmp; $n_extra++; } } # adjust per transfer limit by dividing up unused shares $per_id += $extra / ($my_user_index2 - $my_user_index1 - $n_extra); # sleep amount necessary to bring average to specified limit my $tmp = ($all_vals[$my_index] / $per_id - 1) * $cli_load{time}; $sleep = $tmp if ($tmp > $sleep); } } } # eliminate fractions $sleep = int($sleep + 0.5) if ($sleep); return $sleep; } ################## #### unescape #### ################## # return uri-unescaped version of given string sub unescape { my $text = shift; $text =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg if (defined $text); return $text; } # This chunk of stuff was generated by App::FatPacker. To find the original # file's code, look for the end of this BEGIN block or the string 'FATPACK' BEGIN { my %fatpacked; $fatpacked{"IPC/Open3.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'IPC_OPEN3'; package IPC::Open3;use strict;no strict 'refs';our ($VERSION,@ISA,@EXPORT);require Exporter;use Carp;use Symbol qw(gensym qualify);$VERSION='1.18';@ISA=qw(Exporter);@EXPORT=qw(open3);our$Me='open3 (bug)';sub xpipe {pipe $_[0],$_[1]or croak "$Me: pipe($_[0], $_[1]) failed: $!"}sub xopen {open $_[0],$_[1],@_[2..$#_]and return;local $"=', ';carp "$Me: open(@_) failed: $!"}sub xclose {$_[0]=~ /\A=?(\d+)\z/ ? do {my$fh;open($fh,$_[1].'&=' .$1)and close($fh)}: close $_[0]or croak "$Me: close($_[0]) failed: $!"}sub xfileno {return $1 if $_[0]=~ /\A=?(\d+)\z/;return fileno $_[0]}use constant FORCE_DEBUG_SPAWN=>0;use constant DO_SPAWN=>$^O eq 'os2' || $^O eq 'MSWin32' || FORCE_DEBUG_SPAWN;sub _open3 {local$Me=shift;splice @_,0,1,undef if \$_[0]==\undef;splice @_,1,1,undef if \$_[1]==\undef;unless (eval {$_[0]=gensym unless defined $_[0]&& length $_[0];$_[1]=gensym unless defined $_[1]&& length $_[1];1}){$@ =~ s/(?<=value attempted) at .*//s;croak "$Me: $@"}my@handles=({mode=>'<',handle=>\*STDIN },{mode=>'>',handle=>\*STDOUT },{mode=>'>',handle=>\*STDERR },);for (@handles){$_->{parent}=shift;$_->{open_as}=gensym}if (@_ > 1 and $_[0]eq '-'){croak "Arguments don't make sense when the command is '-'"}$handles[2]{parent}||=$handles[1]{parent};$handles[2]{dup_of_out}=$handles[1]{parent}eq $handles[2]{parent};my$package;for (@handles){$_->{dup}=($_->{parent}=~ s/^[<>]&//);if ($_->{parent}!~ /\A=?(\d+)\z/){$package //= caller 1;$_->{parent}=qualify $_->{parent},$package}next if $_->{dup}or $_->{dup_of_out};if ($_->{mode}eq '<'){xpipe $_->{open_as},$_->{parent}}else {xpipe $_->{parent},$_->{open_as}}}my$kidpid;if (!DO_SPAWN){xpipe my$stat_r,my$stat_w;$kidpid=fork;croak "$Me: fork failed: $!" unless defined$kidpid;if ($kidpid==0){eval {untie*STDIN;untie*STDOUT;untie*STDERR;close$stat_r;require Fcntl;my$flags=fcntl$stat_w,&Fcntl::F_GETFD,0;croak "$Me: fcntl failed: $!" unless$flags;fcntl$stat_w,&Fcntl::F_SETFD,$flags|&Fcntl::FD_CLOEXEC or croak "$Me: fcntl failed: $!";if (!$handles[2]{dup_of_out}&& $handles[2]{dup}&& xfileno($handles[2]{parent})==fileno \*STDOUT){my$tmp=gensym;xopen($tmp,'>&',$handles[2]{parent});$handles[2]{parent}=$tmp}for (@handles){if ($_->{dup_of_out}){xopen \*STDERR,">&STDOUT" if defined fileno STDERR && fileno STDERR!=fileno STDOUT}elsif ($_->{dup}){xopen $_->{handle},$_->{mode}.'&',$_->{parent}if fileno $_->{handle}!=xfileno($_->{parent})}else {xclose $_->{parent},$_->{mode};xopen $_->{handle},$_->{mode}.'&=',fileno $_->{open_as}}}return 1 if ($_[0]eq '-');exec @_ or do {local($")=(" ");croak "$Me: exec of @_ failed"}}and do {close$stat_w;return 0};my$bang=0+$!;my$err=$@;utf8::encode$err if $] >= 5.008;print$stat_w pack('IIa*',$bang,length($err),$err);close$stat_w;eval {require POSIX;POSIX::_exit(255)};exit 255}else {close$stat_w;my$to_read=length(pack('I',0))* 2;my$bytes_read=read($stat_r,my$buf='',$to_read);if ($bytes_read){(my$bang,$to_read)=unpack('II',$buf);read($stat_r,my$err='',$to_read);waitpid$kidpid,0;if ($err){utf8::decode$err if $] >= 5.008}else {$err="$Me: " .($!=$bang)}$!=$bang;die($err)}}}else {my@close;for (@handles){if ($_->{dup_of_out}){$_->{open_as}=$handles[1]{open_as}}elsif ($_->{dup}){$_->{open_as}=$_->{parent}=~ /\A[0-9]+\z/ ? $_->{parent}: \*{$_->{parent}};push@close,$_->{open_as}}else {push@close,\*{$_->{parent}},$_->{open_as}}}require IO::Pipe;$kidpid=eval {spawn_with_handles(\@handles,\@close,@_)};die "$Me: $@" if $@}for (@handles){next if $_->{dup}or $_->{dup_of_out};xclose $_->{open_as},$_->{mode}}xclose$handles[0]{parent},$handles[0]{mode}if$handles[0]{dup};select((select($handles[0]{parent}),$|=1)[0]);$kidpid}sub open3 {if (@_ < 4){local $"=', ';croak "open3(@_): not enough arguments"}return _open3 'open3',@_}sub spawn_with_handles {my$fds=shift;my$close_in_child=shift;my ($fd,%saved,@errs);for$fd (@$fds){$fd->{tmp_copy}=IO::Handle->new_from_fd($fd->{handle},$fd->{mode});$saved{fileno$fd->{handle}}=$fd->{tmp_copy}if$fd->{tmp_copy}}for$fd (@$fds){bless$fd->{handle},'IO::Handle' unless eval {$fd->{handle}->isa('IO::Handle')};my$open_as=$fd->{open_as};my$fileno=fileno($open_as);$fd->{handle}->fdopen(defined($fileno)? $saved{$fileno}|| $open_as : $open_as,$fd->{mode})}unless ($^O eq 'MSWin32'){require Fcntl;for$fd (@$close_in_child){next unless fileno$fd;fcntl($fd,Fcntl::F_SETFD(),1)or push@errs,"fcntl $fd: $!" unless$saved{fileno$fd}}}my$pid;unless (@errs){if (FORCE_DEBUG_SPAWN){pipe my$r,my$w or die "Pipe failed: $!";$pid=fork;die "Fork failed: $!" unless defined$pid;if (!$pid){{no warnings;exec @_}print$w 0 + $!;close$w;require POSIX;POSIX::_exit(255)}close$w;my$bad=<$r>;if (defined$bad){$!=$bad;undef$pid}}else {$pid=eval {system 1,@_}}push@errs,"IO::Pipe: Can't spawn-NOWAIT: $!" if!$pid || $pid < 0}for$fd (reverse @$fds){$fd->{handle}->fdopen($fd->{tmp_copy},$fd->{mode})}for (values%saved){$_->close or croak "Can't close: $!"}croak join "\n",@errs if@errs;return$pid}1; IPC_OPEN3 $fatpacked{"Mail/Sendmail.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'MAIL_SENDMAIL'; package Mail::Sendmail;$VERSION='0.79';%mailcfg=('smtp'=>[qw(localhost) ],'from'=>'','mime'=>1,'retries'=>1,'delay'=>1,'tz'=>'','port'=>25,'debug'=>0);require Exporter;use strict;use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %mailcfg $address_rx $debug $log $error $retry_delay $connect_retries);use Socket;use Time::Local;use Sys::Hostname;eval("use MIME::QuotedPrint");$mailcfg{'mime'}&&=(!$@);@ISA=qw(Exporter);@EXPORT=qw(&sendmail);@EXPORT_OK=qw(%mailcfg time_to_date $address_rx $debug $log $error);my$word_rx='[\x21\x23-\x27\x2A-\x2B\x2D\x2F\w\x3D\x3F]+';my$user_rx=$word_rx .'(?:\.' .$word_rx .')*' ;my$dom_rx='\w[-\w]*(?:\.\w[-\w]*)*';my$ip_rx='\[\d{1,3}(?:\.\d{1,3}){3}\]';$address_rx='((' .$user_rx .')\@(' .$dom_rx .'|' .$ip_rx .'))';;sub time_to_date {my$time=$_[0]|| time();my@months=qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);my@wdays=qw(Sun Mon Tue Wed Thu Fri Sat);my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime($time);my$TZ=$mailcfg{'tz'};if ($TZ eq ""){my$offset=sprintf "%.1f",(timegm(localtime)- time)/ 3600;my$minutes=sprintf "%02d",abs($offset - int($offset))* 60;$TZ=sprintf("%+03d",int($offset)).$minutes}return join(" ",($wdays[$wday].','),$mday,$months[$mon],$year+1900,sprintf("%02d:%02d:%02d",$hour,$min,$sec),$TZ)}sub sendmail {$error='';$log="Mail::Sendmail v. $VERSION - " .scalar(localtime())."\n";my$CRLF="\015\012";local $/=$CRLF;local $\='';local $_;my (%mail,$k,$smtp,$server,$port,$connected,$localhost,$fromaddr,$recip,@recipients,$to,$header,);sub fail {print STDERR @_ if $^W;$error .= join(" ",@_)."\n";close S;return 0}sub socket_write {my$i;for$i (0..$#_){my$data=ref($_[$i])? $_[$i]: \$_[$i];if ($mailcfg{'debug'}> 5){if (length($$data)< 500){print ">",$$data}else {print "> [...",length($$data)," bytes sent ...]\n"}}print(S $$data)|| return 0}1}sub socket_read {my$response;do {chomp($_=);print "<$_\n" if$mailcfg{'debug'}> 5;if (/^[45]/ or!$_){return}$response .= $_}while (/^[\d]+-/);return$response}for$k (keys%mailcfg){if ($k =~ /[A-Z]/){$mailcfg{lc($k)}=$mailcfg{$k}}}while (@_){$k=shift @_;if (!$k and $^W){warn "Received false mail hash key: \'$k\'. Did you forget to put it in quotes?\n"}$k=ucfirst lc($k);$k =~ s/\s*:\s*$//o;$k =~ s/-(.)/"-" . uc($1)/ge;$mail{$k}=shift @_}$smtp=$mail{'Smtp'}|| $mail{'Server'};unshift @{$mailcfg{'smtp'}},$smtp if ($smtp and $mailcfg{'smtp'}->[0]ne $smtp);delete$mail{'Smtp'};delete$mail{'Server'};$mailcfg{'port'}=$mail{'Port'}|| $mailcfg{'port'}|| 25;delete$mail{'Port'};{local $^W=0;$mail{'Message'}=join("",$mail{'Message'},$mail{'Body'},$mail{'Text'})}delete$mail{'Body'};delete$mail{'Text'};$fromaddr=$mail{'Sender'}|| $mail{'From'}|| $mailcfg{'from'};delete$mail{'Sender'};unless ($fromaddr =~ /$address_rx/){return fail("Bad or missing From address: \'$fromaddr\'")}$fromaddr=$1;$mail{Date}||=time_to_date();$log .= "Date: $mail{Date}\n";$mail{'Message'}=~ s/\r\n/\n/go;$mail{'Mime-Version'}||='1.0';$mail{'Content-Type'}||='text/plain; charset="iso-8859-1"';unless ($mail{'Content-Transfer-Encoding'}|| $mail{'Content-Type'}=~ /multipart/io){if ($mailcfg{'mime'}){$mail{'Content-Transfer-Encoding'}='quoted-printable';$mail{'Message'}=encode_qp($mail{'Message'})}else {$mail{'Content-Transfer-Encoding'}='8bit';if ($mail{'Message'}=~ /[\x80-\xFF]/o){$error .= "MIME::QuotedPrint not present!\nSending 8bit characters, hoping it will come across OK.\n";warn "MIME::QuotedPrint not present!\n","Sending 8bit characters without encoding, hoping it will come across OK.\n" if $^W}}}$mail{'Message'}=~ s/^\./\.\./gom;$mail{'Message'}=~ s/\n/$CRLF/go;{local $^W=0;$recip=join(", ",$mail{To},$mail{Cc},$mail{Bcc})}delete$mail{'Bcc'};@recipients=();while ($recip =~ /$address_rx/go){push@recipients,$1}unless (@recipients){return fail("No recipient!")}$localhost=hostname()|| 'localhost';for$server (@{$mailcfg{'smtp'}}){unless (socket S,AF_INET,SOCK_STREAM,scalar(getprotobyname 'tcp')){return fail("socket failed ($!)")}print "- trying $server\n" if$mailcfg{'debug'}> 1;$server =~ s/\s+//go;$port=($server =~ s/:(\d+)$//o)? $1 : $mailcfg{'port'};$smtp=$server;my$smtpaddr=inet_aton$server;unless ($smtpaddr){$error .= "$server not found\n";next}my$retried=0;while ((not $connected=connect S,pack_sockaddr_in($port,$smtpaddr))and ($retried < $mailcfg{'retries'})){$retried++;$error .= "connect to $server failed ($!)\n";print "- connect to $server failed ($!)\n" if$mailcfg{'debug'}> 1;print "retrying in $mailcfg{'delay'} seconds...\n" if$mailcfg{'debug'}> 1;sleep$mailcfg{'delay'}}if ($connected){print "- connected to $server\n" if$mailcfg{'debug'}> 3;last}else {$error .= "connect to $server failed\n";print "- connect to $server failed, next server...\n" if$mailcfg{'debug'}> 1;next}}unless ($connected){return fail("connect to $smtp failed ($!) no (more) retries!")};{local $^W=0;$log .= "Server: $smtp Port: $port\n" ."From: $fromaddr\n" ."Subject: $mail{Subject}\n" ."To: "}my($oldfh)=select(S);$|=1;select($oldfh);socket_read()|| return fail("Connection error from $smtp on port $port ($_)");socket_write("HELO $localhost$CRLF")|| return fail("send HELO error");socket_read()|| return fail("HELO error ($_)");socket_write("MAIL FROM: <$fromaddr>$CRLF")|| return fail("send MAIL FROM: error");socket_read()|| return fail("MAIL FROM: error ($_)");for$to (@recipients){socket_write("RCPT TO: <$to>$CRLF")|| return fail("send RCPT TO: error");socket_read()|| return fail("RCPT TO: error ($_)");$log .= "$to\n "}socket_write("DATA$CRLF")|| return fail("send DATA error");socket_read()|| return fail("DATA error ($_)");for$header (keys%mail){next if$header eq "Message";$mail{$header}=~ s/\s+$//o;socket_write("$header: $mail{$header}$CRLF")|| return fail("send $header: error")};socket_write($CRLF,\$mail{'Message'},"$CRLF.$CRLF")|| return fail("send message error");socket_read()|| return fail("message transmission error ($_)");$log .= "\nResult: $_";socket_write("QUIT$CRLF")|| return fail("send QUIT error");socket_read();close S;return 1}1; MAIL_SENDMAIL $fatpacked{"Text/FormatTable.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'TEXT_FORMATTABLE'; package Text::FormatTable;use Carp;use strict;use warnings;use vars qw($VERSION);$VERSION='1.03';sub _uncolorized_length($) {my$str=shift;$str =~ s/\e \[ [^m]* m//xmsg;return length$str}sub _min_width($) {my$str=shift;my$min;for my$s (split(/\s+/,$str)){my$l=_uncolorized_length$s;$min=$l if not defined$min or $l > $min}return$min ? $min : 1}sub _max_width($) {my$str=shift;my$len=_uncolorized_length$str;return$len ? $len : 1}sub _max($$) {my ($a,$b)=@_;return$a if defined$a and (not defined$b or $a >= $b);return$b}sub _wrap($$) {my ($width,$text)=@_;my@lines=split(/\n/,$text);my@w=();for my$l (@lines){push@w,@{_wrap_line($width,$l)}}return \@w}sub _wrap_line($$) {my ($width,$text)=@_;my$width_m1=$width-1;my@t=($text);while(1){my$t=pop@t;my$l=_uncolorized_length$t;if($l <= $width){push@t,$t;return \@t}elsif($t =~ /^(.{0,$width_m1}\S)\s+(\S.*?)$/){push@t,$1;push@t,$2}elsif($t =~ /(.{$width,}?\S)\s+(\S.*?)$/){if (_uncolorized_length $1 > $width_m1){my$left=substr($1,0,$width);my$right=substr($1,$width);push@t,$left;push@t,$right;push@t,$2}else {push@t,$1;push@t,$2}}else {my$left=substr($t,0,$width);my$right=substr($t,$width);push@t,$left;push@t,$right;return \@t}}return \@t}sub _l_box($$) {my ($width,$text)=@_;my$lines=_wrap($width,$text);map {$_ .= ' 'x($width-_uncolorized_length($_))}@$lines;return$lines}sub _r_box($$) {my ($width,$text)=@_;my$lines=_wrap($width,$text);map {$_=(' 'x($width-_uncolorized_length($_)).$_)}@$lines;return$lines}sub _distribution_f($) {my$max_width=shift;return log($max_width)}sub _calculate_widths($$) {my ($self,$width)=@_;my@widths=();for my$r (@{$self->{data}}){$r->[0]eq 'data' or $r->[0]eq 'head' or next;my$cn=0;my ($max,$min)=(0,0);for my$c (@{$r->[1]}){if ($self->{fixed_widths}[$cn]){$widths[$cn][0]=$self->{fixed_widths}[$cn];$widths[$cn][1]=$self->{fixed_widths}[$cn]}else {$widths[$cn][0]=_max($widths[$cn][0],_min_width$c);$widths[$cn][1]=_max($widths[$cn][1],_max_width$c)}$cn++}}my ($total_min,$total_max)=(0,0);for my$c (@widths){$total_min += $c->[0];$total_max += $c->[1]}my$extra_width += scalar grep {$_->[0]eq '|' or $_->[0]eq ' '}(@{$self->{format}});$total_min += $extra_width;$total_max += $extra_width;if($total_max <= $width){my$cn=0;for my$c (@widths){$self->{widths}[$cn]=$c->[1];$cn++}$self->{total_width}=$total_max}else {my@dist_width;ITERATION: while(1){my$total_f=0.0;my$fixed_width=0;my$remaining=0;for my$c (@widths){if(defined$c->[2]){$fixed_width += $c->[2]}else {$total_f += _distribution_f($c->[1]);$remaining++}}my$available_width=$width-$extra_width-$fixed_width;if($available_width < $remaining*5){$available_width=$remaining*5;$width=$extra_width+$fixed_width+$available_width}my$cn=-1;COLUMN: for my$c (@widths){$cn++;next COLUMN if defined$c->[2];my$w=_distribution_f($c->[1])* $available_width / $total_f;if($c->[0]> $w){$c->[2]=$c->[0];next ITERATION}if($c->[1]< $w){$c->[2]=$c->[1];next ITERATION}$dist_width[$cn]=int($w)}last}my$cn=0;for my$c (@widths){$self->{widths}[$cn]=defined$c->[2]? $c->[2]: $dist_width[$cn];$cn++}}}sub _render_rule($$) {my ($self,$char)=@_;my$out='';my ($col,$data_col)=(0,0);for my$c (@{$self->{format}}){if($c->[0]eq '|'){if ($char eq '-'){$out .= '+'}elsif($char eq ' '){$out .= '|'}else {$out .= $char}}elsif($c->[0]eq ' '){$out .= $char}elsif($c->[0]eq 'l' or $c->[0]eq 'L' or $c->[0]eq 'r' or $c->[0]eq 'R'){$out .= ($char)x($self->{widths}[$data_col]);$data_col++}$col++}return$out."\n"}sub _render_data($$) {my ($self,$data)=@_;my@rdata;my ($col,$data_col)=(0,0);my$lines=0;my@rows_in_column;for my$c (@{$self->{format}}){if(($c->[0]eq 'l')or ($c->[0]eq 'L')){my$lb=_l_box($self->{widths}[$data_col],$data->[$data_col]);$rdata[$data_col]=$lb;my$l=scalar @$lb ;$lines=$l if$lines < $l;$rows_in_column[$data_col]=$l;$data_col++}elsif(($c->[0]eq 'r')or ($c->[0]eq 'R')){my$rb=_r_box($self->{widths}[$data_col],$data->[$data_col]);$rdata[$data_col]=$rb;my$l=scalar @$rb ;$lines=$l if$lines < $l;$rows_in_column[$data_col]=$l ;$data_col++}$col++}my$out='';for my$l (0..($lines-1)){my ($col,$data_col)=(0,0);for my$c (@{$self->{format}}){if($c->[0]eq '|'){$out .= '|'}elsif($c->[0]eq ' '){$out .= ' '}elsif($c->[0]eq 'L' or $c->[0]eq 'R'){my$start_print=$lines - $rows_in_column[$data_col];if (defined$rdata[$data_col][$l-$start_print]and $l >= $start_print){$out .= $rdata[$data_col][$l-$start_print]}else {$out .= ' 'x($self->{widths}[$data_col])}$data_col++}elsif($c->[0]eq 'l' or $c->[0]eq 'r'){if(defined$rdata[$data_col][$l]){$out .= $rdata[$data_col][$l]}else {$out .= ' 'x($self->{widths}[$data_col])}$data_col++}$col++}$out .= "\n"}return$out}sub _parse_format($$) {my ($self,$format)=@_;my@f=split(//,$format);my@format=();my@width=();my ($col,$data_col)=(0,0);my$wid;for my$f (@f){if ($f =~ /(\d+)/){$wid .= $f;next}if($f eq 'l' or $f eq 'L' or $f eq 'r' or $f eq 'R'){$format[$col]=[$f,$data_col];$width[$data_col]=$wid;$wid=undef;$data_col++}elsif($f eq '|' or $f eq ' '){$format[$col]=[$f]}else {croak "unknown column format: $f"}$col++}$self->{format}=\@format;$self->{fixed_widths}=\@width;$self->{col}=$col;$self->{data_col}=$data_col}sub new($$) {my ($class,$format)=@_;croak "new() requires one argument: format" unless defined$format;my$self={col=>'0',row=>'0',data=>[]};bless$self,$class;$self->_parse_format($format);return$self}sub _preprocess_row_data($$) {my ($self,$data)=@_;my$cn=0;for my$c (0..($#$data)){$data->[$c]=~ s/^\s+//m;$data->[$c]=~ s/\s+$//m}}sub head($@) {my ($self,@data)=@_;scalar@data==$self->{data_col}or croak "number of columns must be $self->{data_col}";$self->_preprocess_row_data(\@data);$self->{data}[$self->{row}++]=['head',\@data]}sub row($@) {my ($self,@data)=@_;scalar@data==$self->{data_col}or croak "number of columns must be $self->{data_col}";@data=map {defined $_ ? $_ : ""}@data;$self->_preprocess_row_data(\@data);$self->{data}[$self->{row}++]=['data',\@data]}sub rule($$) {my ($self,$char)=@_;$char='-' unless defined$char;$self->{data}[$self->{row}++]=['rule',$char]}sub render($$) {my ($self,$width)=@_;$width=79 unless defined$width;$self->_calculate_widths($width);my$out='';for my$r (@{$self->{data}}){if($r->[0]eq 'rule'){$out .= $self->_render_rule($r->[1])}elsif($r->[0]eq 'head'){$out .= $self->_render_data($r->[1])}elsif($r->[0]eq 'data'){$out .= $self->_render_data($r->[1])}}return$out}1; TEXT_FORMATTABLE s/^ //mg for values %fatpacked; my $class = 'FatPacked::'.(0+\%fatpacked); no strict 'refs'; *{"${class}::files"} = sub { keys %{$_[0]} }; if ($] < 5.008) { *{"${class}::INC"} = sub { if (my $fat = $_[0]{$_[1]}) { return sub { return 0 unless length $fat; $fat =~ s/^([^\n]*\n?)//; $_ = $1; return 1; }; } return; }; } else { *{"${class}::INC"} = sub { if (my $fat = $_[0]{$_[1]}) { open my $fh, '<', \$fat or die "FatPacker error loading $_[1] (could be a perl installation issue?)"; return $fh; } return; }; } unshift @INC, bless \%fatpacked, $class; } # END OF FATPACK CODE