#!/usr/bin/perl -T # # Copyright (C) 2012-2021 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration # (NASA). All Rights Reserved. # # This software is distributed under the NASA Open Source Agreement # (NOSA), version 1.3. The NOSA has been approved by the Open Source # Initiative. See http://www.opensource.org/licenses/nasa1.3.php # for the complete NOSA document. # # THE SUBJECT SOFTWARE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY OF ANY # KIND, EITHER EXPRESSED, IMPLIED, OR STATUTORY, INCLUDING, BUT NOT # LIMITED TO, ANY WARRANTY THAT THE SUBJECT SOFTWARE WILL CONFORM TO # SPECIFICATIONS, ANY IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR # A PARTICULAR PURPOSE, OR FREEDOM FROM INFRINGEMENT, ANY WARRANTY THAT # THE SUBJECT SOFTWARE WILL BE ERROR FREE, OR ANY WARRANTY THAT # DOCUMENTATION, IF PROVIDED, WILL CONFORM TO THE SUBJECT SOFTWARE. THIS # AGREEMENT DOES NOT, IN ANY MANNER, CONSTITUTE AN ENDORSEMENT BY # GOVERNMENT AGENCY OR ANY PRIOR RECIPIENT OF ANY RESULTS, RESULTING # DESIGNS, HARDWARE, SOFTWARE PRODUCTS OR ANY OTHER APPLICATIONS RESULTING # FROM USE OF THE SUBJECT SOFTWARE. FURTHER, GOVERNMENT AGENCY DISCLAIMS # ALL WARRANTIES AND LIABILITIES REGARDING THIRD-PARTY SOFTWARE, IF # PRESENT IN THE ORIGINAL SOFTWARE, AND DISTRIBUTES IT "AS IS". # # RECIPIENT AGREES TO WAIVE ANY AND ALL CLAIMS AGAINST THE UNITED STATES # GOVERNMENT, ITS CONTRACTORS AND SUBCONTRACTORS, AS WELL AS ANY PRIOR # RECIPIENT. IF RECIPIENT'S USE OF THE SUBJECT SOFTWARE RESULTS IN ANY # LIABILITIES, DEMANDS, DAMAGES, EXPENSES OR LOSSES ARISING FROM SUCH USE, # INCLUDING ANY DAMAGES FROM PRODUCTS BASED ON, OR RESULTING FROM, # RECIPIENT'S USE OF THE SUBJECT SOFTWARE, RECIPIENT SHALL INDEMNIFY AND # HOLD HARMLESS THE UNITED STATES GOVERNMENT, ITS CONTRACTORS AND # SUBCONTRACTORS, AS WELL AS ANY PRIOR RECIPIENT, TO THE EXTENT PERMITTED # BY LAW. RECIPIENT'S SOLE REMEDY FOR ANY SUCH MATTER SHALL BE THE # IMMEDIATE, UNILATERAL TERMINATION OF THIS AGREEMENT. # # This program coordinates the tracking of file operations that are # performed as part of a user-initiated transfer. It provides a way for # the Shift/Mesh client to add operations, set their state, and retrieve # operations for processing. It also provides various status output to # the user upon request. # min for IO::Compress::RawDeflate is 5.9.4 require 5.009_004; use strict; require Compress::BGZF::Reader; require Compress::BGZF::Writer; use Compress::Zlib; use Data::Dumper; eval { # force use of XS variant in case XS version differs from embedded version require XSLoader; XSLoader::load('Data::MessagePack'); }; require Data::MessagePack; use DB_File; use Fcntl qw(:DEFAULT :flock :mode); use File::Basename; require File::NFSLock; use File::Path; use File::Spec; use Getopt::Long qw(:config bundling no_auto_abbrev no_ignore_case require_order); use IO::File; use IO::Handle; # use embedded IPC::Open3 since versions prior to perl 5.14.0 are buggy require IPC::Open3; use List::Util qw(first max min sum); require Mail::Sendmail; use Math::BigInt; use Net::Ping; use POSIX qw(ceil setsid setuid strftime); use Storable qw(dclone); use Symbol qw(gensym); use Sys::Hostname; use Term::ANSIColor; use Text::ParseWords; require Tie::DB_FileLock; require Text::FormatTable; our $VERSION = 8.1; $Data::Dumper::Pair = " = "; $Data::Dumper::Sortkeys = 1; $Data::Dumper::Terse = 1; $File::NFSLock::LOCK_EXTENSION = ".nfs"; $Term::ANSIColor::EACHLINE = "\n"; # binary byte string conversions my %bibytes = ( '' => 1, K => 1024, M => 1024**2, G => 1024**3, T => 1024**4, P => 1024**5, E => 1024**6, ); # byte string conversions my %bytes = ( '' => 1, K => 1000, M => 1000**2, G => 1000**3, T => 1000**4, P => 1000**5, E => 1000**6, ); # second string conversions my %seconds = ( s => 1, m => 60, h => 60 * 60, d => 24 * 60 * 60, w => 7 * 24 * 60 * 60, ); # define default defaults my %conf = ( bandwidth_ind => "100m", bandwidth_org => "1g", bandwidth_xge => "10g", data_expire => 604800, default_buffer => "4m", default_clients => 1, default_cron => 1, default_files => "1k", 'default_find-files' => "2k", default_hosts => 1, default_interval => 30, default_local => "shift,fish,fish-tcp", default_preallocate => 0, default_remote => "shift", default_retry => 2, default_size => "4g", default_split => 0, 'default_split-tar' => "500g", default_stripe => "1g", default_threads => 4, latency_lan => 0.001, latency_wait => 600, latency_wan => 0.05, local_small => "shift,fish,fish-tcp", max_files => "100k", max_streams_lan => 8, max_streams_wan => 16, min_streams_lan => 1, min_streams_wan => 4, min_window_lan => "1m", min_window_wan => "4m", nfs_lock => 0, opts_bbftp => "", opts_mcp => "--double-buffer", opts_msum => "--double-buffer", opts_ssh => "", opts_ssh_secure => "", org_domains => "com|edu|gov|mil|net|org", remote_small => "shift", small_size_lan => "256m", small_size_local => "1g", small_size_wan => "64m", status_lines => 20, sum_type => "md5", sum_split => "1g", ); my $dbgfh; my $dblock; my $doing; my %ioall; my $localtime = localtime; my @locks; my %meta; my $monfile; my %mounts; my %nload; my $self = (gethostbyname(hostname()))[0]; $self = "localhost" if (!$self); my @stages = qw(chattr cksum cp find ln mkdir sum); my $time = time; my %umounts; my $ustore; # files only readable by owner unless explicitly specified umask 077; # untaint path $ENV{PATH} = "/bin:/usr/bin:/usr/local/bin"; # untaint insecure environment variables delete $ENV{$_} foreach (qw(BASH_ENV CDPATH ENV IFS)); # clean up on exit END {exit_clean()}; use sigtrap qw(handler exit_clean normal-signals); # parse options my %opts; my $rc = GetOptions(\%opts, "alive", "doing:1", "get", "history:s", "host=s", "id=s", "last-sum:1", "lock", "meta:1", "monitor:s", "mounts", "pid=i", "plot:s", "put:s", "restart:s", "search=s", "state=s", "stats:s", "status:s", "stop", "sync", "user=s", ); die "Invalid options\n" if (!$rc || scalar(@ARGV) != 0); # parse configuration foreach my $file ("/etc/shiftrc", (getpwuid($<))[7] . "/.shiftrc") { open(FILE, '<', $file) or next; my $mline; while (my $line = ) { # strip whitespace and comments $line =~ s/^\s+|\s+$|\s*#.*//g; next if (!$line); # support line continuation operator $mline .= $line; next if ($mline =~ s/\s*\\$/ /); if ($mline =~ /^(\S+)\s+(.*)/) { $conf{$1} = $2; } $mline = undef; } close FILE; } die "The user_dir setting must be configured\n" if (!$conf{user_dir}); # opts_bbftp may have fake newlines that must become real newlines $conf{opts_bbftp} =~ s/\\n/\n/g; $conf{sum_split} = parse_bytes($conf{sum_split}, 2); # msum cannot split less than 1M $conf{sum_split} = $bibytes{M} if ($conf{sum_split} < $bibytes{M}); # process --stats immediately before setuid or $conf{user_dir} changes if (defined $opts{stats}) { stats(); exit; } if (defined $opts{user}) { die "Only root can specify user\n" if ($> != 0); # untaint user $opts{user} = $1 if ($opts{user} =~ /^([\w-]+)$/); # perform operations without notifying user $opts{quiet} = 1; # become user so synchronization will work correctly my $uid = getpwnam($opts{user}); setuid($uid) if (defined $uid); die "Unable to setuid to user\n" if (!defined $uid || $< != $uid || $> != $uid); } else { $opts{user} = getpwuid($<); } if (defined $opts{host}) { # untaint host $opts{host} = $1 if ($opts{host} =~ /^([\w.-]+)$/); } elsif ($opts{get} || defined $opts{put}) { die "No host name given\n"; } if (defined $opts{id}) { # untaint id.cid ($opts{id}, $opts{cid}) = ($1, $2) if ($opts{id} =~ /^(\d+)(\.\d+)?$/); } if (defined $opts{search}) { # unescape whitespace and special characters in search string $opts{search} = unescape($opts{search}); } # save user_dir and modify it for globbing across all users $opts{user_dir} = $conf{user_dir}; $opts{user_dir} =~ s/%u/*/g; # replace %u with user in config and make directory if necessary; if ($conf{user_dir} =~ s/%u/$opts{user}/g) { if (-e $conf{user_dir} && ! -d $conf{user_dir}) { die "$conf{user_dir} exists and is not a directory\n"; } elsif (! -d $conf{user_dir}) { # directory should be world readable for load info (mkdir $conf{user_dir} || -d $conf{user_dir}) or die "Cannot create user metadata directory: $!\n"; chmod(0755, $conf{user_dir}); } } $conf{umount_db} = "$conf{user_dir}/$opts{user}.mounts"; if (defined $opts{put} && !defined $opts{id}) { # lock user info lock_dir(0); # new transfer so create identifier and directory my @ids; my $dir = $conf{user_dir}; my $cdir; while (-d $dir) { my @dirs = glob "$dir/$opts{user}.[0-9]*"; # linux has a compiled in max of 32k subdirs so cap at 30k $cdir = $dir if (!$cdir && scalar(@dirs) < 30000); push(@ids, @dirs); $dir .= "/$opts{user}.more"; } if (!$cdir) { mkdir $dir or die "Cannot create overflow metadata directory: $!\n"; chmod(0700, $dir); $cdir = $dir; } @ids = map {substr($_, rindex($_, '.') + 1)} @ids; $opts{id} = (sort {$b <=> $a} @ids)[0]; $opts{id}++; # untaint id $opts{id} = $1 if ($opts{id} =~ /(\d+)/); $opts{base} = "$cdir/$opts{user}.$opts{id}"; mkdir $opts{base} or die "Cannot create transfer metadata directory: $!\n"; chmod(0700, $conf{base}); # unlock user info lock_dir(0, 1); # initialize tells $meta{$_} = 0 foreach (@stages, 'rmkdir'); # initialize log sizes $meta{"$_\_size"} = 0 foreach (@stages, qw(alert done error meta)); # initialize done, error, size, and total counts foreach (@stages) { $meta{"d_$_"} = 0; $meta{"e_$_"} = 0; $meta{"s_$_"} = 0; $meta{"t_$_"} = 0; } $meta{"e_$_"} = 0 foreach (qw(corruption exception)); # initialize run counts $meta{s_run} = 0; $meta{t_run} = 0; $meta{w_run} = 0; $meta{s_error} = 0; $meta{s_total} = 0; $meta{t_split} = 0; # initialize other items $meta{last} = 0; $meta{origin} = $opts{host}; $meta{origin_ip} = (split(/\s+/, $ENV{SSH_CONNECTION})) # tty indicates local invocation [defined $ENV{SSH_TTY} ? 2 : 0]; $meta{split_id} = 0; $meta{stop} = 0; # time0 must be set before sync_id() called $meta{time0} = $time; $meta{sync_id} = sync_id() if ($conf{sync_host}); # store initial metadata to file put_meta(); put_meta(\%meta); # return id print "$opts{id}\n"; } elsif ($opts{mounts}) { # replace mount info in user db while (my $line = ) { $line =~ s/\s*\r?\n$//; # eliminate any random double slashes that crept in $line =~ s/\/\//\//g; my %op = split(/[= ]+/, $line); # ignore malformed lines with undefined op values next if (grep(!/./, values %op)); if ($op{args} eq 'mount') { $umounts{"mounth_$op{servers}:$op{remote}"}->{$op{host}} = 1; $umounts{"mountl_$op{host}:$op{servers}:$op{remote}"} = $op{local}; $umounts{"mountr_$op{host}:$op{local}"} = "$op{servers}:$op{remote}"; $umounts{"mounto_$op{host}:$op{local}"} = $op{opts}; } elsif ($op{args} eq 'shell') { $umounts{shells}->{$op{host}} = 1; } } # lock user info lock_dir(0); # store user db to file mp_store(\%umounts, $conf{umount_db}); # unlock user info lock_dir(0, 1); # synchronize user db sync_queue("$opts{user}.mounts") if ($conf{sync_host}); exit; } elsif (defined $opts{meta}) { $opts{meta} = 1 if ($opts{meta} <= 0); die "Identifier required\n" if (!defined $opts{id}); meta(); exit; } elsif (defined $opts{doing}) { $opts{doing} = 1 if ($opts{doing} <= 0); die "Identifier required\n" if (!defined $opts{id}); doing(); exit; } elsif (defined $opts{history}) { history(); exit; } elsif (!defined $opts{id} && defined $opts{monitor}) { monitor(); exit; } elsif (!defined $opts{id} && defined $opts{status}) { status(); exit; } elsif (defined $opts{'last-sum'}) { last_sum(); exit; } elsif (defined $opts{plot}) { plot(); exit; } elsif ($opts{sync}) { sync_remote(); exit; } elsif (!defined $opts{id}) { die "Invalid options\n"; } else { my $dir = $conf{user_dir}; while (-d $dir) { last if (-d "$dir/$opts{user}.$opts{id}"); $dir .= "/$opts{user}.more"; } $opts{base} = "$dir/$opts{user}.$opts{id}"; } if (! -d $opts{base}) { if ($opts{get} || defined $opts{put}) { print "args=stop\n"; # exit with success so old crontabs fail in loop exit; } die "Invalid identifier\n"; } if (defined $opts{monitor}) { monitor(); exit; } # prevent other processes from accessing files lock_dir(1); if ($opts{lock}) { # indicate ok to proceed print "OK\n"; STDIN->flush; # block until connection closed ; exit; } # retrieve metadata from file after possibly (if needed) reverting %meta = %{get_meta()}; # perform requested actions that require only metadata read access if (defined $opts{status} && $opts{state} eq 'none') { delete $opts{state}; print status(); exit; } elsif (defined $opts{status}) { id_status(); exit; } elsif (defined $opts{restart} && !($meta{stop} || $meta{time1} && sum(map {$meta{"e_$_"}} @stages) > 0)) { die "Only transfers in stop or error states can be restarted\n"; } elsif (defined $opts{restart} && -e "$opts{base}/no_restart") { open(FILE, '<', "$opts{base}/no_restart"); my $msg; $msg .= $_ while (); $msg =~ s/\n/ /g; close FILE; die "Restarts of this transfer have been blocked by the administrator: $msg\n"; } elsif ($opts{stop} && ($meta{stop} || $meta{time1})) { die "Only running transfers can be stopped\n"; } # use catchall exception handler to report manager failures $SIG{__DIE__} = sub { our @exception = @_ if (defined $^S && !$^S); }; # initialize next metadata line so can detect interruption put_meta(); $meta{update_id} = "$opts{host}$opts{cid}"; $meta{sync_id} = sync_id() if ($conf{sync_host}); # initialize other items for hosts that have never been seen if (defined $opts{host} && !defined $meta{"env_$opts{host}"}) { $meta{"env_$opts{host}"} = -1; $meta{ohosts}++; } # create host-specific doing log if doesn't already exist $opts{doing} = "doing_$opts{host}$opts{cid}"; $opts{doing_log} = "$opts{base}/$opts{doing}"; log_print($opts{doing}) if (! -f $opts{doing_log} && ($opts{get} || defined $opts{put})); # update last access time $meta{"last_$opts{host}"} = $time if ($opts{alive} || $opts{get} || defined $opts{put}); # track client pids to prevent inadvertent simultaneous processing if ($opts{pid}) { my $pids = "pids_$opts{host}$opts{cid}"; if ($meta{$pids} !~ /(?:^|,)$opts{pid}(?:,|$)/) { # a new process has taken over the transfer $meta{$pids} .= "," if ($meta{$pids}); $meta{$pids} .= $opts{pid}; } } # perform requested actions that require metadata write access if ($opts{alive}) { # host has functional cron if --alive used my $key = "env_$opts{host}"; $meta{$key} .= ":cron" if ($meta{$key} != -1 && $meta{$key} !~ /:cron/); print "args=stop\n" if ($meta{stop} || $meta{time1}); } elsif (defined $opts{restart}) { # clear counts $meta{"e_$_"} = 0 foreach (@stages); $meta{$_} = 0 foreach (qw(stop s_run t_run w_run)); delete $meta{time1}; # clear host/client info so clients can be respawned $meta{ohosts} = 0; delete $meta{$_} foreach (grep(/^(clients|email|env|last|load|pids|shells|sleep|warn)_/, keys %meta)); # move all failed operations out of error back into queues my $gzs = {}; my $line; while (defined($line = log_getline('error', $gzs))) { # reset number of attempts $line =~ s/((^|\s)try=)\d+/${1}0/; $line =~ s/\s*\r?\n$//; my %op = split(/[= ]+/, $line); my @args = split(/,/, $op{args}); my $cmd = shift @args; if ($opts{restart} eq 'ignore') { # decrease total counts to pretend file was done $meta{t_chattr}-- if ($cmd !~ /^(?:chattr|find)$/ && ($meta{sanity} || $meta{preserve})); if ($cmd =~ /^(?:cp|cksum|sum)$/) { # decrease sizes to keep status consistent $meta{s_total} -= $op{size}; $meta{t_cp}--; if ($meta{verify}) { $meta{t_sum}--; $meta{t_cksum}--; } # decrease done ops to keep run() actual/expected consistent if ($cmd ne 'cp') { $meta{s_cp} -= $op{size}; $meta{d_cp}--; } if ($cmd eq 'cksum') { $meta{s_sum} -= $op{size}; $meta{d_sum}--; } } elsif ($cmd eq 'chattr' && !$op{tar_mv}) { # add instead of subtract due to way run() expected computed $meta{"d_$cmd"}++; } else { $meta{"t_$cmd"}--; $meta{tar_mv}-- if ($op{tar_mv}); # record ignored finds so put() can terminate initialization $opts{more_finds}++ if ($cmd eq 'find'); } # add errors as alerts so user has a record of failures $op{text} = escape('Ignored error "' . unescape($op{text}) . '"'); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); $cmd = "alert"; $meta{e_alert}++; } # use corresponding queue for all other cases log_print($cmd, $gzs, $line . "\n"); } # clear error contents log_close('error', $gzs); log_print('error'); # move all running operations out of doing_* back into queues foreach my $file (glob "$opts{base}/doing_*") { next if ($file =~ /\.gzi$/); # untaint file $file = $1 if ($file =~ /^(.*)$/); my $fdoing = get_doing($file); while (my ($key, $line) = each %{$fdoing}) { # reset number of attempts $line =~ s/((^|\s)try=)\d+/${1}0/; $line =~ s/\s*\r?\n$//; my %op = split(/[= ]+/, $line); my @args = split(/,/, $op{args}); my $cmd = shift @args; delete $op{$_} foreach (qw(doing rate run time)); # do not delete hash when retrying cksum delete $op{hash} if ($cmd ne 'cksum'); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); # use corresponding queue for all other cases log_print($cmd, $gzs, $line . "\n"); } $file =~ s/.*\///; log_print($file, $gzs, yenc_encode(Data::MessagePack->pack({})) . "\n"); } log_close($_, $gzs) foreach (keys %{$gzs}); # transfer may have finished after --restart=ignore $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); } elsif ($opts{stop}) { $meta{stop} = 1; $meta{time1} = $time; } $doing = get_doing($opts{doing_log}); # perform put separately so it can be combined with other operations put() if (defined $opts{put} && (!$opts{pid} || # only process puts of most recent client $meta{"pids_$opts{host}$opts{cid}"} =~ /(?:^|,)$opts{pid}$/)); # let other clients update silent corruption db eval {$dblock->unlockDB} if (defined $dblock); if ($opts{get} && ($meta{stop} || $meta{time1} || # stop client if a new process has taken over $meta{"pids_$opts{host}$opts{cid}"} =~ /(?:^|,)$opts{pid},/)) { print "args=stop\n"; } elsif ($opts{get}) { get(); } # send email status updates email_status() if ($meta{mail} && !$opts{quiet} && defined $opts{put} && $conf{email_domain}); if ($opts{get} || defined $opts{put}) { # store doing to file my $gzs = {}; log_print($opts{doing}, $gzs, yenc_encode(Data::MessagePack->pack($doing)) . "\n"); log_close($opts{doing}, $gzs); # update running time average for manager get/put invocations $meta{rc_mgr}++; $meta{ra_mgr} *= ($meta{rc_mgr} - 1) / $meta{rc_mgr}; $meta{ra_mgr} += (time - $time) / $meta{rc_mgr}; } # update log sizes foreach my $file (glob "$opts{base}/*") { next if ($file =~ /\/(?:links|lock(\.nfs)?|mon_\S+|\S+\.gzi)$/); my $log = $file; $log =~ s/.*\///; $meta{"$log\_size"} = (stat $file)[7]; } # store metadata to file put_meta(\%meta); # unlock id before cleanup lock_dir(1, 1); if ($ustore) { # lock user info lock_dir(0); # store user db to file mp_store(\%umounts, $conf{umount_db}); # unlock user info lock_dir(0, 1); # synchronize user db sync_queue("$opts{user}.mounts") if ($conf{sync_host}); } # synchronize log files sync_queue() if ($conf{sync_host}); # detach process during cleanup exit if (fork); close STDIN; close STDOUT; close STDERR; setsid; open(STDIN, "/dev/null"); open(STDERR, ">/dev/null"); # signal any monitors monitor(1) if (defined $opts{put} || defined $opts{restart} || $opts{stop}); # update global load info after detach to avoid blocking on other transfers if ($opts{get} || defined $opts{put}) { # lock user info lock_dir(0); my %loaddb = %{mp_retrieve("$conf{user_dir}/$opts{user}.load")}; if ($meta{time1}) { # remove load info for completed transfers delete $loaddb{$_} foreach (grep(/^(next_)?id_$opts{id}(\.|_)/, keys %loaddb)); } else { my $key = "id_$opts{id}$opts{cid}_$opts{host}"; my %cload = split(/[= ]+/, $meta{"load_$opts{host}$opts{cid}"}); if ($cload{ratio} == -1 && $loaddb{$key}) { # client was throttled so recompute current load my %load = split(/[= ]+/, $loaddb{$key}); my $old = $load{time}; my $new = $old + $cload{time}; delete $load{time}; # scale rates by adjusted interval $load{$_} *= $old / $new foreach (grep(!/^io_/, keys %load)); $load{"cpu_host_$opts{host}"} += $cload{cpu} * $cload{time} / $new; $load{time} = $new; $loaddb{$key} = join(" ", map {"$_=$load{$_}"} keys(%load)); } elsif ($loaddb{"next_$key"}) { my %load = split(/[= ]+/, $loaddb{"next_$key"}); $cload{time} = 1 if (!$cload{time}); # convert sizes to MB/s and scale by actual/estimated ratio $load{$_} = $cload{ratio} * $load{$_} / 1E6 / $cload{time} foreach (keys %load); $load{"cpu_host_$opts{host}"} = $cload{cpu}; $load{time} = $cload{time}; $loaddb{$key} = join(" ", map {"$_=$load{$_}"} keys(%load)); } # update next/io load with fs/host info collected in get()/put() if (scalar(keys %nload)) { $loaddb{"next_$key"} = join(" ", map {"$_=$nload{$_}"} grep(!/^io_/, keys %nload)); $loaddb{$_} = $nload{$_} foreach (grep(/^io_/, keys %nload)); } } mp_store(\%loaddb, "$conf{user_dir}/$opts{user}.load"); chmod(0644, "$conf{user_dir}/$opts{user}.load"); # unlock user info lock_dir(0, 1); sync_queue("$opts{user}.load") if ($conf{sync_host}); sync_local() if ($conf{sync_host}); } # remove status directories older than expiration time my $more; while (-d "$conf{user_dir}/$more") { foreach my $dir (glob "$conf{user_dir}/$more$opts{user}.[0-9]*") { # untaint dir (should be user.id under base+more user directory) $dir = $1 if ($dir =~ /^(\Q$conf{user_dir}\E\/\Q$more\E\Q$opts{user}\E\.\d+)$/); my $id = $dir; $id =~ s/.*\.//; # do not remove directory associated with this manager invocation next if ($id == $opts{id}); my $mtime = (stat("$dir/meta"))[9]; if ($mtime + $conf{data_expire} < $time) { rmtree($dir); # synchronize deleted directory sync_queue("$more$opts{user}.$id") if ($conf{sync_host}); } } $more .= "$opts{user}.more/"; } # remove monitor files older than expiration time foreach my $file (glob "$conf{user_dir}/mon_*") { # untaint file $file = $1 if ($file =~ /^(\Q$conf{user_dir}\E\/mon_\d+_[\w.-]+)$/); my $mtime = (stat($file))[9]; if ($mtime + $conf{data_expire} < $time) { unlink $file; # synchronize deleted file sync_queue(basename($file)) if ($conf{sync_host}); } } ##################### #### build_links #### ##################### # build tied db of processed directories from entries in find log sub build_links { # remove old db unlink "$opts{base}/links"; my %links; tie(%links, 'DB_File', "$opts{base}/links", O_RDWR | O_CREAT, 0600); my $gzs = {}; while (defined($_ = log_getline('find', $gzs))) { s/\s*\r?\n$//; my %op = split(/[= ]+/, $_); my @args = split(/,/, $op{args}); # only initial finds are used during reconstruction next if ($args[0] ne 'find' || defined $op{try}); $links{unescape($args[1])} = 1; } log_close('find', $gzs); $links{t_find} = $meta{t_find}; untie %links; #TODO: error handling if cannot tie or open find } ##################### #### debug_print #### ##################### # print given text from get to stdout and mirror to file if debugging enabled sub debug_print { my $type = shift; if ($conf{debug} || $conf{"debug_$opts{user}"} || $type eq 'DBG') { # open user-specific debug file if not already open open($dbgfh, '>>', "$conf{user_dir}/$opts{user}.debug") if (!$dbgfh); print $dbgfh "$localtime $opts{host} $opts{id}$opts{cid} $type "; if ($type eq 'DBG') { # print stack trace when DBG lines added in code my $i = 1; my @stack; while ((my @cd = (caller($i++)))) { unshift(@stack, "$cd[3]:$cd[2]"); } print $dbgfh "(", join(" -> ", @stack), ") "; } print $dbgfh $_ foreach (@_); $dbgfh->flush; } if ($type eq 'GET') { print $_ foreach (@_); } } ######################## #### default_select #### ######################## # return random host whose sshd is pingable from set of given hosts sub default_select { # choose original host if available my $host = shift; my @hosts = @_; my $np = Net::Ping->new('tcp', 1); $np->port_number(22); do { # pick random host $host = splice(@hosts, rand @hosts, 1); # check availability via tcp ping to ssh port return $host if ($np->ping($host)); } while ($host); return undef; } ################## #### last_sum #### ################## sub last_sum { my $is_tar = $opts{'last-sum'} ? 0 : 1; # load dbs for fs mappings and sum lookups %mounts = %{mp_retrieve($conf{mount_db})}; %umounts = %{mp_retrieve($conf{umount_db})}; my %sums; tie(%sums, 'DB_File', "$conf{user_dir}/$opts{user}.sums", O_RDONLY, 0600); if ($opts{search}) { while (my ($key, $pack) = each %sums) { if ($key =~ qr/$opts{search}/) { my ($dbtime, $dbhash) = unpack("LH*", $pack); print $dbhash, " ", strftime("%D-%T", localtime $dbtime), " ", $key, "\n"; } } return; } my $tar; while () { chomp; my ($file, $key); if (!$is_tar || !$tar) { my $ref = {}; $file = map_local($opts{host}, $_, $opts{host}, $ref); my ($host, $path) = ($opts{host}, $file); # find canonical form on server if ($ref->{remote} && $ref->{local}) { ($host, my $spath) = split(/:/, $ref->{remote}); # replace original mount point with server mount point $path =~ s/^\Q$ref->{local}\E/$spath/; } $key = "$host:$path"; if ($is_tar) { $tar = $key . "\0"; next; } } else { $file = $_; $key = $tar . $file; } my $pack = $sums{$key}; if ($pack) { my ($dbtime, $dbhash) = unpack("LH*", $pack); print $dbhash, " ", strftime("%D-%T", localtime $dbtime), " "; } else { print "- "; } print $file; print "\n"; } } ####################### #### detect_silent #### ####################### my %detect_silent_db; sub detect_silent { my ($op, $src, $dst) = @_; if (!defined $dblock) { eval { $dblock = tie(%detect_silent_db, 'Tie::DB_FileLock', "$conf{user_dir}/$opts{user}.sums", O_RDWR | O_CREAT, 0600); }; } my @attrs = split(/,/, $op->{attrs}); my ($hsize, $ophash); if ($op->{hash0}) { # replace subsets of full hash with new values $ophash = $op->{hash0}; my $start = ($op->{tar_bytes} =~ /(\d+)-\d+/) ? $1 : 0; my $rsrc = $src =~ /^[^\/]+%3A/ ? 1 : 0; if ($meta{'create-tar'} && !$rsrc || $meta{'extract-tar'} && $rsrc) { # create offset only non-zero when remote source # extract offset only zero when remote source $start = 0; } foreach my $hash (split(/,/, $op->{hash})) { if ($hash =~ /^#mutil#(\d+)-(\d+)#\\?(\S+)/) { my ($x1, $x2, $h) = ($1, $2, $3); # find hash size based on how many hashes needed in byte range $hsize = int(length($h) / max(1, ceil(($x2 - $x1) / $conf{sum_split}))); # adjust offset by start of tar byte range my $hoff = $hsize * ($x1 - $start) / $conf{sum_split}; if (length($ophash) < $hsize) { # hash0 too small (file size changed?) - pad with zeros $ophash .= "0" x ($hsize - length($ophash)); } substr($ophash, $hoff, length $h) = $h; } else { # normal hash without mutil prefix $hsize = length $hash; substr($ophash, 0, $hsize) = $hash; } } } else { $ophash = $op->{hash}; # eliminate possible backslash (when file name contains backslash) $ophash =~ s/^\\//; my $size = $attrs[7]; ($size, $ophash) = ($2 - $1, $3) if ($ophash =~ /^#mutil#(\d+)-(\d+)#\\?(\S+)/); # find hash size based on how many hashes needed in byte range $hsize = int(length($ophash) / max(1, ceil($size / $conf{sum_split}))); # eliminate non-offset mutil prefix (if any) $ophash =~ s/^#mutil##\\?//; } my ($sid, $split) = split(/:/, $op->{split}); my $rc; foreach my $file ([$src, "srcfs"], [$dst, "dstfs"]) { my ($host, $path) = $file->[0] =~ /^([^\/]+)%3A(\/.*)?/ ? ($1, $2) : ($op->{host}, $file->[0]); # host may have user@ attached $host =~ s/^(.+%40)//; # find canonical form on server if ($op->{$file->[1]}) { my $srv = (split(/,/, $op->{$file->[1]}))[-1]; my $local; foreach my $db (\%mounts, \%umounts, \%meta) { $local = $db->{"mountl_$host:$srv"}; last if ($local); } if ($local) { ($host, my $spath) = split(/:/, $srv); # replace original mount point with server mount point $path =~ s/^\Q$local\E/$spath/; } } my $key = "$host:$path"; if ($meta{tar_mv} && $file->[1] eq 'dstfs' && $meta{'create-tar'}) { # only one split so file will be renamed $key =~ s/-1\.tar$//; } if ($file->[1] eq 'srcfs' && $meta{'extract-tar'} || $file->[1] eq 'dstfs' && $meta{'create-tar'}) { # separate tar entries with NUL so cannot match any existing file $key .= "\0" . $op->{tar_name}; } $key = "-" . $key if (defined $sid && $file->[1] eq 'srcfs'); my $pack = $detect_silent_db{$key}; my ($dbtime, $dbhash) = unpack("LH*", $pack); if (defined $sid && $file->[1] eq 'srcfs') { # insert split hash into partial hash stored under -key my $hoff = $hsize * $split * $meta{split} / $conf{sum_split}; my @s = split(//, $dbhash); foreach (0 .. $hoff - 1) { $s[$_] = '0' if (!defined $s[$_]); } splice(@s, $hoff, length $ophash, split(//, $ophash)); $ophash = join("", @s); if ($meta{"st_cksum_$sid"} <= 0) { # remove temporary entry and retrieve actual entry delete $detect_silent_db{$key}; $key = substr($key, 1); $pack = $detect_silent_db{$key}; ($dbtime, $dbhash) = unpack("LH*", $pack); } } if ($hsize && $file->[1] eq 'srcfs' && $dbtime == $attrs[4] && (!defined $sid || $meta{"st_cksum_$sid"} <= 0) && $dbhash && $dbhash ne $ophash) { my @diffs; for (my ($i, $o) = (0, 0); $i < length $dbhash; $i += $hsize, $o += $conf{sum_split}) { push(@diffs, $o . "-" . min($attrs[7], $o + $conf{sum_split})) if (substr($dbhash, $i, $hsize) ne substr($ophash, $i, $hsize)); } $op->{text} = "Possible silent corruption of source in byte range(s) "; $op->{text} .= join(",", @diffs) . ". Last known checksum: $dbhash. "; $op->{text} .= "Current checksum: $ophash."; $op->{text} = escape($op->{text}); $rc = 1; } # store data as packed binary for efficiency $detect_silent_db{$key} = pack("LH*", $attrs[4], $ophash); } return $rc; } ############### #### doing #### ############### # output doing log for transfer/host(s) specified with --id/--host option sub doing { my $dir = $conf{user_dir}; while (-d $dir) { last if (-d "$dir/$opts{user}.$opts{id}"); $dir .= "/$opts{user}.more"; } return if (! -d $dir); # retrieve metadata from file %meta = %{get_meta("$dir/$opts{user}.$opts{id}/meta")}; $opts{host} = "*" if (!$opts{host}); foreach my $file (glob("$dir/$opts{user}.$opts{id}/doing_$opts{host}")) { next if ($file =~ /\.gzi$/); # untaint file $file = $1 if ($file =~ /^(.*)$/); my $doing = get_doing($file, $opts{doing}); $file =~ s/.*\///; print "$file = ", Dumper($doing); } } ###################### #### email_status #### ###################### # send invoking user email with current status sub email_status { # obtain status by parsing status() output my $table = status(); my @rows = split(/\n/, $table); my @cols = split(/\s*\|\s*/, $rows[3]); my $state0 = $cols[1]; my $state = $state0; # abort if user excluded current state if ($meta{mail} != 1) { foreach (qw(alert done error run stop throttle warn)) { return if ($state =~ /$_/ && $meta{mail} !~ /$_/); } } # ignore warnings when --sync used $state =~ s/\+warn// if ($meta{sync}); $state =~ s/run\+//; # abort if running or have sent this message type before return if ($state eq 'run' || $meta{"email_$state0"}); # show original command so will be correct for user's installation my $ucmd = $meta{command}; # customized escape to allow ' ', ':', '=', ',', and '\' $ucmd =~ s/([^A-Za-z0-9\- :=\\_.,!~*'()\/])/sprintf("%%%02X", ord($1))/eg; # limit length of command line for performance/usability my $dindex = rindex($ucmd, " "); $ucmd = substr($ucmd, 0, rindex($ucmd, " ", 1024)) . "..." . substr($ucmd, $dindex) if ($dindex > 1024); my $cmd = $ucmd; $cmd =~ s/(^\S*(?:\s+|\/)shiftc?[^\s\/]*)(?:\s|$).*/$1/; # use simple html pre wrapper so will show correctly on html/text clients my $msg = "
\n";
    $msg .= "#" x length($ucmd);
    $msg .= "\n$ucmd\n";
    $msg .= "#" x length($ucmd);

    # status table is always shown
    $msg .= "\n\n$table";
    # record email type to prevent duplicate emails
    $meta{"email_$state0"} = $time;
    if ($state =~ s/throttle/throttled/) {
        $msg .= "\n\nThis transfer is being throttled based on user or admin-specified";
        $msg .= "\nresource limits to preserve the stability of the environment.";
        $msg .= "\nIt will continue at a rate reduced in proportion to the load it is";
        $msg .= "\ngenerating until system load decreases to configured thresholds.";
    }
    if ($state =~ s/warn/warning/) {
        my $stable = id_status('warn');
        if (($stable =~ tr/\n/\n/) == 23) {
            # subset of warnings
            $msg .= "\n\nThe first 10 warnings encountered are shown below.";
            $msg .= "\nTo show the complete set, run the following:\n\n";
            $msg .= "    $cmd --id=$opts{id} --status --state=warn";
        } else {
            # all warnings
            $msg .= "\n\nThe set of the warnings encountered is shown below.";
        }
        $msg .= "\n\nThese operations will be retried automatically and may";
        $msg .= "\nstill complete successfully.  To stop this transfer";
        $msg .= "\nwithout retrying these operations, run the following:\n\n";
        $msg .= "    $cmd --id=$opts{id} --stop\n";
        $msg .= "\n\n" . $stable . "\n\n";
    }
    if ($state =~ /error/) {
        my $stable = id_status('error');
        if (($stable =~ tr/\n/\n/) == 23) {
            # subset of errors
            $msg .= "\n\nThe first 10 errors encountered are shown below.";
            $msg .= "\nTo show the complete set, run the following:\n\n";
            $msg .= "    $cmd --id=$opts{id} --status --state=error";
        } else {
            # all errors
            $msg .= "\n\nThe set of the errors encountered is shown below.";
        }
        if ($state0 =~ /run/) {
            $msg .= "\n\nThis transfer will continue to run until all remaining";
            $msg .= "\noperations have been attempted.  To stop this transfer";
            $msg .= "\nwithout attempting the remainder, run the following:\n\n";
            $msg .= "    $cmd --id=$opts{id} --stop\n";
        } else {
            $msg .= "\n\nTo retry the failed/incomplete portions of this ";
            $msg .= "transfer,\nrun the following on $meta{origin} ";
            $msg .= "(or equivalent):\n\n";
            $msg .= "    $cmd --id=$opts{id} --restart\n";
            $msg .= "\n\nTo ignore the failed/incomplete portions of this ";
            $msg .= "transfer,\nrun the following on $meta{origin} ";
            $msg .= "(or equivalent):\n\n";
            $msg .= "    $cmd --id=$opts{id} --restart=ignore\n";
        }
        $msg .= "\n\n" . $stable . "\n\n";
    }
    if ($state =~ /alert/) {
        my $stable = id_status('alert');
        if (($stable =~ tr/\n/\n/) == 23) {
            # subset of errors
            $msg .= "\n\nThe first 10 alerts encountered are shown below.";
            $msg .= "\nTo show the complete set, run the following:\n\n";
            $msg .= "    $cmd --id=$opts{id} --status --state=alert";
        } else {
            # all errors
            $msg .= "\n\nThe set of the alerts encountered is shown below.";
        }
        $msg .= "\n\n" . $stable . "\n\n";
    }
    $msg .= "
\n"; # send message using server on localhost Mail::Sendmail::sendmail( Smtp => 'localhost', From => "$opts{user}\@$conf{email_domain}", To => "$opts{user}\@$conf{email_domain}", Subject => "shift transfer $opts{id} $state", Message => $msg, 'Content-Type' => "text/html", ); } ################ #### escape #### ################ # return uri-escaped version of given string sub escape { my $text = shift; $text =~ s/([^A-Za-z0-9\-\._~\/])/sprintf("%%%02X", ord($1))/eg if (defined $text); return $text; } #################### #### exit_clean #### #################### # store exceptions, remove locks, and remove monitor files sub exit_clean { my $rc = $?; # save exceptions to user's debug file our @exception; debug_print('DBG', "@exception") if (@exception); close $dbgfh if (defined $dbgfh); # unlock user and transfer directories at program termination lock_dir($_, 1) foreach (0, 1); # remove monitor file in case user aborts if ($monfile) { unlink $monfile; if ($conf{sync_host}) { my $file = basename($monfile); $file = "$opts{user}.$opts{id}/$file" if ($opts{id}); sync_queue($file); } } exit $rc; } ######################## #### format_seconds #### ######################## # return human-readable time output for given number of seconds sub format_seconds { my $rem = shift; my $secs; foreach my $unit (sort {$seconds{$b} <=> $seconds{$a}} keys(%seconds)) { # keep dividing by largest unit my $div = int($rem / $seconds{$unit}); $rem %= $seconds{$unit}; # concatenate each result if ($opts{status} eq 'pad') { if ($unit eq 'd' || $unit eq 'w') { $secs .= sprintf("%0d$unit", $div); } else { $secs .= sprintf("%02d$unit", $div); } } else { $secs .= "$div$unit" if ($div); } } return $secs ? $secs : ($opts{status} eq 'pad' ? "00s" : "0s"); } ###################### #### format_bytes #### ###################### # return human-readable size output for given number of bytes sub format_bytes { my $nbytes = shift; my $empty_zero = shift; return "" if (!$nbytes && $empty_zero); foreach my $unit (sort {$bytes{$b} <=> $bytes{$a}} keys(%bytes)) { if (abs $nbytes >= $bytes{$unit}) { # use 3 significant digits in fixed/scientific notation with unit return sprintf("%.3g$unit\B", $nbytes / $bytes{$unit}); } } # use 1 significant digit for fractional values return sprintf("%.1f\B", $nbytes) if ($nbytes < 1); # use 3 significant digits in fixed/scientific notation without unit return sprintf("%.3g\B", $nbytes); } ############# #### get #### ############# # output a set of operations for the invoking client to process sub get { # retrieve global and user database from file %mounts = %{mp_retrieve($conf{mount_db})}; # do not reload umounts if already done in put() as may have ustore entries %umounts = %{mp_retrieve($conf{umount_db})} if (!scalar(keys %umounts)); my $warn = delete $meta{"warn_$opts{host}$opts{cid}"}; if ($warn > 0) { # use exponential backoff my $sleep = 1 << $meta{"sleep_$opts{host}$opts{cid}"}; $sleep = max(10, int(rand($sleep)) * 60); # wait for more files or for transfer to be done debug_print('GET', "args=sleep,$sleep\n"); # keep doubling sleep time up to an hour $meta{"sleep_$opts{host}$opts{cid}"}++ if ($meta{"sleep_$opts{host}$opts{cid}"} < 6); return; } elsif ($warn == 0) { # progress has been made so reset sleep timer delete $meta{"sleep_$opts{host}$opts{cid}"}; } # throttle if load beyond given resource limits my $sleep = throttle(); if ($sleep > 0) { debug_print('GET', "args=sleep,$sleep\n"); $meta{e_throttle} += $sleep; $meta{"throttled_$opts{host}$opts{cid}"} = 1; return; } else { delete $meta{"throttled_$opts{host}$opts{cid}"}; } # ignore client cron value when globally disabled $meta{cron} = 0 if (!$conf{default_cron}); # send static options first debug_print('GET', "args=getopt,get_host text=$self\n"); debug_print('GET', "args=getopt,sum_type text=$conf{sum_type}\n"); debug_print('GET', "args=getopt,sum_split text=$conf{sum_split}\n"); foreach (qw(create-tar cron dereference exclude extract-tar find-files force ignore-times include index-tar newer offline older ports preallocate preserve recall sanity secure stripe stripe-pool stripe-size sync verify verify-fast)) { debug_print('GET', "args=getopt,$_ text=", escape($meta{$_}), "\n") if (defined $meta{$_}); } # determine logs to process my @logs = ($opts{doing}); # add operations from hosts that have cron and have timed out foreach my $env (grep(/^env_/, keys %meta)) { next if ($meta{$env} !~ /:cron/); my $host = substr($env, 4); if ($meta{"last_$host"} + 1800 < $time) { # idle for 30 minutes if ($host ne $opts{host}) { push(@logs, "doing_$host"); # add logs for extra clients foreach my $i (2..$meta{clients}) { push(@logs, "doing_$host.$i"); } } } } # only process other logs during tar creation when tar_creat chattrs done if (!$meta{'create-tar'} || $meta{last} && $meta{d_chattr} >= $meta{tar_creat}) { if ($meta{pipeline}) { # find index of invoking host among participating hosts my @hosts = grep(/^env_/, keys %meta); my $ihost = first {$hosts[$_] eq $opts{host}} 0 .. $#hosts; # find index of invoking client among participating clients my $icli = $opts{cid} ? 10 * $opts{cid} - 1 : 0; # ensure different hosts/clients process different stages first my @order = qw(cksum sum cp); push(@order, splice(@order, 0, ($ihost * $meta{clients} + $icli) % scalar(@order))); # always process chattrs first and dir chattrs last push(@logs, qw(chattr ln), @order, qw(mkdir find rmkdir)); } else { # process all copies before all sums before all cksums push(@logs, qw(find mkdir cp sum cksum ln chattr rmkdir)) } } else { unshift(@logs, "find"); } # keep copy of original doing so changes don't affect its own processing my $doing0 = dclone($doing); # reverse size so won't end in 0 (so .N0 not reduced to .N after ++) my $ndoing = "0." . reverse($meta{"$opts{doing}_size"}); my $gzs = {}; my ($ldoing, $kdoing, $size, $files, $ops, $all, $secs, $skip, $gettry, $getcmd); my $max_files = parse_bytes($conf{max_files}); my (%diskfs, %localfs, %rtthost); my %cli_load = split(/[= ]+/, $meta{"load_$opts{host}$opts{cid}"}); my $ncli = grep(/^doing_/, keys %meta); # adjust interval so average waiting clients is no more than 1 $opts{interval} = max($meta{interval}, $ncli * $meta{ra_mgr}); LOG: foreach my $log (@logs) { # process dir attrs last by themselves last if ($log eq 'rmkdir' && (!$meta{last} || $meta{t_run} || $ops || $skip || (!$meta{sanity} && !$meta{preserve}))); my $line; if ($log eq $opts{doing}) { $ldoing = $doing0; } elsif ($log =~ /^doing_/) { $ldoing = get_doing("$opts{base}/$log"); } else { log_getline($log, $gzs, 1); next if (!defined $gzs->{$log}); #TODO: need error if cannot be opened or seeked # seek from end for first rmkdir since don't have real size my $whence = $log eq 'rmkdir' && $meta{$log} == -1 ? 2 : 0; $gzs->{$log}->seek($meta{$log}, $whence); } while ($all < $max_files && ($secs < $opts{interval} || $size < $meta{size} && $all < $meta{files}) && ($log =~ /^doing_/ && (($kdoing, $line) = each %{$ldoing}) || $log eq 'rmkdir' && defined($line = last_line($gzs->{rmkdir})) || $log !~ /^(?:doing_|rmkdir)/ && defined($line = log_getline($log, $gzs)))) { $line =~ s/\s*\r?\n$//; # first line of rmkdir will be blank next if (!$line); my %op = split(/[= ]+/, $line); my @args = split(/,/, $op{args}); my $cmd = shift @args; my $save_arg = $args[-1]; # ignore retried entries in rmkdir to avoid duplicate chattrs next if ($log eq 'rmkdir' && defined $op{try}); # never propagate suspend delete $op{suspend}; if ($log eq $opts{doing}) { delete $doing->{$kdoing}; } elsif ($log =~ /^doing_/) { delete $ldoing->{$kdoing}; } if ($log =~ /^doing_/ && $op{try} >= $meta{retry}) { # this operation was originally not completed so record failure $meta{s_run} -= $op{size}; $meta{t_run}--; $op{text} = escape("Host or process failure"); # record as error and abort $line =~ s/(^|\s)text=\S+//; log_print('error', $gzs, $line . " text=$op{text}\n"); $meta{s_error} += $op{size}; $meta{"e_$cmd"}++; $meta{"e_$op{tool}"}++; $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); next; } # map paths w/o writing %meta or other hashes so can be backed out my @refs = ({}, {}); for (my $i = 0; $i < scalar(@args); $i++) { # skip arg 1 of ln/ln chattr since it's a name and not a file next if (!$i && ($cmd eq 'ln' || $cmd eq 'chattr' && $op{ln})); my $ref = $refs[$i]; # write access needed for last arg of chattr/cp/find/ln/mkdir $ref->{rw} = $cmd =~ /^(?:chattr|cp|find|ln|mkdir)/ && $i == scalar(@args) - 1 ? 1 : 0; if ($args[$i] !~ /^\//) { # do not map remote tar dst to prevent size/split corruption next if ($i == 1 && $cmd eq 'find' && $meta{'create-tar'}); # help map_remote avoid last host's caching if ($cmd eq 'cksum' && $op{cache_rclient}) { $ref->{last} = $op{cache_rclient}; } elsif ($op{cache_client}) { $ref->{last} = $op{cache_client}; } $args[$i] = map_remote($opts{host}, $args[$i], $ref); if (!defined $args[$i]) { debug_print('DBG', "map_remote($opts{host}, $args[$i]) undefined\n"); last LOG; } # record if cksum arg has flipped from remote to local $op{"map$i"} = 1 if ($cmd eq 'cksum' && $args[$i] =~ /^\//); } else { my $new = map_local($op{host}, $args[$i], $opts{host}, $ref); if (defined $new) { $args[$i] = $new; } elsif ($meta{"env_$opts{host}"}) { # host does not have access to appropriate # file system and host name hasn't changed debug_print('DBG', "map_local($op{host}, $args[$i], $opts{host}) undefined\n"); last LOG; } } } # stop processing log if file likely still in cache #TODO: store remote renv so can use for fadvise/mounts? if ($op{cache_time}) { my $client = $cmd eq 'cksum' && $op{cache_rclient} ? $op{cache_rclient} : $op{cache_client}; my $server = $cmd eq 'cksum' && $op{cache_rserver} ? $op{cache_rserver} : $op{cache_server}; # amount of time left before file likely out of server cache my $dts = $conf{cache_time_server} - ($time - $op{cache_time}); # amount of time left before file likely out of client cache my $dtc = $conf{cache_time_client} - ($time - $op{cache_time}); $dtc = 0 if ( # no wait needed when client supports fadvise via shift-bin $meta{"env_$client"} =~ /:bin/ || # no wait needed when processing remote files on diff client $client ne $op{cache_client} && $args[0] !~ /^$client%3A/ && $args[1] !~ /^$client%3A/ || # no wait needed when processing local files on diff client $client eq $op{cache_client} && $opts{host} ne $op{cache_client} ); if ($dts > 0 && parse_bytes($conf{cache_size_server}) > $meta{"io_$server"} - $op{"${server}server_io"} || $dtc > 0 && parse_bytes($conf{cache_size_client}) > $meta{"io_$client"} - $op{"${client}_io"}) { my $newskip = max($dts, $dtc); # skip stores max sleep time $skip = $skip ? min($skip, $newskip) : $newskip; next LOG; } } # ensure only one tool will be used per batch if (!defined $getcmd) { $getcmd = $cmd; $gettry = $op{try}; } next LOG if ($cmd ne $getcmd || $op{try} ne $gettry); # process paths again while allowing hash writes my $ir = $args[0] =~ /^\// ? 1 : 0; for (my $i = 0; $i < scalar(@args); $i++) { # skip arg 1 of ln/ln chattr since it's a name and not a file next if (!$i && ($cmd eq 'ln' || $cmd eq 'chattr' && $op{ln})); my $ref = $refs[$i]; if ($args[$i] !~ /^\//) { my $host = $args[$i] =~ /^([^\/]+)%3A/ ? $1 : "localhost"; # remove user name (if applicable) $host = (split(/@/, $host))[-1]; if ($host ne 'localhost' && !defined $rtthost{$host}) { # determine if domain needs another latency measurement my $dn = $host; $dn =~ s/^[^.]+.//; # measure when enough time elapsed and not all errors if ($time - $meta{"lastrtt_$dn"} > $conf{"latency_wait"} && ($meta{"rtt_$dn"} || $meta{"e_rtt_$dn"} < 5)) { $rtthost{$host} = 1; $meta{"lastrtt_$dn"} = $time; } else { # store host for use by later non-rtt processing $rtthost{$host} = -1; } } if ($cmd =~ /^(?:cp|find)/ && $i == 1 && $ref->{local}) { # store target file systems for disk throttling $diskfs{"$host:$ref->{local}"} = $ref->{remote}; } } else { next if (!$ref->{local}); $localfs{$ref->{local}} |= $ref->{rw}; if ($cmd =~ /^(?:cp|find)/ && $i == 1) { # store target file systems for disk throttling $diskfs{"localhost:$ref->{local}"} = $ref->{remote}; } } # store file system type/options my $loc = $i == 0 && scalar(@args) > 1 ? "src" : "dst"; # mount opts may differ per host so this can't be optimized away $op{"${loc}fs"} = "$ref->{opts},$ref->{remote}" if ($ref->{opts}); # store amount of data read/written from/to host/file systems if ($cmd eq 'cp' || $cmd eq 'cksum' && $i == $ir || $cmd eq 'sum' && $i != $ir) { if ($cmd eq 'cp' && $i == 1) { my $dpath = $ref->{remote}; $nload{"iow_fs_$dpath"} += $op{size}; $nload{"iow_host_$ref->{host}"} += $op{size}; if ($args[0] !~ /^\// || $args[1] !~ /^\//) { # remote transfer so record network load $nload{"netr_host_$ref->{host}"} += $op{size}; } # disk overrun prevention foreach my $hl ($meta{disk}, $conf{"throttle_disk_$dpath"}) { next if ($hl !~ /^(\d+):(\d+)$/); my ($high, $low) = split(/:/, $hl); my $use = $cli_load{"used_$dpath"} + ($nload{"iow_fs_$dpath"} >> 10); my $tot = $cli_load{"used_$dpath"} + $cli_load{"left_$dpath"}; if ($tot && 100 * $use / $tot >= $high) { $op{suspend} = $low / 100 * $tot; $meta{"throttled_$opts{host}$opts{cid}"} = 1; last; } } } else { $nload{"ior_fs_$ref->{remote}"} += $op{size}; $nload{"ior_host_$ref->{host}"} += $op{size}; if ($cmd eq 'cp' && ($args[0] !~ /^\// || $args[1] !~ /^\//)) { # remote transfer so record network load $nload{"netw_host_$ref->{host}"} += $op{size}; } } } } if ($log =~ /^doing_/) { # this operation was originally not completed so record failure $meta{s_run} -= $op{size}; $meta{t_run}--; $op{text} = escape("Host or process failure"); # record as warning and retry $op{try}++; $meta{w_run}++; $op{state} = "warn"; } if ($cmd eq 'cp' && $args[0] eq $args[1]) { $op{text} = escape("$args[0] and $args[1] are the same file"); $line =~ s/(^|\s)text=\S+//; log_print('error', $gzs, $line . " text=$op{text}\n"); $meta{s_error} += $op{size}; $meta{"e_$cmd"}++; $meta{"e_$op{tool}"}++; $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); next; } # never count mkdir or chattr on dir against the file total $files++ if (scalar(@args) > 1); $size += $op{size} if ($cmd =~ /^(?:cp|sum|cksum)/); $ops++ if ($log ne 'rmkdir'); $all++; if ($meta{"ra_$cmd"} > 0) { $secs += ($cmd =~ /^(?:cp|sum|cksum)/ ? $op{size} : 1) / $meta{"ra_$cmd"}; } else { # default to files/size setting until rates available $secs += $opts{interval}; } $meta{s_run} += $op{size} if ($op{size}); $meta{t_run}++; $meta{w_run}-- if ($op{state} eq 'warn'); if ($log !~ /^doing_/) { # store last position of log my $tell = $meta{$log}; $meta{$log} = tell $gzs->{$log}; } # rmkdir is for preserving directory attributes $cmd = "chattr" if ($log eq 'rmkdir'); # rejoin mapped arguments $op{args} = join(",", $cmd, @args); $op{host} = $opts{host}; $op{run} = $time; $op{doing} = $ndoing++; if ($cmd ne 'sum' && $cmd ne 'cksum') { delete $op{$_} foreach (grep(/^cache_/, keys %op)); } delete $op{$_} foreach (qw(state tool)); # dynamically insert tar_creat for first record of each tar split if ($meta{'create-tar'} && defined $op{tar_start} && $op{tar_start} == 0) { if ($cmd =~ /^(?:cp|ln|mkdir)$/) { # use tar_last instead of tar_size since tar name may differ $op{tar_creat} = abs $meta{"tar_last_$save_arg"}; } else { # prevent tar_creat from propagating delete $op{tar_creat}; } } # dynamically insert tar_last for last record of last tar split if ($meta{'create-tar'} && $meta{"tar_last_$save_arg"} > 0) { my ($t1, $t2) = split(/-/, $op{tar_bytes}); if ($meta{"tar_last_$save_arg"} < $t2 + 512) { $op{tar_last} = 1; # only insert very first time (will be propagated) delete $meta{"tar_last_$save_arg"}; } } my $get = join(" ", map {"$_=$op{$_}"} sort(keys %op)) . "\n"; $doing->{$op{doing}} = $get; debug_print('GET', $get); } log_close($log, $gzs) if ($log !~ /^doing_/); if ($log ne $opts{doing} && $log =~ /^doing_/) { log_print($log, $gzs, yenc_encode(Data::MessagePack->pack($ldoing)) . "\n"); log_close($log, $gzs); } } my $errs = sum(map {$meta{"e_$_"}} @stages); if (!$errs && $meta{last} && !$all && !$meta{t_run} && $meta{tar_mv}) { foreach my $file (grep(/^tar_split_/, keys %meta)) { next if ($meta{$file} != 1); $file =~ s/^tar_split_//; # dynamically insert tar_mv as final op during single split my $key = $ndoing++; my $get = "args=chattr,$file-1.tar host=$opts{host} size=0"; $get .= " tar_mv=" . ($meta{"tar_nosum_$file"} ? 1 : 2); $get .= " run=$time doing=$key\n"; $doing->{$key} = $get; debug_print('GET', $get); $all++; $meta{t_run}++; } } elsif (!$all) { if ($meta{last} && !$meta{t_run} && !$skip) { # no retries and none running so stop debug_print('GET', "args=stop\n"); delete $meta{"sleep_$opts{host}$opts{cid}"}; } else { # use exponential backoff my $sleep = 1 << $meta{"sleep_$opts{host}$opts{cid}"}; $sleep = max(10, int(rand($sleep)) * 60); # do not sleep more than time needed to clear client/server cache $sleep = min($sleep, $skip) if ($skip); # wait for more files or for transfer to be done debug_print('GET', "args=sleep,$sleep\n"); # keep doubling sleep time up to an hour $meta{"sleep_$opts{host}$opts{cid}"}++ if ($meta{"sleep_$opts{host}$opts{cid}"} < 6); } } log_close($_, $gzs) foreach (keys %{$gzs}); return if (!$all); # send potentially dynamic options last foreach (keys %diskfs) { # send target file systems for disk throttling debug_print('GET', "args=getopt,disk_$_ text=$diskfs{$_}\n"); } # these could potentially be dynamic in the future foreach (qw(buffer threads)) { debug_print('GET', "args=getopt,$_ text=", $meta{$_}, "\n") if ($meta{$_}); } # send individual transport options foreach (qw(bbftp mcp msum), $meta{secure} ? "ssh_secure" : "ssh") { my $val = $conf{"opts_$_"}; next if (!$val); debug_print('GET', "args=getopt,opts_$_ text=", escape($val), "\n"); } # attempt to determine type of transfer (i.e. local/lan/wan) my ($net_dn, $net_rtt, $net_type) = ($opts{host}, 0, "wan"); $net_dn =~ s/^[^.]+\.//; foreach my $host (keys %rtthost) { # transfer is on lan if domain of invoking host matches target domain $net_type = "lan" if ($net_dn && $host =~ /\Q$net_dn\E$/); # find latency for associated domain my $dn = $host; $dn =~ s/^[^.]+\.//; $net_rtt = $meta{"rtt_$dn"} if ($meta{"rtt_$dn"}); # negative values are for rtt/type calculations only next if ($rtthost{$host} == -1); # send remote hosts for latency measurements debug_print('GET', "args=getopt,rtt_$host\n"); } $net_rtt = parse_bytes($conf{"latency_$net_type"}) if (!$net_rtt); my $net_bw = $meta{bandwidth}; if (!$net_bw) { # set default bandwidth based on xge availability or host domain my $type = $meta{"env_$opts{host}"} =~ /:xge/ ? "xge" : ($net_dn =~ /\.(?:$conf{org_domains})$/ ? "org" : "ind"); $net_bw = parse_bytes($conf{"bandwidth_$type"}); } my $net_win = $meta{window}; my $env_win = $meta{"env_$opts{host}"} =~ /tcpwin_(\d+)/ ? $1 : undef; if (!$meta{window}) { # set default window to BDP $net_win = int($net_bw * $net_rtt / 8); # make sure default window is less than max window $net_win = min($net_win, $env_win) if ($env_win); # make sure default window is greater than configured minimum $net_win = max($net_win, parse_bytes($conf{"min_window_$net_type"})); } debug_print('GET', "args=getopt,window text=$net_win\n"); my $net_ns = $meta{streams}; if (!$meta{streams}) { # set default streams to number of max windows needed to consume bw $net_ns = int($net_bw * $net_rtt / 8 / $env_win) if ($env_win && $net_win >= $env_win); $net_ns = int($net_bw * $net_rtt / 8 / $net_win) if ($net_win < $env_win); # make sure default streams is less than configured maximum $net_ns = min($net_ns, $conf{"max_streams_$net_type"}); # make sure default streams is greater than configured minimum $net_ns = max($net_ns, parse_bytes($conf{"min_streams_$net_type"})); } debug_print('GET', "args=getopt,streams text=$net_ns\n"); # send local/remote transport selections based on average file size debug_print('GET', "args=getopt,local text=", # use given transport if specified $meta{local} ? $meta{local} : # optimize for small files if avg file size less than defined size ($size / ($files + 1) < parse_bytes($conf{small_size_local}) ? $conf{local_small} : $conf{default_local}), "\n"); debug_print('GET', "args=getopt,remote text=", # use given transport if specified $meta{remote} ? $meta{remote} : # optimize for small files if avg file size less than defined size ($size / ($files + 1) < parse_bytes($conf{"small_size_$net_type"}) ? $conf{remote_small} : $conf{default_remote}), "\n"); if (grep(/^env_/, keys %meta) < $meta{hosts} || $meta{"clients_$opts{host}"}) { # run client on other/same hosts if there are enough files my $qfiles = $meta{t_split} - $meta{t_run}; $qfiles += $meta{"t_$_"} - $meta{"d_$_"} - $meta{"e_$_"} foreach (@stages); my $qsize = $meta{s_total} - $meta{s_run} - $meta{s_error}; $qsize += 2 * $meta{s_total} if ($meta{verify}); $qsize -= $meta{"s_$_"} foreach (qw(cksum cp sum)); my $nclients; if ($qsize * $meta{files} > $qfiles * $meta{size}) { # queue avg size per file greater than limit avg size per file # estimate nclients based on queue sizes $nclients = 1.0 * $qsize / $meta{size}; # don't use more hosts than number of files $nclients = $qfiles if ($nclients > $qfiles); } else { # queue avg size per file less than limit avg size per file # estimate nclients based on queue files $nclients = 1.0 * $qfiles / $meta{files}; } # reduce by outstanding hosts $nclients -= $meta{ohosts}; if ($nclients > 0 && scalar(keys %localfs) > 0) { my %hosts = $meta{'host-list'} ? # use given host list map {$_ => 1} split(/,/, $meta{'host-list'}) : # find accessible hosts based on global/user db map {$_->{shells} ? %{$_->{shells}} : ()} (\%meta, \%mounts, \%umounts); # determine potential hosts foreach my $fs (keys %localfs) { foreach my $host (keys %hosts) { if ($meta{"env_$host"} || $meta{"nohost_$host"} || !$meta{'host-list'} && !map_local($opts{host}, $fs, $host, {rw => $localfs{$fs}})) { # remove hosts without local file system access delete $hosts{$host}; } } } while ($nclients > 0 && scalar(keys %hosts) > 0 && grep(/^env_/, keys %meta) < $meta{hosts}) { my $host; if (defined $conf{select_hook}) { # select host using configured selection hook $host = open3_get([-1, undef, -1], "$conf{select_hook} $opts{host} $opts{host} " . join(",", keys %hosts)); } # select host using random selection policy $host = (keys %hosts)[rand(keys %hosts)] if (!$host); $host =~ s/\s*\r?\n$//; delete $hosts{$host}; debug_print('GET', "args=host,$host\n"); $meta{"env_$host"} = -1; $meta{ohosts}++; $nclients--; } } # spawn extra clients on invoking host if enough work remains while ($nclients > 0 && $meta{"clients_$opts{host}"}) { $nclients--; debug_print('GET', "args=client,$opts{id}.", $meta{clients} - $meta{"clients_$opts{host}"}--, "\n"); } # no more clients available on host delete $meta{"clients_$opts{host}"} if (!$meta{"clients_$opts{host}"}); } } ################### #### get_doing #### ################### # return past list of operations in progress on given host sub get_doing { my $arg = shift; my $past = shift; $past = 1 if (!defined $past); # untaint file $arg = $1 if ($arg =~ /^(.*)$/); return {} if (! -e $arg); my $gz = Compress::BGZF::Reader->new_filehandle($arg); return {} if (!$gz); $gz->seek(-1, 2); my $line; $line = last_line($gz) while ($past-- > 0); my $log = basename($arg); log_close($log, {$log => $gz}); return Data::MessagePack->unpack(yenc_decode($line)) if ($line); return {}; } ################## #### get_meta #### ################## # return (and possibly revert to) last validated metadata from given meta file sub get_meta { my $mfile = shift; my $past = shift; my $mtell; if (!defined $mfile) { $mfile = "$opts{base}/meta"; $mtell = 0; } my $meta; my $fh; open($fh, '<', $mfile) or return {}; seek($fh, -1, 2); while (1) { # find line starting with '[' and ending with ']', indicating valid line my $line = last_line($fh); last if (!defined $line); $meta = substr($line, 1, -1); last if (substr($line, 0, 1) eq '[' && substr($line, -1, 1) eq ']' && (!defined $past || !--$past)); $meta = undef; $mtell = $fh->tell + 1 if (defined $mtell); } close $fh; if ($meta) { # meta lines are serialized, compressed, and yEnc encoded my $zmeta = uncompress(yenc_decode($meta)); $meta = undef; if ($zmeta) { $meta = Data::MessagePack->unpack($zmeta); # convert strings back to Math::BigInt $meta->{$_} = Math::BigInt->new($meta->{$_}) foreach (grep(/^sd_/, keys %{$meta})); } } if ($meta && defined $mtell && $mtell > 0) { # metadata corrupted so revert to last known good state foreach my $file (glob "$opts{base}/*") { next if ($file =~ /\/(?:links|lock(\.nfs)?|meta|mon_\S+|\S+\.gzi)$/); my $log = $file; $log =~ s/.*\///; my $size = defined $meta->{"$log\_size"} ? $meta->{"$log\_size"} : 0; # untaint file $file = $1 if ($file =~ /^(.*)$/); # truncate all logs to last known good size truncate($file, $size); # remove associated index file unlink("$file.gzi"); } # rebuild link db since it may contain reverted operations build_links() if ($meta{dereference} && !$meta{'extract-tar'}); # truncate last in case any other operations interrupted truncate($mfile, $mtell); } return $meta ? $meta : {}; } ################# #### history #### ################# # output table of hosts and commands for invoking user sub history { # configure table headers my $t = Text::FormatTable->new('r | l | l'); if ($opts{history} eq 'csv') { print join(",", qw(id origin cwd command)), "\n"; } else { $t->head(qw(id origin command)); $t->rule; } # sort by modification time of meta file my @metas; my $dir = $> != 0 ? $conf{user_dir} : $opts{user_dir}; my $user = $> != 0 ? $opts{user} : "*"; my $idglob = defined $opts{id} ? $opts{id} : "[0-9]*"; do { push(@metas, glob "$dir/$user.$idglob/meta"); # glob still returns pattern when no wildcards and no files match pop(@metas) if (defined $opts{id} && ! -f $metas[-1]); $dir .= "/*.more"; } while (scalar(glob $dir)); foreach my $file (sort {$> != 0 ? # sort by user name when root (stat $a)[9] <=> (stat $b)[9] : $a <=> $b} @metas) { my $id = $file; if ($> != 0) { $id =~ s/.*\.|\/meta//g; } else { # ignore old transfers next if ((stat $file)[9] + $conf{data_expire} < $time); # leave user name in id $id =~ s/.*\/([\w-]+\.\d+)\/meta/$1/g; } next if (defined $opts{id} && $opts{id} != $id); # retrieve metadata from file %meta = %{get_meta($file)}; # ignore rows that do not match optional search next if ($opts{search} && join(" ", $meta{origin}, $meta{command}) !~ qr/$opts{search}/); # add row for each transfer my $cmd = $meta{command}; # limit length of command line for performance/usability my $dindex = rindex($cmd, " "); $cmd = substr($cmd, 0, rindex($cmd, " ", 1024)) . "..." . substr($cmd, $dindex) if ($dindex > 1024); if ($opts{history} eq 'csv') { print join(",", $id, $meta{origin}, $meta{cwd}, $cmd), "\n"; } else { $t->row($id, "$meta{origin}\n[$meta{cwd}]", $cmd); } } # output final table print $t->render if ($opts{history} ne 'csv'); } ################### #### id_status #### ################### # output detailed table of all relevant operations in current transfer or # return subset of table in given state sub id_status { my $state = shift; my $nrows = 10000; my $once = 0; if (defined $state) { # this is used in email_status() to send a subset of errors/warnings $nrows = 10; $once = 1; } else { # user only wants items in a particular state $state = $opts{state}; } my $t0 = Text::FormatTable->new('l | l | l | r | r | r | r'); # target is the same for all files during tar creation so use source my @row = (qw(state op), $meta{'create-tar'} || $state eq 'alert' && !$meta{'extract-tar'} ? "source" : "target", qw(size date length rate)); my @row2 = ("", "tool", "info", "", "time", "", ""); if ($opts{status} eq 'csv') { print join(",", @row, @row2), "\n"; } else { $t0->head(@row); $t0->head(@row2); $t0->rule; } my $rows = 0; my $t = dclone($t0); my %files = ( queue => [@stages], warn => [@stages], run => [map {basename $_} glob("$opts{base}/doing_*")], error => ["error"], done => ["done"], alert => ["alert"], ); my %colors = ( queue => "cyan", warn => "yellow", run => "green", error => "red", done => "reset", alert => "magenta", ); foreach my $state0 (qw(queue warn run error done alert)) { next if ($state && $state !~ /^\Q$state0\E$/); foreach my $log (@{$files{$state0}}) { my ($host, $ldoing, $kdoing); my $gzs = {}; if ($state0 eq 'run') { next if ($log =~ /\.gzi$/); $host = $log; $host =~ s/^doing_//; $ldoing = get_doing("$opts{base}/$log"); } else { log_getline($log, $gzs, 1); $gzs->{$log}->seek($meta{$log}, 0) if (defined $meta{$log}); } while ($state0 eq 'run' && (($kdoing, $_) = each %{$ldoing}) || $state0 ne 'run' && defined($_ = log_getline($log, $gzs))) { chomp; # unescape colons and ats in remote paths s/%3A/:/g; s/%40/@/g; my %op = split(/[= ]+/); my @args = split(/,/, $op{args}); # ignore rows that do not match optional search next if ($opts{search} && join(" ", @args) !~ qr/$opts{search}/); next if ($state0 eq 'warn' && $op{state} ne 'warn'); # dst is the same for all files during tar creation so use src $args[-1] = $op{tar_name} if ($meta{'create-tar'} && !$op{tar_mv}); # messages are about source in non-extraction alerts $args[-1] = $args[1] if ($state eq 'alert' && !$meta{'extract-tar'}); # add first row for each operation with bulk of info @row = ($state0, $args[0], $args[-1], "-", "-", "-", "-"); $row[3] = format_bytes($op{size}) if ($args[0] =~ /^(?:cksum|cp|sum)/); $row[4] = strftime('%m/%d', localtime($op{run})) if ($op{run} && $state0 =~ /^(?:alert|done|error|run|warn)$/); if ($state0 eq 'run') { # show amount of time operation has been running $row[5] = format_seconds($time - $op{run}); } elsif ($state0 eq 'done') { # show total time and rate $row[5] = format_seconds($op{time} > 0 ? $op{time} : 1); $row[6] = format_bytes($op{rate}) . "/s" if ($args[0] =~ /^(?:cksum|cp|sum)/); } # add second row for each operation with tool and message @row2 = ("", "", "", "", "", "", ""); $row2[1] = $op{tool} if ($state0 =~ /^(?:done|error|warn)$/); if ($state0 =~ /^(?:error|alert|warn)$/) { # show message associated with errors and warnings $row2[2] = unescape($op{text}); } elsif ($state0 eq 'run') { # show host that is processing run operations $row2[2] = "\@$host"; } elsif ($state0 eq 'done' && $args[0] =~ /^(?:cp|sum)/) { # show hash that was computed for file $row2[2] = "#$op{hash}" if ($op{hash}); } $row2[2] .= " [$op{bytes})" if ($op{bytes} && $state0 =~ /^(?:done|run)$/); $row2[4] = strftime('%R', localtime($op{run})) if ($op{run} && $state0 =~ /^(?:alert|done|error|run|warn)$/); if ($opts{status} eq 'csv') { $row2[2] =~ s/"/""/g; $row2[2] = "\"$row2[2]\"" if ($row2[2] =~ /[,"\n]/); print join(",", @row, @row2), "\n"; } else { if ($opts{status} eq 'color') { # prevent warnings due to empty columns local $SIG{__WARN__} = sub {}; @row = map {colored($_, $colors{$state0})} @row; @row2 = map {colored($_, $colors{$state0})} @row2; } $t->row(@row); $t->row(@row2) if ($state0 ne 'queue'); if (++$rows >= $nrows) { last if ($once); # render in multiple parts when large number of rows print $t->render, "\n"; $t = dclone($t0); $rows = 0; } } } log_close($log, $gzs) if ($state0 ne 'run'); } } if ($opts{status} ne 'csv') { # return/output final table depending on initial given state $once ? return $t->render : print $t->render; } } ################# #### init_id #### ################# # initialize settings for transfer based on getopt lines and/or defaults sub init_id { # initialize log files if (!defined $opts{restart}) { log_print($_) foreach (@stages, qw(alert done error)); } # initialize options with default values foreach (qw(clients cpu hosts interval io ior iow net netr netw ports retry stripe threads)) { $meta{$_} = $conf{"default_$_"} if (!defined $meta{$_} && $conf{"default_$_"}); } # change files unit from billion to gig $meta{files} =~ tr/[bB]/g/ if (defined $meta{files}); # stripe can include additional specifiers using double colons if ($meta{stripe} =~ /::/) { ($meta{stripe}, $meta{'stripe-size'}, $meta{'stripe-pool'}) = split(/::/, $meta{stripe}); foreach (qw(stripe stripe-size stripe-pool)) { delete $meta{$_} if ($meta{$_} eq ''); } } # convert size strings to numbers foreach my $key (qw(bandwidth buffer files find-files size split split-tar stripe stripe-size window)) { # stripe can be zero next if ($key eq 'stripe' && defined $meta{$key} && $meta{$key} eq '0'); # parse some values in binary bytes instead of decimal bytes my $bin = $key =~ /^(?:buffer|split|stripe-size)$/ ? 2 : 0; $bin = 1 if ($key eq 'stripe'); my $new = defined $meta{$key} ? parse_bytes($meta{$key}, $bin) : undef; $new = parse_bytes($conf{"default_$key"}, $bin) if (!defined $new && defined $conf{"default_$key"}); if ($key =~ /^(?:files|find-files|size)$/) { # do not allow zero values $new = 1 if (!$new); } elsif ($key =~ /^(?:split|split-tar)$/) { # size under sum split will cause silent corruption false positives $new = $conf{sum_split} if ($new && $new < $conf{sum_split}); } elsif ($key eq 'buffer') { # buffer over sum split will cause msum to adjust split to buffer $new = $conf{sum_split} if ($new && $new > $conf{sum_split}); } $meta{$key} = $new if (defined $new); } } ################### #### last_line #### ################### # return the line before the current position of a given file handle sub last_line { my $fh = shift; my $tell0 = $fh->tell; # return nothing when file is at beginning return undef if ($tell0 == 0); my $tell = $tell0; my ($buf, $line, $len, $pos); do { $tell = $tell0 - 4194304; $tell = 0 if ($tell < 0); # seek to earlier position in file $fh->seek($tell, 0); my $len = 4194304; $len = $tell0 - $tell if ($len > $tell0); # read up to initial location or that of last round $fh->read($line, $len); $buf = $line . $buf; # find last newline in buffer $pos = rindex($buf, "\n"); $tell0 = $tell; # keep looping while no newline found } while ($tell > 0 && $pos < 0); $pos = 0 if ($pos < 0); # set file handle position for next invocation $fh->seek($tell + $pos, 0); # return buffer after newline $buf = substr($buf, $pos); $buf =~ s/\r?\n//; return $buf; } ################## #### lock_dir #### ################## # lock or wait on given lock (0 = user, 1 = id) or unlock when given sub lock_dir { my ($id, $unlock) = @_; if (!$unlock) { my $f = $id ? "$opts{base}/lock" : "$conf{user_dir}/$opts{user}.lock"; if ($conf{nfs_lock}) { my $lock; while (!$lock) { $lock = File::NFSLock->new({ file => $f, lock_type => LOCK_EX, }); sleep 1 if (!$lock); } $locks[$id] = $lock; } else { open($locks[$id], '>>', $f) || return; flock($locks[$id], LOCK_EX); } } elsif (defined $locks[$id]) { if ($conf{nfs_lock}) { $locks[$id]->unlock; } else { close $locks[$id]; } $locks[$id] = undef; } } ################### #### log_close #### ################### # close Compress::BGZF object for given log and remove from given hash sub log_close { my ($log, $gzs) = @_; if (defined $gzs->{$log}) { my $bgr = tied *{$gzs->{$log}}; if (ref $bgr eq 'Compress::BGZF::Reader' && defined $bgr->{idx}->[-1]) { my ($coff, $uoff) = @{$bgr->{idx}->[-1]}; # rmkdir is just mkdir in reverse my $f = $log eq 'rmkdir' ? 'mkdir' : $log; $f = "$opts{base}/$f.gzi"; local $SIG{__WARN__} = sub {}; # ignore exceptions since gzi is only to increase performance eval {$bgr->write_index($f)} if ($uoff && -s $f < int($uoff / 4096)); # a corrupt/incomplete index will throw exceptions on read unlink $f if ($@); } close $gzs->{$log}; delete $gzs->{$log}; } # error conditions will trigger exception, which will not finalize metadata } ##################### #### log_getline #### ##################### # open Compress::BGZF::Reader for given log and store in given hash sub log_getline { my ($log, $gzs, $noread) = @_; # untaint $log $log = $1 if ($log =~ /^(\w+)$/); if (!defined $gzs->{$log}) { # rmkdir is just mkdir in reverse my $f = $log eq 'rmkdir' ? 'mkdir' : $log; # open new reader and store in given hash $gzs->{$log} = Compress::BGZF::Reader->new_filehandle("$opts{base}/$f"); # error conditions will trigger exception } return if ($noread); return $gzs->{$log}->getline; } ################### #### log_print #### ################### # print given text to open/new Compress::BGZF::Writer for given log # obtained from/stored to given hash sub log_print { my ($log, $gzs, $line) = @_; # untaint $log $log = $1 if ($log =~ /^(\w+)$/); if (!defined $line) { log_close($log, $gzs); unlink("$opts{base}/$log.gzi"); # force new file my $fh = Compress::BGZF::Writer->new_filehandle("$opts{base}/$log"); close $fh; } else { if (!defined $gzs->{$log}) { # open new writer and store in given hash $gzs->{$log} = Compress::BGZF::Writer->new_filehandle(">$opts{base}/$log"); } # note data may be buffered until close so error may be in log_close $gzs->{$log}->print($line); } # error conditions will trigger exception, which will not finalize metadata } ################## #### fs_mount #### ################## # return the mount point on the given host holding the given path sub fs_mount { my ($host, $path) = @_; my $base; my %dbs = map {$_ => $_} (\%mounts, \%umounts, \%meta); my @dirs = File::Spec->splitdir($path); while (scalar(@dirs)) { $base = File::Spec->catdir(@dirs); # check each database for mount point foreach my $db (values %dbs) { my %mnt = (remote => $db->{"mountr_$host:$base"}); if ($mnt{remote}) { # database has mount point information $mnt{local} = $base; $mnt{opts} = $db->{"mounto_$host:$base"}; return \%mnt; } } # check increasingly shorter path prefix pop @dirs; } # no remote mount point found return undef; } ###################### #### map_fs_mount #### ###################### # return the mount point on the given host that corresponds to the # given mount point on another with given read/write access sub map_fs_mount { my ($mnt1, $host2, $rw) = @_; foreach my $db (\%mounts, \%umounts, \%meta) { my %mnt2 = (local => $db->{"mountl_$host2:$mnt1->{remote}"}); if ($mnt2{local}) { $mnt2{opts} = $db->{"mounto_$host2:$mnt2{local}"}; # must have read/write access if specified if (!$rw || $mnt2{opts} =~ /(?:^|,)rw(?:$|,)/) { $mnt2{remote} = $mnt1->{remote}; return \%mnt2; } } } # no equivalent mount point found return undef; } ################### #### map_local #### ################### # return the equivalent of a given path on a given host on another given host sub map_local { my ($host1, $path1, $host2, $ref) = @_; # find file system mount of path on original host my $mnt1 = fs_mount($host1, $path1); if ($host1 eq $host2) { if ($mnt1) { # store mount info $ref->{$_} = $mnt1->{$_} foreach (keys %{$mnt1}); } # return original path return $path1; } elsif (!$mnt1) { # no equivalent mount found on host return undef; } my $mnt2 = map_fs_mount($mnt1, $host2, $ref->{rw}); if (defined $mnt2) { # replace original mount point with new mount point $path1 =~ s/^\Q$mnt1->{local}\E/$mnt2->{local}/; # store mount info $ref->{$_} = $mnt2->{$_} foreach (keys %{$mnt2}); return $path1; } return undef; } #################### #### map_remote #### #################### # return the equivalent of a given remote path on a given host sub map_remote { my ($lhost, $path1, $ref) = @_; # remote paths will still be escaped at this point if ($path1 =~ /^([^\/]+)%3A(\/.*)?/) { my ($rhost, $rpath) = ($1, $2); # host may have user@ attached my $user; ($user, $rhost) = ($1, $2) if ($rhost =~ /(.+%40)(.*)/); if (!$user) { # check if remote file system exists on local host my $path2 = map_local($rhost, $rpath, $lhost, $ref); return $path2 if (defined $path2); } # find file system mount of path on original host my $mnt1 = fs_mount($rhost, $rpath); # return original if no mount found return $path1 if (!$mnt1); my $pick = $opts{"pick_$mnt1->{remote}"}; if (!defined $pick) { # find hosts that mount file system based on global/user db my $key = "mounth_" . $mnt1->{remote}; my %hosts = map {$_->{$key} ? %{$_->{$key}} : ()} (\%meta, \%mounts, \%umounts); my %shells = map {$_->{shells} ? %{$_->{shells}} : ()} (\%meta, \%mounts, \%umounts); # determine potential hosts foreach my $host (keys %hosts) { delete $hosts{$host} if (!map_fs_mount($mnt1, $host, $ref->{rw}) || !$shells{$host}); } # prune last remote host to avoid checksum caching effects delete $hosts{$ref->{last}} if ($ref->{last} && scalar(keys %hosts) > 1); # prune potential hosts based on number currently assigned my $min = 1E9; my %min_hosts; foreach my $host (keys %hosts) { my $npicks = scalar(keys %{$meta{"picks_$host"}}); # don't count previous selection for this host $npicks-- if ($meta{"picks_$host"}->{$lhost}); next if ($npicks > $min); $min = $npicks; $min_hosts{$npicks} = [] if (!defined $min_hosts{$npicks}); push(@{$min_hosts{$npicks}}, $host); } my %picks = map {$_ => 1} @{$min_hosts{$min}}; if (defined $conf{select_hook}) { # select host using configured selection hook $pick = open3_get([-1, undef, -1], "$conf{select_hook} $lhost $rhost " . join(",", keys %picks)); } # revert to default selection policy when no selection $pick = default_select($rhost, keys %picks) if (!$pick); $pick =~ s/\s*\r?\n$//; if (!$pick) { # store original mount info $ref->{$_} = $mnt1->{$_} foreach (keys %{$mnt1}); # return original path if can't find suitable mount return $path1 } # clear previously picked hosts foreach (grep(/^picks_/, keys %meta)) { delete $meta{$_}->{$lhost}; delete $meta{$_} if (scalar(keys %{$meta{$_}}) == 0); } # store that host has already been selected $meta{"picks_$pick"}->{$lhost} = 1; $opts{"pick_$mnt1->{remote}"} = $pick; } my $mnt2 = map_fs_mount($mnt1, $pick, $ref->{rw}); # mnt2 is known to be defined due to previous %hosts map_fs_mount calls # replace original mount point with new mount point $rpath =~ s/^\Q$mnt1->{local}\E/$mnt2->{local}/; # construct remote path using escaped colon after host $rpath = "$user$pick%3A$rpath"; # store mount info $ref->{$_} = $mnt2->{$_} foreach (keys %{$mnt2}); return $rpath; } return undef; } ############## #### meta #### ############## # output metadata for transfer specified with id option sub meta { my $dir = $conf{user_dir}; while (-d $dir) { last if (-d "$dir/$opts{user}.$opts{id}"); $dir .= "/$opts{user}.more"; } if (-d $dir) { my $file = "$dir/$opts{user}.$opts{id}/meta"; print "$file = "; # retrieve metadata from file %meta = %{get_meta($file, $opts{meta})}; print Dumper(\%meta); } } ################# #### monitor #### ################# # print status updates of selected running transfers to stdout # or notify monitor processes when given parameter sub monitor { if ($_[0]) { # find monitor processes foreach my $ph (glob "$conf{user_dir}/mon_* $opts{base}/mon_*") { my $base = basename($ph); my ($pid, $host); # untaint pid and host if ($base =~ /^mon_(\d+)_([\w.-]+)$/) { ($pid, $host) = ($1, $2); } # notify processes using SIGCHLD open3_get([-1, undef, -1], "ssh $host kill -s CHLD $pid"); } return; } # indicate monitoring for other processes $monfile = $opts{id} ? $opts{base} : $conf{user_dir}; my $base = join("_", "mon", $$, $self); $monfile .= "/" . $base; # a failure to create the file will revert to once a minute monitoring open(FILE, '>', $monfile); close FILE; sync_queue($base) if ($conf{sync_host} && !$opts{id}); # set parameters for status() calls $opts{status} = $opts{monitor}; $opts{state} = "run"; my $lines; while (1) { # move cursor up and erase all previously printed lines print "\e[1A\e[K" foreach (1 .. $lines); if ($opts{id}) { # meta must be loaded manually when invoking status() on specfic id %meta = %{get_meta("$opts{base}/meta")}; my $out = status(); # manually count lines in output $lines = $out =~ tr/\n//; print $out; } else { $lines = status(); } # terminate when only headers in status last if ($lines <= 3); eval { # refresh status on timeout or signal from external put() calls local $SIG{ALRM} = sub {}; local $SIG{CHLD} = sub {}; alarm 60; sleep; }; alarm 0; # time is usually only set once upon invocation $time = time; } # monitor file will be cleaned up by END } ##################### #### mp_retrieve #### ##################### # return data structure stored in MessagePack format from given file sub mp_retrieve { my $file = shift; my $return; if (open(MPFILE, '<', $file)) { my $line; $line .= $_ while (); # ignore exceptions since load/umount dbs not critical for completion $return = eval {Data::MessagePack->unpack($line)}; close MPFILE; } return defined $return ? $return : {}; } ################## #### mp_store #### ################## # store given data structure to given file in MessagePack format sub mp_store { my ($data, $file) = @_; if (open(MPFILE, '>', $file)) { # ignore exceptions since load/umount dbs not critical for completion print MPFILE eval {Data::MessagePack->pack($data)}; close MPFILE; } } ################### #### open3_get #### ################### # run given command with stdin/stdout/stderr from/to given files # and return command output when requested sub open3_get { my $files = shift; my @args = @_; my $fhpid = open3_run($files, @args); return undef if (!defined $fhpid); my $ifh; if (!defined $files->[1]) { $ifh = 1; } elsif (scalar(@{$files}) == 3 && !defined $files->[2]) { $ifh = 2; } my $out; if ($ifh) { $out .= $_ while (defined ($_ = $fhpid->[$ifh]->getline)); } open3_wait($fhpid); return $out; } ################### #### open3_run #### ################### # run given command with stdin/stdout/stderr either from/to given files # or from/to autocreated pipes and return associated file handles and pid sub open3_run { my $files = shift; my @args = @_; if (scalar(@args) == 1) { $args[0] =~ s/^\s+|\s+$//g; @args = quotewords('\s+', 0, $args[0]); } my (@fh, @o3); foreach my $i (0 .. scalar(@{$files}) - 1) { my $dir = $i ? '>' : '<'; my $file = $files->[$i]; $file = File::Spec->devnull if ($file == -1); if ($file) { open($fh[$i], $dir, $file); $o3[$i] = $dir . '&' . $fh[$i]->fileno; } else { $o3[$i] = gensym; $fh[$i] = $o3[$i]; } } # combine stdout/stderr if nothing given for stderr $o3[2] = $o3[1] if (scalar(@{$files}) == 2); my $pid; eval {$pid = IPC::Open3::open3(@o3, @args)}; if ($@ || !defined $pid) { open3_wait([@fh]); return undef; } else { $o3[0]->autoflush(1) if (ref $o3[0]); return [@fh, $pid]; } } #################### #### open3_wait #### #################### # wait for processes and clean up handles created by open3_run sub open3_wait { my $fhpid = shift; return if (!defined $fhpid); my $pid = pop(@{$fhpid}); close $_ foreach(@{$fhpid}); waitpid($pid, 0); } ##################### #### parse_bytes #### ##################### # return decimal/binary/binary_power_of_2 equivalent of given string sub parse_bytes { my $text = shift; my $binary = shift; my $tbytes = $binary ? \%bibytes : \%bytes; $text =~ s/([1-9]\d*)([kmgt])?/$1*$tbytes->{uc $2}/eg; if ($text && $binary == 2) { # adjust binary values to power of 2 my $tmp = 1; $tmp <<= 1 while ($text >>= 1); $text = $tmp; } return $text; } ############## #### plot #### ############## # print output suitable for gnuplot to stdout for selected transfers sub plot { #TODO: fail={errors, warnings, corruptions, exceptions} my ($by, $batch); if ($opts{plot} =~ /^(?:client|fs|host|id|net|user)$/) { ($by, $opts{plot}) = ($opts{plot}, "io"); } elsif ($opts{plot} =~ /(\S+)([:\/])(\S+)/) { ($by, $batch, $opts{plot}) = ($1, $2, $3); $batch = $batch eq ':' ? 0 : 1; die "Can only plot by client, fs, host, id, net, or user\n" if ($by !~ /^(?:client|fs|host|id|net|user)$/); } elsif (!$opts{plot}) { $opts{plot} = "io,meta"; } $opts{plot} =~ s/io/join(",",qw(cp sum cksum))/eg; $opts{plot} =~ s/meta/join(",",qw(find mkdir ln chattr))/eg; $opts{plot} =~ s/tool/join(",",qw(bbftp fish fish-tcp mcp msum rsync shift-cp shift-sum))/eg; die "Unsupported term found in plot expression\n" if ($opts{plot} !~ /^((?:chattr|cksum|cp|find|ln|mkdir|sum|bbftp|fish|fish-tcp|mcp|msum|rsync|shift-cp|shift-sum)(,|$))+$/); # remove trailing commas since above regex doesn't catch $opts{plot} =~ s/,+$//; my %times; my @metas; my %items; my $dir = $> != 0 ? $conf{user_dir} : $opts{user_dir}; my $user = $> != 0 ? $opts{user} : "*"; do { push(@metas, glob "$dir/$user.[0-9]*/meta"); $dir .= "/*.more"; } while (scalar(glob $dir)); foreach my $file (@metas) { my $id = $file; if ($> != 0) { $id =~ s/.*\.|\/meta//g; } else { # ignore old transfers next if ((stat $file)[9] + $conf{data_expire} < $time); # leave user name in id $id =~ s/.*\/([\w-]+\.\d+)\/meta/$1/g; } # ignore other ids when id is defined next if ($opts{id} && $id != $opts{id}); # retrieve metadata from file %meta = %{get_meta($file)}; # skip transfers that use --sync next if ($meta{sync}); my $state = state(); # skip transfers that do not match the given state next if (!$opts{id} && $opts{state} && $state !~ /(?:^|\+)\Q$opts{state}\E(?:$|\+)/); my $group; if ($by eq 'id') { $group = $id; } elsif ($by eq 'user') { $group = $file; $group =~ s/.*\/([\w-]+)\.\d+\/meta/$1/g; } my %gets; my ($meta0, $meta); my ($time0, $time1); my $fh; open($fh, '<', $file) or die; while (my $line = <$fh>) { last if (!defined $line); last if (substr($line, 0, 1) != '[' || substr($line, -1, 1) != ']'); $meta0 = $meta; # note that this doesn't handle Math::BigInt as is normally done $meta = eval {Data::MessagePack->unpack(uncompress( yenc_decode(substr($line, 1, -1))))}; next if (!$meta); my $client = $meta->{update_id}; my $host = $client; $host =~ s/\.\d+$//; my $last = "last_$host"; my $ltime = $meta->{$last}; if (!defined $time0) { # skip times that were too far in the past and will clutter plot next if ($> == 0 && $ltime + $conf{data_expire} < $time); # ensure start time is added for difference to first operation $time0 = $ltime; $times{$time0} = [] if (!defined $times{$time0}); } if (!defined $time1 && $meta->{time1}) { # ensure start time is added for difference to last operation $time1 = $meta->{time1}; $times{$time1} = [] if (!defined $times{$time1}); } # record rates when hosts have completed operations (put) if ($gets{$client} && sum(map {$meta->{"$_\_size"}} (@stages, qw(done error))) > sum(map {$meta0->{"$_\_size"}} (@stages, qw(done error)))) { my %fs; if ($by eq 'fs') { # find all file systems in use foreach (grep(/^[ds]_(?:chattr|cksum|find|get|ln|mkdir|put|sum)_/, keys %{$meta})) { s/^\w+_\w+_//; $fs{$_} = 1; } } else { $fs{""} = 1; } if ($by eq 'client') { $group = $client; } elsif ($by eq 'host') { $group = $host; } elsif ($by eq 'net') { $group = "unknown"; if ($meta->{origin_ip}) { # change origin to subnet $group = $meta{origin_ip}; $group =~ s/\.\d+$/\.0/; } } foreach my $fs (keys %fs) { if ($by eq 'fs') { # remove all but server mount point my $f = $fs; $f =~ s/^[^\/]*//; $group = $fs ? $f : "unknown"; } my $data; foreach my $cmd (split(/,/, $opts{plot})) { my ($count, $rate); if ($cmd eq 'find') { # use number of operations generated as rate basis # (this is inaccurate when #fs>1 in a find batch) $rate = sum(map {$meta->{"t_$_"} - $meta0->{"t_$_"}} qw(cp ln mkdir)); } elsif ($cmd =~ /^(?:chattr|cksum|ln|mkdir|sum)$/) { my $fscmd = "d_$cmd"; if ($cmd =~ /sum$/) { $count = $meta->{$fscmd} - $meta0->{$fscmd}; $fscmd = "s_$cmd"; } $fscmd .= "_$fs" if ($fs); $rate = $meta->{$fscmd} - $meta0->{$fscmd}; # io operations are shown in MB/s $rate /= 1E6 if ($cmd =~ /sum$/); } else { $count = $meta->{"d_$cmd"} - $meta0->{"d_$cmd"}; next if ($count <= 0); # this assumes one tool per batch (get() ensures) my @fscmds = $cmd =~ /sum$/ ? qw(cksum sum) : ($fs ? qw(get put) : qw(cp)); # msum and shift-sum foreach my $fscmd (@fscmds) { $fscmd = "s_$fscmd"; $fscmd .= "_$fs" if ($fs); $rate += $meta->{$fscmd} - $meta0->{$fscmd}; } # io operations are shown in MB/s $rate /= 1E6; } # throw out negative results due to retries next if ($rate <= 0); my $tdiff = $ltime - $gets{$client}; $tdiff = 1 if (!$tdiff); $data->{$cmd} = ceil($rate / $tdiff); # add batch size as negligible fraction $data->{$cmd} .= ".00$count" if ($batch && $count > 0); } next if (!defined $data); # add to operations if something done (i.e. not --alive) $times{$gets{$client}} = [] if (!defined $times{$gets{$client}}); $times{$ltime} = [] if (!defined $times{$ltime}); foreach (keys %{$data}) { my $key = $_; $key = "$group [$key]" if ($by); $items{$key} = 1; } $data->{group} = $group if ($by); if ($batch) { $data->{time} = $ltime; } elsif (!$batch) { # computed rates end when the put occurred my $data2 = dclone($data); foreach (split(/,/, $opts{plot})) { $data2->{$_} *= -1; } push(@{$times{$ltime}}, $data2); } # computed rates begin when the get occurred push(@{$times{$gets{$client}}}, $data); } # no more outstanding operations by client delete $gets{$client}; } # record when clients have operations in progress (get) $gets{$client} = $ltime if ($meta->{"doing_$client\_size"} > $meta0->{"doing_$client\_size"}); } close $fh; } my @items = sort(keys %items); die "No suitable transfers to plot\n" if (!scalar(@items)); # embed data inline so output can be piped directly into gnuplot print "# This output requires gnuplot version 5 or higher\n"; print '$data << EOD', "\n"; my $time1; my %vals; my %stats; my $sum; my %sums; foreach my $ltime (sort {$a <=> $b} keys(%times)) { if (!$batch && $time1) { foreach my $group (keys %vals) { while (my ($k, $v) = each %{$vals{$group}}) { next if ($v <= 0); $k = "$group [$k]" if ($group); print join(" ", $time1, $ltime, "\"$k\"", $v), "\n"; $stats{n}++; $stats{s} += $v; $stats{sq} += $v * $v; } } } foreach my $val (@{$times{$ltime}}) { my ($group, $t) = delete @{$val}{qw(group time)}; while (my ($k, $v) = each %{$val}) { if ($batch) { $k = "$group [$k]" if ($group); print join(" ", $ltime, $t, "\"$k\"", $v), "\n"; $stats{n}++; $stats{s} += $v; $stats{sq} += $v * $v; } else { $vals{$group}->{$k} += $v; $sum->{$k} += $v; } } } $sums{$ltime} = dclone($sum) if (!$batch && ref $sum); # record last time witnessed $time1 = $ltime; } print "EOD\n"; if (!$batch) { print '$max << EOD', "\n"; my @sums = sort {$a <=> $b} keys(%sums); my $t0 = $sums[0]; my $tdiff = int(($time1 - $t0) / 100); $tdiff = 1 if (!$tdiff); my @max = (0, 0); foreach my $ltime (@sums) { if ($ltime >= $t0 + $tdiff && $max[0] + $max[1] > 0) { print "$t0 $max[0] $max[1]\n"; $t0 = $t0 + $tdiff; @max = (0, 0); } my @max1; while (my ($k, $v) = each %{$sums{$ltime}}) { my $io = $k =~ /^(?:find|mkdir|ln|chattr)$/ ? 1 : 0; $max1[$io] += $v; } foreach (0, 1) { $max[$_] = $max1[$_] if ($max1[$_] > $max[$_]); } } print "EOD\n"; } if ($stats{n}) { $stats{m} = $stats{s} / $stats{n}; $stats{dev} = sqrt($stats{sq} / $stats{n} - ($stats{s} / $stats{n})**2); } my $lw = "lw " . min(10, max(.75, (50 / scalar(@items)))); my $font = "font '," . max(2, min(10, ceil(300 / scalar(@items)))) . "'"; my $ps = "ps " . min(1, max(.075, (5 / scalar(@items)))); my $td = 3600 * ((24 + (gmtime($time))[2] - (localtime($time))[2]) % 24); # set basic plot characteristics print "set terminal pdf enhanced\n"; print "set border linecolor rgb 'white'\n"; print "set format x '%m/%d %H:%M:%S'\n"; print "set lmargin at screen .15\n"; print "set obj 1 rect behind from screen 0,0 to screen 1,1 fillcolor rgb 'black' behind\n"; print "set palette defined ( 0 '#2f0087', 1 '#6200a4', 2 '#9200a6', 3 '#ba2f8a', 4 '#d85b69', 5 '#ee8949', 6 '#f6bd27', 7 '#e4fa15' )\n"; print "set style arrow 1 nohead lc palette $lw\n"; print "set timefmt '%s'\n"; print "set xdata time\n"; print "set xlabel 'Time' textcolor rgb 'white' offset screen 0,.05\n"; print "set xtics nomirror rotate by 60 right font ',6'\n"; print "set ytics nomirror $font\n"; print "unset key\n"; print "set ylabel 'Rate (I/O: MB/s, Meta: ops/s)' textcolor rgb 'white'\n"; print "set y2tics nomirror\n" if (!$batch); my $lc = $stats{m}+2 * $stats{dev}; print "lc(x) = x > $lc ? $lc : x\n"; print "set yrange [.5:", scalar(@items) + .5, "]\n"; print "item(i) = ", join(" : ", map {"(i eq \"$items[-$_]\" ? $_"} (1..scalar(@items))), " : -1", ")" x scalar(@items), "\n"; print "plot \\\n"; if ($batch) { print " \$data using (\$1-$td):(item(strcol(3))):(lc(\$4)):ytic(3) with points lc palette pt 7 $ps\n"; } else { print " \$data using (\$1-$td):(item(strcol(3))):(\$2-\$1):(0):(lc(\$4)):ytic(3) with vector as 1, \\\n"; print " \$max using (\$1-$td):2 axes x1y2 with points lc \"green\" pt 7 ps .075, \\\n"; print " \$max using (\$1-$td):3 axes x1y2 with points lc \"blue\" pt 7 ps .075\n"; } print "\n"; } ############# #### put #### ############# # record the state of file operations that were processed by a client sub put { # only process a put when the corresponding get was from this host return if ($opts{put} && $opts{put} ne $self && $conf{sync_host}); # retrieve global and user database from file %mounts = %{mp_retrieve($conf{mount_db})}; %umounts = %{mp_retrieve($conf{umount_db})}; my $gzs = {}; my $more_finds = $opts{more_finds} + $meta{d_find} + $meta{e_find} == $meta{t_find} ? 0 : 1; my %links; my %mnts; my %rates; $meta{"warn_$opts{host}$opts{cid}"} = -1; while (my $line = ) { debug_print('PUT', $line); $line =~ s/\s*\r?\n$//; # eliminate any random double slashes that crept in $line =~ s/\/\//\//g; #TODO: size limit? compression? my %op = split(/[= ]+/, $line); # ignore malformed lines with undefined op values next if (grep(!/./, values %op)); if (defined $op{doing}) { # ignore operations that have been processed by another client next if (!defined $doing->{$op{doing}}); delete $doing->{$op{doing}}; } my @args = split(/,/, $op{args}); my $cmd = shift @args; my ($sid, $split) = split(/:/, $op{split}); $rates{$cmd} = $op{rate} if ($op{rate}); if ($op{state} eq 'alert') { log_print('alert', $gzs, $line . "\n"); $meta{e_alert}++; } elsif ($cmd =~ /^ckattr/ && defined $op{state}) { $meta{s_run} -= $op{size}; $meta{t_run}--; if ($op{state} eq 'error') { # dst does not exist so next state is cp $cmd = "cp"; } else { if (defined $op{split}) { # record all split copies done $meta{"sd_cp_$sid"}->bnot; $meta{"st_cp_$sid"} = 0; if ($meta{verify} && $op{state} eq 'done') { # record all split sums and cksums done $meta{"sd_sum_$sid"}->bnot; $meta{"st_sum_$sid"} = 0; $meta{"sd_cksum_$sid"}->bnot; $meta{"st_cksum_$sid"} = 0; } } # record copy done $meta{s_cp} += $op{size}; $meta{d_cp}++; if ($op{state} eq 'done') { if ($meta{verify}) { # record sum and cksum done $meta{s_sum} += $op{size}; $meta{s_cksum} += $op{size}; $meta{d_sum}++; $meta{d_cksum}++; } # dst exists with same attrs so next state is chattr $cmd = "chattr"; # do not create partial operations for chattr delete $op{split} if (defined $op{split}); } else { # dst exists with diff/ignored attrs so next state is cp/sum $cmd = $meta{verify} ? "sum" : "cp"; } } $op{args} =~ s/^[^,]+/$cmd/; # more work to be done delete $op{$_} foreach (qw(doing rate run state text time)); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); if (defined $op{split}) { my $pos = 0; my $split = 0; while ($pos < $op{size}) { # create a partial operation for each split my $end = min($pos + $meta{split}, $op{size}); # adjust size my $size = $end - $pos; $line =~ s/size=\d+/size=$size/; $line =~ s/split=\S+/split=$sid:$split/; log_print($cmd, $gzs, "$line bytes=$pos-$end\n"); $split++; $pos += $meta{split}; } } else { log_print($cmd, $gzs, "$line\n"); } } elsif ($cmd eq 'env') { if ($meta{"env_$opts{host}"} == -1) { # reduce outstanding hosts on first contact $meta{ohosts}--; $meta{"clients_$opts{host}"} = $meta{clients} - 1 if ($meta{clients} > 1); } $meta{"env_$opts{host}"} = $op{text}; } elsif ($cmd eq 'exception') { # track exceptions for stats processing $meta{e_exception}++; $meta{"exception_$opts{host}$opts{cid}"} = unescape($op{text}); } elsif ($cmd eq 'latency') { # record domain network latency foreach my $host (keys %op) { next if ($host eq 'args'); my $dn = $host; $dn =~ s/^[^.]+.//; $meta{"lastrtt_$dn"} = $time; if ($op{$host} < 0) { $meta{"e_rtt_$dn"}++; } else { $meta{"rc_rtt_$dn"}++; my $m = $op{$host} - $meta{"rtt_$dn"}; $meta{"rtt_$dn"} += $m / $meta{"rc_rtt_$dn"}; $meta{"r2_rtt_$dn"} += $m * ($op{$host} - $meta{"rtt_$dn"}); } } } elsif ($cmd eq 'load') { # record host load for throttling $meta{"load_$opts{host}$opts{cid}"} = $line; } elsif ($cmd eq 'getopt') { # initialize transfer settings once all getopt lines received init_id() if ($args[0] eq 'end'); # check validity of option next if ($args[0] !~ /^(?:bandwidth|buffer|clients|command|cpu|create-tar|cron|cwd|dereference|disk|exception|exclude|extract-tar|files|force|host-list|hosts|ignore-times|include|index-tar|interval|io[rw]?|local|mail|net[rw]?|newer|offline|older|pipeline|ports|preallocate|preserve|recall|remote|retry|sanity|secure|silent|size|split|split-tar|streams|stripe|sync|threads|verify|verify-fast|window)$/); $meta{$args[0]} = defined $op{text} ? unescape($op{text}) : 1; } elsif ($cmd eq 'host') { # host error so remove host from outstanding hosts $meta{ohosts}--; delete $meta{"env_$args[0]"}; $meta{"nohost_$args[0]"} = 1; } elsif ($op{state} eq 'done') { track_cache(\%op, $cmd, \@args); $meta{"warn_$opts{host}$opts{cid}"} = 0; log_print('done', $gzs, "$line\n"); $meta{s_run} -= $op{size}; $meta{"s_$cmd"} += $op{size}; $meta{t_run}--; # count operations that check in after transfer stopped against rate $meta{time1} = $time if ($meta{time1}); if (defined $op{split}) { my $test = Math::BigInt->new(1); $test->blsft($split); if ($test->copy->band($meta{"sd_$cmd\_$sid"})->is_zero) { # record that this particular split was done; $meta{"sd_$cmd\_$sid"}->bior($test); # decrement number of splits that need to be done; $meta{"st_$cmd\_$sid"}--; } } if (!defined $op{split} || $meta{"st_$cmd\_$sid"} <= 0) { # only update cmd totals for unsplit files or last split $meta{"d_$cmd"}++; $meta{"d_$op{tool}"}++; } if ($meta{verify} && $cmd eq 'cp') { if ($op{hash}) { # transport already summed so next state is cksum $cmd = "cksum"; $op{args} =~ s/^[^,]+/$cmd/; if (defined $op{split}) { my $test = Math::BigInt->new(1); $test->blsft($split); if ($test->copy->band($meta{"sd_sum_$sid"})->is_zero) { # record that this particular split was done; $meta{"sd_sum_$sid"}->bior($test); # decrement number of splits that need to be done; $meta{"st_sum_$sid"}--; } } if (!defined $op{split} || $meta{"st_sum_$sid"} <= 0) { # only update sum totals for unsplit files or last split $meta{d_sum}++; } $meta{s_sum} += $op{size}; } else { # next state is sum $cmd = "sum"; $op{args} =~ s/^[^,]+/$cmd/; } } elsif ($meta{verify} && $cmd eq 'sum') { # next state is cksum $cmd = "cksum"; $op{args} =~ s/^[^,]+/$cmd/; } elsif (($meta{sanity} || $meta{preserve}) && $cmd =~ /^(?:cksum|cp|ln)/) { if ($meta{silent} && $cmd eq 'cksum' && detect_silent(\%op, $args[0], $args[1])) { $line =~ s/(?:text|tool)=\S+//g; log_print('alert', $gzs, "$line tool=shift-mgr text=$op{text}\n"); $meta{e_silent}++; } if (!defined $op{split} || $cmd eq 'cksum' && $meta{"st_cksum_$sid"} <= 0 || $cmd eq 'cp' && $meta{"st_cp_$sid"} <= 0) { # indicate operation was ln so can handle differently $op{ln} = 1 if ($cmd eq 'ln'); # only chattr unsplit files or last split # next state is chattr $cmd = "chattr"; $op{args} =~ s/^[^,]+/$cmd/; delete $op{$_} foreach (qw(bytes hash split)); } else { # ignore splits before last split next; } } else { if ($meta{silent} && $cmd eq 'cksum' && detect_silent(\%op, $args[0], $args[1])) { $line =~ s/(?:text|tool)=\S+//g; log_print('alert', $gzs, "$line tool=shift-mgr text=$op{text}\n"); $meta{e_silent}++; } $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); next; } # more work to be done delete $op{$_} foreach (qw(doing rate run state text time)); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); log_print($cmd, $gzs, "$line\n"); } elsif ($op{state} && $op{try} >= $meta{retry}) { $meta{"warn_$opts{host}$opts{cid}"} = 0; log_print('error', $gzs, $line . "\n"); $meta{t_run}--; $meta{s_run} -= $op{size}; $meta{s_error} += $op{size}; $meta{"e_$cmd"}++; $meta{"e_$op{tool}"}++; $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); # track corruption for stats processing $meta{e_corruption}++ if ($cmd eq 'cksum' && unescape($op{text}) =~ /^Corruption/ && (!$meta{sync} || $op{try} > 1)); } elsif ($op{state}) { $meta{s_run} -= $op{size}; $meta{t_run}--; $op{try}++; # count operations that check in after transfer stopped against rate $meta{time1} = $time if ($meta{time1}); if ($cmd eq 'chattr' && unescape($op{text}) =~ /file sizes differ$/) { # reset size since may have changed during chattr split join $op{size} = (split(/,/, $op{attrs}))[7]; # file corrupted so next state is cp $cmd = "cp"; $op{args} =~ s/^[^,]+/$cmd/; # mark operations as not done $meta{d_cp}--; $meta{s_cp} -= $op{size}; if ($meta{verify}) { $meta{d_sum}--; $meta{d_cksum}--; $meta{s_sum} -= $op{size}; $meta{s_cksum} -= $op{size}; } if ($meta{split} > 0 && $op{size} > $meta{split}) { $op{state} = "warn"; # more work to be done delete $op{$_} foreach (qw(doing rate run time)); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); my ($x1, $x2) = (0, $op{size}); # bytes must be subset of existing tar bytes range ($x1, $x2) = ($1, $2) if ($op{tar_bytes} =~ /(\d+)-(\d+)/); my $split = 0; while ($x1 < $x2) { $meta{w_run}++; # create a partial copy operation for each split my $end = min($x1 + $meta{split}, $x2); # adjust size my $size = $end - $x1; $line =~ s/size=\d+/size=$size/; log_print($cmd, $gzs, "$line split=$meta{split_id}:$split bytes=$x1-$end\n"); $split++; $x1 += $meta{split}; } # use new split id (old one lost during chattr stage) foreach ($meta{verify} ? qw(cp sum cksum) : qw(cp)) { $meta{t_split} += $split; $meta{"st_$_\_$meta{split_id}"} = $split; $meta{"sd_$_\_$meta{split_id}"} = Math::BigInt->new(0); } $meta{split_id}++; next; } elsif ($op{tar_bytes}) { # tar operations expect bytes field to exist $op{bytes} = $op{tar_bytes}; } } elsif ($cmd eq 'cksum' && unescape($op{text}) =~ /^Corruption,?(.*\d)?/) { my $bytes = $1; track_cache(\%op, $cmd, \@args); # track corruption for stats processing $meta{e_corruption}++ if (!$meta{sync} || $op{try} > 1); $meta{s_cksum} += $op{size}; if ($bytes) { my $end = (split(/,/, $op{attrs}))[7]; if ($op{tar_bytes} =~ /\d+-(\d+)/) { # bytes must be subset of existing tar bytes range $end = $1; } elsif ($end <= $conf{sum_split}) { # process whole file when src fits in one split $bytes = undef; $end = undef; } if (defined $end) { # adjust ranges to sane values my @ranges = split(/,/, $bytes); foreach my $range (@ranges) { my ($x1, $x2) = split(/-/, $range); if ($x1 >= $end) { # remove range if min beyond end offset $range = undef; } elsif ($x2 > $end) { # truncate dst if max beyond end offset $range = $x1 . "-" . $end } } $bytes = join(",", @ranges); # remove empty ranges $bytes =~ s/^,+|,+$//g; $bytes =~ s/,,+/,/g; } if ($bytes) { # reduce tries if progress being made $op{try}-- if (join(",", sort {$a <=> $b} split(/,/, $op{bytes})) ne join(",", sort {$a <=> $b} split(/,/, $bytes))); $op{bytes} = $bytes; # adjust size of remaining operations $op{size} = 0; if (!$op{hash0}) { # keep full hash for silent corruption detection $op{hash0} = $op{hash}; # eliminate mutil prefix (if any) $op{hash0} =~ s/^#mutil#(\d+-\d+)?#\\?//; } else { my $start = ($op{tar_bytes} =~ /(\d+)-\d+/) ? $1 : 0; my $rsrc = $args[0] =~ /^[^\/]+%3A/ ? 1 : 0; if ($meta{'create-tar'} && !$rsrc || $meta{'extract-tar'} && $rsrc) { # create offset only non-zero when remote source # extract offset only zero when remote source $start = 0; } # replace subsets of full hash with new values foreach my $hash (split(/,/, $op{hash})) { if ($hash =~ /^#mutil#(\d+)-(\d+)#\\?(\S+)/) { my ($x1, $x2, $h) = ($1, $2, $3); # adjust offset by start of tar byte range my $hoff = ($x1 - $start) / $conf{sum_split}; substr($op{hash0}, $hoff, length $h) = $h; } } } foreach (split(/,/, $bytes)) { $op{size} += $2 - $1 if (/(\d+)-(\d+)/); } } } if (!$bytes && defined $bytes) { # remaining operations empty so done $meta{"warn_$opts{host}$opts{cid}"} = 0; log_print('done', $gzs, "$line\n"); $meta{"s_$cmd"} += $op{size}; if (defined $op{split}) { my $test = Math::BigInt->new(1); $test->blsft($split); if ($test->copy->band($meta{"sd_$cmd\_$sid"})->is_zero) { # record that this particular split was done; $meta{"sd_$cmd\_$sid"}->bior($test); # decrement number of splits that need to be done; $meta{"st_$cmd\_$sid"}--; } } if (!defined $op{split} || $meta{"st_$cmd\_$sid"} <= 0) { # only update cmd totals for unsplit files or last split $meta{"d_$cmd"}++; $meta{"d_$op{tool}"}++; if ($meta{sanity} || $meta{preserve}) { # only chattr unsplit files or last split # next state is chattr $cmd = "chattr"; $op{args} =~ s/^[^,]+/$cmd/; } else { # ignore splits before last split next; } } else { $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); next; } # more work to be done delete $op{$_} foreach (qw(bytes doing hash rate run split state text time)); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); log_print($cmd, $gzs, "$line\n"); next; } if (defined $op{split}) { my $test = Math::BigInt->new(1); $test->blsft($split); $test->bnot; foreach (qw(cp sum)) { # record that this particular split was not done; $meta{"sd_$_\_$sid"}->band($test); # increment number of splits that need to be done; $meta{"st_$_\_$sid"}++; $meta{"d_$_"}-- if ($meta{"st_$_\_$sid"} == 1); } } else { $meta{d_cp}--; $meta{d_sum}--; } # file corrupted so next state is cp $cmd = "cp"; $op{args} =~ s/^[^,]+/$cmd/; # reduce sizes by amount of file that was corrupt $meta{s_cp} -= $op{size}; $meta{s_sum} -= $op{size}; $meta{s_cksum} -= $op{size}; } elsif ($meta{"warn_$opts{host}$opts{cid}"}) { $meta{"warn_$opts{host}$opts{cid}"} = 1; } $op{state} = "warn"; $meta{w_run}++; # more work to be done delete $op{$_} foreach (qw(doing rate run time)); # do not delete hash when retrying cksum delete $op{hash} if ($op{args} !~ /^cksum/); $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); log_print($cmd, $gzs, "$line\n"); } elsif (defined $op{size}) { $meta{"t_$cmd"}++; $meta{t_chattr}++ if ($meta{sanity} || $meta{preserve}); if ($meta{'create-tar'} && $cmd =~ /^(?:cp|ln|mkdir)/) { # map tar file to origin system to keep tar metadata consistent my $tar; $tar = map_local($opts{host}, $args[-1], $meta{origin}, {rw => 1}) if ($args[-1] =~ /^\//); # back out to original if unable to map #TODO: should this be an error? $tar = $args[-1] if (!$tar); if (!defined $meta{"tar_size_$tar"}) { # initialize tar metadata for this file $meta{"tar_size_$tar"} = 0; $meta{"tar_split_$tar"} = 1; $meta{"tar_index_$tar"} = 0 if ($meta{'index-tar'}); $meta{"tar_nosum_$tar"} = 1 if ($meta{verify}); } elsif ($meta{"tar_size_$tar"} < 0) { # a negative size indicates the final size of the last split $meta{"tar_size_$tar"} = 0; $meta{"tar_split_$tar"}++; $meta{"tar_index_$tar"} = 0 if ($meta{'index-tar'}); } # need .sum mv for reg files / no tracking needed for 2+ splits delete $meta{"tar_nosum_$tar"} if ($cmd eq 'cp' || $meta{"tar_split_$tar"} > 1); $op{tar_start} = $meta{"tar_size_$tar"}; if ($cmd eq 'ln') { my $llen = length(unescape($args[0])); if ($llen > 100) { # add size of long link plus extra record my $asize = $llen + 512; $asize += (512 - ($asize % 512)) if ($asize % 512 > 0); $meta{"tar_size_$tar"} += $asize; } } my $tar_name = unescape($op{tar_name}); # per ustar spec, must append / to dirs $tar_name .= "/" if ($cmd eq 'mkdir'); if (length($tar_name) > 100) { my $pos = index($tar_name, "/", length($tar_name) - 100); if ($pos == -1 || $pos > 155 || length($tar_name) > 255) { # add size of long name plus extra record my $asize = length($tar_name) + 512; $asize += (512 - ($asize % 512)) if ($asize % 512 > 0); $meta{"tar_size_$tar"} += $asize; } } my $size = $cmd ne 'cp' ? 0 : $op{size}; # tar entries contain 512 byte header plus file plus padding $meta{"tar_size_$tar"} += 512; # file contents are written after the header $op{bytes} = $meta{"tar_size_$tar"} . "-"; $meta{"tar_size_$tar"} += $size; $op{bytes} .= $meta{"tar_size_$tar"}; $op{tar_bytes} = $op{bytes}; # pad entry to 512 byte boundary $meta{"tar_size_$tar"} += (512 - ($size % 512)) if ($size > 0 && $size % 512 > 0); # use appropriate split as target $op{args} .= "-" . $meta{"tar_split_$tar"} . ".tar"; if ($meta{'index-tar'}) { # designate position of entry in index file $meta{"tar_index_$tar"} += $op{tar_index}; $op{tar_index} = $meta{"tar_index_$tar"} - $op{tar_index}; } if ($meta{'split-tar'} && $meta{"tar_size_$tar"} >= $meta{'split-tar'}) { # indicate last tar entry so final padding can be added $op{tar_last} = 1; # insert chattr op in find to preallocate and stripe log_print('find', $gzs, "args=chattr,$tar-" . $meta{"tar_split_$tar"} . ".tar host=$opts{host}" . " tar_creat=" . $meta{"tar_size_$tar"} . "\n"); $meta{t_chattr}++; $meta{tar_creat}++; # move to next split by inverting size to save final value $meta{"tar_size_$tar"} = -$meta{"tar_size_$tar"}; $meta{"tar_index_$tar"} = 0 if ($meta{'index-size'}); } $line = join(" ", map {"$_=$op{$_}"} sort(keys %op)); } if ($cmd eq 'ln' || $cmd eq 'mkdir') { log_print($cmd, $gzs, "$line\n"); } elsif ($cmd eq 'cp') { $meta{s_total} += $op{size}; if ($meta{verify}) { $meta{t_sum}++; $meta{t_cksum}++; } if ($meta{split} > 0 && $op{size} > $meta{split}) { my ($x1, $x2) = (0, $op{size}); # bytes must be subset of existing tar bytes range ($x1, $x2) = ($1, $2) if ($op{bytes} =~ /(\d+)-(\d+)/); my $split = 0; while ($x1 < $x2) { # create a partial copy operation for each split my $end = min($x1 + $meta{split}, $x2); # adjust size my $size = $end - $x1; $line =~ s/size=\d+/size=$size/; $line =~ s/ bytes=\S+//; log_print($cmd, $gzs, "$line split=$meta{split_id}:$split bytes=$x1-$end\n"); $split++; $x1 += $meta{split}; } foreach ($meta{verify} ? qw(cp sum cksum) : qw(cp)) { $meta{t_split} += $split; $meta{"st_$_\_$meta{split_id}"} = $split; $meta{"sd_$_\_$meta{split_id}"} = Math::BigInt->new(0); } $meta{split_id}++; } else { log_print($cmd, $gzs, "$line\n"); } } else { if ($cmd =~ /^ckattr/) { # create additional operations without adding to logs my @ops = qw(cp); push(@ops, qw(sum cksum)) if ($meta{verify}); $meta{"t_$_"}++ foreach (@ops); $meta{s_total} += $op{size}; if ($meta{split} > 0 && $op{size} > $meta{split}) { my $split = ceil($op{size} / $meta{split}); foreach (@ops) { $meta{t_split} += $split; $meta{"st_$_\_$meta{split_id}"} = $split; $meta{"sd_$_\_$meta{split_id}"} = Math::BigInt->new(0); } # record split info for result processing $line .= " split=$meta{split_id}:$split"; $meta{split_id}++; } } # use chattr instead of $cmd as there is no ckattr log log_print('chattr', $gzs, "$line\n"); } } elsif ($cmd eq 'find') { if ($meta{dereference} && !$meta{'extract-tar'}) { # these conditions are only valid after getopt lines processed if (!defined $links{t_find}) { tie(%links, 'DB_File', "$opts{base}/links", O_RDWR, 0600); if (!defined $links{t_find} || $links{t_find} != $meta{t_find}) { # this can happen when mgr fails over as find not sync'd untie %links; build_links(); tie(%links, 'DB_File', "$opts{base}/links", O_RDWR, 0600); } #TODO: need error if cannot be tied } # skip src directories already processed due to symlinks next if ($links{$args[0]}); $links{$args[0]} = 1; $links{t_find}++; } $meta{"t_$cmd"}++; log_print('find', $gzs, "$line\n"); } elsif ($cmd eq 'mount') { $mnts{$line} = \%op; } elsif ($cmd eq 'shell') { $mnts{$line} = \%op; $mnts{"pbs_$op{host}"} = {} if ($op{pbs}); } } if ($more_finds && $meta{d_find} == $meta{t_find} && $meta{'create-tar'}) { # tar transition from finds outstanding to no finds outstanding foreach my $file (grep(/^tar_size_/, keys %meta)) { my $size = abs $meta{$file}; $file =~ s/^tar_size_//; my $split = $meta{"tar_split_$file"}; # store file and size so final cp op can insert tar eof padding $meta{"tar_last_$file-$split.tar"} = $size; if ($split == 1) { # rename first split if there is only one split $meta{tar_mv}++; # use chattr to track additional move $meta{t_chattr}++; } # insert chattr op in find to preallocate and stripe log_print('find', $gzs, "args=chattr,$file-$split.tar " . "host=$opts{host} tar_creat=$size\n"); $meta{t_chattr}++; $meta{tar_creat}++; } } # close log files log_close($_, $gzs) foreach (keys %{$gzs}); untie %links if (defined $links{t_find}); if ($more_finds && $meta{d_find} + $meta{e_find} == $meta{t_find}) { # non-tar transition from finds outstanding to no finds outstanding if ($meta{e_find} + $meta{t_cp} + $meta{t_ln} + $meta{t_mkdir} == 0) { # force error if no files (e.g. non-matching --include) # use first find line for error line my $line = log_getline('find', $gzs); log_close('find', $gzs); chomp $line; # this should never happen if client/manager versions match $line = "args=find,no_src,no_dst host=no_host" if (!$line); $line .= " run=$time state=error tool=shift-mgr text=" . escape("No files found - this transfer cannot be restarted"); log_print('error', $gzs, $line . "\n"); log_close('error', $gzs); } elsif (!$meta{e_find}) { # mark initialization done $meta{last} = 1; # initialize rmkdir size (use -1 for later special seek) $meta{rmkdir} = -1; } # mark transfers complete if no files after find $meta{time1} = $time if (!run()); } # update user db if (scalar(keys %mnts)) { while (my ($line, $op) = each %mnts) { # only add hosts that are not in global db next if (!$op->{host} || $mounts{shells}->{$op->{host}}); my $db = $mnts{"pbs_$op->{host}"} ? \%meta : \%umounts; $ustore = 1 if (!$mnts{"pbs_$op->{host}"}); if ($op->{args} eq 'shell') { $db->{shells}->{$op->{host}} = 1; } elsif ($op->{args} eq 'mount') { my $srv = "$op->{servers}:$op->{remote}"; $db->{"mounth_$srv"}->{$op->{host}} = 1; $db->{"mountl_$op->{host}:$srv"} = $op->{local}; $db->{"mountr_$op->{host}:$op->{local}"} = $srv; $db->{"mounto_$op->{host}:$op->{local}"} = $op->{opts}; } } } # update running rate averages for estimated completion while (my ($cmd, $rate) = each %rates) { $meta{"rc_$cmd"}++; $meta{"ra_$cmd"} *= ($meta{"rc_$cmd"} - 1) / $meta{"rc_$cmd"}; $meta{"ra_$cmd"} += $rate / $meta{"rc_$cmd"}; } } ################## #### put_meta #### ################## # begin metadata line or save given metadata and end line sub put_meta { my $meta = shift; my $file = "$opts{base}/meta"; open(FILE, '>>', $file); if (defined $meta) { my $mpmeta = dclone($meta); # convert Math::BigInt values to strings for storage $mpmeta->{$_} = $meta->{$_}->bstr foreach (grep(/^sd_/, keys %{$meta})); print FILE yenc_encode(compress(Data::MessagePack->pack($mpmeta)), ""), "]\n"; } else { print FILE "["; } close FILE; #TODO: handle errors; } ############# #### run #### ############# # return whether or not the current transfer is running sub run { my $expect = $meta{t_find} + $meta{tar_creat}; if (!$meta{'create-tar'} || $meta{last} && $meta{d_chattr} >= $meta{tar_creat}) { $expect += $meta{"t_$_"} foreach (qw(cp ln mkdir)); } # only count chattr errors when not tar_mv errors my $echattr = $meta{t_chattr} - $meta{d_chattr} <= $meta{tar_mv} ? 0 : $meta{e_chattr}; if ($meta{verify} && ($meta{sanity} || $meta{preserve})) { # expect sums for done cps and cksums for done sums $expect += $meta{"d_$_"} foreach (qw(cp sum)); # expect file chattrs for done cksums and done lns $expect += $meta{"d_$_"} foreach (qw(cksum ln)); # expect dir chattrs only when tar create or no other errors my $errs = sum(map {$meta{"e_$_"}} qw(cksum cp find ln mkdir sum)); $expect += $echattr + $errs && (!$meta{'create-tar'} || !$meta{last}) ? 0 : $meta{t_mkdir}; } elsif ($meta{verify}) { # expect sums for done cps and cksums for done sums $expect += $meta{"d_$_"} foreach (qw(cp sum)); } elsif ($meta{sanity} || $meta{preserve}) { # expect file chattrs for done cps and done lns $expect += $meta{"d_$_"} foreach (qw(cp ln)); # expect dir chattrs only when tar create or no other errors my $errs = sum(map {$meta{"e_$_"}} qw(cp find ln mkdir)); # when errs > 0 and rmkdir == 0, any chattr errors are from dirs # and not files, so should still expect t_mkdir dir chattrs $expect += $echattr + $errs && $meta{rmkdir} && !$meta{'create-tar'} ? 0 : $meta{t_mkdir}; } my $actual = sum(map {$meta{"d_$_"}} @stages); my $errs = sum(map {$meta{"e_$_"}} @stages); $actual += $errs; # expect tar_mv chattrs only when no other errors $expect += $echattr + $errs - $meta{e_chattr} ? 0 : $meta{tar_mv}; # running if actual operations differ from expected operations return ($expect != $actual); } ############### #### state #### ############### # return state of current/given transfer sub state { my $meta0 = shift; $meta0 = \%meta if (!defined $meta0); my $state = "run"; # compute number of operations in various states my $done = sum(map {$meta{"d_$_"}} @stages); my $error = sum(map {$meta{"e_$_"}} @stages); my $total = sum(map {$meta{"t_$_"}} @stages); # determine transfer state if ($meta{last} && defined $meta{time1} && $done == $total) { $state = "done"; } elsif ($meta{stop}) { $state = "stop"; } elsif ($meta{time1}) { $state = "error"; } else { if ($meta{w_run} > 0) { $state .= "+warn"; } if ($error > 0) { $state .= "+error"; } if (grep(/^throttled_/, keys(%meta))) { $state .= "+throttle"; } } $state .= "+alert" if ($meta{e_alert} || $meta{e_silent}); return $state; } ############### #### stats #### ############### # output table of consolidated stats across all transfers of invoking # user or all users if invoked as root sub stats { my $all; my %types; my %users; # define headers for each table type my %heads = ( Transfers => [qw(xfers local lan wan dirs files size sums ssize attrs hosts)], Rates => [qw(local_min local_max local_avg lan_min lan_max lan_avg wan_min wan_max wan_avg all_min all_max all_avg)], Tools => [qw(bbftp fish fish-tcp mcp msum rsync shift-chattr shift-cp shift-find shift-sum)], Options_1 => [qw(bandwidth buffer clients cpu create-tar exclude extract-tar files force host-list hosts include index-tar interval)], Options_2 => [qw(io ior iow local net netr netw newer no-cron no-mail no-offline no-preserve no-recall no-sanity no-silent no-verify)], Options_3 => [qw(older pipeline ports preallocate remote retry secure size split split-tar streams stripe sync threads verify-fast window)], Errors => [qw(corruption exception silent throttle chattr cksum cp host ln mkdir sum)], ); # define order in output my @order = qw(Transfers Rates Tools Options_1 Options_2 Options_3 Errors); # add tool errors push(@{$heads{Errors}}, @{$heads{Tools}}); $_ = "e_$_" foreach (@{$heads{Errors}}); $_ = "o_$_" foreach ( @{$heads{Options_1}}, @{$heads{Options_2}}, @{$heads{Options_3}}); if (!$opts{user} && $> == 0) { # replace %u with * to get stats from all users $conf{user_dir} =~ s/%u/*/g; } else { $opts{user} = getpwuid($<) if (!$opts{user}); $conf{user_dir} =~ s/%u/$opts{user}/g; } # compute totals over all transfers my @metas; my $dir = $conf{user_dir}; do { push(@metas, glob "$dir/*/meta"); $dir .= "/*.more"; } while (scalar(glob $dir)); foreach my $file (@metas) { # skip transfers that have expired my $mtime = (stat($file))[9]; next if ($mtime + $conf{data_expire} < $time); # retrieve metadata from file my %meta = %{get_meta($file)}; # derive transfer type my $type = "local"; if ($meta{origin} =~ /\Q$conf{email_domain}\E$/ && grep(/^picks_.*\Q$conf{email_domain}\E$/, keys %meta)) { # original client host is in local domain and remote host picked $type = "lan"; } elsif ($meta{origin} !~ /\Q$conf{email_domain}\E$/) { # original client host is not in local domain $type = "wan"; } # derive user from meta file my $user = $file; $user =~ s/.*\/(\w+)\.\d+\/meta/$1/; foreach (qw(e_corruption e_exception e_silent e_throttle)) { # add corruption/exception/silent totals even if transfer not done $all->{$_} += $meta{$_}; $users{$user}->{$_} += $meta{$_}; $types{$type}->{$_} += $meta{$_}; } # skip transfers that have not completed next if (!$meta{time1}); # transfer totals my %totals = ( attrs => $meta{d_chattr}, hosts => scalar(grep(/^last_/, keys %meta)), dirs => $meta{d_mkdir}, files => $meta{d_cp} + $meta{d_ln}, size => $meta{s_cp}, ssize => $meta{s_sum} + $meta{s_cksum}, sums => $meta{d_sum} + $meta{d_cksum}, xfers => 1, $type => 1, ); # tool operation totals and tool error totals foreach (@{$heads{Tools}}) { $totals{$_} = $meta{"d_$_"}; $totals{"e_$_"} = $meta{"e_$_"}; } # option totals # options that must differ from configured default foreach my $key (qw(buffer clients cpu files hosts interval io ior iow net netr netw ports retry size split split-tar streams stripe threads window)) { # parse some values in binary bytes instead of decimal bytes my $bin = $key =~ /^(?:buffer|split)$/ ? 2 : 0; $bin = 1 if ($key eq 'stripe'); my $default = parse_bytes($conf{"default_$key"}, $bin); $totals{"o_$key"} = defined $meta{$key} && $meta{$key} ne $default ? 1 : 0; } # options that must be inverted $totals{"o_no-offline"} = !$meta{offline} && !$meta{'create-tar'} && !$meta{'extract-tar'} ? 1 : 0; foreach (qw(cron mail preserve recall sanity silent verify)) { $totals{"o_no-$_"} = !$meta{$_} ? 1 : 0; } # normal options foreach (qw(create-tar exclude extract-tar host-list include index-tar local newer older pipeline remote secure sync verify-fast wait)) { $totals{"o_$_"} = $meta{$_} ? 1 : 0; } # error totals (corruption/exception/silent handled earlier) $totals{"e_$_"} = $meta{"e_$_"} foreach (@stages); $totals{e_host} = grep(/^nohost_/, keys %meta); # add transfer stats to totals per user, per type, and overall foreach my $head (keys %heads) { # rates must be processed differently next if ($head eq 'Rates'); foreach my $key (@{$heads{$head}}) { $all->{$key} += $totals{$key}; $users{$user}->{$key} += $totals{$key}; $types{$type}->{$key} += $totals{$key}; } } # compute rate for this transfer my $dtime = $meta{time1} - $meta{time0}; $dtime = 1 if ($dtime <= 0); my $rate = $meta{s_cp} / $dtime; # ignore rates of zero or rates using --sync next if (!$rate || $meta{sync}); # compute rates per user, per type, and overall foreach my $ref ($users{$user}, $types{$type}, $all) { $ref->{"$type\_max"} = max($rate, $ref->{"$type\_max"}); $ref->{"$type\_min"} = $ref->{"$type\_min"} ? min($rate, $ref->{"$type\_min"}) : $rate; $ref->{all_max} = max($rate, $ref->{all_max}); $ref->{all_min} = $ref->{all_min} ? min($rate, $ref->{all_min}) : $rate; # cumulative moving averages $ref->{"$type\_avg"} += (($rate - $ref->{"$type\_avg"}) / $ref->{$type}); $ref->{all_avg} += (($rate - $ref->{all_avg}) / $ref->{xfers}); } } # convert rates to human readable format foreach my $rate (@{$heads{Rates}}) { $all->{$rate} = format_bytes($all->{$rate}) . "/s" if ($all->{$rate}); foreach my $user (keys %users) { $users{$user}->{$rate} = format_bytes($users{$user}->{$rate}) . "/s" if ($users{$user}->{$rate}); } foreach my $type (keys %types) { $types{$type}->{$rate} = format_bytes($types{$type}->{$rate}) . "/s" if ($types{$type}->{$rate}); } } # convert sizes to human readable format foreach my $size (qw(size ssize)) { $all->{$size} = format_bytes($all->{$size}, 1); foreach my $user (keys %users) { $users{$user}->{$size} = format_bytes($users{$user}->{$size}, 1); } foreach my $type (keys %types) { $types{$type}->{$size} = format_bytes($types{$type}->{$size}, 1); } } # compute start and end dates my $date1 = strftime('%m/%d/%y', localtime($time - $conf{data_expire})); my $date2 = strftime('%m/%d/%y', localtime); # produce csv when requested (detailed error table not included) if ($opts{stats} eq 'csv') { my @heads = map {@{$heads{$_}}} @order; print join(",", "user", @heads), "\n"; # add row for each user foreach my $user (sort keys(%users)) { my @row = map {$users{$user}->{$_} || ""} @heads; # only print row if there is an actual non-empty value next if (!first {$_} @row); print join(",", $user, @row), "\n"; } # add row for each transfer type foreach my $type (qw(local lan wan)) { print join(",", $type, map {$types{$type}->{$_} || ""} @heads), "\n"; } # add overall totals print join(",", "all", map {$all->{$_} || ""} @heads), "\n"; return; } # print tables foreach my $head (@order) { my @heads = @{$heads{$head}}; print "$head per user ($date1 - $date2)\n\n"; # configure table headers my $t = Text::FormatTable->new("r" . " | r" x scalar(@heads)); $t->head("user", map {/^\w_/ ? substr($_, 2) : $_} @heads); $t->rule; # add row for each user my $rows = 0; foreach my $user (sort keys(%users)) { my @row = map {$users{$user}->{$_} || ""} @heads; # only print row if there is an actual non-empty value next if (!first {$_} @row); $t->row($user, @row); $rows++; } # add separator between user and type rows $t->rule; # add row for each transfer type foreach my $type (qw(local lan wan)) { $t->row($type, map {$types{$type}->{$_} || ""} @heads); } # add overall totals $t->row("all ($rows)", map {$all->{$_} || ""} @heads); # output final table print $t->render, "\n\n"; } # print error message table print "Error messages per user ($date1 - $date2)\n\n"; # configure table headers my $t = Text::FormatTable->new("r | r | l | l"); $t->head(qw(user id op target)); $t->head("", "", "tool", "message"); $t->rule; my $ulast; foreach my $file # sort by user.id (sort {(split(/\//, $a))[-2] cmp (split(/\//, $b))[-2]} @metas) { # untaint file $file = $1 if ($file =~ /^(.*)$/); # skip transfers that have expired my $mtime = (stat($file))[9]; next if ($mtime + $conf{data_expire} < $time); # retrieve metadata from file my %meta = %{get_meta($file)}; # skip transfers without errors next if (!$meta{error_size} && !$meta{e_exception}); # derive user and id from meta file my ($user, $id); if ($file =~ /.*\/(\w+)\.(\d+)\/meta/) { ($user, $id) = ($1, $2); } else { next; } my $count; # add all exceptions stored in metadata if ($meta{e_exception}) { foreach my $ex (grep(/^exception_/, keys %meta)) { # separate different users with line $t->rule if ($ulast && $user ne $ulast); # only print user and id once to reduce clutter $t->row($user ne $ulast ? $user : "", !$count ? $id : "", "-", $ex); $t->row("", "", "shiftc", unescape($meta{$ex})); $count++; $ulast = $user; } } # add up to three error messages stored in error file $file =~ s/meta$/error/; my $gz = Compress::BGZF::Reader->new_filehandle($file); next if (!$gz); # separate different users with line $t->rule if ($ulast && $user ne $ulast); foreach (1..3) { my $line; last if (!defined($line = $gz->getline)); $line =~ s/\s*\r?\n$//; my %op = split(/[= ]+/, $line); my @args = split(/,/, $op{args}); # only print user and id once to reduce clutter $t->row($user ne $ulast ? $user : "", !$count ? $id : "", $args[0], unescape($args[-1])); $t->row("", "", $op{tool}, unescape($op{text})); $count++; $ulast = $user; } my $log = basename($file); log_close($log, {$log => $gz}); } # output final table print $t->render; } ################ #### status #### ################ # output table of all transfers with status and statistics or # return single row when manager invoked with id option sub status { # configure table headers my $t = Text::FormatTable->new('r | l | r | r | r | r | r | r'); my @row = (qw(id state dirs files), "file size", qw(date run rate)); my @row2 = ("", "", "sums", "attrs", "sum size", "time", "left", ""); if ($opts{status} eq 'csv') { print join(",", @row, @row2), "\n"; } else { $t->head(@row); $t->head(@row2); $t->rule; } # sort by modification time of meta file my @metas; my @rows; my $dones; if ($opts{id}) { @metas = ("$opts{base}/meta"); } else { my $dir = $> != 0 ? $conf{user_dir} : $opts{user_dir}; my $user = $> != 0 ? $opts{user} : "*"; do { push(@metas, glob "$dir/$user.[0-9]*/meta"); $dir .= "/*.more"; } while (scalar(glob $dir)); } foreach my $file (sort {$> != 0 && !defined $opts{monitor} ? # sort by user name when root or --monitor invocation (stat $a)[9] <=> (stat $b)[9] : $a <=> $b} @metas) { my $id = $file; if ($> != 0) { $id =~ s/.*\.|\/meta//g; } else { # ignore old transfers next if ((stat $file)[9] + $conf{data_expire} < $time); # leave user name in id $id =~ s/.*\/([\w-]+\.\d+)\/meta/$1/g; } # retrieve metadata from file %meta = %{get_meta($file)} if (!$opts{id} || $opts{monitor}); my $state = state(); my $color = "green"; if ($state =~ /done/) { $color = $state eq 'done+alert' ? "magenta" : "reset"; $dones++; } elsif ($state eq 'error') { $color = "red"; } elsif ($state eq 'stop') { my $base = $file; $base =~ s/(\/[^\/]+.more|meta)//g; $color = (-e "${base}no_restart" ? "bold " : "") . "cyan"; } elsif ($state =~ /warn|error/) { $color = "yellow"; } elsif ($state =~ /throttle/) { $color = "blue"; } # skip transfers that do not match the given state next if ($opts{state} && $state !~ /(?:^|\+)\Q$opts{state}\E(?:$|\+)/); my $time1 = defined $meta{time1} ? $meta{time1} : $time; # add first row for each transfer with bulk of info my $rate = $time1 - $meta{time0} ? $meta{s_cp} / ($time1 - $meta{time0}) : $meta{s_cp}; my @row = ($id, $state, "$meta{d_mkdir}/$meta{t_mkdir}" . ($meta{last} ? "" : "+"), ($meta{d_cp} + $meta{d_ln}) . "/" . ($meta{t_cp} + $meta{t_ln}) . ($meta{last} ? "" : "+"), format_bytes($meta{s_cp}) . "/" . format_bytes($meta{s_total}) . ($meta{last} ? "" : "+"), strftime('%m/%d', localtime($meta{time0})), format_seconds($time1 - $meta{time0}), format_bytes($rate) . "/s"); my $left; if ($rate && $meta{last} && !$meta{time1}) { # add estimated time to completion my $ncli = max(1, scalar(grep(/^doing_/, keys %meta))); my $rate1 = $meta{ra_cp} ? $meta{ra_cp} : $rate * $ncli; # add time for cps, sums, cksums foreach my $cmd (qw(cp sum cksum)) { # skip if no operations of this type needed next if ($meta{"t_$cmd"} == 0); # use previous rate for same operation type if available $rate1 = $meta{"ra_$cmd"} if ($meta{"ra_$cmd"}); # use rate for previous operation type otherwise $left += ($meta{s_total} - $meta{"s_$cmd"}) / $rate1 / $ncli; } foreach my $cmd (qw(mkdir chattr)) { # use previous rate for chattrs (or 100/s when not available) $rate1 = $meta{"ra_$cmd"} ? $meta{"ra_$cmd"} : 100; $left += ($meta{"t_$cmd"} - $meta{"d_$cmd"}) / $rate1 / $ncli; } # add time for non-cp manager calls # use previous rate for mgr calls (or 1/s when not available) $rate1 = $meta{ra_mgr} ? 1 / $meta{ra_mgr} : 1; foreach (qw(chattr cksum sum)) { $left += ($meta{"t_$_"} - $meta{"d_$_"} - $meta{"e_$_"}) / $meta{files} / $rate1 / $ncli; } $left = format_seconds($left); } my $s_total = $meta{verify} ? 2 * $meta{s_total} : 0; # add second row for each transfer with sums, attrs and sum size my @row2 = ("", "", ($meta{d_sum} + $meta{d_cksum}) . "/" . ($meta{t_sum} + $meta{t_cksum}) . ($meta{last} ? "" : "+"), "$meta{d_chattr}/$meta{t_chattr}" . ($meta{last} ? "" : "+"), format_bytes($meta{s_sum} + $meta{s_cksum}) . "/" . format_bytes($s_total) . ($meta{last} ? "" : "+"), strftime('%R', localtime($meta{time0})), $left, ""); if ($opts{status} eq 'csv') { print join(",", @row, @row2), "\n"; } elsif ($opts{status} eq 'color') { # prevent warnings due to empty columns local $SIG{__WARN__} = sub {}; push(@rows, [map {colored($_, $color)} @row]); push(@rows, [map {colored($_, $color)} @row2]); } else { push(@rows, \@row, \@row2); } } # csv output has already been printed by this point return if ($opts{status} eq 'csv'); if (scalar(@metas) > $conf{status_lines}) { if ($dones && $dones < scalar(@metas) - $conf{status_lines}) { # leave at least one completed transfer in output $dones--; } elsif ($dones > scalar(@metas) - $conf{status_lines}) { # skip older completed transfers beyond configured output limit $dones = scalar(@metas) - $conf{status_lines}; } } my $skip = $> != 0 && $dones && !$opts{id} && !$opts{state} && scalar(@metas) > $conf{status_lines} ? $dones : 0; for (my $i = 0; $i < scalar(@rows); $i += 2) { next if ($skip && $rows[$i]->[1] =~ /^done/ && $dones-- > 0); # add saved rows into table $t->row(@{$rows[$i]}); $t->row(@{$rows[$i + 1]}); } # return/output final table depending on id option $opts{id} ? return $t->render : print $t->render; # notify user when completed transfers not shown print "\n" . ucfirst("$skip completed transfer(s) omitted ") . "(show using \"--status --state=done\")\n" if ($skip); return scalar(@rows) + 3; } ################# #### sync_id #### ################# # return id containing sequence of updates from each manager sub sync_id { my @order = sort($self, $conf{sync_host}); my $bit = $order[0] eq 'self' ? 'z' : 'o'; my $id = $meta{sync_id}; $id = $meta{time0} if (!defined $id); if ($id =~ /$bit(\d+)$/) { my $n = $1 + 1; $id =~ s/\d+$/$n/; } else { $id .= $bit . "1"; } return $id; } #################### #### sync_local #### #################### # process files on sync queue to configured sync host sub sync_local { my $sbase = "$conf{user_dir}/$opts{user}.sync"; my $sfile = $sbase; my $sdoing = (glob "${sbase}_*")[0]; if ($sdoing =~ /_(\d+)$/) { # doing file may exist if previous sync was interrupted my $pid = $1; # check not already running my $run = open3_get([-1, undef, -1], "ps -o command -p $pid"); return if ($run =~ /shift-mgr/); $sfile = $sdoing; } # lock sync queue lock_dir(0); # process may have finished in meantime $sfile = $sbase if (! -e $sfile); my $spid = "${sbase}_$$"; # untaint files $sfile = $1 if ($sfile =~ /^(.*)$/); $spid = $1 if ($spid =~ /^(.*)$/); rename($sfile, $spid); lock_dir(0, 1); open(SYNCQ, '<', $spid) or return; my $fhpid = open3_run([undef, undef, -1], "ssh $conf{sync_host} shift-mgr --sync"); my ($out, $in) = ($fhpid->[0], $fhpid->[1]); my $rc0 = sync_return($in); while (my $cmd = ) { if ($cmd =~ /^\[(.*)\]$/) { $cmd = $1; } else { # sync_queue was interrupted and line is invalid next; } if ($cmd =~ /\s-1$/) { $out->write("#" . $cmd . "\n### 200\n"); sync_return($in); } elsif ($cmd =~ /\s-2$/) { # get sync history of sync host $out->write("#" . $cmd . "\n### 200\n"); my $rsid = sync_return($in); # get sync history of given transfer my $dir = $cmd; $dir =~ s/\s.*//; $dir = unescape($dir); my $mfile = "$conf{user_dir}/$dir/meta"; $opts{base} = "$conf{user_dir}/$dir"; lock_dir(1); # retrieve sync id after possibly (if needed) reverting meta my $meta1 = get_meta(); my %files = map {$_ => [(stat $_)[2,7]]} glob("$opts{base}/*"); my $lcp = $meta1->{sync_id}; # find longest common prefix (LCP) chop $lcp while ($rsid !~ /^\Q$lcp\E/); # find trailing digits of LCP my $n = $lcp =~ /(\d+)$/ ? $1 : 0; # find trailing digits beyond LCP of local and remote side my $l = $meta1->{sync_id} =~ /^\Q$lcp\E(\d+)/ ? $1 : undef; my $r = $rsid =~ /^\Q$lcp\E(\d+)/ ? $1 : undef; # find trailing updates after trailing digits my $tail = $meta1->{sync_id} =~ /\Q$lcp$l\E(.*)/ ? $1 : undef; # number of meta updates needed is sum of values in tail and # any positive difference between local and remote trailing values my $last = sum(split(/[zo]/, $tail)) + 1; $last += (($n.$l) - ($n.$r)) if ($n.$l > $n.$r); my ($fmeta, $mmeta); if ($last > 1) { $fmeta = get_meta($mfile, $last); $mmeta = get_meta($mfile, $last - 1); } lock_dir(1, 1); next if ($last == 1); # sync [ in meta to begin update my $moff = $mmeta->{meta_size}; $moff-- if ($moff); $moff = 0 if (!defined $moff); $out->write("#" . escape("$dir/meta") . " $moff 1 $files{$mfile}->[0]\n"); my $rc = sync_local_io($in, $out, "$dir/meta", $moff, 1); # abort on error (remote meta will be reverted on next sync/get) next if (ref $rc); #TODO: add meta to end of this and remove final meta part foreach my $file (keys %files) { # links must be rebuilt and meta is done separately next if ($file =~ /(?:links|meta|mon_\S+)$/); my $off = $fmeta->{basename($file) . "_size"}; $off = 0 if (!defined $off); my ($mode, $size) = @{$files{$file}}; # don't sync if file has not changed my $len = $size - $off; next if (!$len); $file =~ s/^$conf{user_dir}\///; $out->write("#" . escape($file) . " $off $len $mode\n"); $rc = sync_local_io($in, $out, $file, $off, $len); last if (ref $rc); } # abort on error (remote meta will be reverted on next sync/get) next if (ref $rc); # sync rest of meta including trailing ] to finish update my $len = $files{$mfile}->[1] - $mmeta->{meta_size} - 1; $moff++; $out->write("#" . escape("$dir/meta") . " $moff $len $files{$mfile}->[0]\n"); $rc = sync_local_io($in, $out, "$dir/meta", $moff, $len); } else { $out->write("#" . $cmd . "\n"); my $rc = sync_local_io($in, $out, split(/\s+/, $cmd)); last if (ref $rc); } } close SYNCQ; $out->write("#exit\n") if (!ref $rc0); open3_wait($fhpid); unlink $spid; } ####################### #### sync_local_io #### ####################### # perform local side of sync and return result or return error message in hash sub sync_local_io { my ($in, $out, $file, $off, $len) = @_; my $fh = IO::File->new("$conf{user_dir}/$file", O_RDONLY); my $err; if (!defined $fh) { $err = {error => "Error opening $file: $!"}; # remove newlines so doesn't interfere with protocol $err->{error} =~ s/\n//g; $out->write("### 500 $err->{error}: $!\n"); } else { $out->write("### 100\n"); } my $rc = sync_return($in); return (ref $err ? $err : $rc) if (ref $err || ref $rc); # assume seek works $fh->seek($off, 0); $rc = undef; my $size = 4 * 1048576; while ($len > 0) { $size = $len if ($len < $size); my $buf; my $n = $fh->sysread($buf, $size); last if ($n < $size); $out->write("### 200\n"); $out->write($buf); $len -= $n; } $fh->close; if ($len > 0) { $rc = {error => "Error reading $file: $!"}; # remove newlines so doesn't interfere with protocol $rc->{error} =~ s/\n//g; $out->write("### 500 $rc->{error}\n"); sync_return($in); } else { $out->write("### 200\n"); $rc = sync_return($in); } return $rc; } #################### #### sync_queue #### #################### # place given file or metadata of current transfer on sync queue sub sync_queue { my $file = shift; my $err; # lock sync queue lock_dir(0); open(SYNCQ, '>>', "$conf{user_dir}/$opts{user}.sync"); $err = 1 if (!print SYNCQ "\n["); if ($file && ! -e "$conf{user_dir}/$file") { $err = 1 if (!print SYNCQ escape($file) . " -1"); } elsif ($file) { my @stat = stat "$conf{user_dir}/$file"; $err = 1 if (!print SYNCQ escape($file) . " 0 $stat[7] $stat[2]"); } else { my $dir = $opts{base}; $dir =~ s/^$conf{user_dir}\///; $err = 1 if (!print SYNCQ escape($dir) . " -2"); } # do not write trailing ] on write error to force invalid sync entry print SYNCQ "]" if (!$err); close SYNCQ; lock_dir(0, 1); } ##################### #### sync_remote #### ##################### # initiate fish protocol and perform each transfer given on STDIN sub sync_remote { $SIG{'CHLD'} = 'IGNORE'; my $in = \*STDIN; my $out = \*STDOUT; $out->autoflush(1); # indicate running $out->write("### 200\n"); while (defined($_ = $in->getline)) { s/^\s+|\s+$//g; next if (!s/^#//); my @args = map {unescape($_)} split(/\s+/); exit if (scalar(@args) == 1 && $args[0] eq 'exit'); # untaint arguments @args = map {/(.*)/ ? $1 : $_} @args; $args[0] = "$conf{user_dir}/$args[0]"; if ($args[1] == -1) { # file/dir does not exist on client so remove rmtree($args[0]); $out->write("### 200\n"); sync_return($in); } elsif ($args[1] == -2) { $opts{base} = $args[0]; lock_dir(1); # return sync id after possibly (if needed) reverting meta my $meta = get_meta(); lock_dir(1, 1); $out->write("$meta->{sync_id}\n### 200\n"); sync_return($in); } else { sync_remote_io($in, $out, @args); } } } ######################## #### sync_remote_io #### ######################## # perform remote side of sync and return result or return error message in hash sub sync_remote_io { my ($in, $out, $file, $off, $len, $mode) = @_; truncate($file, $off); # create implicit directories eval {mkpath(dirname($file), {mode => 0755})}; my $fh = IO::File->new($file, O_WRONLY | O_CREAT); my $err; if (!defined $fh) { $err = {error => "Error opening $file: $!"}; } elsif (defined $off && !$fh->seek($off, 0)) { $fh->close; $err = {error => "Error seeking $file: $!"}; } if ($err) { # remove newlines so doesn't interfere with protocol $err->{error} =~ s/\n//g; $out->write("### 500 $err->{error}\n"); } else { $out->write("### 100\n"); } my $rc = sync_return($in); return (ref $err ? $err : $rc) if (ref $err || ref $rc); $rc = undef; my $size = 4 * 1048576; while ($len > 0) { $size = $len if ($len < $size); $rc = sync_return($in); if (ref $rc) { $fh->close; $out->write("### 500 $rc->{error}\n"); return $rc; } my $buf; my $n = $in->read($buf, $size); last if ($n < $size); $fh->syswrite($buf); $len -= $n; } $fh->close; chmod($mode & 07777, $file); if ($len > 0) { $rc = {error => "Error reading $file: $!"}; # remove newlines so doesn't interfere with protocol $rc->{error} =~ s/\n//g; $out->write("### 500 $rc->{error}\n"); sync_return($in); # revert to original size truncate($file, $off); } else { $out->write("### 200\n"); $rc = sync_return($in); } return $rc; } ##################### #### sync_return #### ##################### # parse fish return values and return text or return error message in hash sub sync_return { my $in = shift; my $text; while (defined($_ = $in->getline)) { if (/^###\s+(\d+)(.*)/) { if ($1 != 200 && $1 != 100) { return {error => $2}; } else { $text =~ s/\s+$//; return $text; } } else { $text .= $_; } } return {error => "Invalid protocol return"}; } ################## #### throttle #### ################## # return amount of time transfer should sleep based on configured limits sub throttle { my %cli_load = split(/[= ]+/, $meta{"load_$opts{host}$opts{cid}"}); my $sleep = 0; # disk throttling foreach my $used (grep(/^used_/, keys %cli_load)) { my $disk = $used; $disk =~ s/^used/disk/; my $left = $used; $left =~ s/^used/left/; # avoid divide by zero my $pct = eval{100 * $cli_load{$used} / ($cli_load{$used} + $cli_load{$left})}; if (defined $meta{$disk} && $pct <= $meta{$disk}) { # load has become less than lower threshold delete $meta{$disk}; } elsif (defined $meta{$disk}) { # load still higher than lower threshold $sleep = 300; } # note that {disk} is intentionally not {$disk} foreach my $hl ($meta{disk}, $conf{"throttle_$disk"}) { next if ($hl !~ /^(\d+):(\d+)$/); my ($high, $low) = split(/:/, $hl); if ($pct >= $high) { # load has become higher than upper threshold $meta{$disk} = $low; $sleep = 300; } } } # only throttle further when there was some load generated return $sleep if ($cli_load{ratio} <= 0); my @cli_keys = grep(/^(cpu|io[rw]?|net[rw]?)$/, keys %meta); my @user_keys = grep(/^throttle_\w+_user/, keys %conf); my @fshost_keys = grep(/^throttle_\w+_(fs|host)/, keys %conf); # only throttle further when configured return $sleep if (!scalar(@cli_keys) && !scalar(@user_keys) && !scalar(@fshost_keys)); # compute new load for this transfer since its global data not updated yet my %my_loaddb = %{mp_retrieve("$conf{user_dir}/$opts{user}.load")}; my %my_load = split(/[= ]+/, $my_loaddb{"next_id_$opts{id}$opts{cid}_$opts{host}"}); $cli_load{time} = 1 if (!$cli_load{time}); # convert sizes to MB/s and scale by actual/estimated ratio $my_load{$_} = $cli_load{ratio} * $my_load{$_} / 1E6 / $cli_load{time} foreach (keys %my_load); $my_load{time} = $cli_load{time}; # client throttling foreach my $key (@cli_keys) { next if ($meta{$key} <= 0); my $metric = $key; # count both r/w cases when r/w not specified $metric .= "." if ($metric eq 'io' || $metric eq 'net'); my $total; $total += $my_load{$_} foreach (grep(/^$metric\_host_/, keys %my_load)); # sleep amount necessary to bring average to specified limit my $tmp = ($total / $meta{$key} - 1) * $my_load{time}; $sleep = $tmp if ($tmp > $sleep); } # user throttling my $my_key = "id_$opts{id}$opts{cid}_$opts{host}"; foreach my $key (@user_keys) { next if ($conf{$key} <= 0); if ($key =~ /^throttle_([a-z]+)_user(?:_(\S+))?$/) { my ($metric, $user) = ($1, $2); # only throttle if limit relevant to this user next if ($user && $user ne $opts{user}); # count both r/w cases when r/w not specified $metric .= "." if ($metric eq 'io' || $metric eq 'net'); my @id_vals; my $id_load; my $my_index; # compute relevant load for all transfers of user foreach my $id_key (grep(/^id_/, keys %my_loaddb)) { if ($id_key eq $my_key) { # use current computed load for this transfer $my_index = scalar(@id_vals); $id_load = \%my_load; } else { # all other transfers based on global load data $id_load = {split(/[= ]+/, $my_loaddb{$id_key})}; } my $val; # value may be based on multiple items when r/w not given $val += $id_load->{$_} foreach (grep(/^$metric\_host_/, keys %{$id_load})); push(@id_vals, $val); } # only throttle if combined load of all transfers is above limit next if (!scalar(@id_vals) || sum(@id_vals) <= $conf{$key}); # each transfer initially gets an equal share of the load limit my $per_id = $conf{$key} / scalar(@id_vals); my ($extra, $n_extra); # determine if any transfers are not using their entire share foreach (@id_vals) { my $tmp = $per_id - $_; if ($tmp > 0) { $extra += $tmp; $n_extra++; } } # adjust per transfer limit by dividing up unused shares $per_id += $extra / (scalar(@id_vals) - $n_extra); # sleep amount necessary to bring average to specified limit my $tmp = ($id_vals[$my_index] / $per_id - 1) * $cli_load{time}; $sleep = $tmp if ($tmp > $sleep); } } # fs/host throttling my %all_loaddb; if (scalar(@fshost_keys)) { # consolidate the load info from all users foreach my $file (glob "$opts{user_dir}/*.load") { my $user = $file; $user =~ s/.*\/|\.load$//g; my %loaddb = %{mp_retrieve($file)}; # ignore the ^next_ load fields $all_loaddb{"$user\_$_"} = $loaddb{$_} foreach (grep(/^id_/, keys %loaddb)); } } $my_key = "$opts{user}_$my_key"; foreach my $key (@fshost_keys) { next if ($conf{$key} <= 0); if ($key =~ /^throttle_([a-z]+)_(fs|host)(?:_(\S+))?$/) { my ($metric, $type, $type_val) = ($1, $2, $3); # count both r/w cases when r/w not specified $metric .= "." if ($metric eq 'io' || $metric eq 'net'); # only throttle if limit relevant to this transfer next if ($type_val && !grep(/^$metric\_$type\_$type_val$/, keys %my_load)); # compute the fs/host values applicable to this transfer my %my_type_vals; if ($type_val) { # use specified value when given $my_type_vals{$type_val} = 1; } else { # use all fs/host values in this transfer when no value given foreach (grep(/^$metric\_$type\_/, keys %my_load)) { my $val = $_; $val =~ s/^$metric\_$type\_//; $my_type_vals{$val} = 1; } } foreach my $my_type_val (keys %my_type_vals) { my @all_users; my @all_vals; my $all_load; my ($my_index, $my_user_index1, $my_user_index2, $prev_user); # compute relevant load for all transfers foreach my $all_key (sort(keys %all_loaddb)) { if ($all_key eq $my_key) { # use current computed load for this transfer $my_index = scalar(@all_vals); $all_load = \%my_load; } else { # all other transfers based on global load data $all_load = {split(/[= ]+/, $all_loaddb{$all_key})}; } my $user = $all_key; $user =~ s/_id_.*//g; # store where each user's transfer begin in load list if ($prev_user ne $user) { $my_user_index1 = scalar(@all_vals) if ($user eq $opts{user}); $my_user_index2 = scalar(@all_vals) if ($prev_user eq $opts{user}); push(@all_users, scalar(@all_vals)); $prev_user = $user; } my $val; # value may be based on multiple items when r/w not given $val += $all_load->{$_} foreach (grep(/^$metric\_$type\_$my_type_val$/, keys %{$all_load})); push(@all_vals, $val); } # only throttle if combined load of all transfers is above limit next if (!scalar(@all_vals) || sum(@all_vals) <= $conf{$key}); # each user initially gets an equal share of the load limit my $per_user = $conf{$key} / scalar(@all_users); $my_user_index2 = scalar(@all_vals) if (!defined $my_user_index2); # no throttling needed if this user is under per_user limit next if (sum(@all_vals[ $my_user_index1 .. $my_user_index2 - 1]) <= $per_user); # add extra index for processing of last user push(@all_users, scalar(@all_vals)); my $index1 = shift @all_users; my ($extra, $n_extra); # determine if any users are not using their entire share foreach my $index2 (@all_users) { my $tmp = $per_user - sum(@all_vals[$index1 .. $index2 - 1]); if ($tmp > 0) { $extra += $tmp; $n_extra++; } $index1 = $index2; } # adjust per user limit by dividing up unused shares $per_user += $extra / (scalar(@all_vals) - $n_extra); # each transfer initially gets an equal share of the user limit my $per_id = $per_user / ($my_user_index2 - $my_user_index1); ($extra, $n_extra) = (0, 0); # determine if any transfers are not using their entire share foreach (@all_vals[$my_user_index1 .. $my_user_index2 - 1]) { my $tmp = $per_id - $_; if ($tmp > 0) { $extra += $tmp; $n_extra++; } } # adjust per transfer limit by dividing up unused shares $per_id += $extra / ($my_user_index2 - $my_user_index1 - $n_extra); # sleep amount necessary to bring average to specified limit my $tmp = ($all_vals[$my_index] / $per_id - 1) * $cli_load{time}; $sleep = $tmp if ($tmp > $sleep); } } } # eliminate fractions $sleep = int($sleep + 0.5) if ($sleep); return $sleep; } ##################### #### track_cache #### ##################### # update i/o stats for clients and servers used to mitigate cache effects sub track_cache { my ($op, $cmd, $args) = @_; my @srv = ((split(/,/, $op->{srcfs}))[-1], (split(/,/, $op->{dstfs}))[-1]); # keep track of file system stats for plot by file system if ($cmd =~ /(?:find|mkdir|ln|chattr)/) { $meta{"d_${cmd}_$srv[-1]"}++; } elsif ($cmd eq 'cp') { $meta{"s_get_$srv[0]"} += $op->{size}; $meta{"s_put_$srv[1]"} += $op->{size}; } # file system stats for sum/cksum done later return if ($cmd !~ /(?:cksum|cp|sum)/); my $host = $opts{host}; if ($args->[0] =~ /^([^\/]+)%3A/) { # remote src so sum done on dst and cksum done on src $host = $1; @srv = reverse @srv; } elsif ($args->[1] =~ /^([^\/]+)%3A/) { # remote dst $host = $1; } if ($cmd =~ /(?:cp|sum)/) { # sum and one half of cp always done on local host $nload{"io_fs_$srv[0]"} += $op->{size}; $nload{"io_host_$opts{host}"} += $op->{size}; # keep track of file system stats for plot by file system $meta{"s_${cmd}_$srv[0]"} += $op->{size} if ($cmd eq 'sum'); } if ($cmd =~ /(?:cksum|cp)/) { # cksum and one half of cp always done on dst (or remote src) $nload{"io_fs_$srv[1]"} += $op->{size}; $nload{"io_host_$host"} += $op->{size}; # keep track of file system stats for plot by file system $meta{"s_${cmd}_$srv[1]"} += $op->{size} if ($cmd eq 'cksum'); } return if ($cmd ne 'cp' || !defined $conf{cache_size_client} || !defined $conf{cache_size_server} || !defined $conf{cache_time_client} || !defined $conf{cache_time_server}); # store cache properties for utilization during get() $op->{cache_time} = $time; $op->{cache_client} = $opts{host}; $op->{cache_server} = $srv[0]; if (!$ioall{init}) { # consolidate the load info from all users foreach my $file (glob "$opts{user_dir}/*.load") { my $user = $file; $user =~ s/.*\/|\.load$//g; my %loaddb = %{mp_retrieve($file)}; $ioall{$_} += $loaddb{$_} foreach (grep(/^io_/, keys %loaddb)); } $ioall{init} = 1; } $op->{cache_client_io} = $ioall{"io_host_$op->{cache_client}"} + $nload{"io_host_$op->{cache_client}"}; $op->{cache_server_io} = $ioall{"io_fs_$op->{cache_server}"} + $nload{"io_fs_$op->{cache_server}"}; if ($host ne $opts{host}) { $op->{cache_rclient} = $host; $op->{cache_rclient_io} = $ioall{"io_host_$op->{cache_rclient}"} + $nload{"io_host_$op->{cache_rclient}"}; } if ($srv[0] ne $srv[1]) { $op->{cache_rserver} = $srv[1]; $op->{cache_rserver_io} = $ioall{"io_fs_$op->{cache_rserver}"} + $nload{"io_fs_$op->{cache_rserver}"}; } } ################## #### unescape #### ################## # return uri-unescaped version of given string sub unescape { my $text = shift; $text =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg if (defined $text); return $text; } ##################### #### yenc_decode #### ##################### # based on Convert::BulkDecoder by Johan Vromans (artistic license) sub yenc_decode { $_ = shift; s/=(.)/chr(ord($1) + (256 - 64) & 255)/ge; tr{\000-\377}{\326-\377\000-\325}; return $_; } ##################### #### yenc_encode #### ##################### # based on Convert::BulkDecoder by Johan Vromans (artistic license) sub yenc_encode { $_ = shift; tr{\326-\377\000-\325}{\000-\377}; s/([\x00\x0A\x0D\x3D\x5B\x5D])/"=" . chr(ord($1) + 64 & 255)/ge; return $_; } # This chunk of stuff was generated by App::FatPacker. To find the original # file's code, look for the end of this BEGIN block or the string 'FATPACK' BEGIN { my %fatpacked; $fatpacked{"Compress/BGZF.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'COMPRESS_BGZF'; package Compress::BGZF 0.006;use 5.012;use strict;use warnings;1; COMPRESS_BGZF $fatpacked{"Compress/BGZF/Reader.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'COMPRESS_BGZF_READER'; package Compress::BGZF::Reader;use strict;use warnings;use Carp;use Compress::Zlib;use List::Util qw/sum/;use FileHandle;use constant BGZF_MAGIC=>pack "H*",'1f8b0804';use constant HEAD_BYTES=>12;use constant FOOT_BYTES=>8;sub TIEHANDLE {Compress::BGZF::Reader::new(@_)}sub READ {Compress::BGZF::Reader::_read(@_)}sub READLINE {Compress::BGZF::Reader::getline(@_)}sub SEEK {Compress::BGZF::Reader::_seek(@_)}sub CLOSE {close $_[0]->{fh}}sub TELL {return $_[0]->{u_offset}}sub EOF {return $_[0]->{buffer_len}==-1}sub FILENO {return fileno $_[0]->{fh}}sub usize {return $_[0]->{u_file_size}};sub new_filehandle {my ($class,$fn_in)=@_;croak "input filename required" if (!defined$fn_in);my$fh=FileHandle->new;tie *$fh,$class,$fn_in or croak "failed to tie filehandle";return$fh}sub new {my ($class,$fn_in)=@_;my$self=bless {},$class;$self->{fn_in}=$fn_in or croak "Input name required";open my$fh,'<:raw',$fn_in or croak "Failed to open input file";$self->{fh}=$fh;$self->{buffer}='';$self->{buffer_len}=0;$self->{block_offset}=0;$self->{buffer_offset}=0;$self->{block_size}=0;$self->{file_size}=-s $fn_in;$self->{u_offset}=0;$self->{u_file_size}=0;if (-e "$fn_in.gzi"){$self->_load_index("$fn_in.gzi")}else {$self->_generate_index()}$self->_load_block();return$self}sub _load_block {my ($self,$block_offset)=@_;$self->{buffer_offset}=0;return if (defined$block_offset && $block_offset==$self->{block_offset});if (!defined$block_offset){$block_offset=$self->{block_offset}+ $self->{block_size}}$self->{block_offset}=$block_offset;croak "Read past file end (perhaps corrupted/truncated input?)" if ($self->{block_offset}> $self->{file_size});if ($self->{block_offset}==$self->{file_size}){$self->{buffer}='';$self->{buffer_len}=-1;return}sysseek$self->{fh},$self->{block_offset},0;my ($block_size,$uncompressed_size,$content)=$self->_unpack_block(1);$self->{block_size}=$block_size;$self->{buffer_len}=$uncompressed_size;$self->{buffer}=$content;return}sub _unpack_block {my ($self,$do_unpack)=@_;my@return_values;my ($magic,$mod,$flags,$os,$len_extra)=unpack 'A4A4CCv',_safe_sysread($self->{fh},HEAD_BYTES);my$t=sysseek$self->{fh},0,1;croak "invalid header at $t (corrupt file or not BGZF?)" if ($magic ne BGZF_MAGIC);my$block_size;my$l=0;while ($l < $len_extra){my ($field_id,$field_len)=unpack 'A2v',_safe_sysread($self->{fh},4);if ($field_id eq 'BC'){croak "invalid BC length" if ($field_len!=2);croak "multiple BC fields" if (defined$block_size);$block_size=unpack 'v',_safe_sysread($self->{fh}=>$field_len);$block_size += 1}$l += 4 + $field_len}croak "invalid extra field length" if ($l!=$len_extra);croak "failed to read block size" if (!defined$block_size);push@return_values,$block_size;my$payload_len=$block_size - HEAD_BYTES - FOOT_BYTES - $len_extra;my$content;if ($do_unpack){my$payload=_safe_sysread($self->{fh},$payload_len);my ($i,$status)=inflateInit(-WindowBits=>-&MAX_WBITS());croak "Error during inflate init\n" if ($status!=Z_OK);($content,$status)=$i->inflate($payload);croak "Error during inflate run\n" if ($status!=Z_STREAM_END);my$crc_given=unpack 'V',_safe_sysread($self->{fh}=>4);croak "content CRC32 mismatch" if ($crc_given!=crc32($content))}else {sysseek$self->{fh},$payload_len + 4,1}my$size_given=unpack 'V',_safe_sysread($self->{fh}=>4);croak "content length mismatch" if (defined$content && $size_given!=length($content));push@return_values,$size_given;push@return_values,$content if (defined$content);return@return_values}sub read_data {my ($self,$bytes)=@_;my$r=$self->_read(my$buffer,$bytes);carp "received fewer bytes than requested" if ($r < $bytes && $self->{buffer_len}> -1);$buffer=undef if ($r < 1);return$buffer}sub _read {my$self=shift;my$buf=\shift;my$bytes=shift;my$offset=shift;my$prefix='';if (defined$offset && $offset!=0){$prefix=substr $$buf,0,$offset;$prefix .= "\0" x ($offset - length($$buf))if ($offset > length($$buf))}$$buf='';ITER: while (length($$buf)< $bytes){my$l=length($$buf);my$remaining=$bytes - $l;if ($self->{buffer_offset}+ $remaining <= $self->{buffer_len}){$$buf .= substr$self->{buffer},$self->{buffer_offset},$remaining;$self->{buffer_offset}+= $remaining;$self->_load_block()if ($self->{buffer_offset}==$self->{buffer_len})}else {last ITER if ($self->{buffer_len}< 0);$$buf .= substr$self->{buffer},$self->{buffer_offset};$self->_load_block()}}my$l=length($$buf);$self->{u_offset}+= $l;$$buf=$prefix .$$buf;return$l}sub getline {my ($self)=@_;my$data='';while (1){last if ($self->{buffer_len}< 0);pos($self->{buffer})=$self->{buffer_offset};if ($self->{buffer}=~ m|$/|g){my$pos=pos$self->{buffer};$data .= substr$self->{buffer},$self->{buffer_offset},$pos - $self->{buffer_offset};$self->{buffer_offset}=$pos;$self->_load_block if ($pos==$self->{buffer_len});$self->{u_offset}+= length($data);last}$data .= substr$self->{buffer},$self->{buffer_offset};$self->_load_block}return length($data)> 0 ? $data : undef}sub write_index {my ($self,$fn_out)=@_;croak "missing index output filename" if (!defined$fn_out);$self->_generate_index()if (!defined$self->{idx});my@offsets=@{$self->{idx}};shift@offsets;open my$fh_out,'>:raw',$fn_out;print {$fh_out}pack('Q<',scalar(@offsets));for (@offsets){print {$fh_out}pack('Q<',$_->[0]);print {$fh_out}pack('Q<',$_->[1])}close$fh_out;return}sub _load_index {my ($self,$fn_in)=@_;croak "missing index input filename" if (!defined$fn_in);open my$fh_in,'<:raw',$fn_in or croak "error opening index";read($fh_in,my$n_offsets,8)or croak "failed to read first quad";$n_offsets=unpack 'Q<',$n_offsets;my@idx;for (0..$n_offsets-1){read($fh_in,my$buff,16)or croak "error reading index";$idx[$_]=[unpack 'Q{u_file_size}=$idx[-1]->[1];my$c_size=$idx[-1]->[0];sysseek$self->{fh},$idx[-1]->[0],0;my ($c,$u)=$self->_unpack_block(0);$self->{u_file_size}+= $u;$c_size += $c;while ($c_size < $self->{file_size}){push@idx,[$idx[-1]->[0]+$c,$idx[-1]->[1]+$u];sysseek$self->{fh},$idx[-1]->[0],0;($c,$u)=$self->_unpack_block(0);$self->{u_file_size}+= $u;$c_size += $c}croak "Unexpected file size/last index mismatch ($c_size v $self->{file_size})" if ($c_size!=$self->{file_size});$self->{idx}=[@idx];$self->{ridx}->{$_->[0]}=$_->[1]for (@idx);sysseek$self->{fh},$self->{block_offset},0;return}sub _generate_index {my ($self)=@_;my$uncmp_offset=0;my$cmp_offset=0;my$i=0;$self->{u_file_size}=0;$self->{idx}=[];$self->{ridx}={};sysseek$self->{fh},0,0;while ($cmp_offset < $self->{file_size}){push @{$self->{idx}},[$cmp_offset,$uncmp_offset];$self->{ridx}->{$cmp_offset}=$uncmp_offset;my ($block_size,$uncompressed_size)=$self->_unpack_block(0);$cmp_offset += $block_size;$uncmp_offset += $uncompressed_size;$self->{u_file_size}+= $uncompressed_size}sysseek$self->{fh},$self->{block_offset},0;return}sub move_to {my ($self,@args)=@_;$self->_seek(@args);return}sub _seek {my ($self,$pos,$whence)=@_;$pos += $self->{u_offset}if ($whence==1);$pos=$self->{u_file_size}+ $pos if ($whence==2);return if ($pos < 0);if ($pos >= $self->{u_file_size}){$self->{buffer_len}=-1;$self->{u_offset}=$pos;$self->{block_offset}=$pos;return 1}my$s=scalar @{$self->{idx}};my$idx=int($pos/($self->{u_file_size})* $s);while (1){if ($pos < $self->{idx}->[$idx]->[1]){--$idx;next}if ($idx+1 < $s && $pos >= $self->{idx}->[$idx+1]->[1]){++$idx;next}last}my$block_o=$self->{idx}->[$idx]->[0];my$block_o_u=$self->{idx}->[$idx]->[1];my$buff_o=$pos - $block_o_u;$self->_load_block($block_o);$self->{buffer_offset}=$buff_o;$self->{u_offset}=$block_o_u + $buff_o;return 1}sub get_vo {my ($self)=@_;return ($self->{block_offset}<< 16)| $self->{buffer_offset}}sub move_to_vo {my ($self,$vo)=@_;my$block_o=$vo >> 16;my$buff_o=$vo ^ ($block_o << 16);$self->_load_block($block_o);$self->{buffer_offset}=$buff_o;croak "invalid block offset" if (!defined$self->{ridx}->{$block_o});$self->{u_offset}=$self->{ridx}->{$block_o}+ $buff_o;return}sub _safe_sysread {my ($fh,$len)=@_;my$buf='';my$r=sysread$fh,$buf,$len;croak "returned unexpected byte count" if ($r!=$len);return$buf}1; COMPRESS_BGZF_READER $fatpacked{"Compress/BGZF/Writer.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'COMPRESS_BGZF_WRITER'; package Compress::BGZF::Writer;use strict;use warnings;use Carp;use Compress::Zlib;use IO::Compress::RawDeflate qw/rawdeflate $RawDeflateError/;use constant HEAD_BYTES=>18;use constant FOOT_BYTES=>8;use constant FLUSH_SIZE=>2**16 - HEAD_BYTES - FOOT_BYTES - 1;use constant BGZF_HEADER=>pack "H*",'1f8b08040000000000ff060042430200';sub TIEHANDLE {Compress::BGZF::Writer::new(@_)}sub PRINT {Compress::BGZF::Writer::_queue(@_)}sub CLOSE {Compress::BGZF::Writer::finalize(@_)}sub new_filehandle {my ($class,$fn_out)=@_;open my$fh,'<',undef;tie *$fh,$class,$fn_out or croak "failed to tie filehandle";return$fh}sub new {my ($class,$fn_out)=@_;my$self=bless {},$class;if (defined$fn_out){open$self->{fh},">$fn_out" or croak "Error opening file for writing"}else {$self->{fh}=\*STDOUT}binmode$self->{fh};$self->{c_level}=Z_DEFAULT_COMPRESSION;$self->{buffer}='';$self->{block_offset}=0;$self->{buffer_offset}=0;$self->{u_offset}=0;$self->{idx}=[];return$self}sub set_level {my ($self,$level)=@_;croak "Invalid compression level (allowed 0-9)" if ($level !~ /^\d$/);$self->{c_level}=$level;return}sub add_data {my ($self,$content)=@_;my$vo=($self->{block_offset}<< 16)| $self->{buffer_offset};$self->_queue($content);return$vo}sub _queue {my ($self,$content)=@_;$self->{buffer}.= $content;while (length($self->{buffer})>= FLUSH_SIZE){my$chunk=substr$self->{buffer},0,FLUSH_SIZE,'';my$unwritten=$self->_write_block($chunk);$self->{buffer}=$unwritten .$self->{buffer}if (length($unwritten))}$self->{buffer_offset}=length$self->{buffer};return}sub _write_block {my ($self,$chunk)=@_;my$chunk_len=length($chunk);rawdeflate(\$chunk,\my$payload,-Level=>$self->{c_level})or croak "deflate failed: $RawDeflateError\n";my$trimmed='';while (length($payload)> FLUSH_SIZE){my$trim_len=int($chunk_len * 0.05);$trimmed=substr($chunk,-$trim_len,$trim_len,'').$trimmed;rawdeflate(\$chunk,\$payload,-Level=>$self->{c_level})or croak "deflate failed: $RawDeflateError\n";$chunk_len=length($chunk)}my$block_size=length($payload)+ HEAD_BYTES + FOOT_BYTES;croak "Internal error: block size > 65536" if ($block_size > 2**16);print {$self->{fh}}pack("a*va*VV",BGZF_HEADER,$block_size - 1,$payload,crc32($chunk),$chunk_len,)or croak "Error writing compressed block";$self->{block_offset}+= $block_size;$self->{u_offset}+= $chunk_len;push @{$self->{idx}},[$self->{block_offset},$self->{u_offset}];return$trimmed}sub finalize {my ($self)=@_;while (length($self->{buffer})> 0){croak "file closed but buffer not empty" if (!defined fileno($self->{fh}));my$chunk=substr$self->{buffer},0,FLUSH_SIZE,'';my$unwritten=$self->_write_block($chunk);$self->{buffer}=$unwritten .$self->{buffer}if (length($unwritten))}if (defined fileno($self->{fh})){close$self->{fh}or croak "Error closing compressed file"}return}sub write_index {my ($self,$fn_out)=@_;$self->finalize();croak "missing index output filename" if (!defined$fn_out);open my$fh_out,'>:raw',$fn_out or croak "Error opening index file for writing";my@offsets=@{$self->{idx}};pop@offsets;print {$fh_out}pack('Q<',scalar(@offsets))or croak "Error printing to index file";for (@offsets){print {$fh_out}pack('Qfinalize();return}1; COMPRESS_BGZF_WRITER $fatpacked{"Data/MessagePack.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'DATA_MESSAGEPACK'; package Data::MessagePack;use strict;use warnings;use 5.008001;our$VERSION='1.01';sub true () {require Data::MessagePack::Boolean;no warnings 'once';return$Data::MessagePack::Boolean::true}sub false () {require Data::MessagePack::Boolean;no warnings 'once';return$Data::MessagePack::Boolean::false}if (!__PACKAGE__->can('pack')){my$backend=$ENV{PERL_DATA_MESSAGEPACK}|| ($ENV{PERL_ONLY}? 'pp' : '');if ($backend !~ /\b pp \b/xms){eval {require XSLoader;XSLoader::load(__PACKAGE__,$VERSION)};die $@ if $@ && $backend =~ /\b xs \b/xms}if (!__PACKAGE__->can('pack')){require 'Data/MessagePack/PP.pm'}}sub new {my($class,%args)=@_;return bless \%args,$class}for my$name(qw(canonical prefer_integer utf8)){my$setter=sub {my($self,$value)=@_;$self->{$name}=defined($value)? $value : 1;return$self};my$getter=sub {my($self)=@_;return$self->{$name}};no strict 'refs';*{$name}=$setter;*{'get_' .$name}=$getter}sub encode;*encode=__PACKAGE__->can('pack');sub decode;*decode=__PACKAGE__->can('unpack');1; DATA_MESSAGEPACK $fatpacked{"Data/MessagePack/Boolean.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'DATA_MESSAGEPACK_BOOLEAN'; package Data::MessagePack::Boolean;use strict;use warnings;use overload 'bool'=>sub {${$_[0]}},'0+'=>sub {${$_[0]}},'""'=>sub {${$_[0]}? 'true' : 'false'},fallback=>1,;our$true=do {bless \(my$dummy=1)};our$false=do {bless \(my$dummy=0)};1; DATA_MESSAGEPACK_BOOLEAN $fatpacked{"Data/MessagePack/PP.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'DATA_MESSAGEPACK_PP'; package Data::MessagePack::PP;use 5.008001;use strict;use warnings;no warnings 'recursion';use Carp ();use B ();use Config;BEGIN {my$unpack_int64_slow;my$unpack_uint64_slow;if(!eval {pack 'Q',1}){$unpack_int64_slow=sub {require Math::BigInt;my$high=unpack_uint32($_[0],$_[1]);my$low=unpack_uint32($_[0],$_[1]+ 4);if($high < 0xF0000000){$high=Math::BigInt->new($high);$low=Math::BigInt->new($low);return +($high << 32 | $low)->bstr}else {$high=Math::BigInt->new(~$high);$low=Math::BigInt->new(~$low);return +(-($high << 32 | $low + 1))->bstr}};$unpack_uint64_slow=sub {require Math::BigInt;my$high=Math::BigInt->new(unpack_uint32($_[0],$_[1]));my$low=Math::BigInt->new(unpack_uint32($_[0],$_[1]+ 4));return +($high << 32 | $low)->bstr}}*unpack_uint16=sub {return unpack 'n',substr($_[0],$_[1],2)};*unpack_uint32=sub {return unpack 'N',substr($_[0],$_[1],4)};my$bo_is_me=unpack ('d',"\x00\x00\xf0\x3f\x00\x00\x00\x00")==1;my$pack_double_oabi;my$unpack_double_oabi;if ($] < 5.010){my$bo_is_le=($Config{byteorder}=~ /^1234/);if ($bo_is_me){$pack_double_oabi=sub {my@v=unpack('V2',pack('d',$_[0]));return pack 'CN2',0xcb,@v[0,1]};$unpack_double_oabi=sub {my@v=unpack('V2',substr($_[0],$_[1],8));return unpack('d',pack('N2',@v[0,1]))}}*unpack_int16=sub {my$v=unpack 'n',substr($_[0],$_[1],2);return$v ? $v - 0x10000 : 0};*unpack_int32=sub {no warnings;my$v=unpack 'N',substr($_[0],$_[1],4);return$v ? $v - 0x100000000 : 0};if($bo_is_le){*pack_uint64=sub {my@v=unpack('V2',pack('Q',$_[0]));return pack 'CN2',0xcf,@v[1,0]};*pack_int64=sub {my@v=unpack('V2',pack('q',$_[0]));return pack 'CN2',0xd3,@v[1,0]};*pack_double=$pack_double_oabi || sub {my@v=unpack('V2',pack('d',$_[0]));return pack 'CN2',0xcb,@v[1,0]};*unpack_float=sub {my@v=unpack('v2',substr($_[0],$_[1],4));return unpack('f',pack('n2',@v[1,0]))};*unpack_double=$unpack_double_oabi || sub {my@v=unpack('V2',substr($_[0],$_[1],8));return unpack('d',pack('N2',@v[1,0]))};*unpack_int64=$unpack_int64_slow || sub {my@v=unpack('V*',substr($_[0],$_[1],8));return unpack('q',pack('N2',@v[1,0]))};*unpack_uint64=$unpack_uint64_slow || sub {my@v=unpack('V*',substr($_[0],$_[1],8));return unpack('Q',pack('N2',@v[1,0]))}}else {*pack_uint64=sub {return pack 'CQ',0xcf,$_[0]};*pack_int64=sub {return pack 'Cq',0xd3,$_[0]};*pack_double=$pack_double_oabi || sub {return pack 'Cd',0xcb,$_[0]};*unpack_float=sub {return unpack('f',substr($_[0],$_[1],4))};*unpack_double=$unpack_double_oabi || sub {return unpack('d',substr($_[0],$_[1],8))};*unpack_int64=$unpack_int64_slow || sub {unpack 'q',substr($_[0],$_[1],8)};*unpack_uint64=$unpack_uint64_slow || sub {unpack 'Q',substr($_[0],$_[1],8)}}}else {if ($bo_is_me){$pack_double_oabi=sub {my@v=unpack('V2',pack('d',$_[0]));my$d=unpack('d',pack('V2',@v[1,0]));return pack 'Cd>',0xcb,$d};$unpack_double_oabi=sub {my$first_word=substr($_[0],$_[1],4);my$second_word=substr($_[0],$_[1]+ 4,4);my$d_bin=$second_word .$first_word;return unpack('d>',$d_bin)}}*pack_uint64=sub {return pack 'CQ>',0xcf,$_[0]};*pack_int64=sub {return pack 'Cq>',0xd3,$_[0]};*pack_double=$pack_double_oabi || sub {return pack 'Cd>',0xcb,$_[0]};*unpack_float=sub {return unpack('f>',substr($_[0],$_[1],4))};*unpack_double=$unpack_double_oabi || sub {return unpack('d>',substr($_[0],$_[1],8))};*unpack_int16=sub {return unpack('n!',substr($_[0],$_[1],2))};*unpack_int32=sub {return unpack('N!',substr($_[0],$_[1],4))};*unpack_int64=$unpack_int64_slow || sub {return unpack('q>',substr($_[0],$_[1],8))};*unpack_uint64=$unpack_uint64_slow || sub {return unpack('Q>',substr($_[0],$_[1],8))}}no warnings 'once';@Data::MessagePack::ISA=qw(Data::MessagePack::PP);@Data::MessagePack::Unpacker::ISA=qw(Data::MessagePack::PP::Unpacker);*true=\&Data::MessagePack::true;*false=\&Data::MessagePack::false}sub _unexpected {Carp::confess("Unexpected " .sprintf(shift,@_)." found")}our$_max_depth;sub pack :method {my($self,$data,$max_depth)=@_;Carp::croak('Usage: Data::MessagePack->pack($dat [,$max_depth])')if @_ < 2;$_max_depth=defined$max_depth ? $max_depth : 512;if(not ref$self){$self=$self->new(prefer_integer=>$Data::MessagePack::PreferInteger || 0,canonical=>$Data::MessagePack::Canonical || 0,)}return$self->_pack($data)}sub _pack {my ($self,$value)=@_;local$_max_depth=$_max_depth - 1;if ($_max_depth < 0){Carp::croak("perl structure exceeds maximum nesting level (max_depth set too low?)")}return CORE::pack('C',0xc0)if (not defined$value);if (ref($value)eq 'ARRAY'){my$num=@$value;my$header=$num < 16 ? CORE::pack('C',0x90 + $num): $num < 2 ** 16 - 1 ? CORE::pack('Cn',0xdc,$num): $num < 2 ** 32 - 1 ? CORE::pack('CN',0xdd,$num): _unexpected("number %d",$num);return join('',$header,map {$self->_pack($_)}@$value)}elsif (ref($value)eq 'HASH'){my$num=keys %$value;my$header=$num < 16 ? CORE::pack('C',0x80 + $num): $num < 2 ** 16 - 1 ? CORE::pack('Cn',0xde,$num): $num < 2 ** 32 - 1 ? CORE::pack('CN',0xdf,$num): _unexpected("number %d",$num);if ($self->{canonical}){return join('',$header,map {$self->_pack($_),$self->_pack($value->{$_})}sort {$a cmp $b}keys %$value)}else {return join('',$header,map {$self->_pack($_)}%$value)}}elsif (ref($value)eq 'Data::MessagePack::Boolean'){return CORE::pack('C',${$value}? 0xc3 : 0xc2)}my$b_obj=B::svref_2object(\$value);my$flags=$b_obj->FLAGS;if ($flags & B::SVp_POK){if ($self->{prefer_integer}){if ($value =~ /^-?[0-9]+$/){my$ivalue=0 + $value;if (!($ivalue > 0xFFFFFFFF or $ivalue < ('-' .0x80000000)or $ivalue!=B::svref_2object(\$ivalue)->int_value)){return$self->_pack($ivalue)}}}utf8::encode($value)if utf8::is_utf8($value);my$num=length$value;my$header;if ($self->{utf8}){$header=$num < 32 ? CORE::pack('C',0xa0 + $num): $num < 2 ** 8 - 1 ? CORE::pack('CC',0xd9,$num): $num < 2 ** 16 - 1 ? CORE::pack('Cn',0xda,$num): $num < 2 ** 32 - 1 ? CORE::pack('CN',0xdb,$num): _unexpected('number %d',$num)}else {$header=$num < 2 ** 8 - 1 ? CORE::pack('CC',0xc4,$num): $num < 2 ** 16 - 1 ? CORE::pack('Cn',0xc5,$num): $num < 2 ** 32 - 1 ? CORE::pack('CN',0xc6,$num): _unexpected('number %d',$num)}return$header .$value}elsif($flags & B::SVp_NOK){return pack_double($value)}elsif ($flags & B::SVp_IOK){if ($value >= 0){return$value <= 127 ? CORE::pack 'C',$value : $value < 2 ** 8 ? CORE::pack 'CC',0xcc,$value : $value < 2 ** 16 ? CORE::pack 'Cn',0xcd,$value : $value < 2 ** 32 ? CORE::pack 'CN',0xce,$value : pack_uint64($value)}else {return -$value <= 32 ? CORE::pack 'C',($value & 255): -$value <= 2 ** 7 ? CORE::pack 'Cc',0xd0,$value : -$value <= 2 ** 15 ? CORE::pack 'Cn',0xd1,$value : -$value <= 2 ** 31 ? CORE::pack 'CN',0xd2,$value : pack_int64($value)}}else {_unexpected("data type %s",$b_obj)}}our$_utf8=0;my$p;sub _insufficient {Carp::confess("Insufficient bytes (pos=$p, type=@_)")}sub unpack :method {$p=0;$_utf8=(ref($_[0])&& $_[0]->{utf8})|| $_utf8;my$data=_unpack($_[1]);if($p < length($_[1])){Carp::croak("Data::MessagePack->unpack: extra bytes")}return$data}my$T_STR=0x01;my$T_ARRAY=0x02;my$T_MAP=0x04;my$T_BIN=0x08;my$T_DIRECT=0x10;my@typemap=((0x00)x 256);$typemap[$_]|=$T_ARRAY for 0x90 .. 0x9f,0xdc,0xdd,;$typemap[$_]|=$T_MAP for 0x80 .. 0x8f,0xde,0xdf,;$typemap[$_]|=$T_STR for 0xa0 .. 0xbf,0xd9,0xda,0xdb,;$typemap[$_]|=$T_BIN for 0xc4,0xc5,0xc6,;my@byte2value;for my$pair([0xc3,true],[0xc2,false],[0xc0,undef],(map {[$_,$_ ]}0x00 .. 0x7f),(map {[$_,$_ - 0x100 ]}0xe0 .. 0xff),){$typemap[$pair->[0]]|=$T_DIRECT;$byte2value[$pair->[0]]=$pair->[1]}sub _fetch_size {my($value_ref,$byte,$x8,$x16,$x32,$x_fixbits)=@_;if (defined($x8)&& $byte==$x8){$p += 1;$p <= length(${$value_ref})or _insufficient('x/8');return unpack 'C',substr(${$value_ref},$p - 1,1)}elsif ($byte==$x16){$p += 2;$p <= length(${$value_ref})or _insufficient('x/16');return unpack 'n',substr(${$value_ref},$p - 2,2)}elsif ($byte==$x32){$p += 4;$p <= length(${$value_ref})or _insufficient('x/32');return unpack 'N',substr(${$value_ref},$p - 4,4)}else {return$byte & ~$x_fixbits}}sub _unpack {my ($value)=@_;$p < length($value)or _insufficient('header byte');my$byte=ord(substr$value,$p,1);$p++;return$byte2value[$byte]if$typemap[$byte]& $T_DIRECT;if ($typemap[$byte]& $T_STR){my$size=_fetch_size(\$value,$byte,0xd9,0xda,0xdb,0xa0);my$s=substr($value,$p,$size);length($s)==$size or _insufficient('raw');$p += $size;utf8::decode($s);return$s}elsif ($typemap[$byte]& $T_ARRAY){my$size=_fetch_size(\$value,$byte,undef,0xdc,0xdd,0x90);my@array;push@array,_unpack($value)while --$size >= 0;return \@array}elsif ($typemap[$byte]& $T_MAP){my$size=_fetch_size(\$value,$byte,undef,0xde,0xdf,0x80);my%map;while(--$size >= 0){no warnings;my$key=_unpack($value);my$val=_unpack($value);$map{$key }=$val}return \%map}elsif ($typemap[$byte]& $T_BIN){my$size=_fetch_size(\$value,$byte,0xc4,0xc5,0xc6,0x80);my$s=substr($value,$p,$size);length($s)==$size or _insufficient('bin');$p += $size;utf8::decode($s)if$_utf8;return$s}elsif ($byte==0xcc){$p++;$p <= length($value)or _insufficient('uint8');return CORE::unpack('C',substr($value,$p - 1,1))}elsif ($byte==0xcd){$p += 2;$p <= length($value)or _insufficient('uint16');return unpack_uint16($value,$p - 2)}elsif ($byte==0xce){$p += 4;$p <= length($value)or _insufficient('uint32');return unpack_uint32($value,$p - 4)}elsif ($byte==0xcf){$p += 8;$p <= length($value)or _insufficient('uint64');return unpack_uint64($value,$p - 8)}elsif ($byte==0xd3){$p += 8;$p <= length($value)or _insufficient('int64');return unpack_int64($value,$p - 8)}elsif ($byte==0xd2){$p += 4;$p <= length($value)or _insufficient('int32');return unpack_int32($value,$p - 4)}elsif ($byte==0xd1){$p += 2;$p <= length($value)or _insufficient('int16');return unpack_int16($value,$p - 2)}elsif ($byte==0xd0){$p++;$p <= length($value)or _insufficient('int8');return CORE::unpack 'c',substr($value,$p - 1,1)}elsif ($byte==0xcb){$p += 8;$p <= length($value)or _insufficient('double');return unpack_double($value,$p - 8)}elsif ($byte==0xca){$p += 4;$p <= length($value)or _insufficient('float');return unpack_float($value,$p - 4)}else {_unexpected("byte 0x%02x",$byte)}}package Data::MessagePack::PP::Unpacker;sub new {bless {pos=>0,utf8=>0,buff=>'',},shift}sub utf8 {my$self=shift;$self->{utf8}=(@_ ? shift : 1);return$self}sub get_utf8 {my($self)=@_;return$self->{utf8}}sub execute_limit {execute(@_)}sub execute {my ($self,$data,$offset,$limit)=@_;$offset ||=0;my$value=substr($data,$offset,$limit ? $limit : length$data);my$len=length$value;$self->{buff}.= $value;local$self->{stack}=[];$p=0;while (length($self->{buff})> $p){_count($self,$self->{buff})or last;while (@{$self->{stack}}> 0 && --$self->{stack}->[-1]==0){pop @{$self->{stack}}}if (@{$self->{stack}}==0){$self->{is_finished}++;last}}$self->{pos}=$p;return$p + $offset}sub _count {my ($self,$value)=@_;no warnings;my$byte=unpack('C',substr($value,$p++,1));Carp::croak('invalid data')unless defined$byte;return 1 if$typemap[$byte]& $T_DIRECT;if ($typemap[$byte]& $T_STR){my$num;if ($byte==0xd9){$num=unpack 'C',substr($value,$p,1);$p += 1}elsif ($byte==0xda){$num=unpack 'n',substr($value,$p,2);$p += 2}elsif ($byte==0xdb){$num=unpack 'N',substr($value,$p,4);$p += 4}else {$num=$byte & ~0xa0}$p += $num;return 1}elsif ($typemap[$byte]& $T_ARRAY){my$num;if ($byte==0xdc){$num=unpack 'n',substr($value,$p,2);$p += 2}elsif ($byte==0xdd){$num=unpack 'N',substr($value,$p,4);$p += 4}else {$num=$byte & ~0x90}if ($num){push @{$self->{stack}},$num + 1}return 1}elsif ($typemap[$byte]& $T_MAP){my$num;if ($byte==0xde){$num=unpack 'n',substr($value,$p,2);$p += 2}elsif ($byte==0xdf){$num=unpack 'N',substr($value,$p,4);$p += 4}else {$num=$byte & ~0x80}if ($num){push @{$self->{stack}},$num * 2 + 1}return 1}elsif ($typemap[$byte]& $T_BIN){my$num;if ($byte==0xc4){$num=unpack 'C',substr($value,$p,1);$p += 1}elsif ($byte==0xc5){$num=unpack 'n',substr($value,$p,2);$p += 2}elsif ($byte==0xc6){$num=unpack 'N',substr($value,$p,4);$p += 4}$p += $num;return 1}elsif ($byte >= 0xcc and $byte <= 0xcf){$p += $byte==0xcc ? 1 : $byte==0xcd ? 2 : $byte==0xce ? 4 : $byte==0xcf ? 8 : Data::MessagePack::PP::_unexpected("byte 0x%02x",$byte);return 1}elsif ($byte >= 0xd0 and $byte <= 0xd3){$p += $byte==0xd0 ? 1 : $byte==0xd1 ? 2 : $byte==0xd2 ? 4 : $byte==0xd3 ? 8 : Data::MessagePack::PP::_unexpected("byte 0x%02x",$byte);return 1}elsif ($byte==0xca or $byte==0xcb){$p += $byte==0xca ? 4 : 8;return 1}else {Data::MessagePack::PP::_unexpected("byte 0x%02x",$byte)}return 0}sub data {my($self)=@_;local$Data::MessagePack::PP::_utf8=$self->{utf8};return Data::MessagePack->unpack(substr($self->{buff},0,$self->{pos}))}sub is_finished {my ($self)=@_;return$self->{is_finished}}sub reset :method {$_[0]->{buff}='';$_[0]->{pos}=0;$_[0]->{is_finished}=0}1; DATA_MESSAGEPACK_PP $fatpacked{"File/NFSLock.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'FILE_NFSLOCK'; package File::NFSLock;use strict;use warnings;use Carp qw(croak confess);our$errstr;use base 'Exporter';our@EXPORT_OK=qw(uncache);our$VERSION='1.29';use constant {LOCK_SH=>1,LOCK_EX=>2,LOCK_NB=>4,};our$TYPES={BLOCKING=>LOCK_EX,BL=>LOCK_EX,EXCLUSIVE=>LOCK_EX,EX=>LOCK_EX,NONBLOCKING=>LOCK_EX | LOCK_NB,NB=>LOCK_EX | LOCK_NB,SHARED=>LOCK_SH,SH=>LOCK_SH,};our$LOCK_EXTENSION='.NFSLock';our$HOSTNAME=undef;our$SHARE_BIT=1;my$graceful_sig=sub {print STDERR "Received SIG$_[0]\n" if @_;exit 1};our@CATCH_SIGS=qw(TERM INT);sub new {$errstr=undef;my$type=shift;my$class=ref($type)|| $type || __PACKAGE__;my$self={};if(@_ && ref $_[0]){$self=shift}else{$self->{file}=shift;$self->{lock_type}=shift;$self->{blocking_timeout}=shift;$self->{stale_lock_timeout}=shift}$self->{file}||="";$self->{lock_type}||=0;$self->{blocking_timeout}||=0;$self->{stale_lock_timeout}||=0;$self->{lock_pid}=$$;$self->{unlocked}=1;for my$signal (@CATCH_SIGS){if (!$SIG{$signal}|| $SIG{$signal}eq "DEFAULT"){$SIG{$signal}=$graceful_sig}}if($self->{lock_type}&& $self->{lock_type}!~ /^\d+/ && exists$TYPES->{$self->{lock_type}}){$self->{lock_type}=$TYPES->{$self->{lock_type}}}if(!$HOSTNAME){require Sys::Hostname;$HOSTNAME=Sys::Hostname::hostname()}croak ($errstr="Usage: my \$f = $class->new('/pathtofile/file',\n" ."'BLOCKING|EXCLUSIVE|NONBLOCKING|SHARED', [blocking_timeout, stale_lock_timeout]);\n" ."(You passed \"$self->{file}\" and \"$self->{lock_type}\")")unless length($self->{file});croak ($errstr="Unrecognized lock_type operation setting [$self->{lock_type}]")unless$self->{lock_type}&& $self->{lock_type}=~ /^\d+$/;bless$self,$class;$self->{rand_file}=rand_file($self->{file});$self->{lock_file}=$self->{file}.$LOCK_EXTENSION;my$quit_time=$self->{blocking_timeout}&& !($self->{lock_type}& LOCK_NB)? time()+ $self->{blocking_timeout}: 0;if(-e $self->{lock_file}&& $self->{stale_lock_timeout}> 0 && time()- (stat _)[9]> $self->{stale_lock_timeout}){unlink$self->{lock_file}}while (1){$self->create_magic or return undef;if ($self->{lock_type}& LOCK_EX){last if$self->do_lock}elsif ($self->{lock_type}& LOCK_SH){last if$self->do_lock_shared}else {$errstr="Unknown lock_type [$self->{lock_type}]";return undef}my$fh;if (-e $self->{lock_file}&& open ($fh,'+<',$self->{lock_file})){my@mine=();my@them=();my@dead=();my$has_lock_exclusive=!((stat _)[2]& $SHARE_BIT);my$try_lock_exclusive=!($self->{lock_type}& LOCK_SH);while(defined(my$line=<$fh>)){if ($line =~ /^\Q$HOSTNAME\E (-?\d+) /){my$pid=$1;if ($pid==$$){push@mine,$line}elsif(kill 0,$pid){push@them,$line}else{push@dead,$line}}else {push@them,$line}}if (@dead){local$LOCK_EXTENSION=".shared";my$lock=new File::NFSLock {file=>$self->{lock_file},lock_type=>LOCK_EX,blocking_timeout=>62,stale_lock_timeout=>60,};seek ($fh,0,0);my$content='';while(defined(my$line=<$fh>)){if ($line =~ /^\Q$HOSTNAME\E (-?\d+) /){my$pid=$1;next if (!kill 0,$pid)}$content .= $line}if(length($content)){seek$fh,0,0;print$fh $content;truncate$fh,length($content);close$fh}else{close$fh;unlink$self->{lock_file}}}else {close$fh}if ($try_lock_exclusive eq $has_lock_exclusive && @mine){return$self}}if ($self->{lock_type}& LOCK_NB){$errstr ||="NONBLOCKING lock failed!";return undef}sleep(1);if($quit_time && (time > $quit_time)){$errstr="Timed out waiting for blocking lock";return undef}}$self->uncache;delete$self->{unlocked};return$self}sub DESTROY {shift()->unlock()}sub unlock ($) {my$self=shift;if (!$self->{unlocked}){unlink($self->{rand_file})if -e $self->{rand_file};if($self->{lock_type}& LOCK_SH){$self->do_unlock_shared}else{$self->do_unlock}$self->{unlocked}=1;for my$signal (@CATCH_SIGS){if ($SIG{$signal}&& ($SIG{$signal}eq $graceful_sig)){delete$SIG{$signal}}}}return 1}sub rand_file ($) {my$file=shift;"$file.tmp.".time()%10000 .'.'.$$ .'.'.int(rand()*10000)}sub create_magic ($;$) {$errstr=undef;my$self=shift;my$append_file=shift || $self->{rand_file};$self->{lock_line}||="$HOSTNAME $self->{lock_pid} ".time()." ".int(rand()*10000)."\n";open (my$fh,'>>',$append_file)or do {$errstr="Couldn't open \"$append_file\" [$!]";return undef};print$fh $self->{lock_line};close$fh;return 1}sub do_lock {$errstr=undef;my$self=shift;my$lock_file=$self->{lock_file};my$rand_file=$self->{rand_file};my$chmod=0600;chmod($chmod,$rand_file)|| die "I need ability to chmod files to adequatetly perform locking";my$success=link($rand_file,$lock_file)&& -e $rand_file && (stat _)[3]==2;unlink$rand_file;return$success}sub do_lock_shared {$errstr=undef;my$self=shift;my$lock_file=$self->{lock_file};my$rand_file=$self->{rand_file};my$chmod=0600;$chmod |=$SHARE_BIT;chmod($chmod,$rand_file)|| die "I need ability to chmod files to adequatetly perform locking";local$LOCK_EXTENSION=".shared";my$lock=new File::NFSLock {file=>$lock_file,lock_type=>LOCK_EX,blocking_timeout=>62,stale_lock_timeout=>60,};my$success=link($rand_file,$lock_file);unlink$rand_file;if (!$success && -e $lock_file && ((stat _)[2]& $SHARE_BIT)!=$SHARE_BIT){$errstr='Exclusive lock exists.';return undef}elsif (!$success){$self->create_magic ($self->{lock_file})}return 1}sub do_unlock ($) {return unlink shift->{lock_file}}sub do_unlock_shared ($) {$errstr=undef;my$self=shift;my$lock_file=$self->{lock_file};my$lock_line=$self->{lock_line};local$LOCK_EXTENSION='.shared';my$lock=new File::NFSLock ($lock_file,LOCK_EX,62,60);my$fh;if(!open ($fh,'+<',$lock_file)){if(!-e $lock_file){return 1}else{die "Could not open for writing shared lock file $lock_file ($!)"}}my$content='';while(defined(my$line=<$fh>)){next if$line eq $lock_line;$content .= $line}if(length($content)){seek$fh,0,0;print$fh $content;truncate$fh,length($content);close$fh}else{close$fh;unlink$lock_file}}sub uncache ($;$) {my$file=pop;ref$file && ($file=$file->{file});my$rand_file=rand_file($file);return (link($file,$rand_file)&& unlink($rand_file))}sub newpid {my$self=shift;if ($self->{lock_pid}==$$){my$patience=time + 10;while (time < $patience){if (rename("$self->{lock_file}.fork",$self->{rand_file})){unlink$self->{rand_file};last}select(undef,undef,undef,0.1)}unless ($self->{lock_type}& LOCK_SH){$self->{lock_type}|=LOCK_SH}}else {$self->{lock_pid}=$$;delete$self->{lock_line};$self->create_magic($self->{lock_file});unless ($self->{lock_type}& LOCK_SH){$self->{lock_type}|=LOCK_SH}open (my$fh,'>',"$self->{lock_file}.fork");close($fh)}}sub fork {my$self=shift;my$pid=CORE::fork();if (defined$pid and!$self->{unlocked}){$self->newpid}return$pid}1; FILE_NFSLOCK $fatpacked{"IPC/Open3.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'IPC_OPEN3'; package IPC::Open3;use strict;no strict 'refs';our ($VERSION,@ISA,@EXPORT);require Exporter;use Carp;use Symbol qw(gensym qualify);$VERSION='1.20';@ISA=qw(Exporter);@EXPORT=qw(open3);our$Me='open3 (bug)';sub xpipe {pipe $_[0],$_[1]or croak "$Me: pipe($_[0], $_[1]) failed: $!"}sub xopen {open $_[0],$_[1],@_[2..$#_]and return;local $"=', ';carp "$Me: open(@_) failed: $!"}sub xclose {$_[0]=~ /\A=?(\d+)\z/ ? do {my$fh;open($fh,$_[1].'&=' .$1)and close($fh)}: close $_[0]or croak "$Me: close($_[0]) failed: $!"}sub xfileno {return $1 if $_[0]=~ /\A=?(\d+)\z/;return fileno $_[0]}use constant FORCE_DEBUG_SPAWN=>0;use constant DO_SPAWN=>$^O eq 'os2' || $^O eq 'MSWin32' || FORCE_DEBUG_SPAWN;sub _open3 {local$Me=shift;splice @_,0,1,undef if \$_[0]==\undef;splice @_,1,1,undef if \$_[1]==\undef;unless (eval {$_[0]=gensym unless defined $_[0]&& length $_[0];$_[1]=gensym unless defined $_[1]&& length $_[1];1}){$@ =~ s/(?<=value attempted) at .*//s;croak "$Me: $@"}my@handles=({mode=>'<',handle=>\*STDIN },{mode=>'>',handle=>\*STDOUT },{mode=>'>',handle=>\*STDERR },);for (@handles){$_->{parent}=shift;$_->{open_as}=gensym}if (@_ > 1 and $_[0]eq '-'){croak "Arguments don't make sense when the command is '-'"}$handles[2]{parent}||=$handles[1]{parent};$handles[2]{dup_of_out}=$handles[1]{parent}eq $handles[2]{parent};my$package;for (@handles){$_->{dup}=($_->{parent}=~ s/^[<>]&//);if ($_->{parent}!~ /\A=?(\d+)\z/){$package=caller 1 if (!defined$package);$_->{parent}=qualify $_->{parent},$package}next if $_->{dup}or $_->{dup_of_out};if ($_->{mode}eq '<'){xpipe $_->{open_as},$_->{parent}}else {xpipe $_->{parent},$_->{open_as}}}my$kidpid;if (!DO_SPAWN){xpipe my$stat_r,my$stat_w;$kidpid=fork;croak "$Me: fork failed: $!" unless defined$kidpid;if ($kidpid==0){eval {untie*STDIN;untie*STDOUT;untie*STDERR;close$stat_r;require Fcntl;my$flags=fcntl$stat_w,&Fcntl::F_GETFD,0;croak "$Me: fcntl failed: $!" unless$flags;fcntl$stat_w,&Fcntl::F_SETFD,$flags|&Fcntl::FD_CLOEXEC or croak "$Me: fcntl failed: $!";if (!$handles[2]{dup_of_out}&& $handles[2]{dup}&& xfileno($handles[2]{parent})==fileno \*STDOUT){my$tmp=gensym;xopen($tmp,'>&',$handles[2]{parent});$handles[2]{parent}=$tmp}for (@handles){if ($_->{dup_of_out}){xopen \*STDERR,">&STDOUT" if defined fileno STDERR && fileno STDERR!=fileno STDOUT}elsif ($_->{dup}){xopen $_->{handle},$_->{mode}.'&',$_->{parent}if fileno $_->{handle}!=xfileno($_->{parent})}else {xclose $_->{parent},$_->{mode};xopen $_->{handle},$_->{mode}.'&=',fileno $_->{open_as}}}return 1 if ($_[0]eq '-');exec @_ or do {local($")=(" ");croak "$Me: exec of @_ failed: $!"}}and do {close$stat_w;return 0};my$bang=0+$!;my$err=$@;utf8::encode$err if $] >= 5.008;print$stat_w pack('IIa*',$bang,length($err),$err);close$stat_w;eval {require POSIX;POSIX::_exit(255)};exit 255}else {close$stat_w;my$to_read=length(pack('I',0))* 2;my$bytes_read=read($stat_r,my$buf='',$to_read);if ($bytes_read){(my$bang,$to_read)=unpack('II',$buf);read($stat_r,my$err='',$to_read);waitpid$kidpid,0;if ($err){utf8::decode$err if $] >= 5.008}else {$err="$Me: " .($!=$bang)}$!=$bang;die($err)}}}else {my@close;for (@handles){if ($_->{dup_of_out}){$_->{open_as}=$handles[1]{open_as}}elsif ($_->{dup}){$_->{open_as}=$_->{parent}=~ /\A[0-9]+\z/ ? $_->{parent}: \*{$_->{parent}};push@close,$_->{open_as}}else {push@close,\*{$_->{parent}},$_->{open_as}}}require IO::Pipe;$kidpid=eval {spawn_with_handles(\@handles,\@close,@_)};die "$Me: $@" if $@}for (@handles){next if $_->{dup}or $_->{dup_of_out};xclose $_->{open_as},$_->{mode}}xclose$handles[0]{parent},$handles[0]{mode}if$handles[0]{dup};select((select($handles[0]{parent}),$|=1)[0]);$kidpid}sub open3 {if (@_ < 4){local $"=', ';croak "open3(@_): not enough arguments"}return _open3 'open3',@_}sub spawn_with_handles {my$fds=shift;my$close_in_child=shift;my ($fd,%saved,@errs);for$fd (@$fds){$fd->{tmp_copy}=IO::Handle->new_from_fd($fd->{handle},$fd->{mode});$saved{fileno$fd->{handle}}=$fd->{tmp_copy}if$fd->{tmp_copy}}for$fd (@$fds){bless$fd->{handle},'IO::Handle' unless eval {$fd->{handle}->isa('IO::Handle')};my$open_as=$fd->{open_as};my$fileno=fileno($open_as);$fd->{handle}->fdopen(defined($fileno)? $saved{$fileno}|| $open_as : $open_as,$fd->{mode})}unless ($^O eq 'MSWin32'){require Fcntl;for$fd (@$close_in_child){next unless fileno$fd;fcntl($fd,Fcntl::F_SETFD(),1)or push@errs,"fcntl $fd: $!" unless$saved{fileno$fd}}}my$pid;unless (@errs){if (FORCE_DEBUG_SPAWN){pipe my$r,my$w or die "Pipe failed: $!";$pid=fork;die "Fork failed: $!" unless defined$pid;if (!$pid){{no warnings;exec @_}print$w 0 + $!;close$w;require POSIX;POSIX::_exit(255)}close$w;my$bad=<$r>;if (defined$bad){$!=$bad;undef$pid}}else {$pid=eval {system 1,@_}}if($@){push@errs,"IO::Pipe: Can't spawn-NOWAIT: $@"}elsif(!$pid || $pid < 0){push@errs,"IO::Pipe: Can't spawn-NOWAIT: $!"}}for$fd (reverse @$fds){$fd->{handle}->fdopen($fd->{tmp_copy},$fd->{mode})}for (values%saved){$_->close or croak "Can't close: $!"}croak join "\n",@errs if@errs;return$pid}1; IPC_OPEN3 $fatpacked{"Mail/Sendmail.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'MAIL_SENDMAIL'; package Mail::Sendmail;require 5.006;our$VERSION="0.80";use strict;use warnings;use parent 'Exporter';our%mailcfg=('smtp'=>[qw(localhost) ],'from'=>'','mime'=>1,'retries'=>1,'delay'=>1,'tz'=>'','port'=>25,'debug'=>0);our$address_rx;our$debug;our$log;our$error;our$retry_delay;our$connect_retries;our$auth_support;use Socket;use Time::Local;use Sys::Hostname;$auth_support='DIGEST-MD5 CRAM-MD5 PLAIN LOGIN';eval("use MIME::QuotedPrint");$mailcfg{'mime'}&&=(!$@);our@EXPORT=qw(&sendmail);our@EXPORT_OK=qw(%mailcfg time_to_date $address_rx $debug $log $error);my$word_rx='[\x21\x23-\x27\x2A-\x2B\x2D\x2F\w\x3D\x3F]+';my$user_rx=$word_rx .'(?:\.' .$word_rx .')*' ;my$dom_rx='\w[-\w]*(?:\.\w[-\w]*)*';my$ip_rx='\[\d{1,3}(?:\.\d{1,3}){3}\]';$address_rx='((' .$user_rx .')\@(' .$dom_rx .'|' .$ip_rx .'))';;sub _require_md5 {eval {require Digest::MD5;Digest::MD5->import(qw(md5 md5_hex))};$error .= $@ if $@;return ($@ ? undef : 1)}sub _require_base64 {eval {require MIME::Base64;MIME::Base64->import(qw(encode_base64 decode_base64))};$error .= $@ if $@;return ($@ ? undef : 1)}sub _hmac_md5 {my ($pass,$ckey)=@_;my$size=64;$pass=md5($pass)if length($pass)> $size;my$ipad=$pass ^ (chr(0x36)x $size);my$opad=$pass ^ (chr(0x5c)x $size);return md5_hex($opad,md5($ipad,$ckey))}sub _digest_md5 {my ($user,$pass,$challenge,$realm)=@_;my%ckey=map {/^([^=]+)="?(.+?)"?$/}split(/,/,$challenge);$realm ||=$ckey{realm};my$nonce=$ckey{nonce};my$cnonce=&make_cnonce;my$uri=join('/','smtp',hostname()||'localhost',$ckey{realm});my$qop='auth';my$nc='00000001';my($hv,$a1,$a2);$hv=md5("$user:$realm:$pass");$a1=md5_hex("$hv:$nonce:$cnonce");$a2=md5_hex("AUTHENTICATE:$uri");$hv=md5_hex("$a1:$nonce:$nc:$cnonce:$qop:$a2");return qq(username="$user",realm="$ckey{realm}",nonce="$nonce",nc=$nc,cnonce="$cnonce",digest-uri="$uri",response=$hv,qop=$qop)}sub make_cnonce {my$s='' ;for(1..16){$s .= chr(rand 256)}$s=encode_base64($s,"");$s =~ s/\W/X/go;return substr($s,0,16)}sub time_to_date {my$time=$_[0]|| time();my@months=qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);my@wdays=qw(Sun Mon Tue Wed Thu Fri Sat);my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime($time);my$TZ=$mailcfg{'tz'};if ($TZ eq ""){my$offset=sprintf "%.1f",(timegm(localtime)- time)/ 3600;my$minutes=sprintf "%02d",abs($offset - int($offset))* 60;$TZ=sprintf("%+03d",int($offset)).$minutes}return join(" ",($wdays[$wday].','),$mday,$months[$mon],$year+1900,sprintf("%02d:%02d:%02d",$hour,$min,$sec),$TZ)}sub sendmail {$error='';$log="Mail::Sendmail v. $VERSION - " .scalar(localtime())."\n";my$CRLF="\015\012";local $/=$CRLF;local $\='';local $_;my (%mail,$k,$smtp,$server,$port,$connected,$localhost,$fromaddr,$recip,@recipients,$to,$header,%esmtp,@wanted_methods,);use vars qw($server_reply);sub fail {$error .= join(" ",@_)."\n";if ($server_reply){$error .= "Server said: $server_reply\n";print STDERR "Server said: $server_reply\n" if $^W}close S;return 0}sub socket_write {my$i;for$i (0..$#_){my$data=ref($_[$i])? $_[$i]: \$_[$i];if ($mailcfg{'debug'}> 5){if (length($$data)< 500){print ">",$$data}else {print "> [...",length($$data)," bytes sent ...]\n"}}print(S $$data)|| return 0}1}sub socket_read {$server_reply="";do {$_=;$server_reply .= $_;print "<$_" if$mailcfg{'debug'}> 5;if (/^[45]/ or!$_){chomp$server_reply;return}}while (/^[\d]+-/);chomp$server_reply;return$server_reply}for$k (keys%mailcfg){if ($k =~ /[A-Z]/){$mailcfg{lc($k)}=$mailcfg{$k}}}while (@_){$k=shift @_;if (!$k and $^W){warn "Received false mail hash key: \'$k\'. Did you forget to put it in quotes?\n"}$k=ucfirst lc($k);$k =~ s/\s*:\s*$//o;$k =~ s/-(.)/"-" . uc($1)/ge;$mail{$k}=shift @_;if ($k !~ /^(Message|Body|Text)$/i){$mail{$k}=~ s/\015\012?/\012/go;$mail{$k}=~ s/\012/$CRLF/go}}$smtp=$mail{'Smtp'}|| $mail{'Server'};unshift @{$mailcfg{'smtp'}},$smtp if ($smtp and $mailcfg{'smtp'}->[0]ne $smtp);delete$mail{'Smtp'};delete$mail{'Server'};$mailcfg{'port'}=$mail{'Port'}|| $mailcfg{'port'}|| 25;delete$mail{'Port'};my$auth=$mail{'Auth'};delete$mail{'Auth'};my@parts;push(@parts,$mail{'Message'})if defined($mail{'Message'});push(@parts,$mail{'Body'})if defined($mail{'Body'});push(@parts,$mail{'Text'})if defined($mail{'Text'});$mail{'Message'}=join("",@parts);delete$mail{'Body'};delete$mail{'Text'};$fromaddr=$mail{'Sender'}|| $mail{'From'}|| $mailcfg{'from'};unless ($fromaddr =~ /$address_rx/){return fail("Bad or missing From address: \'$fromaddr\'")}$fromaddr=$1;$mail{Date}||=time_to_date();$log .= "Date: $mail{Date}\n";$mail{'Message'}=~ s/\r\n/\n/go;$mail{'Mime-Version'}||='1.0';$mail{'Content-Type'}||='text/plain; charset="iso-8859-1"';unless ($mail{'Content-Transfer-Encoding'}|| $mail{'Content-Type'}=~ /multipart/io){if ($mailcfg{'mime'}){$mail{'Content-Transfer-Encoding'}='quoted-printable';$mail{'Message'}=encode_qp($mail{'Message'})}else {$mail{'Content-Transfer-Encoding'}='8bit';if ($mail{'Message'}=~ /[\x80-\xFF]/o){$error .= "MIME::QuotedPrint not present!\nSending 8bit characters, hoping it will come across OK.\n";warn "MIME::QuotedPrint not present!\n","Sending 8bit characters without encoding, hoping it will come across OK.\n" if $^W}}}$mail{'Message'}=~ s/^\./\.\./gom;$mail{'Message'}=~ s/\n/$CRLF/go;{my@recipients;push(@recipients,$mail{To})if defined($mail{To});push(@recipients,$mail{Cc})if defined($mail{Cc});push(@recipients,$mail{Bcc})if defined($mail{Bcc});$recip=join(", ",@recipients)}delete$mail{'Bcc'};@recipients=();while ($recip =~ /$address_rx/go){push@recipients,$1}unless (@recipients){return fail("No recipient!")}$localhost=hostname()|| 'localhost';for$server (@{$mailcfg{'smtp'}}){unless (socket S,AF_INET,SOCK_STREAM,scalar(getprotobyname 'tcp')){return fail("socket failed ($!)")}print "- trying $server\n" if$mailcfg{'debug'}> 1;$server =~ s/\s+//go;$port=($server =~ s/:(\d+)$//o)? $1 : $mailcfg{'port'};$smtp=$server;my$smtpaddr=inet_aton$server;unless ($smtpaddr){$error .= "$server not found\n";next}my$retried=0;while ((not $connected=connect S,pack_sockaddr_in($port,$smtpaddr))and ($retried < $mailcfg{'retries'})){$retried++;$error .= "connect to $server failed ($!)\n";print "- connect to $server failed ($!)\n" if$mailcfg{'debug'}> 1;print "retrying in $mailcfg{'delay'} seconds...\n" if$mailcfg{'debug'}> 1;sleep$mailcfg{'delay'}}if ($connected){print "- connected to $server\n" if$mailcfg{'debug'}> 3;last}else {$error .= "connect to $server failed\n";print "- connect to $server failed, next server...\n" if$mailcfg{'debug'}> 1;next}}unless ($connected){return fail("connect to $smtp failed ($!) no (more) retries!")};{local $^W=0;$log .= "Server: $smtp Port: $port\n" ."From: $fromaddr\n" ."Subject: $mail{Subject}\n" }my($oldfh)=select(S);$|=1;select($oldfh);socket_read()|| return fail("Connection error from $smtp on port $port ($_)");socket_write("EHLO $localhost$CRLF")|| return fail("send EHLO error (lost connection?)");my$ehlo=socket_read();if ($ehlo){map {s/^\d+[- ]//;my ($k,$v)=split /\s+/,$_,2;$esmtp{$k}=$v || 1 if$k}split(/\n/,$ehlo)}else {socket_write("HELO $localhost$CRLF")|| return fail("send HELO error (lost connection?)")}if ($auth){warn "AUTH requested\n" if ($mailcfg{debug}> 4);my@methods=grep {$esmtp{'AUTH'}=~/(^|\s)$_(\s|$)/i}grep {$auth_support =~ /(^|\s)$_(\s|$)/i}grep /\S/,split(/\s+/,$auth->{method});if (@methods){if (exists$auth->{pass}){$auth->{password}=$auth->{pass}}my$method=uc$methods[0];_require_base64()|| fail("Could not use MIME::Base64 module required for authentication");if ($method eq "LOGIN"){print STDERR "Trying AUTH LOGIN\n" if ($mailcfg{debug}> 9);socket_write("AUTH LOGIN$CRLF")|| return fail("send AUTH LOGIN failed (lost connection?)");socket_read()|| return fail("AUTH LOGIN failed: $server_reply");socket_write(encode_base64($auth->{user},$CRLF))|| return fail("send LOGIN username failed (lost connection?)");socket_read()|| return fail("LOGIN username failed: $server_reply");socket_write(encode_base64($auth->{password},$CRLF))|| return fail("send LOGIN password failed (lost connection?)");socket_read()|| return fail("LOGIN password failed: $server_reply")}elsif ($method eq "PLAIN"){warn "Trying AUTH PLAIN\n" if ($mailcfg{debug}> 9);socket_write("AUTH PLAIN " .encode_base64(join("\0",$auth->{user},$auth->{user},$auth->{password}),$CRLF))|| return fail("send AUTH PLAIN failed (lost connection?)");socket_read()|| return fail("AUTH PLAIN failed: $server_reply")}elsif ($method eq "CRAM-MD5"){_require_md5()|| fail("Could not use Digest::MD5 module required for authentication");warn "Trying AUTH CRAM-MD5\n" if ($mailcfg{debug}> 9);socket_write("AUTH CRAM-MD5$CRLF")|| return fail("send CRAM-MD5 failed (lost connection?)");my$challenge=socket_read()|| return fail("AUTH CRAM-MD5 failed: $server_reply");$challenge =~ s/^\d+\s+//;my$response=_hmac_md5($auth->{password},decode_base64($challenge));socket_write(encode_base64("$auth->{user} $response",$CRLF))|| return fail("AUTH CRAM-MD5 failed: $server_reply");socket_read()|| return fail("AUTH CRAM-MD5 failed: $server_reply")}elsif ($method eq "DIGEST-MD5"){_require_md5()|| fail("Could not use Digest::MD5 module required for authentication");warn "Trying AUTH DIGEST-MD5\n" if ($mailcfg{debug}> 9);socket_write("AUTH DIGEST-MD5$CRLF")|| return fail("send CRAM-MD5 failed (lost connection?)");my$challenge=socket_read()|| return fail("AUTH DIGEST-MD5 failed: $server_reply");$challenge =~ s/^\d+\s+//;$challenge =~ s/[\r\n]+$//;warn "\nCHALLENGE=",decode_base64($challenge),"\n" if ($mailcfg{debug}> 10);my$response=_digest_md5($auth->{user},$auth->{password},decode_base64($challenge),$auth->{realm});warn "\nRESPONSE=$response\n" if ($mailcfg{debug}> 10);socket_write(encode_base64($response,""),$CRLF)|| return fail("AUTH DIGEST-MD5 failed: $server_reply");my$status=socket_read()|| return fail("AUTH DIGEST-MD5 failed: $server_reply");if ($status =~ /^3/){socket_write($CRLF)|| return fail("AUTH DIGEST-MD5 failed: $server_reply");socket_read()|| return fail("AUTH DIGEST-MD5 failed: $server_reply")}}else {return fail("$method not supported (and wrongly advertised as supported by this silly module)\n")}$log .= "AUTH $method succeeded as user $auth->{user}\n"}else {$esmtp{'AUTH'}=~ s/(^\s+|\s+$)//g;if ($auth->{required}){return fail("Required AUTH method '$auth->{method}' not supported. " ."(Server supports '$esmtp{'AUTH'}'. Module supports: '$auth_support')")}else {warn "No common authentication method! Requested: '$auth->{method}'. Server supports '$esmtp{'AUTH'}'. Module supports: '$auth_support'. Skipping authentication\n"}}}socket_write("MAIL FROM:<$fromaddr>$CRLF")|| return fail("send MAIL FROM: error");socket_read()|| return fail("MAIL FROM: error ($_)");my$to_ok=0;for$to (@recipients){socket_write("RCPT TO:<$to>$CRLF")|| return fail("send RCPT TO: error");if (socket_read()){$log .= "To: $to\n";$to_ok++}else {$log .= "FAILED To: $to ($server_reply)";$error .= "Bad recipient <$to>: $server_reply\n"}}unless ($to_ok){return fail("No valid recipient")}socket_write("DATA$CRLF")|| return fail("send DATA error");socket_read()|| return fail("DATA error ($_)");for$header (keys%mail){next if$header eq "Message";$mail{$header}=~ s/\s+$//o;socket_write("$header: $mail{$header}$CRLF")|| return fail("send $header: error")};socket_write($CRLF,\$mail{'Message'},"$CRLF.$CRLF")|| return fail("send message error");socket_read()|| return fail("message transmission error ($_)");$log .= "\nResult: $_";socket_write("QUIT$CRLF")|| return fail("send QUIT error");socket_read();close S;return 1}1; MAIL_SENDMAIL $fatpacked{"Text/FormatTable.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'TEXT_FORMATTABLE'; package Text::FormatTable;use Carp;use strict;use warnings;use vars qw($VERSION);$VERSION='1.03';sub _uncolorized_length($) {my$str=shift;$str =~ s/\e \[ [^m]* m//xmsg;return length$str}sub _min_width($) {my$str=shift;my$min;for my$s (split(/\s+/,$str)){my$l=_uncolorized_length$s;$min=$l if not defined$min or $l > $min}return$min ? $min : 1}sub _max_width($) {my$str=shift;my$len=_uncolorized_length$str;return$len ? $len : 1}sub _max($$) {my ($a,$b)=@_;return$a if defined$a and (not defined$b or $a >= $b);return$b}sub _wrap($$) {my ($width,$text)=@_;my@lines=split(/\n/,$text);my@w=();for my$l (@lines){push@w,@{_wrap_line($width,$l)}}return \@w}sub _wrap_line($$) {my ($width,$text)=@_;my$width_m1=$width-1;my@t=($text);while(1){my$t=pop@t;my$l=_uncolorized_length$t;if($l <= $width){push@t,$t;return \@t}elsif($width_m1 < 32766 && $t =~ /^(.{0,$width_m1}\S)\s+(\S.*?)$/){push@t,$1;push@t,$2}elsif($width_m1 < 32766 && $t =~ /(.{$width,}?\S)\s+(\S.*?)$/){if (_uncolorized_length $1 > $width_m1){my$left=substr($1,0,$width);my$right=substr($1,$width);push@t,$left;push@t,$right;push@t,$2}else {push@t,$1;push@t,$2}}else {my$left=substr($t,0,$width);my$right=substr($t,$width);push@t,$left;push@t,$right;return \@t}}return \@t}sub _l_box($$) {my ($width,$text)=@_;my$lines=_wrap($width,$text);map {$_ .= ' 'x($width-_uncolorized_length($_))}@$lines;return$lines}sub _r_box($$) {my ($width,$text)=@_;my$lines=_wrap($width,$text);map {$_=(' 'x($width-_uncolorized_length($_)).$_)}@$lines;return$lines}sub _distribution_f($) {my$max_width=shift;return log($max_width)}sub _calculate_widths($$) {my ($self,$width)=@_;my@widths=();for my$r (@{$self->{data}}){$r->[0]eq 'data' or $r->[0]eq 'head' or next;my$cn=0;my ($max,$min)=(0,0);for my$c (@{$r->[1]}){if ($self->{fixed_widths}[$cn]){$widths[$cn][0]=$self->{fixed_widths}[$cn];$widths[$cn][1]=$self->{fixed_widths}[$cn]}else {$widths[$cn][0]=_max($widths[$cn][0],_min_width$c);$widths[$cn][1]=_max($widths[$cn][1],_max_width$c)}$cn++}}my ($total_min,$total_max)=(0,0);for my$c (@widths){$total_min += $c->[0];$total_max += $c->[1]}my$extra_width += scalar grep {$_->[0]eq '|' or $_->[0]eq ' '}(@{$self->{format}});$total_min += $extra_width;$total_max += $extra_width;if($total_max <= $width){my$cn=0;for my$c (@widths){$self->{widths}[$cn]=$c->[1];$cn++}$self->{total_width}=$total_max}else {my@dist_width;ITERATION: while(1){my$total_f=0.0;my$fixed_width=0;my$remaining=0;for my$c (@widths){if(defined$c->[2]){$fixed_width += $c->[2]}else {$total_f += _distribution_f($c->[1]);$remaining++}}my$available_width=$width-$extra_width-$fixed_width;if($available_width < $remaining*5){$available_width=$remaining*5;$width=$extra_width+$fixed_width+$available_width}my$cn=-1;COLUMN: for my$c (@widths){$cn++;next COLUMN if defined$c->[2];my$w=_distribution_f($c->[1])* $available_width / $total_f;if($c->[0]> $w){$c->[2]=$c->[0];next ITERATION}if($c->[1]< $w){$c->[2]=$c->[1];next ITERATION}$dist_width[$cn]=int($w)}last}my$cn=0;for my$c (@widths){$self->{widths}[$cn]=defined$c->[2]? $c->[2]: $dist_width[$cn];$cn++}}}sub _render_rule($$) {my ($self,$char)=@_;my$out='';my ($col,$data_col)=(0,0);for my$c (@{$self->{format}}){if($c->[0]eq '|'){if ($char eq '-'){$out .= '+'}elsif($char eq ' '){$out .= '|'}else {$out .= $char}}elsif($c->[0]eq ' '){$out .= $char}elsif($c->[0]eq 'l' or $c->[0]eq 'L' or $c->[0]eq 'r' or $c->[0]eq 'R'){$out .= ($char)x($self->{widths}[$data_col]);$data_col++}$col++}return$out."\n"}sub _render_data($$) {my ($self,$data)=@_;my@rdata;my ($col,$data_col)=(0,0);my$lines=0;my@rows_in_column;for my$c (@{$self->{format}}){if(($c->[0]eq 'l')or ($c->[0]eq 'L')){my$lb=_l_box($self->{widths}[$data_col],$data->[$data_col]);$rdata[$data_col]=$lb;my$l=scalar @$lb ;$lines=$l if$lines < $l;$rows_in_column[$data_col]=$l;$data_col++}elsif(($c->[0]eq 'r')or ($c->[0]eq 'R')){my$rb=_r_box($self->{widths}[$data_col],$data->[$data_col]);$rdata[$data_col]=$rb;my$l=scalar @$rb ;$lines=$l if$lines < $l;$rows_in_column[$data_col]=$l ;$data_col++}$col++}my$out='';for my$l (0..($lines-1)){my ($col,$data_col)=(0,0);for my$c (@{$self->{format}}){if($c->[0]eq '|'){$out .= '|'}elsif($c->[0]eq ' '){$out .= ' '}elsif($c->[0]eq 'L' or $c->[0]eq 'R'){my$start_print=$lines - $rows_in_column[$data_col];if (defined$rdata[$data_col][$l-$start_print]and $l >= $start_print){$out .= $rdata[$data_col][$l-$start_print]}else {$out .= ' 'x($self->{widths}[$data_col])}$data_col++}elsif($c->[0]eq 'l' or $c->[0]eq 'r'){if(defined$rdata[$data_col][$l]){$out .= $rdata[$data_col][$l]}else {$out .= ' 'x($self->{widths}[$data_col])}$data_col++}$col++}$out .= "\n"}return$out}sub _parse_format($$) {my ($self,$format)=@_;my@f=split(//,$format);my@format=();my@width=();my ($col,$data_col)=(0,0);my$wid;for my$f (@f){if ($f =~ /(\d+)/){$wid .= $f;next}if($f eq 'l' or $f eq 'L' or $f eq 'r' or $f eq 'R'){$format[$col]=[$f,$data_col];$width[$data_col]=$wid;$wid=undef;$data_col++}elsif($f eq '|' or $f eq ' '){$format[$col]=[$f]}else {croak "unknown column format: $f"}$col++}$self->{format}=\@format;$self->{fixed_widths}=\@width;$self->{col}=$col;$self->{data_col}=$data_col}sub new($$) {my ($class,$format)=@_;croak "new() requires one argument: format" unless defined$format;my$self={col=>'0',row=>'0',data=>[]};bless$self,$class;$self->_parse_format($format);return$self}sub _preprocess_row_data($$) {my ($self,$data)=@_;my$cn=0;for my$c (0..($#$data)){$data->[$c]=~ s/^\s+//m;$data->[$c]=~ s/\s+$//m}}sub head($@) {my ($self,@data)=@_;scalar@data==$self->{data_col}or croak "number of columns must be $self->{data_col}";$self->_preprocess_row_data(\@data);$self->{data}[$self->{row}++]=['head',\@data]}sub row($@) {my ($self,@data)=@_;scalar@data==$self->{data_col}or croak "number of columns must be $self->{data_col}";@data=map {defined $_ ? $_ : ""}@data;$self->_preprocess_row_data(\@data);$self->{data}[$self->{row}++]=['data',\@data]}sub rule($$) {my ($self,$char)=@_;$char='-' unless defined$char;$self->{data}[$self->{row}++]=['rule',$char]}sub render($$) {my ($self,$width)=@_;$width=79 unless defined$width;$self->_calculate_widths($width);my$out='';for my$r (@{$self->{data}}){if($r->[0]eq 'rule'){$out .= $self->_render_rule($r->[1])}elsif($r->[0]eq 'head'){$out .= $self->_render_data($r->[1])}elsif($r->[0]eq 'data'){$out .= $self->_render_data($r->[1])}}return$out}1; TEXT_FORMATTABLE $fatpacked{"Tie/DB_FileLock.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'TIE_DB_FILELOCK'; package Tie::DB_FileLock;use strict;require 5.004;require Tie::Hash;use Carp;use DB_File;use FileHandle;use Fcntl qw(:flock O_RDONLY O_RDWR O_CREAT);use vars qw(@ISA @EXPORT $VERSION $DEBUG);@ISA=qw(Tie::Hash DB_File);@EXPORT=@DB_File::EXPORT;$VERSION='0.11';$DEBUG=0;sub TIEHASH {my$class=shift;my ($dbname,$openmode,$perms,$type)=@_;if ($type and ref($type)eq "DB_File::RECNOINFO"){croak "Tie::DB_FileLock can only tie an array to a DB_RECNO database\n"}my$self=bless {},$class;$self->_openDB(@_);$self->lockDB if ($dbname);return$self}sub TIEARRAY {my$class=shift;my ($dbname,$openmode,$perms,$type)=@_;if ($type and ref($type)ne "DB_File::RECNOINFO"){my$t=ref($type);$t =~ s/DB_File::(\w+)INFO/$1/;croak "Tie::DB_FileLock can only tie an associative array to a DB_$t database\n"}croak "DB_RECNO not implemented"}sub _openDB {my$self=shift;my$dbname=shift;my ($openmode,$perms,$type)=@_;my@params=@_;my%db;$openmode=O_CREAT | O_RDWR unless defined$openmode;my$dbobj=tie(%db,'DB_File',$dbname,@params);croak "tie($dbname): $!" unless$dbobj;$dbobj->sync();if ($dbname){my$lockmode;my$fd=$dbobj->fd;my$fh=FileHandle->new("<&=$fd")or croak("dup: $!");$self->{LOCKFH}=$fh;if ($openmode==O_RDONLY){$lockmode=LOCK_SH}else {$lockmode=LOCK_EX}$self->{LOCKMODE}=$lockmode}$self->{DBNAME}=$dbname;$self->{TIEPARAMS}=\@params;$self->{OPENMODE}=$openmode;$self->{DBOBJ}=$dbobj;$self->{ORIG_DB}=\%db}sub _closeDB {undef $_[0]->{DBOBJ};untie($_[0]->{ORIG_DB})or croak("untie: $!");undef($_[0]->{LOCKFH})}sub lockDB {my ($self)=@_;my%db;flock($self->{LOCKFH},$self->{LOCKMODE})or croak("flock: $!");my$dbobj=tie(%db,'DB_File',$self->{DBNAME},@{$self->{TIEPARAMS}});croak "tie($self->{DBNAME}): $!" unless$dbobj;$self->{DB}=\%db;$self->{DBOBJ}=$dbobj}sub unlockDB {my ($self)=@_;return unless$self->{LOCKMODE};if ($self->{LOCKMODE}==LOCK_EX){$self->{DBOBJ}->sync()and croak("sync(): $!")}undef($self->{DBOBJ});untie($self->{DB})or croak("untie: $!");undef($self->{DB});flock($self->{LOCKFH},LOCK_UN)or croak("unlock: $!")}sub debug {$DEBUG=$_[1]if (@_ > 1);return$DEBUG};sub DESTROY {$_[0]->unlockDB();$_[0]->_closeDB()}sub STORE {print STDERR "STORE: @_\n" if$DEBUG;croak("RO hash")if $_[0]->{OPENMODE}==O_RDONLY;$_[0]->{DBOBJ}->put($_[1],$_[2])}sub FETCH {print STDERR "FETCH: @_\n" if$DEBUG;my$v;$_[0]->{DBOBJ}->get($_[1],$v);return$v}sub FIRSTKEY {print STDERR "FIRSTKEY: @_\n" if$DEBUG;$_[0]->{DBOBJ}->FIRSTKEY()}sub NEXTKEY {print STDERR "NEXTKEY: @_\n" if$DEBUG;$_[0]->{DBOBJ}->NEXTKEY($_[1])}sub EXISTS {print STDERR "EXISTS: @_\n" if$DEBUG;exists $_[0]->{DB}->{$_[1]}}sub DELETE {print STDERR "DELETE: @_\n" if$DEBUG;croak("RO hash")if $_[0]->{OPENMODE}==O_RDONLY;delete $_[0]->{DB}->{$_[1]}}sub CLEAR {print STDERR "CLEAR: @_\n" if$DEBUG;croak("RO hash")if $_[0]->{OPENMODE}==O_RDONLY;%{$_[0]->{DB}}=()}sub put {my$r=shift;$r->{DBOBJ}->put(@_)}sub get {my$r=shift;$r->{DBOBJ}->get(@_)}sub del {my$r=shift;$r->{DBOBJ}->del(@_)}sub seq {my$r=shift;$r->{DBOBJ}->seq(@_)}sub sync {my$r=shift;$r->{DBOBJ}->sync(@_)}sub fd {my$r=shift;$r->{DBOBJ}->fd(@_)}sub get_dup {my$r=shift;$r->{DBOBJ}->get_dup(@_)}sub find_dup {my$r=shift;$r->{DBOBJ}->find_dup(@_)}sub del_dup {my$r=shift;$r->{DBOBJ}->del_dup(@_)}sub filter_store_key {my$r=shift;$r->{DBOBJ}->filter_store_key(@_)}sub filter_store_value {my$r=shift;$r->{DBOBJ}->filter_store_value(@_)}sub filter_fetch_key {my$r=shift;$r->{DBOBJ}->filter_fetch_key(@_)}sub filter_fetch_value {my$r=shift;$r->{DBOBJ}->filter_fetch_value(@_)}package Tie::DB_FileLock::HASHINFO;use strict;@Tie::DB_FileLock::HASHINFO::ISA=qw(DB_File::HASHINFO);sub new {shift;DB_File::HASHINFO::new('DB_File::HASHINFO',@_)}package Tie::DB_FileLock::BTREEINFO;use strict;@Tie::DB_FileLock::BTREEINFO::ISA=qw(DB_File::BTREEINFO);sub new {shift;DB_File::HASHINFO::new('DB_File::BTREEINFO',@_)}package Tie::DB_FileLock::RECNOINFO;use strict;@Tie::DB_FileLock::RECNOINFO::ISA=qw(DB_File::RECNOINFO);sub new {shift;DB_File::HASHINFO::new('DB_File::RECNOINFO',@_)}1; TIE_DB_FILELOCK $fatpacked{"parent.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'PARENT'; package parent;use strict;use vars qw($VERSION);$VERSION='0.236';sub import {my$class=shift;my$inheritor=caller(0);if (@_ and $_[0]eq '-norequire'){shift @_}else {for (my@filename=@_){s{::|'}{/}g;require "$_.pm"}}{no strict 'refs';push @{"$inheritor\::ISA"},@_}};1; PARENT s/^ //mg for values %fatpacked; my $class = 'FatPacked::'.(0+\%fatpacked); no strict 'refs'; *{"${class}::files"} = sub { keys %{$_[0]} }; if ($] < 5.008) { *{"${class}::INC"} = sub { if (my $fat = $_[0]{$_[1]}) { return sub { return 0 unless length $fat; $fat =~ s/^([^\n]*\n?)//; $_ = $1; return 1; }; } return; }; } else { *{"${class}::INC"} = sub { if (my $fat = $_[0]{$_[1]}) { open my $fh, '<', \$fat or die "FatPacker error loading $_[1] (could be a perl installation issue?)"; return $fh; } return; }; } unshift @INC, bless \%fatpacked, $class; } # END OF FATPACK CODE