mirror of https://github.com/pkolano/shift.git
Shift 6.2
parent
6ac6966b17
commit
75bf99fb2d
10
BUGS
10
BUGS
|
@ -1,15 +1,7 @@
|
|||
KNOWN BUGS
|
||||
==========
|
||||
|
||||
1. Setting the TCP window size from perl does not seem to work
|
||||
correctly. This can affect the performance of the fish-tcp
|
||||
transport. The window size appears to reach a maximum of 8MB even
|
||||
when the system limit is much higher. It is not known if this is due
|
||||
to the perl version and/or the specific build of perl on the system
|
||||
on which this was discovered. The only current workaround is to
|
||||
increase the number of streams (--streams) when using fish-tcp.
|
||||
|
||||
2. A bug exists in the Shift 4.0 and 5.0 tar creation function that
|
||||
1. A bug exists in the Shift 4.0 and 5.0 tar creation function that
|
||||
could leave some entries in tar files in a partially corrupted state.
|
||||
The conditions under which this could occur are very specific, so the
|
||||
overall percentage of affected tar files is expected to be very low.
|
||||
|
|
28
CHANGES
28
CHANGES
|
@ -255,3 +255,31 @@ CHANGES
|
|||
- Fixed corruption tracking causing negative sizes/rates in --status
|
||||
- Fixed inadvertent call of test shift-mgr during mgr synchronization
|
||||
- Removed warning message when --no-cron specified
|
||||
|
||||
* Shift 6.2 (06/24/19)
|
||||
- Added multi-threading of directory traversal
|
||||
- Added multi-threading of attr preservation when external programs invoked
|
||||
- Added --interval to dynamically adjust batch size around fixed frequency
|
||||
- Added use of gzip index files to increase metadata read performance
|
||||
- Added --no-silent to disable silent corruption detection
|
||||
- Added ability to escape multiple lines over stdin in shift-aux
|
||||
- Added syntax checking of numeric options that can take units
|
||||
- Changed --files and --size to be minimums instead of absolutes
|
||||
- Changed time remaining to more accurately reflect manager overhead
|
||||
- Fixed remote host selection that was running per file instead of per batch
|
||||
- Fixed detection of operations already performed by another client
|
||||
- Fixed slow processing of doing logs due to small backward read size
|
||||
- Fixed local/remote fadvise that had old initial implementation
|
||||
- Fixed --no-recall handling of remote files
|
||||
- Fixed possible exceptions during some thread joins
|
||||
- Fixed Net::Ping exceptions when Time::HiRes not available
|
||||
- Fixed exception when stat of remote file fails
|
||||
- Fixed use of setfacl when shift-bin available
|
||||
- Fixed run state when --restart=ignore ignores final operations
|
||||
- Fixed manager --doing=n so it retrieves nth doing in past
|
||||
- Fixed output of error table in stats due to tainted file names
|
||||
- Fixed option abbreviation so deprecated options don't invoke other options
|
||||
- Fixed double lustre striping during attribute preservation stage
|
||||
- Fixed high memory utilization during traversal of very large directories
|
||||
- Fixed stalls due to inadvertent newlines left in logs after restarts
|
||||
- Fixed preservation of lustre striping due to bad conditional
|
||||
|
|
44
INSTALL
44
INSTALL
|
@ -12,7 +12,7 @@ Shift Installation and Configuration
|
|||
|
||||
o shift-mgr - the Shift manager, which is invoked by client instances
|
||||
and must exist either on all client hosts or on a host
|
||||
accessible via ssh hostbased or pubkey authentication
|
||||
accessible via ssh hostbased or publickey authentication
|
||||
(henceforth called "manager hosts")
|
||||
|
||||
o shift-aux - the Shift auxiliary utility, which is invoked by client
|
||||
|
@ -42,7 +42,7 @@ Shift Installation and Configuration
|
|||
designated as manager hosts. Since the manager is not an active server
|
||||
and is simply a passive executable invoked by clients, the manager can be
|
||||
located on any host with ssh access. These hosts must support either
|
||||
hostbased or pubkey authentication, however, since Shift is an automated
|
||||
hostbased or publickey authentication, however, since Shift is an automated
|
||||
framework that performs actions without the user present. The manager has
|
||||
a synchronization mechanism allowing multiple hosts without a shared file
|
||||
system to be used for redundancy.
|
||||
|
@ -54,7 +54,7 @@ Shift Installation and Configuration
|
|||
|
||||
o perl >= 5.8.5 (>= 5.10.1 is required for multi-threading support)
|
||||
o ssh (and sftp) access to manager/remote hosts via hostbased or
|
||||
pubkey authentication
|
||||
publickey authentication
|
||||
|
||||
2.2. Optional
|
||||
|
||||
|
@ -70,7 +70,7 @@ Shift Installation and Configuration
|
|||
(http://toolkit.globus.org/toolkit/data/gridftp)
|
||||
o rsync - bandwidth-efficient local/remote copy
|
||||
(http://rsync.samba.org)
|
||||
o mesh - lightweight single sign-on via ssh pubkeys
|
||||
o mesh - lightweight single sign-on via ssh publickey authentication
|
||||
(http://mesh.sf.net)
|
||||
|
||||
2.3. Invoked (standard on most systems or when specific file systems in use)
|
||||
|
@ -175,9 +175,9 @@ Shift Installation and Configuration
|
|||
5.1.1. ~/.ssh/id_rsa (or similar)
|
||||
|
||||
If hostbased authentication is not supported by client hosts,
|
||||
manager hosts, and/or remote hosts, pubkey authentication must be
|
||||
used. Clients must have access to private keys to access these
|
||||
systems, which may either be referenced explicitly via the
|
||||
manager hosts, and/or remote hosts, publickey authentication must
|
||||
be used. Clients must have access to private keys to access
|
||||
these systems, which may either be referenced explicitly via the
|
||||
--mgr-identity and --identity options for manager and remote
|
||||
hosts, respectively, or be added to an ssh agent on the client(s).
|
||||
Note that the private keys used for these options must not be
|
||||
|
@ -189,7 +189,7 @@ Shift Installation and Configuration
|
|||
5.1.2. ~/.ssh/authorized_keys
|
||||
|
||||
If hostbased authentication is not supported to other client hosts
|
||||
for parallelization, pubkey authentication must be used. In this
|
||||
for parallelization, publickey authentication must be used. In this
|
||||
case, the public key(s) corresponding to the private key(s) used by
|
||||
clients (those named ~/.ssh/id* loaded into an ssh agent) must be
|
||||
added to the invoking user's authorized_keys file on other client
|
||||
|
@ -270,24 +270,25 @@ Shift Installation and Configuration
|
|||
5.2.2. ~/.ssh/authorized_keys
|
||||
|
||||
If hostbased authentication is not supported to manager hosts,
|
||||
pubkey authentication must be used. In this case, the public key(s)
|
||||
corresponding to the private key(s) used by clients (either loaded
|
||||
into an ssh agent or specified via --mgr-identity) must be added to
|
||||
the appropriate user's authorized_keys file on manager hosts.
|
||||
This user will either be the invoking user or the one specified by
|
||||
--mgr-user.
|
||||
publickey authentication must be used. In this case, the public
|
||||
key(s) corresponding to the private key(s) used by clients (either
|
||||
loaded into an ssh agent or specified via --mgr-identity) must be
|
||||
added to the appropriate user's authorized_keys file on manager
|
||||
hosts. This user will either be the invoking user or the one
|
||||
specified by --mgr-user.
|
||||
|
||||
|
||||
5.3. Remote hosts
|
||||
|
||||
5.3.1. ~/.ssh/authorized_keys
|
||||
|
||||
If hostbased authentication is not supported to remote hosts, pubkey
|
||||
authentication must be used. In this case, the public key(s)
|
||||
corresponding to the private key(s) used by clients (either loaded
|
||||
into an ssh agent or specified via --identity) must be added to the
|
||||
appropriate user's authorized_keys file on remote hosts. This user
|
||||
will either be the invoking user or the one specified by --user.
|
||||
If hostbased authentication is not supported to remote hosts,
|
||||
publickey authentication must be used. In this case, the public
|
||||
key(s) corresponding to the private key(s) used by clients (either
|
||||
loaded into an ssh agent or specified via --identity) must be added
|
||||
to the appropriate user's authorized_keys file on remote hosts.
|
||||
This user will either be the invoking user or the one specified by
|
||||
--user.
|
||||
|
||||
|
||||
6. Usage
|
||||
|
@ -302,9 +303,6 @@ Shift Installation and Configuration
|
|||
if not already in a manpath directory. Basic usage is drop-in
|
||||
compatible with cp/scp with special consideration for the authentication
|
||||
options --mgr, --mgr-identity, --mgr-user, --identity, and --user.
|
||||
Note that the scp "user@host" syntax is not currently supported (the
|
||||
"user@" portion will be dropped) so --user must be specified instead
|
||||
when the remote user differs from the local user.
|
||||
|
||||
6.2. shift-mgr
|
||||
|
||||
|
|
115
doc/shiftc.1
115
doc/shiftc.1
|
@ -118,6 +118,7 @@ given in following sections.
|
|||
(LIST subset of {acl,mode,owner,stripe,time,xattr})
|
||||
\-\-no\-recall do not recall DMF\-managed files before transfer
|
||||
\-\-no\-sanity do not check file existence/size (benchmarking only)
|
||||
\-\-no\-silent do not detect silent corruption or store checksums
|
||||
\-\-no\-verify do not verify/rectify integrity of destination files
|
||||
.PP
|
||||
\fBMonitoring and management options:\fP
|
||||
|
@ -148,8 +149,9 @@ given in following sections.
|
|||
(use suffix {k,m,g,t} for {Kb,Mb,Gb,Tb})
|
||||
\-\-buffer=SIZE use SIZE bytes for buffer in transports
|
||||
(use suffix {k,m,g,t} for {KiB,MiB,GiB,TiB}) [4m]
|
||||
\-\-files=COUNT process transfer in batches of COUNT files
|
||||
\-\-files=COUNT process transfer in batches of at least COUNT files
|
||||
(use suffix {k,m,b/g,t} for 1E{3,6,9,12}) [1k]
|
||||
\-\-interval=NUM adjust batches to run for around NUM seconds [30]
|
||||
\-\-local=LIST set local transport mechanism to one of LIST
|
||||
(LIST subset of {bbcp,bbftp,fish,fish-tcp,gridftp,
|
||||
mcp,rsync,shift})
|
||||
|
@ -158,7 +160,7 @@ given in following sections.
|
|||
(LIST subset of {bbcp,bbftp,fish,fish-tcp,gridftp,
|
||||
rsync,shift})
|
||||
\-\-retry=NUM retry failed operations up to NUM times [2]
|
||||
\-\-size=SIZE process transfer in batches of SIZE bytes
|
||||
\-\-size=SIZE process transfer in batches of at least SIZE bytes
|
||||
(use suffix {k,m,g,t} for {KB,MB,GB,TB}) [4g]
|
||||
\-\-split=SIZE parallelize single files using chunks of SIZE bytes
|
||||
(use suffix {k,m,g,t} for {KiB,MiB,GiB,TiB}) [0]
|
||||
|
@ -440,6 +442,16 @@ as needed even when this option is enabled.
|
|||
Disable file existence and size checks at the end of the transfer.
|
||||
This option was included for benchmarking and completeness purposes
|
||||
and is not recommended for general use.
|
||||
.IP "\fB\-\-no\-silent\fP"
|
||||
By default, the checksums of all files transferred with Shift are
|
||||
stored in a per-user database. When a file with a known checksum is
|
||||
transferred and has not been modified since the checksum was stored, the
|
||||
transfer will be put into the "alert" state if the current checksum does
|
||||
not match the stored checksum. This option disables the storage of
|
||||
checksums and comparison against existing checksums. While silent
|
||||
corruption detection adds minimal overhead during normal operation, it
|
||||
can increase the probability of lock contention when there are large
|
||||
numbers of clients.
|
||||
.IP "\fB\-\-no\-verify\fP"
|
||||
By default, files are checksummed at the source and destination to
|
||||
verify that they have not been corrupted and if corruption is detected,
|
||||
|
@ -462,18 +474,13 @@ the origin host/directory and the original command. When
|
|||
Specify the transfer identifier to be used with management and status
|
||||
commands.
|
||||
.IP "\fB\-\-last\-sum\fP"
|
||||
The checksums of all files transferred with Shift are stored in a
|
||||
per-user db. When a file with a known checksum is transferred and has
|
||||
not been modified since the checksum was stored, the transfer will be
|
||||
put into the "alert" state if the current checksum does not match the
|
||||
stored checksum. This option queries the silent corruption database for
|
||||
all files given on the command line and prints (one file per line) the
|
||||
last known checksum, the file modification time associated with this
|
||||
checksum, and the file name. When \fB\-\-index\-tar\fP is given, the
|
||||
first file argument is assumed to be a tar file and the remaining
|
||||
arguments names of files within the tar for which checksum information
|
||||
will be printed. A checksum of "-" means that no information is stored
|
||||
for the file.
|
||||
Queries the silent corruption database for all files given on the
|
||||
command line and prints (one file per line) the last known checksum, the
|
||||
file modification time associated with this checksum, and the file name.
|
||||
When \fB\-\-index\-tar\fP is given, the first file argument is assumed
|
||||
to be a tar file and the remaining arguments names of files within the
|
||||
tar for which checksum information will be printed. A checksum of "-"
|
||||
means that no information is stored for the file.
|
||||
.IP "\fB\-\-mgr=HOST\fP"
|
||||
Set the host that will be used to manage transfers. By default, this
|
||||
host will be accessed as the current user with hostbased authentication
|
||||
|
@ -536,19 +543,21 @@ specific list of metrics (e.g. \fB\-\-plot=id\fP), "io" is assumed.
|
|||
Restart the transfer associated with the given \fB\-\-id\fP that was
|
||||
stopped due to unrecoverable errors or stopped explicitly via
|
||||
\fB\-\-stop\fP. If \fB\-\-restart=ignore\fP is specified, all existing
|
||||
errors will be ignored and the transfer will progress as if the associated
|
||||
files and directories were no longer part of the transfer. Note that
|
||||
transfers must be restarted on the original client host or one that has
|
||||
equivalent file system access. A subset of the available command-line
|
||||
options may be respecified during a restart including \fB\-\-bandwidth\fP,
|
||||
\fB\-\-buffer\fP, \fB\-\-clients\fP, \fB\-\-cpu\fP, \fB\-\-disk\fP,
|
||||
\fB\-\-files\fP, \fB\-\-force\fP, \fB\-\-host\-file\fP, \fB\-\-host\-list\fP,
|
||||
\fB\-\-hosts\fP, \fB\-\-io\fP, \fB\-\-ior\fP, \fB\-\-iow\fP, \fB\-\-local\fP,
|
||||
\fB\-\-net\fP, \fB\-\-netr\fP, \fB\-\-netw\fP, \fB\-\-no\-cron\fP,
|
||||
\fB\-\-no\-mail\fP, \fB\-\-no\-offline\fP, \fB\-\-no\-recall\fP,
|
||||
\fB\-\-pipeline\fP, \fB\-\-ports\fP, \fB\-\-preallocate\fP, \fB\-\-remote\fP,
|
||||
\fB\-\-retry\fP, \fB\-\-secure\fP, \fB\-\-size\fP, \fB\-\-streams\fP,
|
||||
\fB\-\-stripe\fP, \fB\-\-threads\fP, and \fB\-\-window\fP.
|
||||
errors will be ignored and the transfer will progress as if the
|
||||
associated files and directories were no longer part of the transfer.
|
||||
Note that transfers must be restarted on the original client host or one
|
||||
that has equivalent file system access. A subset of the available
|
||||
command-line options may be respecified during a restart including
|
||||
\fB\-\-bandwidth\fP, \fB\-\-buffer\fP, \fB\-\-clients\fP, \fB\-\-cpu\fP,
|
||||
\fB\-\-disk\fP, \fB\-\-files\fP, \fB\-\-force\fP, \fB\-\-host\-file\fP,
|
||||
\fB\-\-host\-list\fP, \fB\-\-hosts\fP, \fB\-\-interval\fP, \fB\-\-io\fP,
|
||||
\fB\-\-ior\fP, \fB\-\-iow\fP, \fB\-\-local\fP, \fB\-\-net\fP,
|
||||
\fB\-\-netr\fP, \fB\-\-netw\fP, \fB\-\-no\-cron\fP, \fB\-\-no\-mail\fP,
|
||||
\fB\-\-no\-offline\fP, \fB\-\-no\-recall\fP, \fB\-\-no\-silent,
|
||||
\fB\-\-pipeline\fP, \fB\-\-ports\fP, \fB\-\-preallocate\fP,
|
||||
\fB\-\-remote\fP, \fB\-\-retry\fP, \fB\-\-secure\fP, \fB\-\-size\fP,
|
||||
\fB\-\-streams\fP, \fB\-\-stripe\fP, \fB\-\-threads\fP, and
|
||||
\fB\-\-window\fP.
|
||||
.IP "\fB\-\-search=REGEX\fP"
|
||||
When \fB\-\-status\fP and \fB\-\-id\fP are specified, this option will
|
||||
show the full status of file operations in the associated transfer whose
|
||||
|
@ -627,15 +636,29 @@ suffixes k, m, g, and t may be used for KiB, MiB, GiB, and TiB,
|
|||
respectively. The default buffer size is 4 MiB. Increasing the
|
||||
buffer size trades higher memory utilization for more efficient I/O.
|
||||
.IP "\fB\-\-files=COUNT\fP"
|
||||
Process transfers in batches of the given number of files. The
|
||||
suffixes k, m, b or g, and t may be used for 1E3, 1E6, 1E9, and 1E12,
|
||||
respectively. The default batch count is 1000 files. Lowering the
|
||||
batch count will increase the number of checkpoints and the overhead of
|
||||
transfer management. Raising the batch count will have the opposite
|
||||
effect. A batch will be sent for processing when the number of files in
|
||||
the batch reaches the given value. Note that batches of less than the
|
||||
given count can occur if the batch size specified by \fB\-\-size\fP is
|
||||
reached first.
|
||||
Process transfers in batches of at least the given number of files.
|
||||
The suffixes k, m, b or g, and t may be used for 1E3, 1E6, 1E9, and
|
||||
1E12, respectively. The default batch count is 1000 files. This option
|
||||
works in concert with \fB\-\-size\fP and \fB\-\-interval\fP to manage
|
||||
the number of checkpoints and the overhead of transfer management. A
|
||||
batch will initially consist of at least \fB\-\-files\fP files or
|
||||
\fB\-\-size\fP bytes, whichever is reached first. The batch may then
|
||||
be dynamically increased in size until there is enough work to span
|
||||
\fB\-\-interval\fP seconds. To make batch selection completely dynamic,
|
||||
use \fB\-\-files=1\fP and \fB\-\-size=1\fP.
|
||||
.IP "\fB\-\-interval=SECS\fP"
|
||||
Process transfers in batches that take around the given number of
|
||||
seconds. The default interval is 30 seconds. This option works in
|
||||
concert with \fB\-\-files\fP and \fB\-\-size\fP to manage the number of
|
||||
checkpoints and the overhead of transfer management. A batch will
|
||||
initially consist of at least \fB\-\-files\fP files or \fB\-\-size\fP
|
||||
bytes, whichever is reached first. The batch may then be dynamically
|
||||
increased in size until there is enough work to span \fB\-\-interval\fP
|
||||
seconds. Note that the actual time a batch takes will depend on its
|
||||
contents and that the interval will be increased as the number of
|
||||
clients participating in a transfer increases to minimize contention
|
||||
for manager locks. To make batch selection completely static, use
|
||||
\fB\-\-interval=0\fP.
|
||||
.IP "\fB\-\-local=LIST\fP"
|
||||
Specify one or more local transports to be used for the transfer in
|
||||
order of preference, separated by commas. Valid transports for this
|
||||
|
@ -674,15 +697,16 @@ retries. Note that disabling retries also disables the ability of
|
|||
value is cumulative across all stages of a file's processing so
|
||||
different stages may not be retried the same number of times.
|
||||
.IP "\fB\-\-size=SIZE\fP"
|
||||
Process transfers in batches of approximately the given total file size.
|
||||
Process transfers in batches of at least the given total file size.
|
||||
The suffixes k, m, g, and t may be used for KB, MB, GB, and TB,
|
||||
respectively. The default batch size is 4 GB. Lowering the batch size
|
||||
will increase the number of checkpoints and the overhead of transfer
|
||||
management. Raising the batch size will have the opposite effect. A
|
||||
batch will be sent for processing when the total size of all files in
|
||||
the batch reaches the given value. Note that batches of less than the
|
||||
given size can occur if the batch count specified by \fB\-\-files\fP
|
||||
is reached first.
|
||||
respectively. The default batch size is 4 GB. This option works in
|
||||
concert with \fB\-\-files\fP and \fB\-\-interval\fP to manage the number
|
||||
of checkpoints and the overhead of transfer management. A batch will
|
||||
initially consist of at least \fB\-\-size\fP bytes or \fB\-\-files\fP
|
||||
files, whichever is reached first. The batch may then be dynamically
|
||||
increased in size until there is enough work to span \fB\-\-interval\fP
|
||||
seconds. To make batch selection completely dynamic, use
|
||||
\fB\-\-files=1\fP and \fB\-\-size=1\fP.
|
||||
.IP "\fB\-\-split=SIZE\fP"
|
||||
Parallelize the processing of single files using chunks of the given
|
||||
size. The suffixes k, m, g, and t may be used for KiB, MiB, GiB, and
|
||||
|
@ -785,6 +809,9 @@ loss.
|
|||
./"################################################################
|
||||
.SH "TRANSFER THROTTLING"
|
||||
./"################################################################
|
||||
Transfers can be throttled to prevent resource exhaustion when they
|
||||
reach configured thresholds for CPU, disk, I/O, and/or network
|
||||
utilization.
|
||||
.IP "\fB\-\-cpu=NUM\fP"
|
||||
Throttle the transfer when the local CPU usage reaches the specified
|
||||
percent of the total available. This option is disabled by default but
|
||||
|
|
|
@ -145,7 +145,7 @@ user_dir /home/%u/.shift
|
|||
# (use suffix {k,m,g,t} for {KB,MB,GB,TB})
|
||||
#default_buffer 4m
|
||||
|
||||
# maximum number of files to transfer in each batch
|
||||
# minimum number of files to transfer in each batch
|
||||
# (use suffix {k,m,b/g,t} for 10E{3,6,9,12})
|
||||
#default_files 1k
|
||||
|
||||
|
@ -154,6 +154,10 @@ user_dir /home/%u/.shift
|
|||
# (use suffix {k,m,b/g,t} for 10E{3,6,9,12})
|
||||
#default_find-files 2k
|
||||
|
||||
# approximate number of seconds that each batch should run
|
||||
# (note this will be adjusted upward with more clients/greater mgr overhead)
|
||||
#default_interval 30
|
||||
|
||||
# source sparsity percentage at which to preallocate destination file
|
||||
# (sparsity defined as 1 - (512 * allocated_blocks / size))
|
||||
# (note that transports using temporary files are not supported)
|
||||
|
@ -164,7 +168,7 @@ user_dir /home/%u/.shift
|
|||
# (must be at least 1 for --sync to function)
|
||||
#default_retry 2
|
||||
|
||||
# approximate maximum amount of data to transfer in each batch
|
||||
# approximate minimum amount of data to transfer in each batch
|
||||
# (use suffix {k,m,g,t} for {KB,MB,GB,TB})
|
||||
#default_size 4g
|
||||
|
||||
|
|
774
perl/shift-aux
774
perl/shift-aux
|
@ -52,7 +52,7 @@ use File::Basename;
|
|||
use File::Path;
|
||||
use File::Spec;
|
||||
use File::Temp qw(tempfile);
|
||||
use Getopt::Long qw(:config bundling no_ignore_case require_order);
|
||||
use Getopt::Long qw(:config bundling no_auto_abbrev no_ignore_case require_order);
|
||||
use IO::File;
|
||||
use IO::Handle;
|
||||
use IO::Socket::INET;
|
||||
|
@ -61,12 +61,12 @@ require IPC::Open3;
|
|||
use List::Util qw(first min);
|
||||
use MIME::Base64;
|
||||
use POSIX;
|
||||
use Socket;
|
||||
use Socket qw(IPPROTO_TCP TCP_NODELAY);
|
||||
use Symbol qw(gensym);
|
||||
use Sys::Hostname;
|
||||
use Text::ParseWords;
|
||||
|
||||
our $VERSION = 0.96;
|
||||
our $VERSION = 0.98;
|
||||
|
||||
# do not die when receiving sigpipe
|
||||
$SIG{PIPE} = 'IGNORE';
|
||||
|
@ -82,38 +82,51 @@ umask ($< == 0 ? 077 : 077 & umask);
|
|||
my %perl;
|
||||
$perl{ssl} = eval 'use IO::Socket::SSL; 1';
|
||||
# need threads and version of Thread::Queue from perl >= 5.10.1
|
||||
$perl{threads} = eval 'require 5.010_001; use threads; use Thread::Queue; 1';
|
||||
$perl{threads} = eval 'require 5.010_001; use threads; use Thread::Queue; use Thread::Semaphore; 1';
|
||||
|
||||
my %opts;
|
||||
my %opts = (
|
||||
'buffer' => 4194304,
|
||||
'buffer-size' => 4,
|
||||
'hash-type' => "md5",
|
||||
'ports' => "50000:51000",
|
||||
'split-size' => 1024,
|
||||
'streams' => 4,
|
||||
'threads' => 4,
|
||||
'window' => 4194304,
|
||||
);
|
||||
my $cmd = shift @ARGV;
|
||||
|
||||
# parse options and perform corresponding command
|
||||
if (!$cmd) {
|
||||
die "Invalid command\n";
|
||||
} elsif ($cmd eq 'chattr') {
|
||||
die "Invalid options\n" if (!GetOptions(\%opts,
|
||||
"buffer=i", "threads=i",
|
||||
));
|
||||
die "Invalid options\n" if (scalar(@ARGV) > 0);
|
||||
chattr();
|
||||
} elsif ($cmd eq 'escape') {
|
||||
die "Invalid options\n" if (scalar(@ARGV) != 1);
|
||||
print escape($ARGV[0]);
|
||||
if (scalar(@ARGV)) {
|
||||
print escape($_) foreach (@ARGV);
|
||||
} else {
|
||||
print escape($_) while (<STDIN>);
|
||||
}
|
||||
} elsif ($cmd eq 'fadvise') {
|
||||
die "Invalid options\n" if (scalar(@ARGV) > 0);
|
||||
# offload to shift-bin since fadvise does not exist in perl
|
||||
exec("shift-bin");
|
||||
} elsif ($cmd eq 'find') {
|
||||
die "Invalid options\n" if (!GetOptions(\%opts,
|
||||
"create-tar", "dereference|L", "exclude=s@", "extract-tar",
|
||||
"find-files=i", "ignore-times", "include=s@", "newer=s", "older=s",
|
||||
"preserve=s", "sync",
|
||||
"buffer=i", "create-tar", "dereference|L", "exclude=s@",
|
||||
"extract-tar", "find-files=i", "ignore-times", "include=s@",
|
||||
"newer=s", "older=s", "preserve=s", "recall", "sync", "threads=i",
|
||||
));
|
||||
die "Invalid options\n" if (scalar(@ARGV) > 0);
|
||||
find();
|
||||
} elsif ($cmd eq 'fish') {
|
||||
%opts = (
|
||||
'buffer-size' => 4,
|
||||
'ports' => "50000:51000",
|
||||
'streams' => 4,
|
||||
'window' => 4194304,
|
||||
);
|
||||
die "Invalid options\n" if (!GetOptions(\%opts,
|
||||
"buffer-size=i", "ports=s", "secure", "streams=i", "tcp", "verify",
|
||||
"window=i",
|
||||
"buffer-size=i", "hash-type=s", "ports=s", "secure", "split-size=i",
|
||||
"streams=i", "tcp", "verify", "window=i",
|
||||
));
|
||||
die "Invalid options\n" if (scalar(@ARGV) > 0);
|
||||
$opts{'buffer-size'} <<= 20;
|
||||
|
@ -122,12 +135,6 @@ if (!$cmd) {
|
|||
die "Invalid options\n" if (scalar(@ARGV) > 0);
|
||||
mount();
|
||||
} elsif ($cmd eq 'sum') {
|
||||
%opts = (
|
||||
'buffer-size' => 4,
|
||||
'hash-type' => "md5",
|
||||
'split-size' => 1024,
|
||||
'threads' => 4,
|
||||
);
|
||||
die "Invalid options\n" if (!GetOptions(\%opts,
|
||||
"buffer-size=i", "c", "hash-type=s", "split-size=i", "threads=i",
|
||||
));
|
||||
|
@ -148,40 +155,120 @@ if (!$cmd) {
|
|||
}
|
||||
}
|
||||
|
||||
################
|
||||
#### buffer ####
|
||||
################
|
||||
# write output in blocks for efficiency
|
||||
my $buf4;
|
||||
sub buffer {
|
||||
my ($text, $file) = @_;
|
||||
$opts{text_buf} .= $text;
|
||||
$opts{file_buf} .= $file;
|
||||
if (($opts{text_buf} || $opts{file_buf}) &&
|
||||
(!defined $text || length $opts{text_buf} >= $opts{buffer} ||
|
||||
length $opts{file_buf} >= $opts{buffer})) {
|
||||
$buf4->down if ($perl{threads} && $opts{threads} > 1);
|
||||
if ($opts{text_buf} && (!defined $text ||
|
||||
length $opts{text_buf} >= $opts{buffer})) {
|
||||
print $opts{text_buf};
|
||||
STDOUT->flush;
|
||||
delete $opts{text_buf};
|
||||
}
|
||||
if ($opts{file_buf} && (!defined $text ||
|
||||
length $opts{file_buf} >= $opts{buffer})) {
|
||||
print {$opts{dmfh}} $opts{file_buf};
|
||||
$opts{dmfh}->flush;
|
||||
delete $opts{file_buf};
|
||||
}
|
||||
$buf4->up if ($perl{threads} && $opts{threads} > 1);
|
||||
}
|
||||
}
|
||||
|
||||
################
|
||||
#### chattr ####
|
||||
################
|
||||
# set given attr of files given on STDIN to given value(s) and output ok/error
|
||||
sub chattr {
|
||||
# check for existence of commands
|
||||
my %have;
|
||||
foreach my $bin (qw(fallocate lfs setfacl setfattr shift-bin)) {
|
||||
$have{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH}));
|
||||
$opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH}));
|
||||
}
|
||||
# offload to shift-bin for more efficient processing if possible
|
||||
exec("shift-bin") if ($have{'shift-bin'});
|
||||
|
||||
while (my $line = <STDIN>) {
|
||||
chomp $line;
|
||||
my ($cmd, $file, $attrs) = split(/\s+/, $line, 3);
|
||||
# short circuit if command not available
|
||||
next if (!$have{$cmd});
|
||||
# untaint arguments
|
||||
$cmd = $1 if ($cmd =~ /(.*)/);
|
||||
$file = $1 if ($file =~ /(.*)/s);
|
||||
$attrs = $1 if ($attrs =~ /(.*)/);
|
||||
my $ufile = unescape($file);
|
||||
|
||||
# make sure parent directory exists
|
||||
my $dir = $ufile =~ s/\/$// ? $ufile : dirname($ufile);
|
||||
eval {mkpath($dir)};
|
||||
my ($q, @threads);
|
||||
if ($perl{threads} && $opts{threads} > 1) {
|
||||
$q = Thread::Queue->new;
|
||||
# mutual exclusion for stdout
|
||||
$buf4 = Thread::Semaphore->new(1);
|
||||
@threads = map {threads->create(sub {
|
||||
eval {
|
||||
# this should not be needed but there is a weird race condition
|
||||
# that causes the open3 forked child to never execute
|
||||
local $SIG{ALRM} = sub {die};
|
||||
alarm 2;
|
||||
# will revert to individual commands if alarm hit
|
||||
$opts{fhpid} = open3_run([undef, undef, -1], "shift-bin")
|
||||
if ($opts{have}->{'shift-bin'});
|
||||
alarm 0;
|
||||
};
|
||||
chattr1($_) while (defined ($_ = $q->dequeue));
|
||||
buffer();
|
||||
open3_wait($opts{fhpid}) if (defined $opts{fhpid});
|
||||
})} (1 .. $opts{threads});
|
||||
} else {
|
||||
# offload to shift-bin for more efficient processing if possible
|
||||
exec("shift-bin") if ($opts{have}->{'shift-bin'});
|
||||
}
|
||||
|
||||
while (<STDIN>) {
|
||||
chomp;
|
||||
next if (!$_);
|
||||
if ($perl{threads} && $opts{threads} > 1) {
|
||||
$q->enqueue($_);
|
||||
} else {
|
||||
chattr1($_);
|
||||
}
|
||||
}
|
||||
if ($perl{threads} && $opts{threads} > 1) {
|
||||
# force threads to exit
|
||||
$q->enqueue(undef) foreach (@threads);
|
||||
foreach (@threads) {
|
||||
$_->join if ($_);
|
||||
}
|
||||
} else {
|
||||
buffer();
|
||||
}
|
||||
}
|
||||
|
||||
#################
|
||||
#### chattr1 ####
|
||||
#################
|
||||
# set attrs of a single file
|
||||
sub chattr1 {
|
||||
my $line = shift;
|
||||
my ($cmd, $file, $attrs) = split(/\s+/, $line, 3);
|
||||
# short circuit if command not available
|
||||
return if (!defined $opts{fhpid} && !$opts{have}->{$cmd});
|
||||
# untaint arguments
|
||||
$cmd = $1 if ($cmd =~ /(.*)/);
|
||||
$file = $1 if ($file =~ /(.*)/s);
|
||||
$attrs = $1 if ($attrs =~ /(.*)/);
|
||||
my $ufile = unescape($file);
|
||||
|
||||
# make sure parent directory exists
|
||||
my $dir = $ufile =~ s/\/$// ? $ufile : dirname($ufile);
|
||||
eval {mkpath($dir)};
|
||||
|
||||
if (defined $opts{fhpid}) {
|
||||
$opts{fhpid}->[0]->print($line . "\n");
|
||||
my $text = $opts{fhpid}->[1]->getline;
|
||||
buffer($text);
|
||||
} else {
|
||||
my ($cin, @copts, $in, $out);
|
||||
if ($cmd eq 'fallocate') {
|
||||
@copts = ("-n", "-l", unescape($attrs), $ufile);
|
||||
} elsif ($cmd eq 'setstripe') {
|
||||
$cmd = "lfs";
|
||||
my ($count, $size, $pool) = split(/\s+/, unescape($attrs));
|
||||
my ($count, $size, $pool) = split(/\s+/, $attrs);
|
||||
$count = 0 if (!$count);
|
||||
$size = 0 if (!$size);
|
||||
@copts = ("setstripe", "-c", $count, "-S", $size, $ufile);
|
||||
|
@ -203,9 +290,9 @@ sub chattr {
|
|||
my $text;
|
||||
$text .= $_ while (<$out>);
|
||||
$text =~ s/\n/ /g;
|
||||
print "$file,", escape($text), "\n";
|
||||
buffer("$file,", escape($text) . "\n");
|
||||
} else {
|
||||
print "$file,ok\n";
|
||||
buffer("$file,ok\n");
|
||||
}
|
||||
close $out;
|
||||
}
|
||||
|
@ -226,310 +313,392 @@ sub escape {
|
|||
#### find ####
|
||||
##############
|
||||
# output list of files/dirs beneath paths given on STDIN with stat info
|
||||
my ($findq, $findb4, $findn4, $findq4);
|
||||
sub find {
|
||||
my @threads;
|
||||
if (!$opts{'extract-tar'}) {
|
||||
# check for existence of various commands
|
||||
foreach my $bin (qw(dmget lfs getfacl getfattr shift-bin)) {
|
||||
$opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH}));
|
||||
}
|
||||
|
||||
if ($opts{recall} && $opts{have}->{dmget}) {
|
||||
# set up tmp file for recalls in case threads need to access
|
||||
($opts{dmfh}, $opts{dmtmp}) = tempfile();
|
||||
}
|
||||
|
||||
if ($perl{threads} && $opts{threads} > 1) {
|
||||
$findq = Thread::Queue->new;
|
||||
# number of worker "b's" processing files
|
||||
$findb4 = Thread::Semaphore->new(0);
|
||||
# number of items that have been added to queue
|
||||
$findn4 = Thread::Semaphore->new(1);
|
||||
# number of unprocessed items on queue
|
||||
$findq4 = Thread::Semaphore->new(0);
|
||||
# mutual exclusion for stdout
|
||||
$buf4 = Thread::Semaphore->new(1);
|
||||
@threads = map {threads->create(sub {
|
||||
$opts{fhpid} = open3_run([undef, undef, -1], "shift-bin")
|
||||
if ($opts{have}->{'shift-bin'} && ($opts{preserve} == 1 ||
|
||||
$opts{preserve} =~ /acl|stripe|xattr/));
|
||||
while (1) {
|
||||
my $path = $findq->dequeue_timed(1);
|
||||
if (defined $path) {
|
||||
$findb4->up;
|
||||
$findq4->down;
|
||||
find1($path);
|
||||
$findb4->down;
|
||||
} elsif (!$$findq4 && !$$findb4) {
|
||||
buffer();
|
||||
last;
|
||||
}
|
||||
}
|
||||
open3_wait($opts{fhpid}) if (defined $opts{fhpid});
|
||||
})} (1 .. $opts{threads});
|
||||
} else {
|
||||
# file count used in both single/multi-threaded cases
|
||||
$findn4 = \0;
|
||||
$opts{fhpid} = open3_run([undef, undef, -1], "shift-bin")
|
||||
if ($opts{have}->{'shift-bin'} && ($opts{preserve} == 1 ||
|
||||
$opts{preserve} =~ /acl|stripe|xattr/));
|
||||
}
|
||||
}
|
||||
|
||||
while (my $line = <STDIN>) {
|
||||
chomp $line;
|
||||
$opts{$_} = undef foreach (qw(srcfs tar_name tar_tell));
|
||||
my @args = split(/\s+/, $line);
|
||||
my $ref = pop @args;
|
||||
print "ref $ref\n";
|
||||
while (scalar(@args) > 3) {
|
||||
my $opt = pop @args;
|
||||
$opts{$1} = $2 if ($opt =~ /(\w+)=(\S+)/);
|
||||
}
|
||||
my @uargs = map {unescape($_)} @args;
|
||||
if ($opts{'extract-tar'}) {
|
||||
print "ref $ref\n";
|
||||
# process local tar files
|
||||
find_tar(@uargs);
|
||||
} else {
|
||||
find1(@uargs);
|
||||
buffer("ref $ref\n");
|
||||
my ($shost, $spath, $dst) = @uargs;
|
||||
my $sdir = dirname($spath);
|
||||
$sdir = "" if ($sdir eq '/');
|
||||
my $ddir = dirname($dst);
|
||||
$ddir = "" if ($ddir eq '/');
|
||||
my $path = [[basename($spath), basename($dst), $ref], $shost, $sdir,
|
||||
$dst, $ddir, $opts{srcfs}];
|
||||
if ($opts{'create-tar'}) {
|
||||
my $tdir = dirname(unescape($opts{tar_name}));
|
||||
$tdir = "" if ($tdir eq '.');
|
||||
$tdir .= "/" if ($tdir && $tdir !~ /\/$/);
|
||||
push(@{$path}, $tdir);
|
||||
}
|
||||
if ($perl{threads} && $opts{threads} > 1) {
|
||||
$findq4->up;
|
||||
$findn4->up;
|
||||
$findq->enqueue($path);
|
||||
} else {
|
||||
find1($path);
|
||||
}
|
||||
}
|
||||
}
|
||||
open3_wait($opts{fhpid}) if (defined $opts{fhpid});
|
||||
return if ($opts{'extract-tar'});
|
||||
|
||||
if ($opts{dmtmp}) {
|
||||
buffer();
|
||||
if ($perl{threads} && $opts{threads} > 1) {
|
||||
foreach (@threads) {
|
||||
$_->join if ($_);
|
||||
}
|
||||
} else {
|
||||
open3_wait($opts{fhpid}) if (defined $opts{fhpid});
|
||||
}
|
||||
|
||||
if ($opts{recall} && $opts{have}->{dmget}) {
|
||||
close $opts{dmfh};
|
||||
# fork to avoid intermittent hangs of dmget
|
||||
my $pid = fork_setsid();
|
||||
if ($pid) {
|
||||
waitpid($pid, 0);
|
||||
delete $opts{dmfh};
|
||||
delete $opts{dmtmp};
|
||||
if (-s $opts{dmtmp}) {
|
||||
# fork to avoid intermittent hangs of dmget
|
||||
my $pid = fork_setsid();
|
||||
if ($pid) {
|
||||
waitpid($pid, 0);
|
||||
delete $opts{dmfh};
|
||||
delete $opts{dmtmp};
|
||||
} else {
|
||||
my $extra = $opts{'create-tar'} ? " -a" : "";
|
||||
# ignore errors since files are automatically retrieved anyway
|
||||
open3_get([$opts{dmtmp}, -1, -1], "dmget -nq$extra");
|
||||
unlink $opts{dmtmp};
|
||||
POSIX::_exit(0);
|
||||
}
|
||||
} else {
|
||||
my $extra = $opts{'create-tar'} ? " -a" : "";
|
||||
# ignore errors since files will be automatically retrieved anyway
|
||||
open3_get([$opts{dmtmp}, -1, -1], "dmget -nq$extra");
|
||||
unlink $opts{dmtmp};
|
||||
POSIX::_exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
###############
|
||||
#### find1 ####
|
||||
###############
|
||||
# output list of files/dirs beneath given paths with stat info
|
||||
sub find1 {
|
||||
my ($shost, $spath, $dst) = @_;
|
||||
if (!defined $opts{have}) {
|
||||
# check for existence of various commands
|
||||
$opts{have} = {};
|
||||
foreach my $bin (qw(dmget lfs getfacl getfattr shift-bin)) {
|
||||
$opts{have}->{$bin} = first {-x "$_/$bin"} (split(/:/, $ENV{PATH}));
|
||||
my $path = shift;
|
||||
my ($file0, $shost, $sdir, $dst, $ddir, $srcfs, $tdir) = @{$path};
|
||||
my $dfile0 = $file0;
|
||||
my ($top, $ref);
|
||||
if (ref $file0) {
|
||||
$top = 1;
|
||||
($file0, $dfile0, $ref) = @{$file0};
|
||||
}
|
||||
return if ($file0 eq '.' || $file0 eq '..');
|
||||
my $file = "$sdir/$file0";
|
||||
|
||||
my $dmf = $opts{recall} && $opts{have}->{dmget} && $srcfs =~ /,dmf/ ? 1 : 0;
|
||||
|
||||
# dereference before stat
|
||||
$file = abs_path($file) if ($opts{dereference});
|
||||
# always get stat info of real file
|
||||
my @stat = lstat($file);
|
||||
my $mode;
|
||||
if (scalar(@stat) == 0) {
|
||||
$file = "$sdir/$file0" if ($opts{dereference});
|
||||
if ($top) {
|
||||
# escape commas
|
||||
$file =~ s/(,)/sprintf("%%%02X", ord($1))/eg;
|
||||
# return error if original file
|
||||
buffer("$ref:Cannot stat $file\n");
|
||||
return;
|
||||
}
|
||||
$opts{fhpid} = open3_run([undef, undef, -1], "shift-bin")
|
||||
if ($opts{have}->{'shift-bin'});
|
||||
# lower level files cannot return errors because there is no way
|
||||
# to back out of previously added operations, so instead a find
|
||||
# op is added, which will succeed/fail on its own when processed
|
||||
} else {
|
||||
$mode = $stat[2];
|
||||
$stat[2] &= 07777;
|
||||
|
||||
# only directories, regular files, and symlinks are supported
|
||||
return if (!S_ISDIR($mode) && !S_ISREG($mode) && !S_ISLNK($mode));
|
||||
# dmf handling for individual files is carried out by transport_dmf
|
||||
$dmf = 0 if ($top && !S_ISDIR($mode));
|
||||
}
|
||||
|
||||
my $dmf = $opts{have}->{dmget} && $opts{srcfs} =~ /,dmf/ ? 1 : 0;
|
||||
|
||||
my $sdir = dirname($spath);
|
||||
$sdir = "" if ($sdir eq '/');
|
||||
my $ddir = dirname($dst);
|
||||
$ddir = "" if ($ddir eq '/');
|
||||
my $tdir = $opts{'create-tar'} ? dirname($opts{tar_name}) : undef;
|
||||
$tdir = "" if ($tdir eq '.');
|
||||
$tdir .= "/" if ($tdir && $tdir !~ /\/$/);
|
||||
my $dname = basename($dst);
|
||||
my @files = (basename($spath));
|
||||
FILE: foreach my $file0 (@files) {
|
||||
if (ref $file0) {
|
||||
# processing subdir so update prefix
|
||||
$sdir = $file0->[0];
|
||||
$ddir = $file0->[1];
|
||||
$tdir = $file0->[2] . "/" if ($opts{'create-tar'});
|
||||
# dname only needed on first iteration where src/dst name may differ
|
||||
$dname = undef;
|
||||
next;
|
||||
# exclude files (must be before dir processing)
|
||||
if (defined $opts{exclude}) {
|
||||
foreach my $re (@{$opts{exclude}}) {
|
||||
my $ure = unescape($re);
|
||||
return if (eval {$file =~ /$ure/});
|
||||
}
|
||||
next if ($file0 eq '.' || $file0 eq '..');
|
||||
my $file = "$sdir/$file0";
|
||||
my $dfile0 = $dname ? $dname : $file0;
|
||||
}
|
||||
|
||||
# dereference before stat
|
||||
$file = abs_path($file) if ($opts{dereference});
|
||||
# always get stat info of real file
|
||||
my @stat = lstat($file);
|
||||
my $mode;
|
||||
if (scalar(@stat) == 0) {
|
||||
$file = "$sdir/$file0" if ($opts{dereference});
|
||||
if (scalar(@files) == 1) {
|
||||
my $dh;
|
||||
if (scalar(@stat) == 0 || S_ISDIR($mode)) {
|
||||
# ensure $err defined unless explicitly set to undef
|
||||
my $err = "";
|
||||
if (scalar(@stat) > 0 && (!$opts{dereference} || $top) &&
|
||||
(!defined $opts{'find-files'} ||
|
||||
$$findn4 < $opts{'find-files'})) {
|
||||
# add subdirs of this directory for processing when below limit
|
||||
if (opendir($dh, $file)) {
|
||||
$err = undef;
|
||||
# directory will be processed after parent dir printed
|
||||
} else {
|
||||
$err = "Error opening directory $file\n";
|
||||
}
|
||||
if ($err && $top) {
|
||||
# escape commas
|
||||
$file =~ s/(,)/sprintf("%%%02X", ord($1))/eg;
|
||||
$err =~ s/(,)/sprintf("%%%02X", ord($1))/eg;
|
||||
# return error if original file
|
||||
print "Cannot stat $file\n";
|
||||
next;
|
||||
}
|
||||
# lower level files cannot return errors because there is no way
|
||||
# to back out of previously added operations, so instead a find
|
||||
# op is added, which will succeed/fail on its own when processed
|
||||
} else {
|
||||
$mode = $stat[2];
|
||||
$stat[2] &= 07777;
|
||||
|
||||
# only directories, regular files, and symlinks are supported
|
||||
next if (!S_ISDIR($mode) && !S_ISREG($mode) && !S_ISLNK($mode));
|
||||
# dmf handling for individual files is carried out by transport_dmf
|
||||
$dmf = 0 if (scalar(@files) == 1 && !S_ISDIR($mode));
|
||||
}
|
||||
|
||||
# exclude files (must be before dir processing)
|
||||
if (defined $opts{exclude}) {
|
||||
foreach my $re (@{$opts{exclude}}) {
|
||||
my $ure = unescape($re);
|
||||
next FILE if (eval {$file =~ /$ure/});
|
||||
buffer("$ref:$err");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (scalar(@stat) == 0 || S_ISDIR($mode)) {
|
||||
my $err = "";
|
||||
if (scalar(@stat) > 0 && (!$opts{dereference} || scalar(@files) == 1) &&
|
||||
(!defined $opts{'find-files'} ||
|
||||
scalar(@files) < $opts{'find-files'})) {
|
||||
# add subdirs of this directory for processing when below limit
|
||||
if (opendir(DIR, $file)) {
|
||||
$! = undef;
|
||||
my @sub_files = readdir DIR;
|
||||
if ($! || scalar(@sub_files) == 0) {
|
||||
# there is currently no good way to detect readdir errors
|
||||
# dirs should always contain . and .. at a minimum
|
||||
$err = "Error reading directory $file";
|
||||
} else {
|
||||
my $dirs = [$file, "$ddir/$dfile0"];
|
||||
push(@{$dirs}, "$tdir$file0") if ($opts{'create-tar'});
|
||||
push(@files, $dirs, @sub_files);
|
||||
$err = undef;
|
||||
}
|
||||
closedir DIR;
|
||||
} else {
|
||||
$err = "Error opening directory $file";
|
||||
}
|
||||
if ($err && scalar(@files) == 1) {
|
||||
# escape commas
|
||||
$err =~ s/(,)/sprintf("%%%02X", ord($1))/eg;
|
||||
# return error if original file
|
||||
print $err, "\n";
|
||||
next;
|
||||
}
|
||||
}
|
||||
if (defined $err) {
|
||||
# this handles directories as well as lower level stat failures
|
||||
print "args=find,", escape("$shost:$file"), ",";
|
||||
if ($opts{'create-tar'}) {
|
||||
print escape($dst), " tar_name=" . escape("$tdir$file0");
|
||||
} else {
|
||||
print escape("$ddir/$dfile0");
|
||||
}
|
||||
print "\n";
|
||||
next;
|
||||
}
|
||||
if (defined $err) {
|
||||
# this handles directories as well as lower level stat failures
|
||||
my $line = "args=find," . escape("$shost:$file") . ",";
|
||||
$line .= $opts{'create-tar'} ? escape($dst) . " tar_name=" .
|
||||
escape("$tdir$file0") : escape("$ddir/$dfile0");
|
||||
buffer($line . "\n");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
# include files (must be after dir processing)
|
||||
if (defined $opts{include}) {
|
||||
my $found;
|
||||
foreach my $re (@{$opts{include}}) {
|
||||
my $ure = unescape($re);
|
||||
next if (eval {$file !~ /$ure/});
|
||||
$found = 1;
|
||||
last;
|
||||
}
|
||||
next if (!$found);
|
||||
# include files
|
||||
if (defined $opts{include}) {
|
||||
my $found;
|
||||
foreach my $re (@{$opts{include}}) {
|
||||
my $ure = unescape($re);
|
||||
next if (eval {$file !~ /$ure/});
|
||||
$found = 1;
|
||||
last;
|
||||
}
|
||||
# must be done both here for files and after dir processing
|
||||
goto FIND_DIR if (!$found);
|
||||
}
|
||||
|
||||
# newer/older files (must be after dir processing)
|
||||
my %ti = ('a' => 8, 'm' => 9, 'c' => 10);
|
||||
if (defined $opts{newer}) {
|
||||
if ($opts{newer} =~ /^([^:]+):(\S+)/) {
|
||||
my ($type, $time) = ($1, $2);
|
||||
$type =~ s/\|/1||/g;
|
||||
$type =~ s/([amc])/$stat[$ti{$1}]>=$time&&/g;
|
||||
$type =~ s/([AMC])/$stat[$ti{lc($1)}]<$time&&/g;
|
||||
$type .= "1";
|
||||
next if (!eval $type);
|
||||
} elsif ($stat[9] < $opts{newer}) {
|
||||
next;
|
||||
}
|
||||
# newer/older files (must be after dir processing)
|
||||
my %ti = ('a' => 8, 'm' => 9, 'c' => 10);
|
||||
if (defined $opts{newer}) {
|
||||
if ($opts{newer} =~ /^([^:]+):(\S+)/) {
|
||||
my ($type, $time) = ($1, $2);
|
||||
$type =~ s/\|/1||/g;
|
||||
$type =~ s/([amc])/$stat[$ti{$1}]>=$time&&/g;
|
||||
$type =~ s/([AMC])/$stat[$ti{lc($1)}]<$time&&/g;
|
||||
$type .= "1";
|
||||
# must be done both here for files and after dir processing
|
||||
goto FIND_DIR if (!eval $type);
|
||||
} elsif ($stat[9] < $opts{newer}) {
|
||||
# must be done both here for files and after dir processing
|
||||
goto FIND_DIR;
|
||||
}
|
||||
if (defined $opts{older}) {
|
||||
if ($opts{older} =~ /^([^:]+):(\S+)/) {
|
||||
my ($type, $time) = ($1, $2);
|
||||
$type =~ s/\|/1||/g;
|
||||
$type =~ s/([amc])/$stat[$ti{$1}]<$time&&/g;
|
||||
$type =~ s/([AMC])/$stat[$ti{lc($1)}]>=$time&&/g;
|
||||
$type .= "1";
|
||||
next if (!eval $type);
|
||||
} elsif ($stat[9] >= $opts{older}) {
|
||||
next;
|
||||
}
|
||||
}
|
||||
if (defined $opts{older}) {
|
||||
if ($opts{older} =~ /^([^:]+):(\S+)/) {
|
||||
my ($type, $time) = ($1, $2);
|
||||
$type =~ s/\|/1||/g;
|
||||
$type =~ s/([amc])/$stat[$ti{$1}]<$time&&/g;
|
||||
$type =~ s/([AMC])/$stat[$ti{lc($1)}]>=$time&&/g;
|
||||
$type .= "1";
|
||||
# must be done both here for files and after dir processing
|
||||
goto FIND_DIR if (!eval $type);
|
||||
} elsif ($stat[9] >= $opts{older}) {
|
||||
# must be done both here for files and after dir processing
|
||||
goto FIND_DIR;
|
||||
}
|
||||
}
|
||||
|
||||
# dereference before stat
|
||||
# resolve uid/gid if possible
|
||||
my $user = getpwuid($stat[4]);
|
||||
my $group = getgrgid($stat[5]);
|
||||
# resolve uid/gid if possible
|
||||
my $user = $opts{findu}->{$stat[4]};
|
||||
if (!defined $user) {
|
||||
$user = getpwuid($stat[4]);
|
||||
$user = "uid_$stat[4]" if (!$user);
|
||||
$opts{findu}->{$stat[4]} = $user;
|
||||
}
|
||||
my $group = $opts{findg}->{$stat[5]};
|
||||
if (!defined $group) {
|
||||
$group = getgrgid($stat[5]);
|
||||
$group = "gid_$stat[5]" if (!$group);
|
||||
my $attrs = join(",", @stat[2,4,5,8,9],
|
||||
escape($user), escape($group), $stat[7], 512 * $stat[12]);
|
||||
$opts{findg}->{$stat[5]} = $group;
|
||||
}
|
||||
my $attrs = join(",", @stat[2,4,5,8,9],
|
||||
escape($user), escape($group), $stat[7], 512 * $stat[12]);
|
||||
|
||||
my @acls;
|
||||
my @lattrs;
|
||||
my @xattrs;
|
||||
# try to get acls
|
||||
if (($opts{have}->{'shift-bin'} || $opts{have}->{getfacl}) &&
|
||||
!$opts{'create-tar'} &&
|
||||
($opts{preserve} == 1 || $opts{preserve} =~ /acl/) &&
|
||||
(!$opts{srcfs} || $opts{srcfs} =~ /,acl/)) {
|
||||
if (defined $opts{fhpid}) {
|
||||
$opts{fhpid}->[0]->print("getfacl $file\n");
|
||||
my $text = $opts{fhpid}->[1]->getline;
|
||||
my @cols = split(/\s+/, $text);
|
||||
push(@acls, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0');
|
||||
} else {
|
||||
my $fhpid = open3_run([-1, undef, -1],
|
||||
"getfacl", "-cps", "--", $file);
|
||||
while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) {
|
||||
chomp;
|
||||
next if (!$_);
|
||||
push(@acls, escape($_));
|
||||
}
|
||||
open3_wait($fhpid);
|
||||
}
|
||||
}
|
||||
|
||||
# try to get xattrs
|
||||
if (($opts{have}->{'shift-bin'} || $opts{have}->{getfattr}) &&
|
||||
!$opts{'create-tar'} &&
|
||||
($opts{preserve} == 1 || $opts{preserve} =~ /xattr/) &&
|
||||
(!$opts{srcfs} || $opts{srcfs} =~ /,xattr/)) {
|
||||
if (defined $opts{fhpid}) {
|
||||
$opts{fhpid}->[0]->print("getfattr $file\n");
|
||||
my $text = $opts{fhpid}->[1]->getline;
|
||||
my @cols = split(/\s+/, $text);
|
||||
push(@xattrs, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0');
|
||||
} else {
|
||||
my $fhpid = open3_run([-1, undef, -1],
|
||||
"getfattr", "-dhe", "base64", $file);
|
||||
while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) {
|
||||
chomp;
|
||||
next if (!$_ || /^\s*#/);
|
||||
push(@xattrs, escape(decode_base64($_)));
|
||||
}
|
||||
open3_wait($fhpid);
|
||||
}
|
||||
}
|
||||
|
||||
# try to get lustre striping
|
||||
if (($opts{have}->{'shift-bin'} || $opts{have}->{lfs}) &&
|
||||
!S_ISLNK($mode) && !$opts{'create-tar'} &&
|
||||
($opts{preserve} == 1 || $opts{preserve} =~ /stripe/) &&
|
||||
$opts{srcfs} =~ /^lustre/) {
|
||||
# ignore symlinks as link to fifo can hang forever
|
||||
if (defined $opts{fhpid}) {
|
||||
$opts{fhpid}->[0]->print("getstripe $file\n");
|
||||
my $text = $opts{fhpid}->[1]->getline;
|
||||
my @cols = split(/\s+/, $text);
|
||||
@lattrs = split(/,/, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0');
|
||||
} else {
|
||||
open(FILE, '-|', "lfs", "getstripe", "-d", $file);
|
||||
while (<FILE>) {
|
||||
$lattrs[0] = $1 if (/stripe_count:\s*(-?\d+)/);
|
||||
$lattrs[1] = $1 if (/stripe_size:\s*(-?\d+)/);
|
||||
}
|
||||
close FILE;
|
||||
}
|
||||
}
|
||||
$lattrs[0] = 0 if (!defined $lattrs[0] && defined $lattrs[1]);
|
||||
$lattrs[1] = 0 if (!defined $lattrs[1] && defined $lattrs[0]);
|
||||
|
||||
# begin log entry
|
||||
my $index_len = !$opts{'index-tar'} ? 0 : 28 + length("$tdir$file0") +
|
||||
length(sprintf("%7s%7s%9d", $user, $group, $stat[7]));
|
||||
if (S_ISLNK($mode)) {
|
||||
my $ln = readlink($file);
|
||||
print "args=ln,", escape($ln);
|
||||
$index_len += 4 + length($ln);
|
||||
} elsif (S_ISDIR($mode)) {
|
||||
print "args=mkdir";
|
||||
} elsif ($opts{sync}) {
|
||||
print "args=ckattr", $opts{'ignore-times'} ? "0" : "",
|
||||
",", escape("$shost:$file");
|
||||
my @acls;
|
||||
my @lattrs;
|
||||
my @xattrs;
|
||||
# try to get acls
|
||||
if (($opts{have}->{'shift-bin'} || $opts{have}->{getfacl}) &&
|
||||
!$opts{'create-tar'} &&
|
||||
($opts{preserve} == 1 || $opts{preserve} =~ /acl/) &&
|
||||
(!$srcfs || $srcfs =~ /,acl/)) {
|
||||
if (defined $opts{fhpid}) {
|
||||
$opts{fhpid}->[0]->print("getfacl $file\n");
|
||||
my $text = $opts{fhpid}->[1]->getline;
|
||||
my @cols = split(/\s+/, $text);
|
||||
push(@acls, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0');
|
||||
} else {
|
||||
print "args=cp,", escape("$shost:$file");
|
||||
my $fhpid = open3_run([-1, undef, -1],
|
||||
"getfacl", "-cps", "--", $file);
|
||||
while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) {
|
||||
chomp;
|
||||
next if (!$_);
|
||||
push(@acls, escape($_));
|
||||
}
|
||||
open3_wait($fhpid);
|
||||
}
|
||||
print ",", escape($opts{'create-tar'} ? $dst : "$ddir/$dfile0");
|
||||
print " acls=" . join(",", @acls) if (scalar(@acls) > 0);
|
||||
print " xattrs=" . join(",", @xattrs) if (scalar(@xattrs) > 0);
|
||||
print " lustre_attrs=" . join(",", @lattrs) if (scalar(@lattrs) > 0);
|
||||
print " tar_index=$index_len" if ($opts{'index-tar'});
|
||||
print " tar_name=" . escape("$tdir$file0") if ($opts{'create-tar'});
|
||||
print " size=$stat[7] attrs=$attrs\n";
|
||||
if ($dmf && !S_ISLNK($mode) && !S_ISDIR($mode)) {
|
||||
($opts{dmfh}, $opts{dmtmp}) = tempfile() if (!$opts{dmtmp});
|
||||
print {$opts{dmfh}} $file, "\n";
|
||||
}
|
||||
|
||||
# try to get xattrs
|
||||
if (($opts{have}->{'shift-bin'} || $opts{have}->{getfattr}) &&
|
||||
!$opts{'create-tar'} &&
|
||||
($opts{preserve} == 1 || $opts{preserve} =~ /xattr/) &&
|
||||
(!$srcfs || $srcfs =~ /,xattr/)) {
|
||||
if (defined $opts{fhpid}) {
|
||||
$opts{fhpid}->[0]->print("getfattr $file\n");
|
||||
my $text = $opts{fhpid}->[1]->getline;
|
||||
my @cols = split(/\s+/, $text);
|
||||
push(@xattrs, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0');
|
||||
} else {
|
||||
my $fhpid = open3_run([-1, undef, -1],
|
||||
"getfattr", "-dhe", "base64", $file);
|
||||
while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) {
|
||||
chomp;
|
||||
next if (!$_ || /^\s*#/);
|
||||
push(@xattrs, escape(decode_base64($_)));
|
||||
}
|
||||
open3_wait($fhpid);
|
||||
}
|
||||
}
|
||||
|
||||
# try to get lustre striping
|
||||
if (($opts{have}->{'shift-bin'} || $opts{have}->{lfs}) &&
|
||||
!S_ISLNK($mode) && !$opts{'create-tar'} &&
|
||||
($opts{preserve} == 1 || $opts{preserve} =~ /stripe/) &&
|
||||
$srcfs =~ /^lustre/) {
|
||||
# ignore symlinks as link to fifo can hang forever
|
||||
if (defined $opts{fhpid}) {
|
||||
$opts{fhpid}->[0]->print("getstripe $file\n");
|
||||
my $text = $opts{fhpid}->[1]->getline;
|
||||
my @cols = split(/\s+/, $text);
|
||||
@lattrs = split(/,/, $cols[2]) if ($file eq $cols[1] && $cols[-1] eq '0');
|
||||
} else {
|
||||
my $fhpid = open3_run([-1, undef, -1],
|
||||
"lfs", "getstripe", "-d", $file);
|
||||
while (defined $fhpid && defined ($_ = $fhpid->[1]->getline)) {
|
||||
$lattrs[0] = $1 if (/stripe_count:\s*(-?\d+)/);
|
||||
$lattrs[1] = $1 if (/stripe_size:\s*(-?\d+)/);
|
||||
}
|
||||
open3_wait($fhpid);
|
||||
}
|
||||
}
|
||||
$lattrs[0] = 0 if (!defined $lattrs[0] && defined $lattrs[1]);
|
||||
$lattrs[1] = 0 if (!defined $lattrs[1] && defined $lattrs[0]);
|
||||
|
||||
# begin log entry
|
||||
my $line;
|
||||
my $index_len = !$opts{'index-tar'} ? 0 : 28 + length("$tdir$file0") +
|
||||
length(sprintf("%7s%7s%9d", $user, $group, $stat[7]));
|
||||
if (S_ISLNK($mode)) {
|
||||
my $ln = readlink($file);
|
||||
$line .= "args=ln," . escape($ln);
|
||||
$index_len += 4 + length($ln);
|
||||
} elsif (S_ISDIR($mode)) {
|
||||
$line .= "args=mkdir";
|
||||
} elsif ($opts{sync}) {
|
||||
$line .= "args=ckattr" . ($opts{'ignore-times'} ? "0" : "") .
|
||||
"," . escape("$shost:$file");
|
||||
} else {
|
||||
$line .= "args=cp," . escape("$shost:$file");
|
||||
}
|
||||
$line .= "," . (escape($opts{'create-tar'} ? $dst : "$ddir/$dfile0"));
|
||||
$line .= " acls=" . join(",", @acls) if (scalar(@acls) > 0);
|
||||
$line .= " xattrs=" . join(",", @xattrs) if (scalar(@xattrs) > 0);
|
||||
$line .= " lustre_attrs=" . join(",", @lattrs) if (scalar(@lattrs) > 0);
|
||||
$line .= " tar_index=$index_len" if ($opts{'index-tar'});
|
||||
$line .= " tar_name=" . escape("$tdir/$file0") if ($opts{'create-tar'});
|
||||
$line .= " size=$stat[7] attrs=$attrs\n";
|
||||
buffer($line, !S_ISLNK($mode) && !S_ISDIR($mode) &&
|
||||
$dmf ? $file . "\n" : undef);
|
||||
|
||||
FIND_DIR: if (defined $dh) {
|
||||
# flush buffer to ensure parent dir printed before subdirs
|
||||
buffer();
|
||||
while (readdir $dh) {
|
||||
my $path = [$_, $shost, $file, $dst, "$ddir/$dfile0", $srcfs];
|
||||
push(@{$path}, "$tdir/$file0") if ($opts{'create-tar'});
|
||||
if ($perl{threads} && $opts{threads} > 1 &&
|
||||
#$findq->pending < $opts{'queue-size'}) {
|
||||
#$findq->pending < $opts{'threads'}) {
|
||||
#TODO: determine what size should go here
|
||||
$$findq4 < 4 * $opts{threads}) {
|
||||
# only add to queue if not already at capacity
|
||||
$findq4->up;
|
||||
$findn4->up;
|
||||
$findq->enqueue($path);
|
||||
} else {
|
||||
# process now if single threaded or queue at capacity
|
||||
find1($path);
|
||||
}
|
||||
}
|
||||
closedir $dh;
|
||||
}
|
||||
}
|
||||
|
||||
##################
|
||||
|
@ -791,6 +960,7 @@ sub fish {
|
|||
}
|
||||
#TODO: de-hardcode 60 second timeout
|
||||
$sock->sockopt(SO_RCVTIMEO, pack('L!L!', +60, 0));
|
||||
$sock->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
|
||||
$key = "" . rand();
|
||||
my $scert;
|
||||
if ($opts{secure}) {
|
||||
|
@ -878,7 +1048,9 @@ sub fish {
|
|||
}
|
||||
close $tsock;
|
||||
})} (1 .. $opts{streams});
|
||||
$_->join foreach (@threads);
|
||||
foreach (@threads) {
|
||||
$_->join if ($_);
|
||||
}
|
||||
unlink $cert if ($cert);
|
||||
close $sock;
|
||||
}
|
||||
|
@ -1285,8 +1457,10 @@ sub sum {
|
|||
}
|
||||
})} (1 .. $nthr);
|
||||
# force threads to exit
|
||||
$q->enqueue(undef) foreach (1 .. $nthr);
|
||||
$_->join foreach (@threads);
|
||||
$q->enqueue(undef) foreach (@threads);
|
||||
foreach (@threads) {
|
||||
$_->join if ($_);
|
||||
}
|
||||
# append any error messages back to original ref text
|
||||
while (defined (my $sumret = $qret->dequeue_nb)) {
|
||||
my ($qi, $i, $hash) = @{$sumret};
|
||||
|
|
237
perl/shift-mgr
237
perl/shift-mgr
|
@ -53,7 +53,7 @@ use Fcntl qw(:DEFAULT :flock :mode);
|
|||
use File::Basename;
|
||||
use File::Path;
|
||||
use File::Spec;
|
||||
use Getopt::Long qw(:config bundling no_ignore_case require_order);
|
||||
use Getopt::Long qw(:config bundling no_auto_abbrev no_ignore_case require_order);
|
||||
use IO::File;
|
||||
use IO::Handle;
|
||||
# use embedded IPC::Open3 since versions prior to perl 5.14.0 are buggy
|
||||
|
@ -71,7 +71,7 @@ use Text::ParseWords;
|
|||
require Tie::DB_FileLock;
|
||||
require Text::FormatTable;
|
||||
|
||||
our $VERSION = 0.97;
|
||||
our $VERSION = 0.98;
|
||||
|
||||
$Data::Dumper::Pair = " = ";
|
||||
$Data::Dumper::Sortkeys = 1;
|
||||
|
@ -121,6 +121,7 @@ my %conf = (
|
|||
default_files => "1k",
|
||||
'default_find-files' => "2k",
|
||||
default_hosts => 1,
|
||||
default_interval => 30,
|
||||
default_local => "shift,fish,fish-tcp",
|
||||
default_preallocate => 0,
|
||||
default_remote => "shift",
|
||||
|
@ -557,10 +558,12 @@ if ($opts{alive}) {
|
|||
|
||||
# move all running operations out of doing_* back into queues
|
||||
foreach my $file (glob "$opts{base}/doing_*") {
|
||||
next if ($file =~ /\.gzi$/);
|
||||
# untaint file
|
||||
$file = $1 if ($file =~ /^(.*)$/);
|
||||
my $fdoing = get_doing($file);
|
||||
while (my ($key, $line) = each %{$fdoing}) {
|
||||
$line =~ s/\s*\r?\n$//;
|
||||
# put operation in do/tree
|
||||
my %op = split(/[= ]+/, $line);
|
||||
my @args = split(/,/, $op{args});
|
||||
|
@ -578,6 +581,8 @@ if ($opts{alive}) {
|
|||
log_print($file, $gzs, escape(nfreeze({})) . "\n");
|
||||
}
|
||||
log_close($_, $gzs) foreach (keys %{$gzs});
|
||||
# transfer may have finished after --restart=ignore
|
||||
$meta{time1} = $time if (($meta{last} || $meta{e_find}) && !run());
|
||||
} elsif ($opts{stop}) {
|
||||
$meta{stop} = 1;
|
||||
$meta{time1} = $time;
|
||||
|
@ -611,11 +616,16 @@ if ($opts{get} || defined $opts{put}) {
|
|||
my $gzs = {};
|
||||
log_print($opts{doing}, $gzs, escape(nfreeze($doing)) . "\n");
|
||||
log_close($opts{doing}, $gzs);
|
||||
|
||||
# update running time average for manager get/put invocations
|
||||
$meta{rc_mgr}++;
|
||||
$meta{ra_mgr} *= ($meta{rc_mgr} - 1) / $meta{rc_mgr};
|
||||
$meta{ra_mgr} += (time - $time) / $meta{rc_mgr};
|
||||
}
|
||||
|
||||
# update log sizes
|
||||
foreach my $file (glob "$opts{base}/*") {
|
||||
next if ($file =~ /\/(?:find|lock)$/);
|
||||
next if ($file =~ /\/(?:find|lock|\S+\.gzi)$/);
|
||||
my $log = $file;
|
||||
$log =~ s/.*\///;
|
||||
$meta{"$log\_size"} = (stat $file)[7];
|
||||
|
@ -745,7 +755,17 @@ sub debug_print {
|
|||
# open user-specific debug file if not already open
|
||||
open($dbgfh, '>>', "$conf{user_dir}/$opts{user}.debug") if (!$dbgfh);
|
||||
print $dbgfh "$localtime $opts{host} $opts{id}$opts{cid} $type ";
|
||||
if ($type eq 'DBG') {
|
||||
# print stack trace when DBG lines added in code
|
||||
my $i = 1;
|
||||
my @stack;
|
||||
while ((my @cd = (caller($i++)))) {
|
||||
unshift(@stack, "$cd[3]:$cd[2]");
|
||||
}
|
||||
print $dbgfh "(", join(" -> ", @stack), ") ";
|
||||
}
|
||||
print $dbgfh $_ foreach (@_);
|
||||
$dbgfh->flush;
|
||||
}
|
||||
if ($type eq 'GET') {
|
||||
print $_ foreach (@_);
|
||||
|
@ -945,7 +965,10 @@ sub doing {
|
|||
%meta = %{get_meta("$dir/$opts{user}.$opts{id}/meta")};
|
||||
$opts{host} = "*" if (!$opts{host});
|
||||
foreach my $file (glob("$dir/$opts{user}.$opts{id}/doing_$opts{host}")) {
|
||||
my $doing = get_doing($file);
|
||||
next if ($file =~ /\.gzi$/);
|
||||
# untaint file
|
||||
$file = $1 if ($file =~ /^(.*)$/);
|
||||
my $doing = get_doing($file, $opts{doing});
|
||||
$file =~ s/.*\///;
|
||||
print "$file = ", Dumper($doing);
|
||||
}
|
||||
|
@ -1222,11 +1245,15 @@ sub get {
|
|||
|
||||
# keep copy of original doing so changes don't affect its own processing
|
||||
my $doing0 = dclone($doing);
|
||||
my $ndoing = "0." . $meta{"$opts{doing}_size"};
|
||||
# reverse size so won't end in 0 (so .N0 not reduced to .N after ++)
|
||||
my $ndoing = "0." . reverse($meta{"$opts{doing}_size"});
|
||||
my $gzs = {};
|
||||
my ($ldoing, $kdoing, $size, $files, $ops, $all, $skip);
|
||||
my ($ldoing, $kdoing, $size, $files, $ops, $all, $secs, $skip);
|
||||
my (%diskfs, %localfs, %rtthost);
|
||||
my %cli_load = split(/[= ]+/, $meta{"load_$opts{host}$opts{cid}"});
|
||||
my $ncli = grep(/^doing_/, keys %meta);
|
||||
# adjust interval so average waiting clients is no more than 1
|
||||
$opts{interval} = max($meta{interval}, $ncli * $meta{ra_mgr});
|
||||
LOG: foreach my $log (@logs) {
|
||||
# process dir attrs last by themselves
|
||||
last if ($log eq 'rtree' && (!$meta{last} || $meta{t_run} || $ops ||
|
||||
|
@ -1245,7 +1272,8 @@ sub get {
|
|||
$gzs->{$log}->seek($meta{$log}, $whence);
|
||||
}
|
||||
|
||||
while ($size < $meta{size} && $all < $meta{files} &&
|
||||
while (($size < $meta{size} && $all < $meta{files} ||
|
||||
$secs < $opts{interval}) &&
|
||||
($log =~ /^doing_/ && (($kdoing, $line) = each %{$ldoing}) ||
|
||||
$log eq 'rtree' && defined($line = last_line($gzs->{rtree})) ||
|
||||
$log !~ /^(?:doing_|rtree)/ &&
|
||||
|
@ -1296,7 +1324,7 @@ sub get {
|
|||
if ($args[$i] !~ /^\//) {
|
||||
# do not map remote tar dst to prevent size/split corruption
|
||||
next if ($i == 1 && $cmd eq 'find' && $meta{'create-tar'});
|
||||
# allow map_remote to avoid last host's caching
|
||||
# help map_remote avoid last host's caching
|
||||
if ($cmd eq 'cksum' && $op{cache_rclient}) {
|
||||
$ref->{last} = $op{cache_rclient};
|
||||
} elsif ($op{cache_client}) {
|
||||
|
@ -1450,6 +1478,13 @@ sub get {
|
|||
$size += $op{size} if ($cmd =~ /^(?:cp|sum|cksum)/);
|
||||
$ops++ if ($log ne 'rtree');
|
||||
$all++;
|
||||
if ($meta{"ra_$cmd"} > 0) {
|
||||
$secs += ($cmd =~ /^(?:cp|sum|cksum)/ ?
|
||||
$op{size} : 1) / $meta{"ra_$cmd"};
|
||||
} else {
|
||||
# default to files/size setting until rates available
|
||||
$secs += $opts{interval};
|
||||
}
|
||||
$meta{s_run} += $op{size} if ($op{size});
|
||||
$meta{t_run}++;
|
||||
$meta{w_run}-- if ($op{state} eq 'warn');
|
||||
|
@ -1705,15 +1740,21 @@ sub get {
|
|||
###################
|
||||
#### get_doing ####
|
||||
###################
|
||||
# return last list of operations in progress on given host
|
||||
# return past list of operations in progress on given host
|
||||
sub get_doing {
|
||||
my $arg = shift;
|
||||
my $past = shift;
|
||||
$past = 1 if (!defined $past);
|
||||
# untaint file
|
||||
$arg = $1 if ($arg =~ /^(.*)$/);
|
||||
return {} if (! -e $arg);
|
||||
my $gz = Compress::BGZF::Reader->new_filehandle($arg);
|
||||
return {} if (!$gz);
|
||||
$gz->seek(-1, 2);
|
||||
my $line = last_line($gz);
|
||||
close $gz;
|
||||
my $line;
|
||||
$line = last_line($gz) while ($past-- > 0);
|
||||
my $log = basename($arg);
|
||||
log_close($log, {$log => $gz});
|
||||
return thaw(unescape($line)) if ($line);
|
||||
return {};
|
||||
}
|
||||
|
@ -1755,7 +1796,7 @@ sub get_meta {
|
|||
if ($meta && defined $mtell && $mtell > 0) {
|
||||
# metadata corrupted so revert to last known good state
|
||||
foreach my $file (glob "$opts{base}/*") {
|
||||
next if ($file =~ /\/(?:find|lock|meta)$/);
|
||||
next if ($file =~ /\/(?:find|lock|meta|\S+\.gzi)$/);
|
||||
my $log = $file;
|
||||
$log =~ s/.*\///;
|
||||
my $size = defined $meta->{"$log\_size"} ? $meta->{"$log\_size"} : 0;
|
||||
|
@ -1763,6 +1804,8 @@ sub get_meta {
|
|||
$file = $1 if ($file =~ /^(.*)$/);
|
||||
# truncate all logs to last known good size
|
||||
truncate($file, $size);
|
||||
# remove associated index file
|
||||
unlink("$file.gzi");
|
||||
}
|
||||
# rebuild find db since it may contain reverted operations
|
||||
build_find() if ($meta{dereference} && !$meta{'extract-tar'});
|
||||
|
@ -1789,8 +1832,11 @@ sub history {
|
|||
my @metas;
|
||||
my $dir = $> != 0 ? $conf{user_dir} : $opts{user_dir};
|
||||
my $user = $> != 0 ? $opts{user} : "*";
|
||||
my $idglob = defined $opts{id} ? $opts{id} : "[0-9]*";
|
||||
do {
|
||||
push(@metas, glob "$dir/$user.[0-9]*/meta");
|
||||
push(@metas, glob "$dir/$user.$idglob/meta");
|
||||
# glob still returns pattern when no wildcards and no files match
|
||||
pop(@metas) if (defined $opts{id} && ! -f $metas[-1]);
|
||||
$dir .= "/*.more";
|
||||
} while (scalar(glob $dir));
|
||||
foreach my $file (sort {$> != 0 ?
|
||||
|
@ -1886,6 +1932,7 @@ sub id_status {
|
|||
my ($host, $ldoing, $kdoing);
|
||||
my $gzs = {};
|
||||
if ($state0 eq 'run') {
|
||||
next if ($log =~ /\.gzi$/);
|
||||
$host = $log;
|
||||
$host =~ s/^doing_//;
|
||||
$ldoing = get_doing("$opts{base}/$log");
|
||||
|
@ -1983,7 +2030,7 @@ sub init_id {
|
|||
}
|
||||
|
||||
# initialize options with default values
|
||||
foreach (qw(clients cpu hosts io ior iow net netr netw ports retry
|
||||
foreach (qw(clients cpu hosts interval io ior iow net netr netw ports retry
|
||||
stripe threads)) {
|
||||
$meta{$_} = $conf{"default_$_"}
|
||||
if (!defined $meta{$_} && $conf{"default_$_"});
|
||||
|
@ -2040,11 +2087,11 @@ sub last_line {
|
|||
my $tell = $tell0;
|
||||
my ($buf, $line, $len, $pos);
|
||||
do {
|
||||
$tell = $tell0 - 1024;
|
||||
$tell = $tell0 - 4194304;
|
||||
$tell = 0 if ($tell < 0);
|
||||
# seek to earlier position in file
|
||||
$fh->seek($tell, 0);
|
||||
my $len = 1024;
|
||||
my $len = 4194304;
|
||||
$len = $tell0 - $tell if ($len > $tell0);
|
||||
# read up to initial location or that of last round
|
||||
$fh->read($line, $len);
|
||||
|
@ -2088,6 +2135,16 @@ sub log_close {
|
|||
# find/ln/mkdir are stored in cp
|
||||
$log = "cp" if ($log =~ /^(?:find|ln|mkdir)$/);
|
||||
if (defined $gzs->{$log}) {
|
||||
my $bgr = tied *{$gzs->{$log}};
|
||||
if (ref $bgr eq 'Compress::BGZF::Reader' &&
|
||||
defined $bgr->{idx}->[-1]) {
|
||||
my ($coff, $uoff) = @{$bgr->{idx}->[-1]};
|
||||
# rtree is just tree in reverse
|
||||
my $f = $log eq 'rtree' ? 'tree' : $log;
|
||||
$f = "$opts{base}/$f.gzi";
|
||||
local $SIG{__WARN__} = sub {};
|
||||
$bgr->write_index($f) if ($uoff && -s $f < int($uoff / 4096));
|
||||
}
|
||||
close $gzs->{$log};
|
||||
delete $gzs->{$log};
|
||||
}
|
||||
|
@ -2129,6 +2186,7 @@ sub log_print {
|
|||
eval {
|
||||
if (!defined $line) {
|
||||
log_close($log, $gzs);
|
||||
unlink("$opts{base}/$log.gzi");
|
||||
# force new file
|
||||
my $fh = Compress::BGZF::Writer->new_filehandle("$opts{base}/$log");
|
||||
close $fh;
|
||||
|
@ -2248,59 +2306,61 @@ sub map_remote {
|
|||
my $mnt1 = fs_mount($rhost, $rpath);
|
||||
# return original if no mount found
|
||||
return $path1 if (!$mnt1);
|
||||
my $mnt2;
|
||||
# find hosts that mount file system based on global/user db
|
||||
my $key = "mounth_" . $mnt1->{remote};
|
||||
my %hosts = map {$_->{$key} ? %{$_->{$key}} : ()}
|
||||
(\%meta, \%mounts, \%umounts);
|
||||
my %shells = map {$_->{shells} ? %{$_->{shells}} : ()}
|
||||
(\%meta, \%mounts, \%umounts);
|
||||
# determine potential hosts
|
||||
foreach my $host (keys %hosts) {
|
||||
delete $hosts{$host}
|
||||
if (!map_fs_mount($mnt1, $host, $ref->{rw}) || !$shells{$host});
|
||||
}
|
||||
my $pick = $opts{"pick_$mnt1->{remote}"};
|
||||
if (!defined $pick) {
|
||||
# find hosts that mount file system based on global/user db
|
||||
my $key = "mounth_" . $mnt1->{remote};
|
||||
my %hosts = map {$_->{$key} ? %{$_->{$key}} : ()}
|
||||
(\%meta, \%mounts, \%umounts);
|
||||
my %shells = map {$_->{shells} ? %{$_->{shells}} : ()}
|
||||
(\%meta, \%mounts, \%umounts);
|
||||
# determine potential hosts
|
||||
foreach my $host (keys %hosts) {
|
||||
delete $hosts{$host}
|
||||
if (!map_fs_mount($mnt1, $host, $ref->{rw}) || !$shells{$host});
|
||||
}
|
||||
|
||||
# prune last remote host to avoid checksum caching effects
|
||||
delete $hosts{$ref->{last}}
|
||||
if ($ref->{last} && scalar(keys %hosts) > 1);
|
||||
# prune last remote host to avoid checksum caching effects
|
||||
delete $hosts{$ref->{last}}
|
||||
if ($ref->{last} && scalar(keys %hosts) > 1);
|
||||
|
||||
# prune potential hosts based on number currently assigned
|
||||
my $min = 1E9;
|
||||
my %min_hosts;
|
||||
foreach my $host (keys %hosts) {
|
||||
my $npicks = scalar(keys %{$meta{"picks_$host"}});
|
||||
# don't count previous selection for this host
|
||||
$npicks-- if ($meta{"picks_$host"}->{$lhost});
|
||||
next if ($npicks > $min);
|
||||
$min = $npicks;
|
||||
$min_hosts{$npicks} = [] if (!defined $min_hosts{$npicks});
|
||||
push(@{$min_hosts{$npicks}}, $host);
|
||||
}
|
||||
my %picks = map {$_ => 1} @{$min_hosts{$min}};
|
||||
# prune potential hosts based on number currently assigned
|
||||
my $min = 1E9;
|
||||
my %min_hosts;
|
||||
foreach my $host (keys %hosts) {
|
||||
my $npicks = scalar(keys %{$meta{"picks_$host"}});
|
||||
# don't count previous selection for this host
|
||||
$npicks-- if ($meta{"picks_$host"}->{$lhost});
|
||||
next if ($npicks > $min);
|
||||
$min = $npicks;
|
||||
$min_hosts{$npicks} = [] if (!defined $min_hosts{$npicks});
|
||||
push(@{$min_hosts{$npicks}}, $host);
|
||||
}
|
||||
my %picks = map {$_ => 1} @{$min_hosts{$min}};
|
||||
|
||||
my $pick;
|
||||
if (defined $conf{select_hook}) {
|
||||
# select host using configured selection hook
|
||||
$pick = open3_get([-1, undef, -1],
|
||||
"$conf{select_hook} $lhost $rhost " . join(",", keys %picks));
|
||||
if (defined $conf{select_hook}) {
|
||||
# select host using configured selection hook
|
||||
$pick = open3_get([-1, undef, -1],
|
||||
"$conf{select_hook} $lhost $rhost " . join(",", keys %picks));
|
||||
}
|
||||
# revert to default selection policy when no selection
|
||||
$pick = default_select($rhost, keys %picks) if (!$pick);
|
||||
$pick =~ s/\s*\r?\n$//;
|
||||
if (!$pick) {
|
||||
# store original mount info
|
||||
$ref->{$_} = $mnt1->{$_} foreach (keys %{$mnt1});
|
||||
# return original path if can't find suitable mount
|
||||
return $path1
|
||||
}
|
||||
# clear previously picked hosts
|
||||
foreach (grep(/^picks_/, keys %meta)) {
|
||||
delete $meta{$_}->{$lhost};
|
||||
delete $meta{$_} if (scalar(keys %{$meta{$_}}) == 0);
|
||||
}
|
||||
# store that host has already been selected
|
||||
$meta{"picks_$pick"}->{$lhost} = 1;
|
||||
$opts{"pick_$mnt1->{remote}"} = $pick;
|
||||
}
|
||||
# revert to default selection policy when no selection
|
||||
$pick = default_select($rhost, keys %picks) if (!$pick);
|
||||
$pick =~ s/\s*\r?\n$//;
|
||||
if (!$pick) {
|
||||
# store original mount info
|
||||
$ref->{$_} = $mnt1->{$_} foreach (keys %{$mnt1});
|
||||
# return original path if can't find suitable mount
|
||||
return $path1
|
||||
}
|
||||
# clear previously picked hosts
|
||||
foreach (grep(/^picks_/, keys %meta)) {
|
||||
delete $meta{$_}->{$lhost};
|
||||
delete $meta{$_} if (scalar(keys %{$meta{$_}}) == 0);
|
||||
}
|
||||
# store that host has already been selected
|
||||
$meta{"picks_$pick"}->{$lhost} = 1;
|
||||
my $mnt2 = map_fs_mount($mnt1, $pick, $ref->{rw});
|
||||
# mnt2 is known to be defined due to previous %hosts map_fs_mount calls
|
||||
# replace original mount point with new mount point
|
||||
|
@ -2732,7 +2792,11 @@ sub put {
|
|||
my %op = split(/[= ]+/, $line);
|
||||
# ignore malformed lines with undefined op values
|
||||
next if (grep(!/./, values %op));
|
||||
delete $doing->{$op{doing}} if (defined $op{doing});
|
||||
if (defined $op{doing}) {
|
||||
# ignore operations that have been processed by another client
|
||||
next if (!defined $doing->{$op{doing}});
|
||||
delete $doing->{$op{doing}};
|
||||
}
|
||||
|
||||
my @args = split(/,/, $op{args});
|
||||
my $cmd = shift @args;
|
||||
|
@ -2829,7 +2893,7 @@ sub put {
|
|||
# initialize transfer settings once all getopt lines received
|
||||
init_id() if ($args[0] eq 'end');
|
||||
# check validity of option
|
||||
next if ($args[0] !~ /^(?:bandwidth|buffer|clients|command|cpu|create-tar|cron|cwd|dereference|disk|exception|exclude|extract-tar|files|force|host-list|hosts|ignore-times|include|index-tar|io[rw]?|local|mail|net[rw]?|newer|offline|older|pipeline|ports|preallocate|preserve|recall|remote|retry|sanity|secure|size|split|split-tar|streams|stripe|sync|threads|verify|verify-fast|window)$/);
|
||||
next if ($args[0] !~ /^(?:bandwidth|buffer|clients|command|cpu|create-tar|cron|cwd|dereference|disk|exception|exclude|extract-tar|files|force|host-list|hosts|ignore-times|include|index-tar|interval|io[rw]?|local|mail|net[rw]?|newer|offline|older|pipeline|ports|preallocate|preserve|recall|remote|retry|sanity|secure|silent|size|split|split-tar|streams|stripe|sync|threads|verify|verify-fast|window)$/);
|
||||
$meta{$args[0]} = defined $op{text} ? unescape($op{text}) : 1;
|
||||
} elsif ($cmd eq 'host') {
|
||||
# host error so remove host from outstanding hosts
|
||||
|
@ -2893,7 +2957,8 @@ sub put {
|
|||
($cmd =~ /^(?:cksum|cp|ln)/ ||
|
||||
# tar mkdirs are not put in tree so are not handled by rtree
|
||||
$cmd eq 'mkdir' && $meta{'create-tar'})) {
|
||||
if ($cmd eq 'cksum' && detect_silent(\%op, $args[0], $args[1])) {
|
||||
if ($meta{silent} && $cmd eq 'cksum' &&
|
||||
detect_silent(\%op, $args[0], $args[1])) {
|
||||
$line =~ s/(?:text|tool)=\S+//g;
|
||||
log_print('alert', $gzs, "$line tool=shift-mgr text=$op{text}\n");
|
||||
$meta{e_silent}++;
|
||||
|
@ -2913,7 +2978,8 @@ sub put {
|
|||
next;
|
||||
}
|
||||
} else {
|
||||
if ($cmd eq 'cksum' && detect_silent(\%op, $args[0], $args[1])) {
|
||||
if ($meta{silent} && $cmd eq 'cksum' &&
|
||||
detect_silent(\%op, $args[0], $args[1])) {
|
||||
$line =~ s/(?:text|tool)=\S+//g;
|
||||
log_print('alert', $gzs, "$line tool=shift-mgr text=$op{text}\n");
|
||||
$meta{e_silent}++;
|
||||
|
@ -3477,13 +3543,13 @@ sub stats {
|
|||
[qw(bbcp bbftp fish fish-tcp gridftp mcp msum rsync shiftc shift-aux)];
|
||||
$heads{Options_1} =
|
||||
[qw(bandwidth buffer clients cpu create-tar exclude extract-tar files
|
||||
force host-list hosts include index-tar io ior)];
|
||||
force host-list hosts include index-tar interval)];
|
||||
$heads{Options_2} =
|
||||
[qw(iow local net netr netw newer no-cron no-mail no-offline
|
||||
no-preserve no-recall no-sanity no-verify older pipeline)];
|
||||
[qw(io ior iow local net netr netw newer no-cron no-mail no-offline
|
||||
no-preserve no-recall no-sanity no-silent no-verify)];
|
||||
$heads{Options_3} =
|
||||
[qw(ports preallocate remote retry secure size split split-tar streams
|
||||
stripe sync threads verify-fast window)];
|
||||
[qw(older pipeline ports preallocate remote retry secure size split
|
||||
split-tar streams stripe sync threads verify-fast window)];
|
||||
$heads{Errors} =
|
||||
[qw(corruption exception silent throttle chattr cksum cp host ln mkdir sum)];
|
||||
|
||||
|
@ -3566,9 +3632,9 @@ sub stats {
|
|||
|
||||
# option totals
|
||||
# options that must differ from configured default
|
||||
foreach my $key (qw(buffer clients cpu files hosts io ior iow net netr
|
||||
netw ports retry size split split-tar streams stripe
|
||||
threads window)) {
|
||||
foreach my $key (qw(buffer clients cpu files hosts interval io ior iow
|
||||
net netr netw ports retry size split split-tar
|
||||
streams stripe threads window)) {
|
||||
# parse some values in binary bytes instead of decimal bytes
|
||||
my $bin = $key =~ /^(?:buffer|split)$/ ? 2 : 0;
|
||||
$bin = 1 if ($key eq 'stripe');
|
||||
|
@ -3579,7 +3645,7 @@ sub stats {
|
|||
# options that must be inverted
|
||||
$totals{"o_no-offline"} = !$meta{offline} && !$meta{'create-tar'} &&
|
||||
!$meta{'extract-tar'} ? 1 : 0;
|
||||
foreach (qw(cron mail preserve recall sanity verify)) {
|
||||
foreach (qw(cron mail preserve recall sanity silent verify)) {
|
||||
$totals{"o_no-$_"} = !$meta{$_} ? 1 : 0;
|
||||
}
|
||||
# normal options
|
||||
|
@ -3723,6 +3789,8 @@ sub stats {
|
|||
foreach my $file
|
||||
# sort by user.id
|
||||
(sort {(split(/\//, $a))[-2] cmp (split(/\//, $b))[-2]} @metas) {
|
||||
# untaint file
|
||||
$file = $1 if ($file =~ /^(.*)$/);
|
||||
# skip transfers that have expired
|
||||
my $mtime = (stat($file))[9];
|
||||
next if ($mtime + $conf{data_expire} < $time);
|
||||
|
@ -3775,7 +3843,8 @@ sub stats {
|
|||
$count++;
|
||||
$ulast = $user;
|
||||
}
|
||||
close $gz;
|
||||
my $log = basename($file);
|
||||
log_close($log, {$log => $gz});
|
||||
}
|
||||
# output final table
|
||||
print $t->render;
|
||||
|
@ -3881,9 +3950,7 @@ sub status {
|
|||
my $left;
|
||||
if ($rate && $meta{last} && !$meta{time1}) {
|
||||
# add estimated time to completion
|
||||
my $ncli = sum(map {$meta{$_}} (grep(/^clients_/, keys %meta)));
|
||||
$ncli = grep(/^env_/, keys %meta) * $meta{clients} - $ncli;
|
||||
$ncli = 1 if ($ncli <= 0);
|
||||
my $ncli = max(1, scalar(grep(/^doing_/, keys %meta)));
|
||||
my $rate1 = $meta{ra_cp} ? $meta{ra_cp} : $rate * $ncli;
|
||||
# add time for cps, sums, cksums
|
||||
foreach my $cmd (qw(cp sum cksum)) {
|
||||
|
@ -3899,10 +3966,12 @@ sub status {
|
|||
$rate1 = $meta{"ra_$cmd"} ? $meta{"ra_$cmd"} : 100;
|
||||
$left += ($meta{"t_$cmd"} - $meta{"d_$cmd"}) / $rate1 / $ncli;
|
||||
}
|
||||
# add time for non-cp manager calls assuming 1/s rate
|
||||
# add time for non-cp manager calls
|
||||
# use previous rate for mgr calls (or 1/s when not available)
|
||||
$rate1 = $meta{ra_mgr} ? 1 / $meta{ra_mgr} : 1;
|
||||
foreach (qw(chattr cksum sum)) {
|
||||
$left += ($meta{"t_$_"} - $meta{"d_$_"} - $meta{"e_$_"}) /
|
||||
$meta{files} / $ncli;
|
||||
$meta{files} / $rate1 / $ncli;
|
||||
}
|
||||
$left = format_seconds($left);
|
||||
}
|
||||
|
|
1212
perl/shiftc
1212
perl/shiftc
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue