From 75bf99fb2db9256ee191f64203100156ef0c97cf Mon Sep 17 00:00:00 2001 From: Paul Kolano Date: Mon, 24 Jun 2019 17:07:40 -0700 Subject: [PATCH] Shift 6.2 --- BUGS | 10 +- CHANGES | 28 ++ INSTALL | 44 +- doc/shiftc.1 | 115 +++-- etc/shiftrc | 8 +- perl/shift-aux | 774 +++++++++++++++++++------------ perl/shift-mgr | 237 ++++++---- perl/shiftc | 1212 ++++++++++++++++++++++++++---------------------- 8 files changed, 1423 insertions(+), 1005 deletions(-) diff --git a/BUGS b/BUGS index a1a3c93..b44d234 100644 --- a/BUGS +++ b/BUGS @@ -1,15 +1,7 @@ KNOWN BUGS ========== -1. Setting the TCP window size from perl does not seem to work - correctly. This can affect the performance of the fish-tcp - transport. The window size appears to reach a maximum of 8MB even - when the system limit is much higher. It is not known if this is due - to the perl version and/or the specific build of perl on the system - on which this was discovered. The only current workaround is to - increase the number of streams (--streams) when using fish-tcp. - -2. A bug exists in the Shift 4.0 and 5.0 tar creation function that +1. A bug exists in the Shift 4.0 and 5.0 tar creation function that could leave some entries in tar files in a partially corrupted state. The conditions under which this could occur are very specific, so the overall percentage of affected tar files is expected to be very low. diff --git a/CHANGES b/CHANGES index a067876..7f34d9e 100644 --- a/CHANGES +++ b/CHANGES @@ -255,3 +255,31 @@ CHANGES - Fixed corruption tracking causing negative sizes/rates in --status - Fixed inadvertent call of test shift-mgr during mgr synchronization - Removed warning message when --no-cron specified + +* Shift 6.2 (06/24/19) + - Added multi-threading of directory traversal + - Added multi-threading of attr preservation when external programs invoked + - Added --interval to dynamically adjust batch size around fixed frequency + - Added use of gzip index files to increase metadata read performance + - Added --no-silent to disable silent corruption detection + - Added ability to escape multiple lines over stdin in shift-aux + - Added syntax checking of numeric options that can take units + - Changed --files and --size to be minimums instead of absolutes + - Changed time remaining to more accurately reflect manager overhead + - Fixed remote host selection that was running per file instead of per batch + - Fixed detection of operations already performed by another client + - Fixed slow processing of doing logs due to small backward read size + - Fixed local/remote fadvise that had old initial implementation + - Fixed --no-recall handling of remote files + - Fixed possible exceptions during some thread joins + - Fixed Net::Ping exceptions when Time::HiRes not available + - Fixed exception when stat of remote file fails + - Fixed use of setfacl when shift-bin available + - Fixed run state when --restart=ignore ignores final operations + - Fixed manager --doing=n so it retrieves nth doing in past + - Fixed output of error table in stats due to tainted file names + - Fixed option abbreviation so deprecated options don't invoke other options + - Fixed double lustre striping during attribute preservation stage + - Fixed high memory utilization during traversal of very large directories + - Fixed stalls due to inadvertent newlines left in logs after restarts + - Fixed preservation of lustre striping due to bad conditional diff --git a/INSTALL b/INSTALL index daf5670..aa7b1b3 100644 --- a/INSTALL +++ b/INSTALL @@ -12,7 +12,7 @@ Shift Installation and Configuration o shift-mgr - the Shift manager, which is invoked by client instances and must exist either on all client hosts or on a host - accessible via ssh hostbased or pubkey authentication + accessible via ssh hostbased or publickey authentication (henceforth called "manager hosts") o shift-aux - the Shift auxiliary utility, which is invoked by client @@ -42,7 +42,7 @@ Shift Installation and Configuration designated as manager hosts. Since the manager is not an active server and is simply a passive executable invoked by clients, the manager can be located on any host with ssh access. These hosts must support either - hostbased or pubkey authentication, however, since Shift is an automated + hostbased or publickey authentication, however, since Shift is an automated framework that performs actions without the user present. The manager has a synchronization mechanism allowing multiple hosts without a shared file system to be used for redundancy. @@ -54,7 +54,7 @@ Shift Installation and Configuration o perl >= 5.8.5 (>= 5.10.1 is required for multi-threading support) o ssh (and sftp) access to manager/remote hosts via hostbased or - pubkey authentication + publickey authentication 2.2. Optional @@ -70,7 +70,7 @@ Shift Installation and Configuration (http://toolkit.globus.org/toolkit/data/gridftp) o rsync - bandwidth-efficient local/remote copy (http://rsync.samba.org) - o mesh - lightweight single sign-on via ssh pubkeys + o mesh - lightweight single sign-on via ssh publickey authentication (http://mesh.sf.net) 2.3. Invoked (standard on most systems or when specific file systems in use) @@ -175,9 +175,9 @@ Shift Installation and Configuration 5.1.1. ~/.ssh/id_rsa (or similar) If hostbased authentication is not supported by client hosts, - manager hosts, and/or remote hosts, pubkey authentication must be - used. Clients must have access to private keys to access these - systems, which may either be referenced explicitly via the + manager hosts, and/or remote hosts, publickey authentication must + be used. Clients must have access to private keys to access + these systems, which may either be referenced explicitly via the --mgr-identity and --identity options for manager and remote hosts, respectively, or be added to an ssh agent on the client(s). Note that the private keys used for these options must not be @@ -189,7 +189,7 @@ Shift Installation and Configuration 5.1.2. ~/.ssh/authorized_keys If hostbased authentication is not supported to other client hosts - for parallelization, pubkey authentication must be used. In this + for parallelization, publickey authentication must be used. In this case, the public key(s) corresponding to the private key(s) used by clients (those named ~/.ssh/id* loaded into an ssh agent) must be added to the invoking user's authorized_keys file on other client @@ -270,24 +270,25 @@ Shift Installation and Configuration 5.2.2. ~/.ssh/authorized_keys If hostbased authentication is not supported to manager hosts, - pubkey authentication must be used. In this case, the public key(s) - corresponding to the private key(s) used by clients (either loaded - into an ssh agent or specified via --mgr-identity) must be added to - the appropriate user's authorized_keys file on manager hosts. - This user will either be the invoking user or the one specified by - --mgr-user. + publickey authentication must be used. In this case, the public + key(s) corresponding to the private key(s) used by clients (either + loaded into an ssh agent or specified via --mgr-identity) must be + added to the appropriate user's authorized_keys file on manager + hosts. This user will either be the invoking user or the one + specified by --mgr-user. 5.3. Remote hosts 5.3.1. ~/.ssh/authorized_keys - If hostbased authentication is not supported to remote hosts, pubkey - authentication must be used. In this case, the public key(s) - corresponding to the private key(s) used by clients (either loaded - into an ssh agent or specified via --identity) must be added to the - appropriate user's authorized_keys file on remote hosts. This user - will either be the invoking user or the one specified by --user. + If hostbased authentication is not supported to remote hosts, + publickey authentication must be used. In this case, the public + key(s) corresponding to the private key(s) used by clients (either + loaded into an ssh agent or specified via --identity) must be added + to the appropriate user's authorized_keys file on remote hosts. + This user will either be the invoking user or the one specified by + --user. 6. Usage @@ -302,9 +303,6 @@ Shift Installation and Configuration if not already in a manpath directory. Basic usage is drop-in compatible with cp/scp with special consideration for the authentication options --mgr, --mgr-identity, --mgr-user, --identity, and --user. - Note that the scp "user@host" syntax is not currently supported (the - "user@" portion will be dropped) so --user must be specified instead - when the remote user differs from the local user. 6.2. shift-mgr diff --git a/doc/shiftc.1 b/doc/shiftc.1 index 1309864..8f3a926 100644 --- a/doc/shiftc.1 +++ b/doc/shiftc.1 @@ -118,6 +118,7 @@ given in following sections. (LIST subset of {acl,mode,owner,stripe,time,xattr}) \-\-no\-recall do not recall DMF\-managed files before transfer \-\-no\-sanity do not check file existence/size (benchmarking only) +\-\-no\-silent do not detect silent corruption or store checksums \-\-no\-verify do not verify/rectify integrity of destination files .PP \fBMonitoring and management options:\fP @@ -148,8 +149,9 @@ given in following sections. (use suffix {k,m,g,t} for {Kb,Mb,Gb,Tb}) \-\-buffer=SIZE use SIZE bytes for buffer in transports (use suffix {k,m,g,t} for {KiB,MiB,GiB,TiB}) [4m] -\-\-files=COUNT process transfer in batches of COUNT files +\-\-files=COUNT process transfer in batches of at least COUNT files (use suffix {k,m,b/g,t} for 1E{3,6,9,12}) [1k] +\-\-interval=NUM adjust batches to run for around NUM seconds [30] \-\-local=LIST set local transport mechanism to one of LIST (LIST subset of {bbcp,bbftp,fish,fish-tcp,gridftp, mcp,rsync,shift}) @@ -158,7 +160,7 @@ given in following sections. (LIST subset of {bbcp,bbftp,fish,fish-tcp,gridftp, rsync,shift}) \-\-retry=NUM retry failed operations up to NUM times [2] -\-\-size=SIZE process transfer in batches of SIZE bytes +\-\-size=SIZE process transfer in batches of at least SIZE bytes (use suffix {k,m,g,t} for {KB,MB,GB,TB}) [4g] \-\-split=SIZE parallelize single files using chunks of SIZE bytes (use suffix {k,m,g,t} for {KiB,MiB,GiB,TiB}) [0] @@ -440,6 +442,16 @@ as needed even when this option is enabled. Disable file existence and size checks at the end of the transfer. This option was included for benchmarking and completeness purposes and is not recommended for general use. +.IP "\fB\-\-no\-silent\fP" +By default, the checksums of all files transferred with Shift are +stored in a per-user database. When a file with a known checksum is +transferred and has not been modified since the checksum was stored, the +transfer will be put into the "alert" state if the current checksum does +not match the stored checksum. This option disables the storage of +checksums and comparison against existing checksums. While silent +corruption detection adds minimal overhead during normal operation, it +can increase the probability of lock contention when there are large +numbers of clients. .IP "\fB\-\-no\-verify\fP" By default, files are checksummed at the source and destination to verify that they have not been corrupted and if corruption is detected, @@ -462,18 +474,13 @@ the origin host/directory and the original command. When Specify the transfer identifier to be used with management and status commands. .IP "\fB\-\-last\-sum\fP" -The checksums of all files transferred with Shift are stored in a -per-user db. When a file with a known checksum is transferred and has -not been modified since the checksum was stored, the transfer will be -put into the "alert" state if the current checksum does not match the -stored checksum. This option queries the silent corruption database for -all files given on the command line and prints (one file per line) the -last known checksum, the file modification time associated with this -checksum, and the file name. When \fB\-\-index\-tar\fP is given, the -first file argument is assumed to be a tar file and the remaining -arguments names of files within the tar for which checksum information -will be printed. A checksum of "-" means that no information is stored -for the file. +Queries the silent corruption database for all files given on the +command line and prints (one file per line) the last known checksum, the +file modification time associated with this checksum, and the file name. +When \fB\-\-index\-tar\fP is given, the first file argument is assumed +to be a tar file and the remaining arguments names of files within the +tar for which checksum information will be printed. A checksum of "-" +means that no information is stored for the file. .IP "\fB\-\-mgr=HOST\fP" Set the host that will be used to manage transfers. By default, this host will be accessed as the current user with hostbased authentication @@ -536,19 +543,21 @@ specific list of metrics (e.g. \fB\-\-plot=id\fP), "io" is assumed. Restart the transfer associated with the given \fB\-\-id\fP that was stopped due to unrecoverable errors or stopped explicitly via \fB\-\-stop\fP. If \fB\-\-restart=ignore\fP is specified, all existing -errors will be ignored and the transfer will progress as if the associated -files and directories were no longer part of the transfer. Note that -transfers must be restarted on the original client host or one that has -equivalent file system access. A subset of the available command-line -options may be respecified during a restart including \fB\-\-bandwidth\fP, -\fB\-\-buffer\fP, \fB\-\-clients\fP, \fB\-\-cpu\fP, \fB\-\-disk\fP, -\fB\-\-files\fP, \fB\-\-force\fP, \fB\-\-host\-file\fP, \fB\-\-host\-list\fP, -\fB\-\-hosts\fP, \fB\-\-io\fP, \fB\-\-ior\fP, \fB\-\-iow\fP, \fB\-\-local\fP, -\fB\-\-net\fP, \fB\-\-netr\fP, \fB\-\-netw\fP, \fB\-\-no\-cron\fP, -\fB\-\-no\-mail\fP, \fB\-\-no\-offline\fP, \fB\-\-no\-recall\fP, -\fB\-\-pipeline\fP, \fB\-\-ports\fP, \fB\-\-preallocate\fP, \fB\-\-remote\fP, -\fB\-\-retry\fP, \fB\-\-secure\fP, \fB\-\-size\fP, \fB\-\-streams\fP, -\fB\-\-stripe\fP, \fB\-\-threads\fP, and \fB\-\-window\fP. +errors will be ignored and the transfer will progress as if the +associated files and directories were no longer part of the transfer. +Note that transfers must be restarted on the original client host or one +that has equivalent file system access. A subset of the available +command-line options may be respecified during a restart including +\fB\-\-bandwidth\fP, \fB\-\-buffer\fP, \fB\-\-clients\fP, \fB\-\-cpu\fP, +\fB\-\-disk\fP, \fB\-\-files\fP, \fB\-\-force\fP, \fB\-\-host\-file\fP, +\fB\-\-host\-list\fP, \fB\-\-hosts\fP, \fB\-\-interval\fP, \fB\-\-io\fP, +\fB\-\-ior\fP, \fB\-\-iow\fP, \fB\-\-local\fP, \fB\-\-net\fP, +\fB\-\-netr\fP, \fB\-\-netw\fP, \fB\-\-no\-cron\fP, \fB\-\-no\-mail\fP, +\fB\-\-no\-offline\fP, \fB\-\-no\-recall\fP, \fB\-\-no\-silent, +\fB\-\-pipeline\fP, \fB\-\-ports\fP, \fB\-\-preallocate\fP, +\fB\-\-remote\fP, \fB\-\-retry\fP, \fB\-\-secure\fP, \fB\-\-size\fP, +\fB\-\-streams\fP, \fB\-\-stripe\fP, \fB\-\-threads\fP, and +\fB\-\-window\fP. .IP "\fB\-\-search=REGEX\fP" When \fB\-\-status\fP and \fB\-\-id\fP are specified, this option will show the full status of file operations in the associated transfer whose @@ -627,15 +636,29 @@ suffixes k, m, g, and t may be used for KiB, MiB, GiB, and TiB, respectively. The default buffer size is 4 MiB. Increasing the buffer size trades higher memory utilization for more efficient I/O. .IP "\fB\-\-files=COUNT\fP" -Process transfers in batches of the given number of files. The -suffixes k, m, b or g, and t may be used for 1E3, 1E6, 1E9, and 1E12, -respectively. The default batch count is 1000 files. Lowering the -batch count will increase the number of checkpoints and the overhead of -transfer management. Raising the batch count will have the opposite -effect. A batch will be sent for processing when the number of files in -the batch reaches the given value. Note that batches of less than the -given count can occur if the batch size specified by \fB\-\-size\fP is -reached first. +Process transfers in batches of at least the given number of files. +The suffixes k, m, b or g, and t may be used for 1E3, 1E6, 1E9, and +1E12, respectively. The default batch count is 1000 files. This option +works in concert with \fB\-\-size\fP and \fB\-\-interval\fP to manage +the number of checkpoints and the overhead of transfer management. A +batch will initially consist of at least \fB\-\-files\fP files or +\fB\-\-size\fP bytes, whichever is reached first. The batch may then +be dynamically increased in size until there is enough work to span +\fB\-\-interval\fP seconds. To make batch selection completely dynamic, +use \fB\-\-files=1\fP and \fB\-\-size=1\fP. +.IP "\fB\-\-interval=SECS\fP" +Process transfers in batches that take around the given number of +seconds. The default interval is 30 seconds. This option works in +concert with \fB\-\-files\fP and \fB\-\-size\fP to manage the number of +checkpoints and the overhead of transfer management. A batch will +initially consist of at least \fB\-\-files\fP files or \fB\-\-size\fP +bytes, whichever is reached first. The batch may then be dynamically +increased in size until there is enough work to span \fB\-\-interval\fP +seconds. Note that the actual time a batch takes will depend on its +contents and that the interval will be increased as the number of +clients participating in a transfer increases to minimize contention +for manager locks. To make batch selection completely static, use +\fB\-\-interval=0\fP. .IP "\fB\-\-local=LIST\fP" Specify one or more local transports to be used for the transfer in order of preference, separated by commas. Valid transports for this @@ -674,15 +697,16 @@ retries. Note that disabling retries also disables the ability of value is cumulative across all stages of a file's processing so different stages may not be retried the same number of times. .IP "\fB\-\-size=SIZE\fP" -Process transfers in batches of approximately the given total file size. +Process transfers in batches of at least the given total file size. The suffixes k, m, g, and t may be used for KB, MB, GB, and TB, -respectively. The default batch size is 4 GB. Lowering the batch size -will increase the number of checkpoints and the overhead of transfer -management. Raising the batch size will have the opposite effect. A -batch will be sent for processing when the total size of all files in -the batch reaches the given value. Note that batches of less than the -given size can occur if the batch count specified by \fB\-\-files\fP -is reached first. +respectively. The default batch size is 4 GB. This option works in +concert with \fB\-\-files\fP and \fB\-\-interval\fP to manage the number +of checkpoints and the overhead of transfer management. A batch will +initially consist of at least \fB\-\-size\fP bytes or \fB\-\-files\fP +files, whichever is reached first. The batch may then be dynamically +increased in size until there is enough work to span \fB\-\-interval\fP +seconds. To make batch selection completely dynamic, use +\fB\-\-files=1\fP and \fB\-\-size=1\fP. .IP "\fB\-\-split=SIZE\fP" Parallelize the processing of single files using chunks of the given size. The suffixes k, m, g, and t may be used for KiB, MiB, GiB, and @@ -785,6 +809,9 @@ loss. ./"################################################################ .SH "TRANSFER THROTTLING" ./"################################################################ +Transfers can be throttled to prevent resource exhaustion when they +reach configured thresholds for CPU, disk, I/O, and/or network +utilization. .IP "\fB\-\-cpu=NUM\fP" Throttle the transfer when the local CPU usage reaches the specified percent of the total available. This option is disabled by default but diff --git a/etc/shiftrc b/etc/shiftrc index ba44022..9cbcc17 100644 --- a/etc/shiftrc +++ b/etc/shiftrc @@ -145,7 +145,7 @@ user_dir /home/%u/.shift # (use suffix {k,m,g,t} for {KB,MB,GB,TB}) #default_buffer 4m -# maximum number of files to transfer in each batch +# minimum number of files to transfer in each batch # (use suffix {k,m,b/g,t} for 10E{3,6,9,12}) #default_files 1k @@ -154,6 +154,10 @@ user_dir /home/%u/.shift # (use suffix {k,m,b/g,t} for 10E{3,6,9,12}) #default_find-files 2k +# approximate number of seconds that each batch should run +# (note this will be adjusted upward with more clients/greater mgr overhead) +#default_interval 30 + # source sparsity percentage at which to preallocate destination file # (sparsity defined as 1 - (512 * allocated_blocks / size)) # (note that transports using temporary files are not supported) @@ -164,7 +168,7 @@ user_dir /home/%u/.shift # (must be at least 1 for --sync to function) #default_retry 2 -# approximate maximum amount of data to transfer in each batch +# approximate minimum amount of data to transfer in each batch # (use suffix {k,m,g,t} for {KB,MB,GB,TB}) #default_size 4g diff --git a/perl/shift-aux b/perl/shift-aux index f447800..8a6bfd3 100755 --- a/perl/shift-aux +++ b/perl/shift-aux @@ -52,7 +52,7 @@ use File::Basename; use File::Path; use File::Spec; use File::Temp qw(tempfile); -use Getopt::Long qw(:config bundling no_ignore_case require_order); +use Getopt::Long qw(:config bundling no_auto_abbrev no_ignore_case require_order); use IO::File; use IO::Handle; use IO::Socket::INET; @@ -61,12 +61,12 @@ require IPC::Open3; use List::Util qw(first min); use MIME::Base64; use POSIX; -use Socket; +use Socket qw(IPPROTO_TCP TCP_NODELAY); use Symbol qw(gensym); use Sys::Hostname; use Text::ParseWords; -our $VERSION = 0.96; +our $VERSION = 0.98; # do not die when receiving sigpipe $SIG{PIPE} = 'IGNORE'; @@ -82,38 +82,51 @@ umask ($< == 0 ? 077 : 077 & umask); my %perl; $perl{ssl} = eval 'use IO::Socket::SSL; 1'; # need threads and version of Thread::Queue from perl >= 5.10.1 -$perl{threads} = eval 'require 5.010_001; use threads; use Thread::Queue; 1'; +$perl{threads} = eval 'require 5.010_001; use threads; use Thread::Queue; use Thread::Semaphore; 1'; -my %opts; +my %opts = ( + 'buffer' => 4194304, + 'buffer-size' => 4, + 'hash-type' => "md5", + 'ports' => "50000:51000", + 'split-size' => 1024, + 'streams' => 4, + 'threads' => 4, + 'window' => 4194304, +); my $cmd = shift @ARGV; # parse options and perform corresponding command if (!$cmd) { die "Invalid command\n"; } elsif ($cmd eq 'chattr') { + die "Invalid options\n" if (!GetOptions(\%opts, + "buffer=i", "threads=i", + )); die "Invalid options\n" if (scalar(@ARGV) > 0); chattr(); } elsif ($cmd eq 'escape') { - die "Invalid options\n" if (scalar(@ARGV) != 1); - print escape($ARGV[0]); + if (scalar(@ARGV)) { + print escape($_) foreach (@ARGV); + } else { + print escape($_) while (); + } +} elsif ($cmd eq 'fadvise') { + die "Invalid options\n" if (scalar(@ARGV) > 0); + # offload to shift-bin since fadvise does not exist in perl + exec("shift-bin"); } elsif ($cmd eq 'find') { die "Invalid options\n" if (!GetOptions(\%opts, - "create-tar", "dereference|L", "exclude=s@", "extract-tar", - "find-files=i", "ignore-times", "include=s@", "newer=s", "older=s", - "preserve=s", "sync", + "buffer=i", "create-tar", "dereference|L", "exclude=s@", + "extract-tar", "find-files=i", "ignore-times", "include=s@", + "newer=s", "older=s", "preserve=s", "recall", "sync", "threads=i", )); die "Invalid options\n" if (scalar(@ARGV) > 0); find(); } elsif ($cmd eq 'fish') { - %opts = ( - 'buffer-size' => 4, - 'ports' => "50000:51000", - 'streams' => 4, - 'window' => 4194304, - ); die "Invalid options\n" if (!GetOptions(\%opts, - "buffer-size=i", "ports=s", "secure", "streams=i", "tcp", "verify", - "window=i", + "buffer-size=i", "hash-type=s", "ports=s", "secure", "split-size=i", + "streams=i", "tcp", "verify", "window=i", )); die "Invalid options\n" if (scalar(@ARGV) > 0); $opts{'buffer-size'} <<= 20; @@ -122,12 +135,6 @@ if (!$cmd) { die "Invalid options\n" if (scalar(@ARGV) > 0); mount(); } elsif ($cmd eq 'sum') { - %opts = ( - 'buffer-size' => 4, - 'hash-type' => "md5", - 'split-size' => 1024, - 'threads' => 4, - ); die "Invalid options\n" if (!GetOptions(\%opts, "buffer-size=i", "c", "hash-type=s", "split-size=i", "threads=i", )); @@ -148,40 +155,120 @@ if (!$cmd) { } } +################ +#### buffer #### +################ +# write output in blocks for efficiency +my $buf4; +sub buffer { + my ($text, $file) = @_; + $opts{text_buf} .= $text; + $opts{file_buf} .= $file; + if (($opts{text_buf} || $opts{file_buf}) && + (!defined $text || length $opts{text_buf} >= $opts{buffer} || + length $opts{file_buf} >= $opts{buffer})) { + $buf4->down if ($perl{threads} && $opts{threads} > 1); + if ($opts{text_buf} && (!defined $text || + length $opts{text_buf} >= $opts{buffer})) { + print $opts{text_buf}; + STDOUT->flush; + delete $opts{text_buf}; + } + if ($opts{file_buf} && (!defined $text || + length $opts{file_buf} >= $opts{buffer})) { + print {$opts{dmfh}} $opts{file_buf}; + $opts{dmfh}->flush; + delete $opts{file_buf}; + } + $buf4->up if ($perl{threads} && $opts{threads} > 1); + } +} + ################ #### chattr #### ################ # set given attr of files given on STDIN to given value(s) and output ok/error sub chattr { # check for existence of commands - my %have; foreach my $bin (qw(fallocate lfs setfacl setfattr shift-bin)) { - $have{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH})); + $opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH})); } - # offload to shift-bin for more efficient processing if possible - exec("shift-bin") if ($have{'shift-bin'}); - - while (my $line = ) { - chomp $line; - my ($cmd, $file, $attrs) = split(/\s+/, $line, 3); - # short circuit if command not available - next if (!$have{$cmd}); - # untaint arguments - $cmd = $1 if ($cmd =~ /(.*)/); - $file = $1 if ($file =~ /(.*)/s); - $attrs = $1 if ($attrs =~ /(.*)/); - my $ufile = unescape($file); - # make sure parent directory exists - my $dir = $ufile =~ s/\/$// ? $ufile : dirname($ufile); - eval {mkpath($dir)}; + my ($q, @threads); + if ($perl{threads} && $opts{threads} > 1) { + $q = Thread::Queue->new; + # mutual exclusion for stdout + $buf4 = Thread::Semaphore->new(1); + @threads = map {threads->create(sub { + eval { + # this should not be needed but there is a weird race condition + # that causes the open3 forked child to never execute + local $SIG{ALRM} = sub {die}; + alarm 2; + # will revert to individual commands if alarm hit + $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") + if ($opts{have}->{'shift-bin'}); + alarm 0; + }; + chattr1($_) while (defined ($_ = $q->dequeue)); + buffer(); + open3_wait($opts{fhpid}) if (defined $opts{fhpid}); + })} (1 .. $opts{threads}); + } else { + # offload to shift-bin for more efficient processing if possible + exec("shift-bin") if ($opts{have}->{'shift-bin'}); + } + while () { + chomp; + next if (!$_); + if ($perl{threads} && $opts{threads} > 1) { + $q->enqueue($_); + } else { + chattr1($_); + } + } + if ($perl{threads} && $opts{threads} > 1) { + # force threads to exit + $q->enqueue(undef) foreach (@threads); + foreach (@threads) { + $_->join if ($_); + } + } else { + buffer(); + } +} + +################# +#### chattr1 #### +################# +# set attrs of a single file +sub chattr1 { + my $line = shift; + my ($cmd, $file, $attrs) = split(/\s+/, $line, 3); + # short circuit if command not available + return if (!defined $opts{fhpid} && !$opts{have}->{$cmd}); + # untaint arguments + $cmd = $1 if ($cmd =~ /(.*)/); + $file = $1 if ($file =~ /(.*)/s); + $attrs = $1 if ($attrs =~ /(.*)/); + my $ufile = unescape($file); + + # make sure parent directory exists + my $dir = $ufile =~ s/\/$// ? $ufile : dirname($ufile); + eval {mkpath($dir)}; + + if (defined $opts{fhpid}) { + $opts{fhpid}->[0]->print($line . "\n"); + my $text = $opts{fhpid}->[1]->getline; + buffer($text); + } else { my ($cin, @copts, $in, $out); if ($cmd eq 'fallocate') { @copts = ("-n", "-l", unescape($attrs), $ufile); } elsif ($cmd eq 'setstripe') { $cmd = "lfs"; - my ($count, $size, $pool) = split(/\s+/, unescape($attrs)); + my ($count, $size, $pool) = split(/\s+/, $attrs); $count = 0 if (!$count); $size = 0 if (!$size); @copts = ("setstripe", "-c", $count, "-S", $size, $ufile); @@ -203,9 +290,9 @@ sub chattr { my $text; $text .= $_ while (<$out>); $text =~ s/\n/ /g; - print "$file,", escape($text), "\n"; + buffer("$file,", escape($text) . "\n"); } else { - print "$file,ok\n"; + buffer("$file,ok\n"); } close $out; } @@ -226,310 +313,392 @@ sub escape { #### find #### ############## # output list of files/dirs beneath paths given on STDIN with stat info +my ($findq, $findb4, $findn4, $findq4); sub find { + my @threads; + if (!$opts{'extract-tar'}) { + # check for existence of various commands + foreach my $bin (qw(dmget lfs getfacl getfattr shift-bin)) { + $opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH})); + } + + if ($opts{recall} && $opts{have}->{dmget}) { + # set up tmp file for recalls in case threads need to access + ($opts{dmfh}, $opts{dmtmp}) = tempfile(); + } + + if ($perl{threads} && $opts{threads} > 1) { + $findq = Thread::Queue->new; + # number of worker "b's" processing files + $findb4 = Thread::Semaphore->new(0); + # number of items that have been added to queue + $findn4 = Thread::Semaphore->new(1); + # number of unprocessed items on queue + $findq4 = Thread::Semaphore->new(0); + # mutual exclusion for stdout + $buf4 = Thread::Semaphore->new(1); + @threads = map {threads->create(sub { + $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") + if ($opts{have}->{'shift-bin'} && ($opts{preserve} == 1 || + $opts{preserve} =~ /acl|stripe|xattr/)); + while (1) { + my $path = $findq->dequeue_timed(1); + if (defined $path) { + $findb4->up; + $findq4->down; + find1($path); + $findb4->down; + } elsif (!$$findq4 && !$$findb4) { + buffer(); + last; + } + } + open3_wait($opts{fhpid}) if (defined $opts{fhpid}); + })} (1 .. $opts{threads}); + } else { + # file count used in both single/multi-threaded cases + $findn4 = \0; + $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") + if ($opts{have}->{'shift-bin'} && ($opts{preserve} == 1 || + $opts{preserve} =~ /acl|stripe|xattr/)); + } + } + while (my $line = ) { chomp $line; $opts{$_} = undef foreach (qw(srcfs tar_name tar_tell)); my @args = split(/\s+/, $line); my $ref = pop @args; - print "ref $ref\n"; while (scalar(@args) > 3) { my $opt = pop @args; $opts{$1} = $2 if ($opt =~ /(\w+)=(\S+)/); } my @uargs = map {unescape($_)} @args; if ($opts{'extract-tar'}) { + print "ref $ref\n"; # process local tar files find_tar(@uargs); } else { - find1(@uargs); + buffer("ref $ref\n"); + my ($shost, $spath, $dst) = @uargs; + my $sdir = dirname($spath); + $sdir = "" if ($sdir eq '/'); + my $ddir = dirname($dst); + $ddir = "" if ($ddir eq '/'); + my $path = [[basename($spath), basename($dst), $ref], $shost, $sdir, + $dst, $ddir, $opts{srcfs}]; + if ($opts{'create-tar'}) { + my $tdir = dirname(unescape($opts{tar_name})); + $tdir = "" if ($tdir eq '.'); + $tdir .= "/" if ($tdir && $tdir !~ /\/$/); + push(@{$path}, $tdir); + } + if ($perl{threads} && $opts{threads} > 1) { + $findq4->up; + $findn4->up; + $findq->enqueue($path); + } else { + find1($path); + } } } - open3_wait($opts{fhpid}) if (defined $opts{fhpid}); + return if ($opts{'extract-tar'}); - if ($opts{dmtmp}) { + buffer(); + if ($perl{threads} && $opts{threads} > 1) { + foreach (@threads) { + $_->join if ($_); + } + } else { + open3_wait($opts{fhpid}) if (defined $opts{fhpid}); + } + + if ($opts{recall} && $opts{have}->{dmget}) { close $opts{dmfh}; - # fork to avoid intermittent hangs of dmget - my $pid = fork_setsid(); - if ($pid) { - waitpid($pid, 0); - delete $opts{dmfh}; - delete $opts{dmtmp}; + if (-s $opts{dmtmp}) { + # fork to avoid intermittent hangs of dmget + my $pid = fork_setsid(); + if ($pid) { + waitpid($pid, 0); + delete $opts{dmfh}; + delete $opts{dmtmp}; + } else { + my $extra = $opts{'create-tar'} ? " -a" : ""; + # ignore errors since files are automatically retrieved anyway + open3_get([$opts{dmtmp}, -1, -1], "dmget -nq$extra"); + unlink $opts{dmtmp}; + POSIX::_exit(0); + } } else { - my $extra = $opts{'create-tar'} ? " -a" : ""; - # ignore errors since files will be automatically retrieved anyway - open3_get([$opts{dmtmp}, -1, -1], "dmget -nq$extra"); unlink $opts{dmtmp}; - POSIX::_exit(0); } } } - + ############### #### find1 #### ############### # output list of files/dirs beneath given paths with stat info sub find1 { - my ($shost, $spath, $dst) = @_; - if (!defined $opts{have}) { - # check for existence of various commands - $opts{have} = {}; - foreach my $bin (qw(dmget lfs getfacl getfattr shift-bin)) { - $opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH})); + my $path = shift; + my ($file0, $shost, $sdir, $dst, $ddir, $srcfs, $tdir) = @{$path}; + my $dfile0 = $file0; + my ($top, $ref); + if (ref $file0) { + $top = 1; + ($file0, $dfile0, $ref) = @{$file0}; + } + return if ($file0 eq '.' || $file0 eq '..'); + my $file = "$sdir/$file0"; + + my $dmf = $opts{recall} && $opts{have}->{dmget} && $srcfs =~ /,dmf/ ? 1 : 0; + + # dereference before stat + $file = abs_path($file) if ($opts{dereference}); + # always get stat info of real file + my @stat = lstat($file); + my $mode; + if (scalar(@stat) == 0) { + $file = "$sdir/$file0" if ($opts{dereference}); + if ($top) { + # escape commas + $file =~ s/(,)/sprintf("%%%02X", ord($1))/eg; + # return error if original file + buffer("$ref:Cannot stat $file\n"); + return; } - $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") - if ($opts{have}->{'shift-bin'}); + # lower level files cannot return errors because there is no way + # to back out of previously added operations, so instead a find + # op is added, which will succeed/fail on its own when processed + } else { + $mode = $stat[2]; + $stat[2] &= 07777; + + # only directories, regular files, and symlinks are supported + return if (!S_ISDIR($mode) && !S_ISREG($mode) && !S_ISLNK($mode)); + # dmf handling for individual files is carried out by transport_dmf + $dmf = 0 if ($top && !S_ISDIR($mode)); } - my $dmf = $opts{have}->{dmget} && $opts{srcfs} =~ /,dmf/ ? 1 : 0; - - my $sdir = dirname($spath); - $sdir = "" if ($sdir eq '/'); - my $ddir = dirname($dst); - $ddir = "" if ($ddir eq '/'); - my $tdir = $opts{'create-tar'} ? dirname($opts{tar_name}) : undef; - $tdir = "" if ($tdir eq '.'); - $tdir .= "/" if ($tdir && $tdir !~ /\/$/); - my $dname = basename($dst); - my @files = (basename($spath)); - FILE: foreach my $file0 (@files) { - if (ref $file0) { - # processing subdir so update prefix - $sdir = $file0->[0]; - $ddir = $file0->[1]; - $tdir = $file0->[2] . "/" if ($opts{'create-tar'}); - # dname only needed on first iteration where src/dst name may differ - $dname = undef; - next; + # exclude files (must be before dir processing) + if (defined $opts{exclude}) { + foreach my $re (@{$opts{exclude}}) { + my $ure = unescape($re); + return if (eval {$file =~ /$ure/}); } - next if ($file0 eq '.' || $file0 eq '..'); - my $file = "$sdir/$file0"; - my $dfile0 = $dname ? $dname : $file0; + } - # dereference before stat - $file = abs_path($file) if ($opts{dereference}); - # always get stat info of real file - my @stat = lstat($file); - my $mode; - if (scalar(@stat) == 0) { - $file = "$sdir/$file0" if ($opts{dereference}); - if (scalar(@files) == 1) { + my $dh; + if (scalar(@stat) == 0 || S_ISDIR($mode)) { + # ensure $err defined unless explicitly set to undef + my $err = ""; + if (scalar(@stat) > 0 && (!$opts{dereference} || $top) && + (!defined $opts{'find-files'} || + $$findn4 < $opts{'find-files'})) { + # add subdirs of this directory for processing when below limit + if (opendir($dh, $file)) { + $err = undef; + # directory will be processed after parent dir printed + } else { + $err = "Error opening directory $file\n"; + } + if ($err && $top) { # escape commas - $file =~ s/(,)/sprintf("%%%02X", ord($1))/eg; + $err =~ s/(,)/sprintf("%%%02X", ord($1))/eg; # return error if original file - print "Cannot stat $file\n"; - next; - } - # lower level files cannot return errors because there is no way - # to back out of previously added operations, so instead a find - # op is added, which will succeed/fail on its own when processed - } else { - $mode = $stat[2]; - $stat[2] &= 07777; - - # only directories, regular files, and symlinks are supported - next if (!S_ISDIR($mode) && !S_ISREG($mode) && !S_ISLNK($mode)); - # dmf handling for individual files is carried out by transport_dmf - $dmf = 0 if (scalar(@files) == 1 && !S_ISDIR($mode)); - } - - # exclude files (must be before dir processing) - if (defined $opts{exclude}) { - foreach my $re (@{$opts{exclude}}) { - my $ure = unescape($re); - next FILE if (eval {$file =~ /$ure/}); + buffer("$ref:$err"); + return; } } - - if (scalar(@stat) == 0 || S_ISDIR($mode)) { - my $err = ""; - if (scalar(@stat) > 0 && (!$opts{dereference} || scalar(@files) == 1) && - (!defined $opts{'find-files'} || - scalar(@files) < $opts{'find-files'})) { - # add subdirs of this directory for processing when below limit - if (opendir(DIR, $file)) { - $! = undef; - my @sub_files = readdir DIR; - if ($! || scalar(@sub_files) == 0) { - # there is currently no good way to detect readdir errors - # dirs should always contain . and .. at a minimum - $err = "Error reading directory $file"; - } else { - my $dirs = [$file, "$ddir/$dfile0"]; - push(@{$dirs}, "$tdir$file0") if ($opts{'create-tar'}); - push(@files, $dirs, @sub_files); - $err = undef; - } - closedir DIR; - } else { - $err = "Error opening directory $file"; - } - if ($err && scalar(@files) == 1) { - # escape commas - $err =~ s/(,)/sprintf("%%%02X", ord($1))/eg; - # return error if original file - print $err, "\n"; - next; - } - } - if (defined $err) { - # this handles directories as well as lower level stat failures - print "args=find,", escape("$shost:$file"), ","; - if ($opts{'create-tar'}) { - print escape($dst), " tar_name=" . escape("$tdir$file0"); - } else { - print escape("$ddir/$dfile0"); - } - print "\n"; - next; - } + if (defined $err) { + # this handles directories as well as lower level stat failures + my $line = "args=find," . escape("$shost:$file") . ","; + $line .= $opts{'create-tar'} ? escape($dst) . " tar_name=" . + escape("$tdir$file0") : escape("$ddir/$dfile0"); + buffer($line . "\n"); + return; } + } - # include files (must be after dir processing) - if (defined $opts{include}) { - my $found; - foreach my $re (@{$opts{include}}) { - my $ure = unescape($re); - next if (eval {$file !~ /$ure/}); - $found = 1; - last; - } - next if (!$found); + # include files + if (defined $opts{include}) { + my $found; + foreach my $re (@{$opts{include}}) { + my $ure = unescape($re); + next if (eval {$file !~ /$ure/}); + $found = 1; + last; } + # must be done both here for files and after dir processing + goto FIND_DIR if (!$found); + } - # newer/older files (must be after dir processing) - my %ti = ('a' => 8, 'm' => 9, 'c' => 10); - if (defined $opts{newer}) { - if ($opts{newer} =~ /^([^:]+):(\S+)/) { - my ($type, $time) = ($1, $2); - $type =~ s/\|/1||/g; - $type =~ s/([amc])/$stat[$ti{$1}]>=$time&&/g; - $type =~ s/([AMC])/$stat[$ti{lc($1)}]<$time&&/g; - $type .= "1"; - next if (!eval $type); - } elsif ($stat[9] < $opts{newer}) { - next; - } + # newer/older files (must be after dir processing) + my %ti = ('a' => 8, 'm' => 9, 'c' => 10); + if (defined $opts{newer}) { + if ($opts{newer} =~ /^([^:]+):(\S+)/) { + my ($type, $time) = ($1, $2); + $type =~ s/\|/1||/g; + $type =~ s/([amc])/$stat[$ti{$1}]>=$time&&/g; + $type =~ s/([AMC])/$stat[$ti{lc($1)}]<$time&&/g; + $type .= "1"; + # must be done both here for files and after dir processing + goto FIND_DIR if (!eval $type); + } elsif ($stat[9] < $opts{newer}) { + # must be done both here for files and after dir processing + goto FIND_DIR; } - if (defined $opts{older}) { - if ($opts{older} =~ /^([^:]+):(\S+)/) { - my ($type, $time) = ($1, $2); - $type =~ s/\|/1||/g; - $type =~ s/([amc])/$stat[$ti{$1}]<$time&&/g; - $type =~ s/([AMC])/$stat[$ti{lc($1)}]>=$time&&/g; - $type .= "1"; - next if (!eval $type); - } elsif ($stat[9] >= $opts{older}) { - next; - } + } + if (defined $opts{older}) { + if ($opts{older} =~ /^([^:]+):(\S+)/) { + my ($type, $time) = ($1, $2); + $type =~ s/\|/1||/g; + $type =~ s/([amc])/$stat[$ti{$1}]<$time&&/g; + $type =~ s/([AMC])/$stat[$ti{lc($1)}]>=$time&&/g; + $type .= "1"; + # must be done both here for files and after dir processing + goto FIND_DIR if (!eval $type); + } elsif ($stat[9] >= $opts{older}) { + # must be done both here for files and after dir processing + goto FIND_DIR; } + } - # dereference before stat - # resolve uid/gid if possible - my $user = getpwuid($stat[4]); - my $group = getgrgid($stat[5]); + # resolve uid/gid if possible + my $user = $opts{findu}->{$stat[4]}; + if (!defined $user) { + $user = getpwuid($stat[4]); $user = "uid_$stat[4]" if (!$user); + $opts{findu}->{$stat[4]} = $user; + } + my $group = $opts{findg}->{$stat[5]}; + if (!defined $group) { + $group = getgrgid($stat[5]); $group = "gid_$stat[5]" if (!$group); - my $attrs = join(",", @stat[2,4,5,8,9], - escape($user), escape($group), $stat[7], 512 * $stat[12]); + $opts{findg}->{$stat[5]} = $group; + } + my $attrs = join(",", @stat[2,4,5,8,9], + escape($user), escape($group), $stat[7], 512 * $stat[12]); - my @acls; - my @lattrs; - my @xattrs; - # try to get acls - if (($opts{have}->{'shift-bin'} || $opts{have}->{getfacl}) && - !$opts{'create-tar'} && - ($opts{preserve} == 1 || $opts{preserve} =~ /acl/) && - (!$opts{srcfs} || $opts{srcfs} =~ /,acl/)) { - if (defined $opts{fhpid}) { - $opts{fhpid}->[0]->print("getfacl $file\n"); - my $text = $opts{fhpid}->[1]->getline; - my @cols = split(/\s+/, $text); - push(@acls, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); - } else { - my $fhpid = open3_run([-1, undef, -1], - "getfacl", "-cps", "--", $file); - while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { - chomp; - next if (!$_); - push(@acls, escape($_)); - } - open3_wait($fhpid); - } - } - - # try to get xattrs - if (($opts{have}->{'shift-bin'} || $opts{have}->{getfattr}) && - !$opts{'create-tar'} && - ($opts{preserve} == 1 || $opts{preserve} =~ /xattr/) && - (!$opts{srcfs} || $opts{srcfs} =~ /,xattr/)) { - if (defined $opts{fhpid}) { - $opts{fhpid}->[0]->print("getfattr $file\n"); - my $text = $opts{fhpid}->[1]->getline; - my @cols = split(/\s+/, $text); - push(@xattrs, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); - } else { - my $fhpid = open3_run([-1, undef, -1], - "getfattr", "-dhe", "base64", $file); - while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { - chomp; - next if (!$_ || /^\s*#/); - push(@xattrs, escape(decode_base64($_))); - } - open3_wait($fhpid); - } - } - - # try to get lustre striping - if (($opts{have}->{'shift-bin'} || $opts{have}->{lfs}) && - !S_ISLNK($mode) && !$opts{'create-tar'} && - ($opts{preserve} == 1 || $opts{preserve} =~ /stripe/) && - $opts{srcfs} =~ /^lustre/) { - # ignore symlinks as link to fifo can hang forever - if (defined $opts{fhpid}) { - $opts{fhpid}->[0]->print("getstripe $file\n"); - my $text = $opts{fhpid}->[1]->getline; - my @cols = split(/\s+/, $text); - @lattrs = split(/,/, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); - } else { - open(FILE, '-|', "lfs", "getstripe", "-d", $file); - while () { - $lattrs[0] = $1 if (/stripe_count:\s*(-?\d+)/); - $lattrs[1] = $1 if (/stripe_size:\s*(-?\d+)/); - } - close FILE; - } - } - $lattrs[0] = 0 if (!defined $lattrs[0] && defined $lattrs[1]); - $lattrs[1] = 0 if (!defined $lattrs[1] && defined $lattrs[0]); - - # begin log entry - my $index_len = !$opts{'index-tar'} ? 0 : 28 + length("$tdir$file0") + - length(sprintf("%7s%7s%9d", $user, $group, $stat[7])); - if (S_ISLNK($mode)) { - my $ln = readlink($file); - print "args=ln,", escape($ln); - $index_len += 4 + length($ln); - } elsif (S_ISDIR($mode)) { - print "args=mkdir"; - } elsif ($opts{sync}) { - print "args=ckattr", $opts{'ignore-times'} ? "0" : "", - ",", escape("$shost:$file"); + my @acls; + my @lattrs; + my @xattrs; + # try to get acls + if (($opts{have}->{'shift-bin'} || $opts{have}->{getfacl}) && + !$opts{'create-tar'} && + ($opts{preserve} == 1 || $opts{preserve} =~ /acl/) && + (!$srcfs || $srcfs =~ /,acl/)) { + if (defined $opts{fhpid}) { + $opts{fhpid}->[0]->print("getfacl $file\n"); + my $text = $opts{fhpid}->[1]->getline; + my @cols = split(/\s+/, $text); + push(@acls, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); } else { - print "args=cp,", escape("$shost:$file"); + my $fhpid = open3_run([-1, undef, -1], + "getfacl", "-cps", "--", $file); + while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { + chomp; + next if (!$_); + push(@acls, escape($_)); + } + open3_wait($fhpid); } - print ",", escape($opts{'create-tar'} ? $dst : "$ddir/$dfile0"); - print " acls=" . join(",", @acls) if (scalar(@acls) > 0); - print " xattrs=" . join(",", @xattrs) if (scalar(@xattrs) > 0); - print " lustre_attrs=" . join(",", @lattrs) if (scalar(@lattrs) > 0); - print " tar_index=$index_len" if ($opts{'index-tar'}); - print " tar_name=" . escape("$tdir$file0") if ($opts{'create-tar'}); - print " size=$stat[7] attrs=$attrs\n"; - if ($dmf && !S_ISLNK($mode) && !S_ISDIR($mode)) { - ($opts{dmfh}, $opts{dmtmp}) = tempfile() if (!$opts{dmtmp}); - print {$opts{dmfh}} $file, "\n"; + } + + # try to get xattrs + if (($opts{have}->{'shift-bin'} || $opts{have}->{getfattr}) && + !$opts{'create-tar'} && + ($opts{preserve} == 1 || $opts{preserve} =~ /xattr/) && + (!$srcfs || $srcfs =~ /,xattr/)) { + if (defined $opts{fhpid}) { + $opts{fhpid}->[0]->print("getfattr $file\n"); + my $text = $opts{fhpid}->[1]->getline; + my @cols = split(/\s+/, $text); + push(@xattrs, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); + } else { + my $fhpid = open3_run([-1, undef, -1], + "getfattr", "-dhe", "base64", $file); + while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { + chomp; + next if (!$_ || /^\s*#/); + push(@xattrs, escape(decode_base64($_))); + } + open3_wait($fhpid); } } + + # try to get lustre striping + if (($opts{have}->{'shift-bin'} || $opts{have}->{lfs}) && + !S_ISLNK($mode) && !$opts{'create-tar'} && + ($opts{preserve} == 1 || $opts{preserve} =~ /stripe/) && + $srcfs =~ /^lustre/) { + # ignore symlinks as link to fifo can hang forever + if (defined $opts{fhpid}) { + $opts{fhpid}->[0]->print("getstripe $file\n"); + my $text = $opts{fhpid}->[1]->getline; + my @cols = split(/\s+/, $text); + @lattrs = split(/,/, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); + } else { + my $fhpid = open3_run([-1, undef, -1], + "lfs", "getstripe", "-d", $file); + while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { + $lattrs[0] = $1 if (/stripe_count:\s*(-?\d+)/); + $lattrs[1] = $1 if (/stripe_size:\s*(-?\d+)/); + } + open3_wait($fhpid); + } + } + $lattrs[0] = 0 if (!defined $lattrs[0] && defined $lattrs[1]); + $lattrs[1] = 0 if (!defined $lattrs[1] && defined $lattrs[0]); + + # begin log entry + my $line; + my $index_len = !$opts{'index-tar'} ? 0 : 28 + length("$tdir$file0") + + length(sprintf("%7s%7s%9d", $user, $group, $stat[7])); + if (S_ISLNK($mode)) { + my $ln = readlink($file); + $line .= "args=ln," . escape($ln); + $index_len += 4 + length($ln); + } elsif (S_ISDIR($mode)) { + $line .= "args=mkdir"; + } elsif ($opts{sync}) { + $line .= "args=ckattr" . ($opts{'ignore-times'} ? "0" : "") . + "," . escape("$shost:$file"); + } else { + $line .= "args=cp," . escape("$shost:$file"); + } + $line .= "," . (escape($opts{'create-tar'} ? $dst : "$ddir/$dfile0")); + $line .= " acls=" . join(",", @acls) if (scalar(@acls) > 0); + $line .= " xattrs=" . join(",", @xattrs) if (scalar(@xattrs) > 0); + $line .= " lustre_attrs=" . join(",", @lattrs) if (scalar(@lattrs) > 0); + $line .= " tar_index=$index_len" if ($opts{'index-tar'}); + $line .= " tar_name=" . escape("$tdir/$file0") if ($opts{'create-tar'}); + $line .= " size=$stat[7] attrs=$attrs\n"; + buffer($line, !S_ISLNK($mode) && !S_ISDIR($mode) && + $dmf ? $file . "\n" : undef); + + FIND_DIR: if (defined $dh) { + # flush buffer to ensure parent dir printed before subdirs + buffer(); + while (readdir $dh) { + my $path = [$_, $shost, $file, $dst, "$ddir/$dfile0", $srcfs]; + push(@{$path}, "$tdir/$file0") if ($opts{'create-tar'}); + if ($perl{threads} && $opts{threads} > 1 && + #$findq->pending < $opts{'queue-size'}) { + #$findq->pending < $opts{'threads'}) { +#TODO: determine what size should go here + $$findq4 < 4 * $opts{threads}) { + # only add to queue if not already at capacity + $findq4->up; + $findn4->up; + $findq->enqueue($path); + } else { + # process now if single threaded or queue at capacity + find1($path); + } + } + closedir $dh; + } } ################## @@ -791,6 +960,7 @@ sub fish { } #TODO: de-hardcode 60 second timeout $sock->sockopt(SO_RCVTIMEO, pack('L!L!', +60, 0)); + $sock->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1); $key = "" . rand(); my $scert; if ($opts{secure}) { @@ -878,7 +1048,9 @@ sub fish { } close $tsock; })} (1 .. $opts{streams}); - $_->join foreach (@threads); + foreach (@threads) { + $_->join if ($_); + } unlink $cert if ($cert); close $sock; } @@ -1285,8 +1457,10 @@ sub sum { } })} (1 .. $nthr); # force threads to exit - $q->enqueue(undef) foreach (1 .. $nthr); - $_->join foreach (@threads); + $q->enqueue(undef) foreach (@threads); + foreach (@threads) { + $_->join if ($_); + } # append any error messages back to original ref text while (defined (my $sumret = $qret->dequeue_nb)) { my ($qi, $i, $hash) = @{$sumret}; diff --git a/perl/shift-mgr b/perl/shift-mgr index 7afff7d..73ab81b 100755 --- a/perl/shift-mgr +++ b/perl/shift-mgr @@ -53,7 +53,7 @@ use Fcntl qw(:DEFAULT :flock :mode); use File::Basename; use File::Path; use File::Spec; -use Getopt::Long qw(:config bundling no_ignore_case require_order); +use Getopt::Long qw(:config bundling no_auto_abbrev no_ignore_case require_order); use IO::File; use IO::Handle; # use embedded IPC::Open3 since versions prior to perl 5.14.0 are buggy @@ -71,7 +71,7 @@ use Text::ParseWords; require Tie::DB_FileLock; require Text::FormatTable; -our $VERSION = 0.97; +our $VERSION = 0.98; $Data::Dumper::Pair = " = "; $Data::Dumper::Sortkeys = 1; @@ -121,6 +121,7 @@ my %conf = ( default_files => "1k", 'default_find-files' => "2k", default_hosts => 1, + default_interval => 30, default_local => "shift,fish,fish-tcp", default_preallocate => 0, default_remote => "shift", @@ -557,10 +558,12 @@ if ($opts{alive}) { # move all running operations out of doing_* back into queues foreach my $file (glob "$opts{base}/doing_*") { + next if ($file =~ /\.gzi$/); # untaint file $file = $1 if ($file =~ /^(.*)$/); my $fdoing = get_doing($file); while (my ($key, $line) = each %{$fdoing}) { + $line =~ s/\s*\r?\n$//; # put operation in do/tree my %op = split(/[= ]+/, $line); my @args = split(/,/, $op{args}); @@ -578,6 +581,8 @@ if ($opts{alive}) { log_print($file, $gzs, escape(nfreeze({})) . "\n"); } log_close($_, $gzs) foreach (keys %{$gzs}); + # transfer may have finished after --restart=ignore + $meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run()); } elsif ($opts{stop}) { $meta{stop} = 1; $meta{time1} = $time; @@ -611,11 +616,16 @@ if ($opts{get} || defined $opts{put}) { my $gzs = {}; log_print($opts{doing}, $gzs, escape(nfreeze($doing)) . "\n"); log_close($opts{doing}, $gzs); + + # update running time average for manager get/put invocations + $meta{rc_mgr}++; + $meta{ra_mgr} *= ($meta{rc_mgr} - 1) / $meta{rc_mgr}; + $meta{ra_mgr} += (time - $time) / $meta{rc_mgr}; } # update log sizes foreach my $file (glob "$opts{base}/*") { - next if ($file =~ /\/(?:find|lock)$/); + next if ($file =~ /\/(?:find|lock|\S+\.gzi)$/); my $log = $file; $log =~ s/.*\///; $meta{"$log\_size"} = (stat $file)[7]; @@ -745,7 +755,17 @@ sub debug_print { # open user-specific debug file if not already open open($dbgfh, '>>', "$conf{user_dir}/$opts{user}.debug") if (!$dbgfh); print $dbgfh "$localtime $opts{host} $opts{id}$opts{cid} $type "; + if ($type eq 'DBG') { + # print stack trace when DBG lines added in code + my $i = 1; + my @stack; + while ((my @cd = (caller($i++)))) { + unshift(@stack, "$cd[3]:$cd[2]"); + } + print $dbgfh "(", join(" -> ", @stack), ") "; + } print $dbgfh $_ foreach (@_); + $dbgfh->flush; } if ($type eq 'GET') { print $_ foreach (@_); @@ -945,7 +965,10 @@ sub doing { %meta = %{get_meta("$dir/$opts{user}.$opts{id}/meta")}; $opts{host} = "*" if (!$opts{host}); foreach my $file (glob("$dir/$opts{user}.$opts{id}/doing_$opts{host}")) { - my $doing = get_doing($file); + next if ($file =~ /\.gzi$/); + # untaint file + $file = $1 if ($file =~ /^(.*)$/); + my $doing = get_doing($file, $opts{doing}); $file =~ s/.*\///; print "$file = ", Dumper($doing); } @@ -1222,11 +1245,15 @@ sub get { # keep copy of original doing so changes don't affect its own processing my $doing0 = dclone($doing); - my $ndoing = "0." . $meta{"$opts{doing}_size"}; + # reverse size so won't end in 0 (so .N0 not reduced to .N after ++) + my $ndoing = "0." . reverse($meta{"$opts{doing}_size"}); my $gzs = {}; - my ($ldoing, $kdoing, $size, $files, $ops, $all, $skip); + my ($ldoing, $kdoing, $size, $files, $ops, $all, $secs, $skip); my (%diskfs, %localfs, %rtthost); my %cli_load = split(/[= ]+/, $meta{"load_$opts{host}$opts{cid}"}); + my $ncli = grep(/^doing_/, keys %meta); + # adjust interval so average waiting clients is no more than 1 + $opts{interval} = max($meta{interval}, $ncli * $meta{ra_mgr}); LOG: foreach my $log (@logs) { # process dir attrs last by themselves last if ($log eq 'rtree' && (!$meta{last} || $meta{t_run} || $ops || @@ -1245,7 +1272,8 @@ sub get { $gzs->{$log}->seek($meta{$log}, $whence); } - while ($size < $meta{size} && $all < $meta{files} && + while (($size < $meta{size} && $all < $meta{files} || + $secs < $opts{interval}) && ($log =~ /^doing_/ && (($kdoing, $line) = each %{$ldoing}) || $log eq 'rtree' && defined($line = last_line($gzs->{rtree})) || $log !~ /^(?:doing_|rtree)/ && @@ -1296,7 +1324,7 @@ sub get { if ($args[$i] !~ /^\//) { # do not map remote tar dst to prevent size/split corruption next if ($i == 1 && $cmd eq 'find' && $meta{'create-tar'}); - # allow map_remote to avoid last host's caching + # help map_remote avoid last host's caching if ($cmd eq 'cksum' && $op{cache_rclient}) { $ref->{last} = $op{cache_rclient}; } elsif ($op{cache_client}) { @@ -1450,6 +1478,13 @@ sub get { $size += $op{size} if ($cmd =~ /^(?:cp|sum|cksum)/); $ops++ if ($log ne 'rtree'); $all++; + if ($meta{"ra_$cmd"} > 0) { + $secs += ($cmd =~ /^(?:cp|sum|cksum)/ ? + $op{size} : 1) / $meta{"ra_$cmd"}; + } else { + # default to files/size setting until rates available + $secs += $opts{interval}; + } $meta{s_run} += $op{size} if ($op{size}); $meta{t_run}++; $meta{w_run}-- if ($op{state} eq 'warn'); @@ -1705,15 +1740,21 @@ sub get { ################### #### get_doing #### ################### -# return last list of operations in progress on given host +# return past list of operations in progress on given host sub get_doing { my $arg = shift; + my $past = shift; + $past = 1 if (!defined $past); + # untaint file + $arg = $1 if ($arg =~ /^(.*)$/); return {} if (! -e $arg); my $gz = Compress::BGZF::Reader->new_filehandle($arg); return {} if (!$gz); $gz->seek(-1, 2); - my $line = last_line($gz); - close $gz; + my $line; + $line = last_line($gz) while ($past-- > 0); + my $log = basename($arg); + log_close($log, {$log => $gz}); return thaw(unescape($line)) if ($line); return {}; } @@ -1755,7 +1796,7 @@ sub get_meta { if ($meta && defined $mtell && $mtell > 0) { # metadata corrupted so revert to last known good state foreach my $file (glob "$opts{base}/*") { - next if ($file =~ /\/(?:find|lock|meta)$/); + next if ($file =~ /\/(?:find|lock|meta|\S+\.gzi)$/); my $log = $file; $log =~ s/.*\///; my $size = defined $meta->{"$log\_size"} ? $meta->{"$log\_size"} : 0; @@ -1763,6 +1804,8 @@ sub get_meta { $file = $1 if ($file =~ /^(.*)$/); # truncate all logs to last known good size truncate($file, $size); + # remove associated index file + unlink("$file.gzi"); } # rebuild find db since it may contain reverted operations build_find() if ($meta{dereference} && !$meta{'extract-tar'}); @@ -1789,8 +1832,11 @@ sub history { my @metas; my $dir = $> != 0 ? $conf{user_dir} : $opts{user_dir}; my $user = $> != 0 ? $opts{user} : "*"; + my $idglob = defined $opts{id} ? $opts{id} : "[0-9]*"; do { - push(@metas, glob "$dir/$user.[0-9]*/meta"); + push(@metas, glob "$dir/$user.$idglob/meta"); + # glob still returns pattern when no wildcards and no files match + pop(@metas) if (defined $opts{id} && ! -f $metas[-1]); $dir .= "/*.more"; } while (scalar(glob $dir)); foreach my $file (sort {$> != 0 ? @@ -1886,6 +1932,7 @@ sub id_status { my ($host, $ldoing, $kdoing); my $gzs = {}; if ($state0 eq 'run') { + next if ($log =~ /\.gzi$/); $host = $log; $host =~ s/^doing_//; $ldoing = get_doing("$opts{base}/$log"); @@ -1983,7 +2030,7 @@ sub init_id { } # initialize options with default values - foreach (qw(clients cpu hosts io ior iow net netr netw ports retry + foreach (qw(clients cpu hosts interval io ior iow net netr netw ports retry stripe threads)) { $meta{$_} = $conf{"default_$_"} if (!defined $meta{$_} && $conf{"default_$_"}); @@ -2040,11 +2087,11 @@ sub last_line { my $tell = $tell0; my ($buf, $line, $len, $pos); do { - $tell = $tell0 - 1024; + $tell = $tell0 - 4194304; $tell = 0 if ($tell < 0); # seek to earlier position in file $fh->seek($tell, 0); - my $len = 1024; + my $len = 4194304; $len = $tell0 - $tell if ($len > $tell0); # read up to initial location or that of last round $fh->read($line, $len); @@ -2088,6 +2135,16 @@ sub log_close { # find/ln/mkdir are stored in cp $log = "cp" if ($log =~ /^(?:find|ln|mkdir)$/); if (defined $gzs->{$log}) { + my $bgr = tied *{$gzs->{$log}}; + if (ref $bgr eq 'Compress::BGZF::Reader' && + defined $bgr->{idx}->[-1]) { + my ($coff, $uoff) = @{$bgr->{idx}->[-1]}; + # rtree is just tree in reverse + my $f = $log eq 'rtree' ? 'tree' : $log; + $f = "$opts{base}/$f.gzi"; + local $SIG{__WARN__} = sub {}; + $bgr->write_index($f) if ($uoff && -s $f < int($uoff / 4096)); + } close $gzs->{$log}; delete $gzs->{$log}; } @@ -2129,6 +2186,7 @@ sub log_print { eval { if (!defined $line) { log_close($log, $gzs); + unlink("$opts{base}/$log.gzi"); # force new file my $fh = Compress::BGZF::Writer->new_filehandle("$opts{base}/$log"); close $fh; @@ -2248,59 +2306,61 @@ sub map_remote { my $mnt1 = fs_mount($rhost, $rpath); # return original if no mount found return $path1 if (!$mnt1); - my $mnt2; - # find hosts that mount file system based on global/user db - my $key = "mounth_" . $mnt1->{remote}; - my %hosts = map {$_->{$key} ? %{$_->{$key}} : ()} - (\%meta, \%mounts, \%umounts); - my %shells = map {$_->{shells} ? %{$_->{shells}} : ()} - (\%meta, \%mounts, \%umounts); - # determine potential hosts - foreach my $host (keys %hosts) { - delete $hosts{$host} - if (!map_fs_mount($mnt1, $host, $ref->{rw}) || !$shells{$host}); - } + my $pick = $opts{"pick_$mnt1->{remote}"}; + if (!defined $pick) { + # find hosts that mount file system based on global/user db + my $key = "mounth_" . $mnt1->{remote}; + my %hosts = map {$_->{$key} ? %{$_->{$key}} : ()} + (\%meta, \%mounts, \%umounts); + my %shells = map {$_->{shells} ? %{$_->{shells}} : ()} + (\%meta, \%mounts, \%umounts); + # determine potential hosts + foreach my $host (keys %hosts) { + delete $hosts{$host} + if (!map_fs_mount($mnt1, $host, $ref->{rw}) || !$shells{$host}); + } - # prune last remote host to avoid checksum caching effects - delete $hosts{$ref->{last}} - if ($ref->{last} && scalar(keys %hosts) > 1); + # prune last remote host to avoid checksum caching effects + delete $hosts{$ref->{last}} + if ($ref->{last} && scalar(keys %hosts) > 1); - # prune potential hosts based on number currently assigned - my $min = 1E9; - my %min_hosts; - foreach my $host (keys %hosts) { - my $npicks = scalar(keys %{$meta{"picks_$host"}}); - # don't count previous selection for this host - $npicks-- if ($meta{"picks_$host"}->{$lhost}); - next if ($npicks > $min); - $min = $npicks; - $min_hosts{$npicks} = [] if (!defined $min_hosts{$npicks}); - push(@{$min_hosts{$npicks}}, $host); - } - my %picks = map {$_ => 1} @{$min_hosts{$min}}; + # prune potential hosts based on number currently assigned + my $min = 1E9; + my %min_hosts; + foreach my $host (keys %hosts) { + my $npicks = scalar(keys %{$meta{"picks_$host"}}); + # don't count previous selection for this host + $npicks-- if ($meta{"picks_$host"}->{$lhost}); + next if ($npicks > $min); + $min = $npicks; + $min_hosts{$npicks} = [] if (!defined $min_hosts{$npicks}); + push(@{$min_hosts{$npicks}}, $host); + } + my %picks = map {$_ => 1} @{$min_hosts{$min}}; - my $pick; - if (defined $conf{select_hook}) { - # select host using configured selection hook - $pick = open3_get([-1, undef, -1], - "$conf{select_hook} $lhost $rhost " . join(",", keys %picks)); + if (defined $conf{select_hook}) { + # select host using configured selection hook + $pick = open3_get([-1, undef, -1], + "$conf{select_hook} $lhost $rhost " . join(",", keys %picks)); + } + # revert to default selection policy when no selection + $pick = default_select($rhost, keys %picks) if (!$pick); + $pick =~ s/\s*\r?\n$//; + if (!$pick) { + # store original mount info + $ref->{$_} = $mnt1->{$_} foreach (keys %{$mnt1}); + # return original path if can't find suitable mount + return $path1 + } + # clear previously picked hosts + foreach (grep(/^picks_/, keys %meta)) { + delete $meta{$_}->{$lhost}; + delete $meta{$_} if (scalar(keys %{$meta{$_}}) == 0); + } + # store that host has already been selected + $meta{"picks_$pick"}->{$lhost} = 1; + $opts{"pick_$mnt1->{remote}"} = $pick; } - # revert to default selection policy when no selection - $pick = default_select($rhost, keys %picks) if (!$pick); - $pick =~ s/\s*\r?\n$//; - if (!$pick) { - # store original mount info - $ref->{$_} = $mnt1->{$_} foreach (keys %{$mnt1}); - # return original path if can't find suitable mount - return $path1 - } - # clear previously picked hosts - foreach (grep(/^picks_/, keys %meta)) { - delete $meta{$_}->{$lhost}; - delete $meta{$_} if (scalar(keys %{$meta{$_}}) == 0); - } - # store that host has already been selected - $meta{"picks_$pick"}->{$lhost} = 1; my $mnt2 = map_fs_mount($mnt1, $pick, $ref->{rw}); # mnt2 is known to be defined due to previous %hosts map_fs_mount calls # replace original mount point with new mount point @@ -2732,7 +2792,11 @@ sub put { my %op = split(/[= ]+/, $line); # ignore malformed lines with undefined op values next if (grep(!/./, values %op)); - delete $doing->{$op{doing}} if (defined $op{doing}); + if (defined $op{doing}) { + # ignore operations that have been processed by another client + next if (!defined $doing->{$op{doing}}); + delete $doing->{$op{doing}}; + } my @args = split(/,/, $op{args}); my $cmd = shift @args; @@ -2829,7 +2893,7 @@ sub put { # initialize transfer settings once all getopt lines received init_id() if ($args[0] eq 'end'); # check validity of option - next if ($args[0] !~ /^(?:bandwidth|buffer|clients|command|cpu|create-tar|cron|cwd|dereference|disk|exception|exclude|extract-tar|files|force|host-list|hosts|ignore-times|include|index-tar|io[rw]?|local|mail|net[rw]?|newer|offline|older|pipeline|ports|preallocate|preserve|recall|remote|retry|sanity|secure|size|split|split-tar|streams|stripe|sync|threads|verify|verify-fast|window)$/); + next if ($args[0] !~ /^(?:bandwidth|buffer|clients|command|cpu|create-tar|cron|cwd|dereference|disk|exception|exclude|extract-tar|files|force|host-list|hosts|ignore-times|include|index-tar|interval|io[rw]?|local|mail|net[rw]?|newer|offline|older|pipeline|ports|preallocate|preserve|recall|remote|retry|sanity|secure|silent|size|split|split-tar|streams|stripe|sync|threads|verify|verify-fast|window)$/); $meta{$args[0]} = defined $op{text} ? unescape($op{text}) : 1; } elsif ($cmd eq 'host') { # host error so remove host from outstanding hosts @@ -2893,7 +2957,8 @@ sub put { ($cmd =~ /^(?:cksum|cp|ln)/ || # tar mkdirs are not put in tree so are not handled by rtree $cmd eq 'mkdir' && $meta{'create-tar'})) { - if ($cmd eq 'cksum' && detect_silent(\%op, $args[0], $args[1])) { + if ($meta{silent} && $cmd eq 'cksum' && + detect_silent(\%op, $args[0], $args[1])) { $line =~ s/(?:text|tool)=\S+//g; log_print('alert', $gzs, "$line tool=shift-mgr text=$op{text}\n"); $meta{e_silent}++; @@ -2913,7 +2978,8 @@ sub put { next; } } else { - if ($cmd eq 'cksum' && detect_silent(\%op, $args[0], $args[1])) { + if ($meta{silent} && $cmd eq 'cksum' && + detect_silent(\%op, $args[0], $args[1])) { $line =~ s/(?:text|tool)=\S+//g; log_print('alert', $gzs, "$line tool=shift-mgr text=$op{text}\n"); $meta{e_silent}++; @@ -3477,13 +3543,13 @@ sub stats { [qw(bbcp bbftp fish fish-tcp gridftp mcp msum rsync shiftc shift-aux)]; $heads{Options_1} = [qw(bandwidth buffer clients cpu create-tar exclude extract-tar files - force host-list hosts include index-tar io ior)]; + force host-list hosts include index-tar interval)]; $heads{Options_2} = - [qw(iow local net netr netw newer no-cron no-mail no-offline - no-preserve no-recall no-sanity no-verify older pipeline)]; + [qw(io ior iow local net netr netw newer no-cron no-mail no-offline + no-preserve no-recall no-sanity no-silent no-verify)]; $heads{Options_3} = - [qw(ports preallocate remote retry secure size split split-tar streams - stripe sync threads verify-fast window)]; + [qw(older pipeline ports preallocate remote retry secure size split + split-tar streams stripe sync threads verify-fast window)]; $heads{Errors} = [qw(corruption exception silent throttle chattr cksum cp host ln mkdir sum)]; @@ -3566,9 +3632,9 @@ sub stats { # option totals # options that must differ from configured default - foreach my $key (qw(buffer clients cpu files hosts io ior iow net netr - netw ports retry size split split-tar streams stripe - threads window)) { + foreach my $key (qw(buffer clients cpu files hosts interval io ior iow + net netr netw ports retry size split split-tar + streams stripe threads window)) { # parse some values in binary bytes instead of decimal bytes my $bin = $key =~ /^(?:buffer|split)$/ ? 2 : 0; $bin = 1 if ($key eq 'stripe'); @@ -3579,7 +3645,7 @@ sub stats { # options that must be inverted $totals{"o_no-offline"} = !$meta{offline} && !$meta{'create-tar'} && !$meta{'extract-tar'} ? 1 : 0; - foreach (qw(cron mail preserve recall sanity verify)) { + foreach (qw(cron mail preserve recall sanity silent verify)) { $totals{"o_no-$_"} = !$meta{$_} ? 1 : 0; } # normal options @@ -3723,6 +3789,8 @@ sub stats { foreach my $file # sort by user.id (sort {(split(/\//, $a))[-2] cmp (split(/\//, $b))[-2]} @metas) { + # untaint file + $file = $1 if ($file =~ /^(.*)$/); # skip transfers that have expired my $mtime = (stat($file))[9]; next if ($mtime + $conf{data_expire} < $time); @@ -3775,7 +3843,8 @@ sub stats { $count++; $ulast = $user; } - close $gz; + my $log = basename($file); + log_close($log, {$log => $gz}); } # output final table print $t->render; @@ -3881,9 +3950,7 @@ sub status { my $left; if ($rate && $meta{last} && !$meta{time1}) { # add estimated time to completion - my $ncli = sum(map {$meta{$_}} (grep(/^clients_/, keys %meta))); - $ncli = grep(/^env_/, keys %meta) * $meta{clients} - $ncli; - $ncli = 1 if ($ncli <= 0); + my $ncli = max(1, scalar(grep(/^doing_/, keys %meta))); my $rate1 = $meta{ra_cp} ? $meta{ra_cp} : $rate * $ncli; # add time for cps, sums, cksums foreach my $cmd (qw(cp sum cksum)) { @@ -3899,10 +3966,12 @@ sub status { $rate1 = $meta{"ra_$cmd"} ? $meta{"ra_$cmd"} : 100; $left += ($meta{"t_$cmd"} - $meta{"d_$cmd"}) / $rate1 / $ncli; } - # add time for non-cp manager calls assuming 1/s rate + # add time for non-cp manager calls + # use previous rate for mgr calls (or 1/s when not available) + $rate1 = $meta{ra_mgr} ? 1 / $meta{ra_mgr} : 1; foreach (qw(chattr cksum sum)) { $left += ($meta{"t_$_"} - $meta{"d_$_"} - $meta{"e_$_"}) / - $meta{files} / $ncli; + $meta{files} / $rate1 / $ncli; } $left = format_seconds($left); } diff --git a/perl/shiftc b/perl/shiftc index 8eef982..15dff05 100755 --- a/perl/shiftc +++ b/perl/shiftc @@ -60,7 +60,7 @@ use File::Path; use File::Spec; use File::Spec::Unix; use File::Temp qw(tempdir tempfile); -use Getopt::Long qw(:config bundling no_ignore_case require_order); +use Getopt::Long qw(:config bundling no_auto_abbrev no_ignore_case require_order); use IO::File; use IO::Handle; use IO::Socket::INET; @@ -70,7 +70,7 @@ require IPC::Open3; use List::Util qw(first max min sum); use MIME::Base64; use POSIX; -use Socket; +use Socket qw(IPPROTO_TCP TCP_NODELAY); use Storable qw(nfreeze thaw); use Symbol qw(gensym); use Sys::Hostname; @@ -83,7 +83,7 @@ use constant SFTP_TRUNC => 0x10; use constant SFTP_WRITE => 0x02; use constant SFTP_EXCL => 0x20; -our $VERSION = 0.97; +our $VERSION = 0.98; $Data::Dumper::Indent = 0; $Data::Dumper::Purity = 1; @@ -100,7 +100,7 @@ delete $ENV{DISPLAY}; my %perl; $perl{ssl} = eval 'use IO::Socket::SSL; 1'; # need threads and version of Thread::Queue from perl >= 5.10.1 -$perl{threads} = eval 'require 5.010_001; use threads; use Thread::Queue; 1'; +$perl{threads} = eval 'require 5.010_001; use threads; use Thread::Queue; use Thread::Semaphore; 1'; ######################### #### default options #### @@ -1088,16 +1088,16 @@ sub shift_ { "directory|d", "disk=s", "exclude=s@", "extract-tar", "files=s", "force|f", "help|h", "history:s", "host-file=s", "host-list=s", "hosts=i", "id=s", "identity=s", "ignore-times|I", "include=s@", - "index-tar", "io=i", "ior=i", "iow=i", "dereference|L", "last-sum", - "local=s", "mgr=s", "mgr-identity=s", "mgr-user=s", "monitor:s", - "net=i", "netr=i", "netw=i", "newer=s", "no-cron", "no-dereference|P", - "no-mail:s", "no-offline", "no-preserve:s", "no-recall", "no-sanity", - "no-target-directory|T", "no-verify", "older=s", "pid=i", "pipeline", - "plot:s", "ports=s", "preallocate=i", "recursive|R|r", "remote=s", - "restart:s", "retry=i", "search=s", "secure", "size=s", "split=s", - "split-tar=s", "state=s", "stats:s", "status:s", "stop", "streams=i", - "stripe=s", "sync", "threads=i", "user=s", "verify-fast", "wait", - "window=s", + "index-tar", "interval=i", "io=i", "ior=i", "iow=i", "dereference|L", + "last-sum", "local=s", "mgr=s", "mgr-identity=s", "mgr-user=s", + "monitor:s", "net=i", "netr=i", "netw=i", "newer=s", "no-cron", + "no-dereference|P", "no-mail:s", "no-offline", "no-preserve:s", + "no-recall", "no-sanity", "no-silent", "no-target-directory|T", + "no-verify", "older=s", "pid=i", "pipeline", "plot:s", "ports=s", + "preallocate=i", "recursive|R|r", "remote=s", "restart:s", "retry=i", + "search=s", "secure", "size=s", "split=s", "split-tar=s", "state=s", + "stats:s", "status:s", "stop", "streams=i", "stripe=s", "sync", + "threads=i", "user=s", "verify-fast", "wait", "window=s", )); $opts{mgr} = $save_mgr if ($save_mgr ne 'none'); my %in_opts = map {$_ => 1} keys %opts; @@ -1118,17 +1118,6 @@ sub shift_ { die "Unable to read host file " . $opts{'host-file'} . ": $!\n"; } } - foreach my $opt (qw(newer older)) { - if ($opts{$opt} && $opts{$opt} !~ /^\d+$/) { - my $spec; - ($spec, $opts{$opt}) = ($1, $2) - if ($opts{$opt} =~ /^([acmACM\|]+:)(.*)/); - require Date::Parse; - my $time = Date::Parse::str2time($opts{$opt}); - die "Unable to parse date string '$opts{$opt}'\n" if (!$time); - $opts{$opt} = (!$opts{'extract-tar'} && $spec ? $spec : "") . $time; - } - } if (scalar(keys %hosts) != 0) { $hosts{$host} = 1; $opts{hosts} = scalar(keys %hosts) if (!defined $opts{hosts}); @@ -1187,7 +1176,7 @@ sub shift_ { $opts{monitor} = ""; } - foreach (qw(cron offline recall sanity verify)) { + foreach (qw(cron offline recall sanity silent verify)) { $opts{$_} = $opts{"no-$_"} ? 0 : 1; $in_opts{$_} = 1 if ($in_opts{"no-$_"}); } @@ -1303,6 +1292,8 @@ sub shift_ { die "--restart requires the --id option\n"; } elsif ($opts{stop} && !$opts{id}) { die "--stop requires the --id option\n"; + } elsif ($opts{files} && $opts{files} !~ /^([1-9]\d*)([bkmgt])?$/i) { + die "Invalid count '$opts{files}' for option --files\n"; } elsif ($opts{ports} && $opts{ports} !~ /^\d+:\d+/) { die "Invalid port range '$opts{ports}' in --ports\n"; } elsif ($opts{help}) { @@ -1355,6 +1346,7 @@ sub shift_ { print " (LIST subset of {acl,mode,owner,stripe,time,xattr})\n"; print " --no-recall do not recall DMF-managed files before transfer\n"; print " --no-sanity do not check file existence/size (benchmarking only)\n"; + print " --no-silent do not detect silent corruption or store checksums\n"; print " --no-verify do not verify/rectify integrity of destination files\n"; print "\n"; print "Monitoring and management options:\n"; @@ -1385,8 +1377,9 @@ sub shift_ { print " (use suffix {k,m,g,t} for {Kb,Mb,Gb,Tb})\n"; print " --buffer=SIZE use SIZE bytes for buffer in transports\n"; print " (use suffix {k,m,g,t} for {KiB,MiB,GiB,TiB}) [4m]\n"; - print " --files=COUNT process transfer in batches of COUNT files\n"; + print " --files=COUNT process transfer in batches of at least COUNT files\n"; print " (use suffix {k,m,b/g,t} for 1E{3,6,9,12}) [1k]\n"; + print " --interval=NUM adjust batches to run for around NUM seconds [30]\n"; print " --local=LIST set local transport mechanism to one of LIST\n"; print " (LIST subset of {bbcp,bbftp,fish,fish-tcp,gridftp,\n"; print " mcp,rsync,shift})\n"; @@ -1395,7 +1388,7 @@ sub shift_ { print " (LIST subset of {bbcp,bbftp,fish,fish-tcp,gridftp,\n"; print " rsync,shift})\n"; print " --retry=NUM retry failed operations up to NUM times [2]\n"; - print " --size=SIZE process transfer in batches of SIZE bytes\n"; + print " --size=SIZE process transfer in batches of at least SIZE bytes\n"; print " (use suffix {k,m,g,t} for {KB,MB,GB,TB}) [4g]\n"; print " --split=SIZE parallelize single files using chunks of SIZE bytes\n"; print " (use suffix {k,m,g,t} for {KiB,MiB,GiB,TiB}) [0]\n"; @@ -1422,6 +1415,23 @@ sub shift_ { print " --netw=NUM throttle local network writes at NUM MB/s\n"; exit; } + + foreach my $opt (qw(bandwidth buffer size split split-tar window)) { + if ($opts{$opt} && $opts{$opt} !~ /^([1-9]\d*)([kmgt])?$/i) { + die "Invalid size '$opts{$opt}' for option --$opt\n"; + } + } + foreach my $opt (qw(newer older)) { + if ($opts{$opt} && $opts{$opt} !~ /^\d+$/) { + my $spec; + ($spec, $opts{$opt}) = ($1, $2) + if ($opts{$opt} =~ /^([acmACM\|]+:)(.*)/); + require Date::Parse; + my $time = Date::Parse::str2time($opts{$opt}); + die "Unable to parse date string '$opts{$opt}'\n" if (!$time); + $opts{$opt} = (!$opts{'extract-tar'} && $spec ? $spec : "") . $time; + } + } if (defined $opts{include}) { foreach (@{$opts{include}}) { die "Invalid regular expression '$_' in --include\n" @@ -1453,14 +1463,16 @@ sub shift_ { # process arguments if (!$opts{id}) { my ($logfh, $log) = sftp_tmp(); + $opts{logfh} = $logfh; # send options foreach (qw(bandwidth buffer clients command cpu create-tar cron dereference disk extract-tar files force host-list hosts - ignore-times index-tar io ior iow local mail net netr netw - newer offline older pipeline ports preallocate preserve - recall remote retry sanity secure size split split-tar - streams stripe sync threads verify verify-fast wait window)) { + ignore-times index-tar interval io ior iow local mail net + netr netw newer offline older pipeline ports preallocate + preserve recall remote retry sanity secure silent size split + split-tar streams stripe sync threads verify verify-fast + wait window)) { print $logfh "args=getopt,$_ text=", escape($opts{$_}), "\n" if (defined $opts{$_}); } @@ -1476,12 +1488,12 @@ sub shift_ { print "Reading argument lines from stdin...\n" if (scalar(@ARGV) == 0); my $nfiles; if (scalar(@ARGV) > 0) { - $nfiles += shift_args(\$logfh, $log, $host, \@ARGV); + $nfiles += shift_args($host, \@ARGV); } else { while (my $line = ) { $line =~ s/^\s+|\s+$//g; my @args = quotewords('\s+', 0, $line); - $nfiles += shift_args(\$logfh, $log, $host, \@args); + $nfiles += shift_args($host, \@args); } } close $logfh; @@ -1501,14 +1513,15 @@ sub shift_ { # send options that can be respecified my ($logfh, $log) = sftp_tmp(); foreach (qw(bandwidth buffer clients cpu cron disk files force host-list - hosts io ior iow local mail net netr netw offline pipeline - ports preallocate recall remote retry secure size streams - stripe threads window)) { + hosts interval io ior iow local mail net netr netw offline + pipeline ports preallocate recall remote retry secure silent + size streams stripe threads window)) { print $logfh "args=getopt,$_ text=", escape($opts{$_}), "\n" if ($in_opts{$_}); } # indicate all options sent print $logfh "args=getopt,end\n"; + close $logfh; my $extra = $opts{restart} ? "=$opts{restart}" : ""; my $out = shift_mgr( "--restart$extra --id=$opts{id} --host=$host --put", $log); @@ -1563,8 +1576,8 @@ sub shift_ { #### shift_args #### #################### sub shift_args { - my ($logfh_ref, $log, $host, $args) = @_; - my $logfh = $$logfh_ref; + my ($host, $args) = @_; + my $logfh = $opts{logfh}; my @args = @{$args}; my $dst = pop(@args); @@ -1867,304 +1880,6 @@ sub shift_cron { $opts{crontab} = 1; } -#################### -#### shift_find #### -#################### -sub shift_find { - my ($shost, $spath, $dst, $ref) = @_; - my $logfh = $ref->{logfh}; - my $host = fqdn(hostname); - - # process tar files - if ($opts{'extract-tar'}) { - tar_extract($shost, $spath, $dst, $ref); - return; - } - - # check for existence of various commands - if ((!defined $opts{have} || !$opts{have}->{GET}) && $shost eq 'localhost') { - $opts{have}->{GET} = 1; - foreach my $bin (qw(dmget lfs getfacl getfattr shift-bin)) { - $opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH})); - } - $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") - if ($opts{have}->{'shift-bin'}); - } - - $ref->{tool} = "shiftc"; - my $dmf = $opts{recall} && $opts{have}->{dmget} && $ref->{srcfs} =~ /,dmf/ ? 1 : 0; - - # compute local (or remote if shift-aux fails) files and sizes - my $sdir = dirname($spath); - $sdir = "" if ($sdir eq '/'); - my $ddir = dirname($dst); - $ddir = "" if ($ddir eq '/'); - my $tdir = $opts{'create-tar'} ? dirname(unescape($ref->{tar_name})) : undef; - $tdir = "" if ($tdir eq '.'); - $tdir .= "/" if ($tdir && $tdir !~ /\/$/); - my $dname = basename($dst); - my @files = (basename($spath)); - FILE: foreach my $file0 (@files) { - if (ref $file0) { - # processing subdir so update prefix - $sdir = $file0->[0]; - $ddir = $file0->[1]; - $tdir = $file0->[2] . "/" if ($opts{'create-tar'}); - # dname only needed on first iteration where src/dst name may differ - $dname = undef; - next; - } - next if ($file0 eq '.' || $file0 eq '..'); - my $file = "$sdir/$file0"; - my $dfile0 = $dname ? $dname : $file0; - - # dereference before stat - if ($opts{dereference}) { - $file = $shost eq 'localhost' ? - abs_path($file) : sftp($shost)->realpath($file); - } - # always get stat info of real file - my @stat; - if ($shost eq 'localhost') { - @stat = lstat($file); - } else { - my $fattrs = sftp($shost)->lstat($file); - if ($fattrs) { - # approximate local stat - @stat = (0, 0, $fattrs->perm , 0, $fattrs->uid, $fattrs->gid, - 0, $fattrs->size, $fattrs->atime, $fattrs->mtime, - $fattrs->mtime, 0, int($fattrs->size / 512)); - } - } - my $mode; - if (scalar(@stat) == 0) { - $file = "$sdir/$file0" if ($opts{dereference}); - if (scalar(@files) == 1) { - # return error if original file - sftp_error($ref, "Cannot stat $file"); - next; - } - # lower level files cannot return errors because there is no way - # to back out of previously added operations, so instead a find - # op is added, which will succeed/fail on its own when processed - } else { - $mode = $stat[2]; - $stat[2] &= 07777; - - # only directories, regular files, and symlinks are supported - next if (!S_ISDIR($mode) && !S_ISREG($mode) && !S_ISLNK($mode)); - # dmf handling for individual files is carried out by transport_dmf - $dmf = 0 if (scalar(@files) == 1 && !S_ISDIR($mode)); - } - - # exclude files (must be before dir processing) - if (defined $opts{exclude}) { - foreach my $re (@{$opts{exclude}}) { - next FILE if (eval {$file =~ /$re/}); - } - } - - if (scalar(@stat) == 0 || S_ISDIR($mode)) { - my $err = ""; - if (scalar(@stat) > 0 && (!$opts{dereference} || scalar(@files) == 1) && - (!defined $opts{'find-files'} || - scalar(@files) < $opts{'find-files'})) { - # add subdirs of this directory for processing when below limit - my $rc = $shost eq 'localhost' ? opendir(DIR, $file) : - sftp($shost)->opendir($file); - if ($rc) { - my @sub_files; - if ($shost eq 'localhost') { - $! = undef; - @sub_files = readdir DIR; - # there is currently no good way to detect readdir errors - @sub_files = () if ($!); - closedir DIR; - } else { - @sub_files = map {$_->{filename}} sftp($shost)->readdir($rc); - sftp($shost)->closedir($rc); - } - if (scalar(@sub_files) > 0) { - my $dirs = [$file, "$ddir/$dfile0"]; - push(@{$dirs}, "$tdir$file0") if ($opts{'create-tar'}); - push(@files, $dirs, @sub_files); - $err = undef; - } else { - # dirs should always contain . and .. at a minimum - $err = "Error reading directory $file"; - } - } else { - $err = "Error opening directory $file"; - } - if ($err && scalar(@files) == 1) { - # return error if original file - sftp_error($ref, $err); - next; - } - } - if (defined $err) { - # this handles directories as well as lower level failures - print $logfh "args=find,", escape(hostpath($shost, $file)), ","; - if ($opts{'create-tar'}) { - print $logfh escape($dst), " tar_name=" . escape("$tdir$file0"); - } else { - print $logfh escape("$ddir/$dfile0"); - } - print $logfh " host=$host\n"; - next; - } - } - - # include files (must be after dir processing) - if (defined $opts{include}) { - my $found; - foreach my $re (@{$opts{include}}) { - next if (eval {$file !~ /$re/}); - $found = 1; - last; - } - next if (!$found); - } - - # newer/older files (must be after dir processing) - my %ti = ('a' => 8, 'm' => 9, 'c' => 10); - if (defined $opts{newer}) { - if ($opts{newer} =~ /^([^:]+):(\S+)/) { - my ($type, $time) = ($1, $2); - $type =~ s/\|/1||/g; - $type =~ s/([amc])/$stat[$ti{$1}]>=$time&&/g; - $type =~ s/([AMC])/$stat[$ti{lc($1)}]<$time&&/g; - $type .= "1"; - next if (!eval $type); - } elsif ($stat[9] < $opts{newer}) { - next; - } - } - if (defined $opts{older}) { - if ($opts{older} =~ /^([^:]+):(\S+)/) { - my ($type, $time) = ($1, $2); - $type =~ s/\|/1||/g; - $type =~ s/([amc])/$stat[$ti{$1}]<$time&&/g; - $type =~ s/([AMC])/$stat[$ti{lc($1)}]>=$time&&/g; - $type .= "1"; - next if (!eval $type); - } elsif ($stat[9] >= $opts{older}) { - next; - } - } - - # resolve uid/gid if possible - my $user = getpwuid($stat[4]); - my $group = getgrgid($stat[5]); - $user = "uid_$stat[4]" if (!$user); - $group = "gid_$stat[5]" if (!$group); - my $attrs = join(",", @stat[2,4,5,8,9], - escape($user), escape($group), $stat[7], 512 * $stat[12]); - - my @acls; - my @lattrs; - my @xattrs; - if ($shost eq 'localhost') { - # if shift-aux failed, there is no backup to get this info remotely - # try to get acls - if (($opts{have}->{'shift-bin'} || $opts{have}->{getfacl}) && - !$opts{'create-tar'} && - ($opts{preserve} == 1 || $opts{preserve} =~ /acl/) && - (!$ref->{srcfs} || $ref->{srcfs} =~ /,acl/)) { - if (defined $opts{fhpid}) { - $opts{fhpid}->[0]->print("getfacl $file\n"); - my $text = $opts{fhpid}->[1]->getline; - my @cols = split(/\s+/, $text); - push(@acls, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); - } else { - my $fhpid = open3_run([-1, undef, -1], - "getfacl", "-cps", "--", $file); - while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { - chomp; - next if (!$_); - push(@acls, escape($_)); - } - open3_wait($fhpid); - } - } - - # try to get xattrs - if (($opts{have}->{'shift-bin'} || $opts{have}->{getfattr}) && - !$opts{'create-tar'} && - ($opts{preserve} == 1 || $opts{preserve} =~ /xattr/) && - (!$ref->{srcfs} || $ref->{srcfs} =~ /,xattr/)) { - if (defined $opts{fhpid}) { - $opts{fhpid}->[0]->print("getfattr $file\n"); - my $text = $opts{fhpid}->[1]->getline; - my @cols = split(/\s+/, $text); - push(@xattrs, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); - } else { - my $fhpid = open3_run([-1, undef, -1], - "getfattr", "-dhe", "base64", $file); - while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { - chomp; - next if (!$_ || /^\s*#/); - push(@xattrs, escape(decode_base64($_))); - } - open3_wait($fhpid); - } - } - - # try to get lustre striping - if (($opts{have}->{'shift-bin'} || $opts{have}->{lfs}) && - !S_ISLNK($mode) && !$opts{'create-tar'} && - ($opts{preserve} == 1 || $opts{preserve} =~ /stripe/) && - $ref->{srcfs} =~ /^lustre/) { - # ignore symlinks as link to fifo can hang forever - if (defined $opts{fhpid}) { - $opts{fhpid}->[0]->print("getstripe $file\n"); - my $text = $opts{fhpid}->[1]->getline; - my @cols = split(/\s+/, $text); - @lattrs = split(/,/, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); - } else { - my $fhpid = open3_run([-1, undef, -1], - "lfs", "getstripe", "-d", $file); - while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { - $lattrs[0] = $1 if (/stripe_count:\s*(-?\d+)/); - $lattrs[1] = $1 if (/stripe_size:\s*(-?\d+)/); - } - open3_wait($fhpid); - } - } - $lattrs[0] = 0 if (!defined $lattrs[0] && defined $lattrs[1]); - $lattrs[1] = 0 if (!defined $lattrs[1] && defined $lattrs[0]); - } - - # begin log entry - my $index_len = !$opts{'index-tar'} ? 0 : 28 + length("$tdir$file0") + - length(sprintf("%7s%7s%9d", $user, $group, $stat[7])); - if (S_ISLNK($mode)) { - my $ln = $shost eq 'localhost' ? readlink($file) : - sftp($shost)->readlink($file); - print $logfh "args=ln,", escape($ln); - $index_len += 4 + length($ln); - } elsif (S_ISDIR($mode)) { - print $logfh "args=mkdir"; - } elsif ($opts{sync}) { - print $logfh "args=ckattr", $opts{'ignore-times'} ? "0" : "", - ",", escape(hostpath($shost, $file)); - } else { - print $logfh "args=cp,", escape(hostpath($shost, $file)); - } - print $logfh ",", escape($opts{'create-tar'} ? $dst : "$ddir/$dfile0"); - print $logfh " acls=" . join(",", @acls) if (scalar(@acls) > 0); - print $logfh " xattrs=" . join(",", @xattrs) if (scalar(@xattrs) > 0); - print $logfh " lustre_attrs=" . join(",", @lattrs) if (scalar(@lattrs) > 0); - print $logfh " tar_index=$index_len" if ($opts{'index-tar'}); - print $logfh " tar_name=" . escape("$tdir$file0") if ($opts{'create-tar'}); - print $logfh " host=$host size=$stat[7] attrs=$attrs\n"; - if ($dmf && !S_ISLNK($mode) && !S_ISDIR($mode)) { - ($opts{dmfh}, $opts{dmtmp}) = sftp_tmp() if (!$opts{dmtmp}); - print {$opts{dmfh}} $file, "\n"; - } - } -} - ####################### #### shift_latency #### ####################### @@ -2202,7 +1917,10 @@ sub shift_latency { next if (!$np); } my $rc; - ($rc, $time) = $np->ping($host); + eval { + # there may still be an exception from undefined subroutine + ($rc, $time) = $np->ping($host); + }; if (!$rc) { # use ssh syn ping as backup to tcp echo port @@ -2216,8 +1934,11 @@ sub shift_latency { }; next if (!$nps); } - $nps->ping($host); - ($rc, $time) = $nps->ack; + eval { + # there may still be an exception from undefined subroutine + $nps->ping($host); + ($rc, $time) = $nps->ack; + }; $time /= 2; } } @@ -2324,6 +2045,7 @@ sub shift_loop { push(@host_agents, $agent_sock) if (scalar(@host_agents) == 0); my ($logfh, $log) = sftp_tmp(); + $opts{logfh} = $logfh; my ($taskfh, $task) = sftp_tmp(); close $taskfh; @@ -2345,6 +2067,7 @@ sub shift_loop { my $text = localtime(time) . ": @exception"; chomp $text; print $logfh "args=exception text=", escape($text); + close $logfh; shift_mgr("--id=$opts{id} --host=$host --put $opts{get_host}", $log, undef, 1440); } @@ -2384,8 +2107,8 @@ sub shift_loop { my $etime = $1; my $time = $etime - time; last if ($time < 60); - my $out = open3_get([-1, undef, -1], "ssh-keygen -l -f $key.pub"); - next if ($out =~ /not a public key file/); + my $out = open3_get([-1, undef], "ssh-keygen -l -f $key.pub"); + next if ($out =~ /not a public key file|no such file/i); # ensure key has correct permissions chmod(0600, $key); open3_get([-1, undef], "ssh-add -t $time $key"); @@ -2405,6 +2128,7 @@ sub shift_loop { die "$$out\n" if (ref $out); #TODO: error checking if id doesn't exist open($logfh, '>', $log); + $opts{logfh} = $logfh; my $run = time; my %ops; @@ -2580,6 +2304,7 @@ sub shift_loop { } else { # must approximate without shift-aux my $fattrs = sftp($rhost[1])->lstat($rpath[1]); + next if (!defined $fattrs); $du = $fattrs->size; } } @@ -2594,7 +2319,6 @@ sub shift_loop { } } elsif ($cmd eq 'find') { my ($rhost, $rpath) = hostpath(unescape($args[0])); - $op{logfh} = $logfh; transport($cmd, $rhost, $rpath, unescape($args[1]), \%op); } elsif ($cmd eq 'ln') { my ($rhost, $rpath) = hostpath(unescape($args[1])); @@ -2733,8 +2457,8 @@ sub shift_mgr { my $etime = $1; my $time = $etime - time; last if ($time < 60); - my $out = open3_get([-1, undef, -1], "ssh-keygen -l -f $key.pub"); - next if ($out =~ /not a public key file/); + my $out = open3_get([-1, undef], "ssh-keygen -l -f $key.pub"); + next if ($out =~ /not a public key file|no such file/); # ensure key has correct permissions chmod(0600, $key); open3_get([-1, undef], "ssh-add -t $time $key"); @@ -2768,23 +2492,12 @@ sub shift_mgr { ###################### sub shift_mounts { my $host = shift; + my %fstab; my %mnt = ( host => $host, args => "mount", ); - my %fstab; - if (open(FILE, "/etc/fstab")) { - while () { - s/^\s+|\s+$//g; - next if (/^#/); - my ($dev, $local, $type) = split(/\s+/); - next if (!$type); - $fstab{$local} = [$dev, $type]; - } - close FILE; - } - # check for existence of getfacl my $acl = first {-x "$_/getfacl"} (split(/:/, $ENV{PATH})); # gather file system information from mount @@ -2801,6 +2514,17 @@ sub shift_mounts { if (/(\S+)\s+on\s+(\S+)\s+type\s+(\S+)/); if ($mnt{local}) { if ($dev eq 'systemd-1') { + if (!$fstab{"."} && open(FILE, "/etc/fstab")) { + $fstab{"."} = 1; + while () { + s/^\s+|\s+$//g; + next if (/^#/); + my ($dev, $local, $type) = split(/\s+/); + next if (!$type); + $fstab{$local} = [$dev, $type]; + } + close FILE; + } # systemd mounts must be read from fstab my $fstab = $fstab{$mnt{local}}; ($dev, $type) = ($fstab->[0], $fstab->[1]); @@ -2953,7 +2677,7 @@ sub tar_canonpath { # based on Tar/Archive::Tar 0.07 by Calle Dybedahl (no license specified) sub tar_extract { my ($shost, $spath, $dst, $ref) = @_; - my $logfh = $ref->{logfh}; + my $logfh = $opts{logfh}; my $src = hostpath($shost, $spath); my $host = fqdn(hostname); @@ -3150,6 +2874,7 @@ sub tar_extract { # checks for ustar limitations have already been done by this point sub tar_record { my ($fh, $type, $src, $ref, $ifh) = @_; + my $logfh = $opts{logfh}; my @attrs = split(/,/, $ref->{attrs}); my $file = $ref->{tar_name}; @@ -3770,7 +3495,6 @@ sub transport_bbftp { ########################## sub transport_chattr { my ($host, $tcmds) = @_; - my ($fh, $fhpid, $tmp); # check for existence of commands if ((!defined $opts{have} || !$opts{have}->{SET}) && $host eq 'localhost') { @@ -3779,25 +3503,22 @@ sub transport_chattr { $opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH})); } } - $fhpid = open3_run([undef, undef, -1], "shift-bin") - if ($opts{have}->{'shift-bin'}); + + my (@chattrs, @allocs); # lfs setstripe (must be done before fallocate) - if ($opts{have}->{'shift-bin'} || $opts{have}->{lfs} || - $host ne 'localhost') { - foreach my $cmd (@{$tcmds}) { - my ($op, $src, $dst, $ref) = @{$cmd}; - next if ($opts{'create-tar'} && ($op ne 'chattr' || - !$ref->{tar_creat})); - next if ($op !~ /^(?:chattr|get|mkdir|put)$/ || !$ref->{dstfs} || - $ref->{dstfs} !~ /^lustre/ || $ref->{ln} || - $opts{stripe} eq '0' && !defined $opts{'stripe-size'} && - !defined $opts{'stripe-pool'}); + foreach my $cmd (@{$tcmds}) { + my ($op, $src, $dst, $ref) = @{$cmd}; + # lfs setstripe (must be done before fallocate) + if ((!$opts{'create-tar'} && $op =~ /^(?:get|mkdir|put)$/ || + $op eq 'chattr' && $ref->{tar_creat}) && + $ref->{dstfs} && $ref->{dstfs} =~ /^lustre/ && !$ref->{ln} && + ($opts{stripe} ne '0' || defined $opts{'stripe-size'} || + defined $opts{'stripe-pool'})) { # set striping my @stripe = (0, 0); - # preserve existing striping when not excluded - @stripe = split(/,/, $ref->{lustre_attrs}) - if ($opts{preserve} =~ /stripe/ && $ref->{lustre_attrs}); + # preserve existing striping when available + @stripe = split(/,/, $ref->{lustre_attrs}) if ($ref->{lustre_attrs}); my @attrs = split(/,/, $ref->{attrs}); # define variables that are allowed in striping expressions my ($nm, $sz, $sc, $ss) = ($dst, $attrs[7], @stripe); @@ -3815,45 +3536,18 @@ sub transport_chattr { } # count >= 64k indicates a size per stripe $stripe[0] = int($sz / $stripe[0]) if ($stripe[0] >= 65536); - if ($host ne 'localhost' && $op ne 'get') { - $ref->{tool} = "shift-aux"; - # stripe remote files in batch - ($fh, $tmp) = sftp_tmp() if (!$tmp); - # use trailing / to indicate directories - print $fh "setstripe ", escape($dst), $op eq 'mkdir' ? "/" : "", - " ", join(" ", @stripe), "\n"; - next; - } - # stripe local files immediately - my $dir = $op eq 'mkdir' ? $dst : dirname($dst); - eval {mkpath($dir)}; - if (defined $fhpid) { - $ref->{tool} = "shift-bin"; - $fhpid->[0]->print( - "setstripe ", escape($dst), $op eq 'mkdir' ? "/" : "", - " ", join(" ", @stripe), "\n"); - $fhpid->[1]->getline; - } else { - $ref->{tool} = "shiftc"; - my @args = ("lfs", "setstripe", "-c", $stripe[0], - "-S", $stripe[1], $dst); - splice(@args, -1, 0, "-p", $stripe[2]) if ($stripe[2]); - system(@args); - } + my @args = ("setstripe", escape($dst) . ($op eq 'mkdir' ? "/" : ""), + join(" ", @stripe)); + push(@chattrs, \@args); # ignore errors since files automatically striped anyway } - } - # fallocate - if ($opts{preallocate} &&($opts{have}->{'shift-bin'} || - $opts{have}->{fallocate} || $host ne 'localhost')) { - foreach my $cmd (@{$tcmds}) { - my ($op, $src, $dst, $ref) = @{$cmd}; - next if ($op !~ /^(?:get|put)$/ && - ($op !~ /^(?:ln|mkdir)$/ || !$ref->{tar_creat})); - # lustre/nfs do not seem to support fallocate at the moment - next if ($ref->{dstfs} =~ /^(?:lustre|nfs)$/); - next if ($opts{'create-tar'} && !$ref->{tar_creat}); + # fallocate + if ($opts{preallocate} && ($op =~ /^(?:get|put)$/ || + $op =~ /^(?:ln|mkdir)$/ && $ref->{tar_creat}) && + # lustre/nfs do not seem to support fallocate at the moment + $ref->{dstfs} !~ /^(?:lustre|nfs)$/ && + (!$opts{'create-tar'} || $ref->{tar_creat})) { my $size; if ($opts{'create-tar'}) { $size = $ref->{tar_creat}; @@ -3864,101 +3558,120 @@ sub transport_chattr { 1 - $attrs[8] / $attrs[7] >= $opts{preallocate} / 100); $size = $attrs[7]; } - if ($host ne 'localhost' && $op ne 'get') { - $ref->{tool} = "shift-aux"; - # allocate remote files in batch - ($fh, $tmp) = sftp_tmp() if (!$tmp); - print $fh "fallocate ", escape($dst), " $size\n"; - next; - } - # allocate local files immediately - if (defined $fhpid) { - $ref->{tool} = "shift-bin"; - $fhpid->[0]->print("fallocate ", escape($dst), " $size\n"); - $fhpid->[1]->getline; - } else { - $ref->{tool} = "shiftc"; - eval {mkpath(dirname($dst))}; - system("fallocate", "-n", "-l", $size, $dst); - } + my @args = ("fallocate", escape($dst), $size); + # use different queue to give higher probability that striping done + push(@allocs, \@args); # ignore errors since files automatically allocated anyway } - } - # setfacl - if ($opts{have}->{'shift-bin'} || $opts{have}->{setfacl} || - $host ne 'localhost') { - foreach my $cmd (@{$tcmds}) { - my ($op, $src, $dst, $ref) = @{$cmd}; - next if ($op ne 'chattr' || !$ref->{acls} || - $ref->{dstfs} && $ref->{dstfs} !~ /,acl/); + # setfacl + if ($op eq 'chattr' && $ref->{acls} && + (!$ref->{dstfs} || $ref->{dstfs} =~ /,acl/)) { + my @args = ("setfacl", escape($dst), $ref->{acls}); + push(@chattrs, \@args); # ignore errors since systems may have different command/users - if ($host ne 'localhost') { - $ref->{tool} = "shift-aux"; - # chattr remote files in batch - ($fh, $tmp) = sftp_tmp() if (!$tmp); - print $fh "setfacl ", escape($dst), " $ref->{acls}\n"; - next; - } - # chattr local files immediately - if (defined $fhpid) { - $ref->{tool} = "shift-bin"; - $fhpid->[0]->print("setfacl ", escape($dst), " $ref->{acls}\n"); - $fhpid->[1]->getline; - } else { - $ref->{tool} = "shiftc"; - my $fhpid = open3_run([undef, -1, -1], "setfacl", "-M-", $dst); - next if (!defined $fhpid); - my $acls = $ref->{acls}; - $acls =~ s/,/\n/g; - $fhpid->[0]->print(unescape($acls)); - $fhpid->[0]->close; - open3_wait($fhpid); - } } - } - # setfattr - if ($opts{have}->{'shift-bin'} || $opts{have}->{setfattr} || - $host ne 'localhost') { - foreach my $cmd (@{$tcmds}) { - my ($op, $src, $dst, $ref) = @{$cmd}; - next if ($op ne 'chattr' || !$ref->{xattrs} || - $ref->{dstfs} && $ref->{dstfs} !~ /,xattr/); + # setfattr + if ($op eq 'chattr' && $ref->{xattrs} && + (!$ref->{dstfs} || $ref->{dstfs} =~ /,xattr/)) { + my @args = ("setfattr", escape($dst), $ref->{xattrs}); + push(@chattrs, \@args); # ignore errors since systems may have different command/users - if ($host ne 'localhost') { - $ref->{tool} = "shift-aux"; - # chattr remote files in batch - ($fh, $tmp) = sftp_tmp() if (!$tmp); - print $fh "setfattr ", escape($dst), " $ref->{xattrs}\n"; - next; - } - # chattr local files immediately - if (defined $fhpid) { - $ref->{tool} = "shift-bin"; - $fhpid->[0]->print("setfattr ", escape($dst), " $ref->{xattrs}\n"); - $fhpid->[1]->getline; - } else { - $ref->{tool} = "shiftc"; - my $fhpid = open3_run([undef, -1, -1], - "setfattr", "-h", "--restore=-"); - next if (!defined $fhpid); - my $xattrs = $ref->{xattrs}; - $xattrs =~ s/,/\n/g; - $fhpid->[0]->print("# file: $dst\n", unescape($xattrs)); - $fhpid->[0]->close; - open3_wait($fhpid); - } } + + $ref->{tool} = $host ne 'localhost' ? "shift-aux" : + ($opts{have}->{'shift-bin'} ? "shift-bin" : "shiftc"); } - if ($tmp) { + return if (!scalar(@chattrs) && !scalar(@allocs)); + + my ($q, @threads); + if ($perl{threads} && $opts{threads} > 1) { + $q = Thread::Queue->new; + # mutual exclusion for stdout + @threads = map {threads->create(sub { + $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") + if ($opts{have}->{'shift-bin'}); + transport_chattr1($_) while (defined ($_ = $q->dequeue)); + open3_wait($opts{fhpid}) if (defined $opts{fhpid}); + })} (1 .. $opts{threads}); + } + + if ($host ne 'localhost') { + my ($fh, $tmp) = sftp_tmp(); + foreach (@chattrs, @allocs) { + print $fh join(" ", @{$_}), "\n"; + } close $fh; open3_get([$tmp, -1, -1], "$opts{ssh} $host $opts{caux} chattr"); unlink $tmp; - # ignore errors since will fail to system defaults + } else { + if ($perl{threads} && $opts{threads} > 1) { + $q->enqueue(@chattrs, @allocs); + # force threads to exit + $q->enqueue(undef) foreach (1 .. scalar(@threads)); + foreach (@threads) { + $_->join if ($_); + } + } else { + $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") + if ($opts{have}->{'shift-bin'}); + transport_chattr1($_) foreach (@chattrs, @allocs); + open3_wait($opts{fhpid}) if (defined $opts{fhpid}); + } + } +} + +########################### +#### transport_chattr1 #### +########################### +# set attrs of a single file +sub transport_chattr1 { + my ($cmd, $file, $attrs) = @{$_[0]}; + # short circuit if command not available + return if (!defined $opts{fhpid} && !$opts{have}->{$cmd}); + # untaint arguments + $cmd = $1 if ($cmd =~ /(.*)/); + $file = $1 if ($file =~ /(.*)/s); + $attrs = $1 if ($attrs =~ /(.*)/); + my $ufile = unescape($file); + + # make sure parent directory exists + my $dir = $ufile =~ s/\/$// ? $ufile : dirname($ufile); + eval {mkpath($dir)}; + + if (defined $opts{fhpid}) { + $opts{fhpid}->[0]->print(join(" ", $cmd, $file, $attrs), "\n"); + $opts{fhpid}->[1]->getline; + # ignore errors + } else { + my ($cin, @copts, $in, $out); + if ($cmd eq 'fallocate') { + @copts = ("-n", "-l", unescape($attrs), $ufile); + } elsif ($cmd eq 'setstripe') { + $cmd = "lfs"; + my ($count, $size, $pool) = split(/\s+/, unescape($attrs)); + $count = 0 if (!$count); + $size = 0 if (!$size); + @copts = ("setstripe", "-c", $count, "-S", $size, $ufile); + splice(@copts, -1, 0, "-p", $pool) if ($pool); + } elsif ($cmd eq 'setfacl') { + $attrs =~ s/,/\n/g; + $cin = unescape($attrs); + @copts = ("-M-", $ufile); + } elsif ($cmd eq 'setfattr') { + $attrs =~ s/,/\n/g; + $cin = "# file: $ufile\n" . unescape($attrs); + @copts = ("-h", "--restore=-"); + } + my $pid = IPC::Open3::open3($in, $out, $out, $cmd, @copts); + print $in $cin if ($cin); + close $in; + waitpid($pid, 0); + # ignore errors + close $out; } - open3_wait($fhpid) if (defined $fhpid); } ####################### @@ -4013,7 +3726,7 @@ sub transport_dmf { foreach my $op (keys %gettmp) { close $getfh{$op}; my $ssh = $op eq 'get' && $host ne 'localhost' ? "$opts{ssh} $host" : ""; - # ignore errors since files will be automatically retrieved anyway + # ignore errors since files are automatically retrieved anyway open3_get([$gettmp{$op}, -1, -1], "$ssh dmget -nq$extra"); unlink $gettmp{$op}; } @@ -4021,7 +3734,7 @@ sub transport_dmf { foreach my $rhost (keys %puttmp) { close $putfh{$rhost}; my $ssh = $rhost ne 'localhost' ? "$opts{ssh} $rhost" : ""; - # ignore errors since files will be automatically migrated anyway + # ignore errors since files are automatically migrated anyway open3_get([$puttmp{$rhost}, -1, -1], "$ssh dmput -n"); unlink $puttmp{$rhost}; } @@ -4035,7 +3748,6 @@ sub transport_fadvise { my ($host, $tcmds) = @_; my ($fh, $tmp); #TODO: do we want --no-fadvise? - #TODO: what about remote hosts? foreach my $cmd (@{$tcmds}) { my ($op, $src, $dst, $ref) = @{$cmd}; my @attrs = split(/,/, $ref->{attrs}); @@ -4052,32 +3764,20 @@ sub transport_fadvise { # adjust dst by tar start offset $doff = $x1 - $t1; } - # use eval as can die when permission denied - if ($perl{fadvise}) { - if ($op =~ /^(?:put|sum)$/ || - $op eq 'get' && $host eq 'localhost') { - eval {fadvise($src, $soff, $x2 - $x1, POSIX_FADV_DONTNEED())}; - } - if ($op eq 'get' || - $op =~ /^(?:cksum|put)$/ && $host eq 'localhost') { - eval {fadvise($dst, $doff, $x2 - $x1, POSIX_FADV_DONTNEED())}; - } + if ($op eq 'get') { + ($fh, $tmp) = sftp_tmp() if (!$tmp); + print $fh escape($src), " ", $soff, " ", $x2 - $x1, "\n"; } - if ($host ne 'localhost') { - if ($op eq 'get') { - ($fh, $tmp) = sftp_tmp() if (!$tmp); - print $fh escape($src), " ", $soff, " ", $x2 - $x1, "\n"; - } - if ($op =~ /^(?:cksum|put)$/) { - ($fh, $tmp) = sftp_tmp() if (!$tmp); - print $fh escape($dst), " ", $doff, " ", $x2 - $x1, "\n"; - } + if ($op =~ /^(?:cksum|put)$/) { + ($fh, $tmp) = sftp_tmp() if (!$tmp); + print $fh escape($dst), " ", $doff, " ", $x2 - $x1, "\n"; } } } if ($tmp) { close $fh; - open3_get([$tmp, -1, -1], "$opts{ssh} $host $opts{caux} fadvise"); + open3_get([$tmp, -1, -1], $host eq 'localhost' ? "shift-bin" : + "$opts{ssh} $host $opts{caux} fadvise"); unlink $tmp; # ignore errors since will eventually reclaim cache } @@ -4086,36 +3786,37 @@ sub transport_fadvise { ######################## #### transport_find #### ######################## -# compute remote files and sizes via shift-aux sub transport_find { my ($host, $tcmds) = @_; + my $logfh = $opts{logfh}; my ($fh, $tmp); my %refs; foreach my $cmd (@{$tcmds}) { my ($op, $src, $dst, $ref) = @{$cmd}; next if ($op ne 'find'); - if ($host eq 'localhost') { - shift_find($host, $src, $dst, $ref); - next; + if ($host ne 'localhost') { + ($fh, $tmp) = sftp_tmp() if (!$tmp); + print $fh join(" ", map {escape($_)} ($host, $src, $dst)); + foreach my $opt (qw(srcfs tar_name tar_tell)) { + print $fh " $opt=$ref->{$opt}" if (defined $ref->{$opt}); + } + print $fh " $ref\n"; + $ref->{tool} = "shift-aux"; } - ($fh, $tmp) = sftp_tmp() if (!$tmp); - print $fh join(" ", map {escape($_)} ($host, $src, $dst)); - foreach my $opt (qw(srcfs tar_name tar_tell)) { - print $fh " $opt=$ref->{$opt}" if (defined $ref->{$opt}); - } - print $fh " $ref\n"; - $ref->{tool} = "shift-aux"; $refs{$ref} = [$src, $dst, $ref]; } + return if (!scalar(keys %refs)); + $opts{lhost} = fqdn(hostname); if ($tmp) { + # try to process remote finds using shift-aux close $fh; my $cmd = "$opts{ssh} $host $opts{caux} find"; foreach my $opt (qw(create-tar dereference extract-tar ignore-times - index-tar sync)) { + index-tar recall sync)) { $cmd .= " --$opt" if ($opts{$opt}); } - foreach my $opt (qw(find-files newer older preserve)) { + foreach my $opt (qw(buffer find-files newer older preserve threads)) { $cmd .= " --$opt $opts{$opt}" if ($opts{$opt}); } foreach my $opt (qw(exclude include)) { @@ -4123,36 +3824,132 @@ sub transport_find { $cmd .= " --$opt " . escape($_) foreach (@{$opts{$opt}}); } - my $lhost = fqdn(hostname); - my ($ref, $logfh); + my %drefs; my $fhpid = open3_run([$tmp, undef, -1], $cmd); while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { s/\s+$//; if (!/,/) { if (/^ref\s+(\S+)$/) { - $ref = $refs{$1}->[2]; - $logfh = $ref->{logfh}; - delete $refs{$1}; + my $ref = $refs{$1}->[2]; + $drefs{$ref} = $ref; + delete $refs{$ref}; } else { # errors indicated by comma-less line - sftp_error($ref, $_); + my ($ref, $err) = split(/:/, $_, 2); + $ref = $drefs{$ref}; + sftp_error($ref, $err) if (defined $ref); } next; } - print $logfh $_, " host=$lhost\n"; + print $logfh $_, " host=$opts{lhost}\n"; } open3_wait($fhpid); unlink $tmp; - # additional processing needed if shift-aux failed - shift_find($host, @{$_}) foreach (values %refs); - if (defined $opts{fhpid}) { - open3_wait($opts{fhpid}); - delete $opts{fhpid}; + return if (!scalar(keys %refs)); + } + + # process any remaining tar files + if ($opts{'extract-tar'}) { + tar_extract($host, @{$_}) foreach (values %refs); + return; + } + + # check for existence of various commands + if ((!defined $opts{have} || !$opts{have}->{GET}) && $host eq 'localhost') { + $opts{have}->{GET} = 1; + foreach my $bin (qw(dmget lfs getfacl getfattr shift-bin)) { + $opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH})); } } - if ($opts{dmtmp}) { - close $opts{dmfh}; + if ($opts{recall} && $opts{have}->{dmget}) { + # must set up tmp file for recalls in case threads need to access + ($opts{dmfh}, $opts{dmtmp}) = tempfile(); + } + + my @threads; + if ($perl{threads} && $opts{threads} > 1) { + $opts{findq} = Thread::Queue->new; + $opts{findeq} = Thread::Queue->new; + # number of worker "b's" processing files + $opts{findb4} = Thread::Semaphore->new(0); + # number of items that have been added to queue + $opts{findn4} = Thread::Semaphore->new(1); + # mutual exclusion for stdout + $opts{findo4} = Thread::Semaphore->new(1); + # number of unprocessed items on queue + $opts{findq4} = Thread::Semaphore->new(0); + @threads = map {threads->create(sub { + $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") + if ($opts{have}->{'shift-bin'} && ($opts{preserve} == 1 || + $opts{preserve} =~ /acl|stripe|xattr/)); + # remove existing ssh connections so threads use their own + delete @opts{grep(/^sftp_/, keys %opts)}; + while (1) { + my $path = $opts{findq}->dequeue_timed(1); + if (defined $path) { + $opts{findb4}->up; + $opts{findq4}->down; + transport_find1($path); + $opts{findb4}->down; + } elsif (!${$opts{findq4}} && !${$opts{findb4}}) { + transport_find_buffer(); + last; + } + } + open3_wait($opts{fhpid}) if (defined $opts{fhpid}); + })} (1 .. $opts{threads}); + } else { + # need to define file count as used in both single/multi-threaded cases + $opts{findn4} = \0; + $opts{fhpid} = open3_run([undef, undef, -1], "shift-bin") + if ($opts{have}->{'shift-bin'} && ($opts{preserve} == 1 || + $opts{preserve} =~ /acl|stripe|xattr/)); + } + + foreach (values %refs) { + # compute local (or remote if shift-aux fails) files and sizes + my ($spath, $dst, $ref) = @{$_}; + $ref->{tool} = "shiftc"; + my $sdir = dirname($spath); + $sdir = "" if ($sdir eq '/'); + my $ddir = dirname($dst); + $ddir = "" if ($ddir eq '/'); + my $path = [[basename($spath), basename($dst), + $perl{threads} && $opts{threads} > 1 ? "" . $ref : $ref], + $host, $sdir, $dst, $ddir, $ref->{srcfs}]; + if ($opts{'create-tar'}) { + my $tdir = dirname(unescape($ref->{tar_name})); + $tdir = "" if ($tdir eq '.'); + $tdir .= "/" if ($tdir && $tdir !~ /\/$/); + push(@{$path}, $tdir); + } + if ($perl{threads} && $opts{threads} > 1) { + $opts{findq4}->up; + $opts{findn4}->up; + $opts{findq}->enqueue($path); + } else { + transport_find1($path); + } + } + + if ($perl{threads} && $opts{threads} > 1) { + foreach (@threads) { + $_->join if ($_); + } + # add error messages back to original ref + while (defined (my $referr = $opts{findeq}->dequeue_nb)) { + my ($ref, $err) = @{$referr}; + sftp_error($refs{$ref}->[2], $err) if (defined $refs{$ref}); + } + delete $opts{"find" . $_} foreach (qw(q eq b4 n4 o4 q4)); + } else { + transport_find_buffer(); + open3_wait($opts{fhpid}) if (defined $opts{fhpid}); + } + + close $opts{dmfh} if (defined $opts{dmfh}); + if (-s $opts{dmtmp}) { # fork to avoid intermittent hangs of dmget my $pid = fork_setsid(); if ($pid) { @@ -4162,11 +3959,339 @@ sub transport_find { } else { my $ssh = $host eq 'localhost' ? "" : "$opts{ssh} $host"; my $extra = $opts{'create-tar'} ? " -a" : ""; - # ignore errors since files will be automatically retrieved anyway + # ignore errors since files are automatically retrieved anyway open3_get([$opts{dmtmp}, -1, -1], "$ssh dmget -nq$extra"); unlink $opts{dmtmp}; POSIX::_exit(0); } + } else { + unlink $opts{dmtmp}; + } +} + +############################### +#### transport_find_buffer #### +############################### +# write output in blocks for efficiency +sub transport_find_buffer { + my ($text, $file) = @_; + $opts{find_buf} .= $text; + $opts{recall_buf} .= $file; + if (($opts{find_buf} || $opts{recall_buf}) && + (!defined $text || length $opts{find_buf} >= $opts{buffer} || + length $opts{recall_buf} >= $opts{buffer})) { + $opts{findo4}->down if ($perl{threads} && $opts{threads} > 1); + if ($opts{find_buf} && (!defined $text || + length $opts{find_buf} >= $opts{buffer})) { + print {$opts{logfh}} $opts{find_buf}; + $opts{logfh}->flush; + delete $opts{find_buf}; + } + if ($opts{recall_buf} && (!defined $text || + length $opts{recall_buf} >= $opts{buffer})) { + print {$opts{dmfh}} $opts{recall_buf}; + $opts{dmfh}->flush; + delete $opts{recall_buf}; + } + $opts{findo4}->up if ($perl{threads} && $opts{threads} > 1); + } +} + +######################### +#### transport_find1 #### +######################### +# output list of files/dirs beneath given paths with stat info +sub transport_find1 { + my $path = shift; + my ($file0, $shost, $sdir, $dst, $ddir, $srcfs, $tdir) = @{$path}; + my $dfile0 = $file0; + my ($top, $ref); + if (ref $file0) { + $top = 1; + ($file0, $dfile0, $ref) = @{$file0}; + } + return if ($file0 eq '.' || $file0 eq '..'); + my $file = "$sdir/$file0"; + + my $dmf = $opts{recall} && $opts{have}->{dmget} && $srcfs =~ /,dmf/ ? 1 : 0; + + # dereference before stat + if ($opts{dereference}) { + $file = $shost eq 'localhost' ? + abs_path($file) : sftp($shost)->realpath($file); + } + # always get stat info of real file + my @stat; + if ($file) { + if ($shost eq 'localhost') { + @stat = lstat($file); + } else { + my $fattrs = sftp($shost)->lstat($file); + if ($fattrs) { + # approximate local stat + @stat = (0, 0, $fattrs->perm , 0, $fattrs->uid, $fattrs->gid, + 0, $fattrs->size, $fattrs->atime, $fattrs->mtime, + $fattrs->mtime, 0, int($fattrs->size / 512)); + } + } + } + my $mode; + if (scalar(@stat) == 0) { + $file = "$sdir/$file0" if ($opts{dereference}); + if ($top) { + # return error if original file + my $err = "Cannot stat $file"; + if ($perl{threads} && $opts{threads} > 1) { + $opts{findeq}->enqueue([$ref, $err]); + } else { + sftp_error($ref, $err); + } + return; + } + # lower level files cannot return errors because there is no way + # to back out of previously added operations, so instead a find + # op is added, which will succeed/fail on its own when processed + } else { + $mode = $stat[2]; + $stat[2] &= 07777; + + # only directories, regular files, and symlinks are supported + return if (!S_ISDIR($mode) && !S_ISREG($mode) && !S_ISLNK($mode)); + # dmf handling for individual files is carried out by transport_dmf + $dmf = 0 if ($top && !S_ISDIR($mode)); + } + + # exclude files (must be before dir processing) + if (defined $opts{exclude}) { + foreach my $re (@{$opts{exclude}}) { + return if (eval {$file =~ /$re/}); + } + } + + my $dh; + if (scalar(@stat) == 0 || S_ISDIR($mode)) { + # ensure $err defined unless explicitly set to undef + my $err = ""; + if (scalar(@stat) > 0 && (!$opts{dereference} || $top) && + (!defined $opts{'find-files'} || + ${$opts{findn4}} < $opts{'find-files'})) { + # add subdirs of this directory for processing when below limit + if ($shost eq 'localhost' ? opendir($dh, $file) : + ($dh = sftp($shost)->opendir($file))) { + $err = undef; + # directory will be processed after parent dir printed + } else { + $err = "Error opening directory $file"; + } + if ($err && $top) { + # return error if original file + if ($perl{threads} && $opts{threads} > 1) { + $opts{findeq}->enqueue([$ref, $err]); + } else { + sftp_error($ref, $err); + } + return; + } + } + if (defined $err) { + # this handles directories as well as lower level stat failures + my $line = "args=find," . escape(hostpath($shost, $file)) . ","; + $line .= $opts{'create-tar'} ? escape($dst) . " tar_name=" . + escape("$tdir$file0") : escape("$ddir/$dfile0"); + transport_find_buffer($line . " host=$opts{lhost}\n"); + return; + } + } + + # include files + if (defined $opts{include}) { + my $found; + foreach my $re (@{$opts{include}}) { + next if (eval {$file !~ /$re/}); + $found = 1; + last; + } + # must be done both here for files and after dir processing + goto FIND_DIR if (!$found); + } + + # newer/older files + my %ti = ('a' => 8, 'm' => 9, 'c' => 10); + if (defined $opts{newer}) { + if ($opts{newer} =~ /^([^:]+):(\S+)/) { + my ($type, $time) = ($1, $2); + $type =~ s/\|/1||/g; + $type =~ s/([amc])/$stat[$ti{$1}]>=$time&&/g; + $type =~ s/([AMC])/$stat[$ti{lc($1)}]<$time&&/g; + $type .= "1"; + # must be done both here for files and after dir processing + goto FIND_DIR if (!eval $type); + } elsif ($stat[9] < $opts{newer}) { + # must be done both here for files and after dir processing + goto FIND_DIR; + } + } + if (defined $opts{older}) { + if ($opts{older} =~ /^([^:]+):(\S+)/) { + my ($type, $time) = ($1, $2); + $type =~ s/\|/1||/g; + $type =~ s/([amc])/$stat[$ti{$1}]<$time&&/g; + $type =~ s/([AMC])/$stat[$ti{lc($1)}]>=$time&&/g; + $type .= "1"; + # must be done both here for files and after dir processing + goto FIND_DIR if (!eval $type); + } elsif ($stat[9] >= $opts{older}) { + # must be done both here for files and after dir processing + goto FIND_DIR; + } + } + + # resolve uid/gid if possible + my $user = $opts{findu}->{$stat[4]}; + if (!defined $user) { + $user = getpwuid($stat[4]); + $user = "uid_$stat[4]" if (!$user); + $opts{findu}->{$stat[4]} = $user; + } + my $group = $opts{findg}->{$stat[5]}; + if (!defined $group) { + $group = getgrgid($stat[5]); + $group = "gid_$stat[5]" if (!$group); + $opts{findg}->{$stat[5]} = $group; + } + my $attrs = join(",", @stat[2,4,5,8,9], + escape($user), escape($group), $stat[7], 512 * $stat[12]); + + my @acls; + my @lattrs; + my @xattrs; + if ($shost eq 'localhost') { + # if shift-aux failed, there is no backup to get this info remotely + # try to get acls + if (($opts{have}->{'shift-bin'} || $opts{have}->{getfacl}) && + !$opts{'create-tar'} && + ($opts{preserve} == 1 || $opts{preserve} =~ /acl/) && + (!$srcfs || $srcfs =~ /,acl/)) { + if (defined $opts{fhpid}) { + $opts{fhpid}->[0]->print("getfacl $file\n"); + my $text = $opts{fhpid}->[1]->getline; + my @cols = split(/\s+/, $text); + push(@acls, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); + } else { + my $fhpid = open3_run([-1, undef, -1], + "getfacl", "-cps", "--", $file); + while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { + chomp; + next if (!$_); + push(@acls, escape($_)); + } + open3_wait($fhpid); + } + } + + # try to get xattrs + if (($opts{have}->{'shift-bin'} || $opts{have}->{getfattr}) && + !$opts{'create-tar'} && + ($opts{preserve} == 1 || $opts{preserve} =~ /xattr/) && + (!$srcfs || $srcfs =~ /,xattr/)) { + if (defined $opts{fhpid}) { + $opts{fhpid}->[0]->print("getfattr $file\n"); + my $text = $opts{fhpid}->[1]->getline; + my @cols = split(/\s+/, $text); + push(@xattrs, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); + } else { + my $fhpid = open3_run([-1, undef, -1], + "getfattr", "-dhe", "base64", $file); + while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { + chomp; + next if (!$_ || /^\s*#/); + push(@xattrs, escape(decode_base64($_))); + } + open3_wait($fhpid); + } + } + + # try to get lustre striping + if (($opts{have}->{'shift-bin'} || $opts{have}->{lfs}) && + !S_ISLNK($mode) && !$opts{'create-tar'} && + ($opts{preserve} == 1 || $opts{preserve} =~ /stripe/) && + $srcfs =~ /^lustre/) { + # ignore symlinks as link to fifo can hang forever + if (defined $opts{fhpid}) { + $opts{fhpid}->[0]->print("getstripe $file\n"); + my $text = $opts{fhpid}->[1]->getline; + my @cols = split(/\s+/, $text); + @lattrs = split(/,/, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0'); + } else { + my $fhpid = open3_run([-1, undef, -1], + "lfs", "getstripe", "-d", $file); + while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) { + $lattrs[0] = $1 if (/stripe_count:\s*(-?\d+)/); + $lattrs[1] = $1 if (/stripe_size:\s*(-?\d+)/); + } + open3_wait($fhpid); + } + } + $lattrs[0] = 0 if (!defined $lattrs[0] && defined $lattrs[1]); + $lattrs[1] = 0 if (!defined $lattrs[1] && defined $lattrs[0]); + } + + # begin log entry + my $line; + my $index_len = !$opts{'index-tar'} ? 0 : 28 + length("$tdir$file0") + + length(sprintf("%7s%7s%9d", $user, $group, $stat[7])); + if (S_ISLNK($mode)) { + my $ln = $shost eq 'localhost' ? readlink($file) : + sftp($shost)->readlink($file); + $line .= "args=ln," . escape($ln); + $index_len += 4 + length($ln); + } elsif (S_ISDIR($mode)) { + $line .= "args=mkdir"; + } elsif ($opts{sync}) { + $line .= "args=ckattr" . ($opts{'ignore-times'} ? "0" : "") . + "," . escape(hostpath($shost, $file)); + } else { + $line .= "args=cp," . escape(hostpath($shost, $file)); + } + $line .= "," . (escape($opts{'create-tar'} ? $dst : "$ddir/$dfile0")); + $line .= " acls=" . join(",", @acls) if (scalar(@acls) > 0); + $line .= " xattrs=" . join(",", @xattrs) if (scalar(@xattrs) > 0); + $line .= " lustre_attrs=" . join(",", @lattrs) if (scalar(@lattrs) > 0); + $line .= " tar_index=$index_len" if ($opts{'index-tar'}); + $line .= " tar_name=" . escape("$tdir/$file0") if ($opts{'create-tar'}); + $line .= " size=$stat[7] attrs=$attrs host=$opts{lhost}\n"; + transport_find_buffer($line, !S_ISLNK($mode) && !S_ISDIR($mode) && + $dmf ? $file . "\n" : undef); + + FIND_DIR: if (defined $dh) { + # flush buffer to ensure parent dir printed before subdirs + transport_find_buffer(); + while (1) { + my $ent; + if ($shost eq 'localhost') { + $ent = readdir $dh; + } else { + $ent = sftp($shost)->readdir($dh); + $ent = $ent->{filename} if ($ent); + } + last if (!defined $ent); + my $path = [$ent, $shost, $file, $dst, "$ddir/$dfile0", $srcfs]; + push(@{$path}, "$tdir/$file0") if ($opts{'create-tar'}); + if ($perl{threads} && $opts{threads} > 1 && + #$opts{findq}->pending < $opts{'queue-size'}) { + #$opts{findq}->pending < $opts{'threads'}) { +#TODO: determine what size should go here + ${$opts{findq4}} < 4 * $opts{threads}) { + # only add to queue if not already at capacity + $opts{findq4}->up; + $opts{findn4}->up; + $opts{findq}->enqueue($path); + } else { + # process now if single threaded or queue at capacity + transport_find1($path); + } + } + closedir $dh; } } @@ -4321,6 +4446,7 @@ sub transport_fish { } #TODO: de-hardcode 60 second timeout $tin->sockopt(SO_RCVTIMEO, pack('L!L!', +60, 0)); + $tin->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1); $tout = $tin; my $nonce = "" . rand(); my $hmac = Digest::HMAC::hmac_hex($nonce, $key, @@ -6142,7 +6268,7 @@ sub vdf { my $out; eval { local $SIG{__WARN__} = sub {die}; - # use 2s alarm in case df stalls + # use 15s alarm in case df stalls local $SIG{ALRM} = sub {die}; alarm 15; # use open3 to avoid executing a shell command based on the name