From 72211b149632a571e9e31875f471bfc4c7881a4a Mon Sep 17 00:00:00 2001 From: Kyle Date: Tue, 22 Dec 2015 20:54:59 -0800 Subject: [PATCH] Add clustersh, update docs. + clustersh is a command-line utility for interacting with multiple servers over SSH. It can run commands or transfer files. + The main README was updated to include clustersh, lib, mwc, and sbuf. + The fragment README notes the use of the -nl flag now. --- README.md | 7 + cmd/clustersh/README | 77 ++++++++++ cmd/clustersh/main.go | 347 ++++++++++++++++++++++++++++++++++++++++++ cmd/fragment/README | 3 +- 4 files changed, 433 insertions(+), 1 deletion(-) create mode 100644 cmd/clustersh/README create mode 100644 cmd/clustersh/main.go diff --git a/README.md b/README.md index fbb3e1e..454835c 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ Contents: certchain/ Display the certificate chain from a TLS connection. certdump/ Dump certificate information. + clustersh/ Run commands or transfer files across multiple + servers via SSH. csrpubdump/ Dump the public key from an X.509 certificate request. fragment/ Print a fragment of a file. @@ -23,6 +25,11 @@ Contents: stealchain/ Dump the verified chain from a TLS connection. tlskeypair/ Check whether a TLS certificate and key file match. + lib/ Commonly-useful functions for writing Go programs. + logging/ A logging library. + mwc/ MultiwriteCloser implementation. + sbuf/ A byte buffer that can be wiped. + Each program should have a small README in the directory with more information. diff --git a/cmd/clustersh/README b/cmd/clustersh/README new file mode 100644 index 0000000..8ef02dc --- /dev/null +++ b/cmd/clustersh/README @@ -0,0 +1,77 @@ +clustersh + +This is a program to run commands and transfer files between multiple +servers over SSH. + +[ Usage ] + clustersh [-a addresses] [-c chunk] [-h] [-u user] command args + +[ Flags ] + + -a addresses The comma-separated list of servers to send + to. There must not be any spaces in this list. + + -c chunk The size of chunks to transfer, in bytes. The + default is 16MB. + + -u user The SSH username to use. It must be the + same for all hosts. Defaults to the value + of the USER environment variable. + +[ Commands ] + +exec, run: run a command on the servers + The args list must be the command line to send to the hosts. + +upload, up, push, send: upload a file to the servers + The first argument is the local file to upload, and the second + is the filename to store it as on the remote. + +download, down, pull, fetch: download a file from the servers + The first argument is the filename to fetch.The second argument + is the base name for the local file. It will have a '-' and the + hostname appended. + +[ Examples ] + +Tailing syslog on the hosts "mesos-01," "mesos-02," and "mesos-03" +with the same username as the logged in user: + + $ clustersh -a mesos-01,mesos-02,mesos-03 tail /var/log/syslog + 2015/12/22 20:37:17 waiting for sessions to complete + [mesos-02] Dec 22 23:37:20 mesos-02 chronos[1848]: [2015-12-22 23:37:20,325] INFO Declined unused offers with filter refuseSeconds=5.0 (use --decline_offer_duration to reconfigure) (org.apache.mesos.chronos.scheduler.mesos.MesosJobFramework:97) + [mesos-02] Dec 22 23:37:20 mesos-02 mesos-master[534]: I1222 23:37:20.326491 612 master.cpp:3297] Processing DECLINE call for offers: [ 07e0c918-efa9-48c3-9851-77abf0c95f92-O61048 ] for framework 07e0c918-efa9-48c3-9851-77abf0c95f92-0001 (chronos-2.4.0) at scheduler-c6926d5b-2e35-47d9-89fb-0a407234f0ba@159.203.201.212:48192 + [mesos-02] Dec 22 23:37:20 mesos-02 mesos-master[534]: I1222 23:37:20.326786 612 hierarchical.cpp:744] Recovered cpus(*):1; mem(*):497; disk(*):14910; ports(*):[31000-32000] (total: cpus(*):1; mem(*):497; disk(*):14910; ports(*):[31000-32000], allocated: ) on slave 07e0c918-efa9-48c3-9851-77abf0c95f92-S78 from framework 07e0c918-efa9-48c3-9851-77abf0c95f92-0001 + [mesos-01] Dec 22 23:37:19 mesos-01 marathon[1719]: [2015-12-22 23:37:19,045] INFO 73.189.108.122 - - [23/Dec/2015:04:37:19 +0000] "GET //mesos-01:8080/v2/deployments HTTP/1.1" 200 2 "http://mesos-01:8080/ui/" "Mozilla/5.0 (X11; Linux x86_64; rv:43.0) Gecko/20100101 Firefox/43.0 Iceweasel/43.0" (mesosphere.chaos.http.ChaosRequestLog$$EnhancerByGuice$$f7252a53:qtp643328884-33) + [mesos-01] Dec 22 23:37:19 mesos-01 marathon[1719]: [2015-12-22 23:37:19,047] INFO 73.189.108.122 - - [23/Dec/2015:04:37:19 +0000] "GET //mesos-01:8080/v2/apps HTTP/1.1" 200 11 "http://mesos-01:8080/ui/" "Mozilla/5.0 (X11; Linux x86_64; rv:43.0) Gecko/20100101 Firefox/43.0 Iceweasel/43.0" (mesosphere.chaos.http.ChaosRequestLog$$EnhancerByGuice$$f7252a53:qtp643328884-27) + [mesos-01] Dec 22 23:37:19 mesos-01 marathon[1719]: [2015-12-22 23:37:19,048] INFO 73.189.108.122 - - [23/Dec/2015:04:37:19 +0000] "GET //mesos-01:8080/v2/queue HTTP/1.1" 200 12 "http://mesos-01:8080/ui/" "Mozilla/5.0 (X11; Linux x86_64; rv:43.0) Gecko/20100101 Firefox/43.0 Iceweasel/43.0" 1 (mesosphere.chaos.http.ChaosRequestLog$$EnhancerByGuice$$f7252a53:qtp643328884-29) + [mesos-03] Dec 22 23:35:09 mesos-03 mesos-slave[2675]: I1222 23:35:09.231684 2692 slave.cpp:4039] Current disk usage 8.93%. Max allowed age: 5.674993196753403days + [mesos-03] Dec 22 23:36:09 mesos-03 mesos-slave[2675]: I1222 23:36:09.232365 2694 slave.cpp:4039] Current disk usage 8.93%. Max allowed age: 5.674993196753403days + [mesos-03] Dec 22 23:37:09 mesos-03 mesos-slave[2675]: I1222 23:37:09.232836 2691 slave.cpp:4039] Current disk usage 8.93%. Max allowed age: 5.674993196753403days + +Transferring the `clustersh` program to the same three hosts: + + $ clustersh -a mesos-01,mesos-02,mesos-03 push clustersh clustersh + 2015/12/22 20:39:23 waiting for sessions to complete + [mesos-02] wrote 6422688-byte chunk + [mesos-02] clustersh uploaded to clustersh + [mesos-03] wrote 6422688-byte chunk + [mesos-03] clustersh uploaded to clustersh + [mesos-01] wrote 6422688-byte chunk + [mesos-01] clustersh uploaded to clustersh + +Copying the `/var/log/mesos/marathon.log` file from each of the three hosts: + + $ clustersh -a mesos-01,mesos-02,mesos-03 fetch /var/log/mesos/marathon.log marathon.log + 2015/12/22 20:42:28 waiting for sessions to complete + [mesos-02] wrote 467461-byte chunk + [mesos-02] /var/log/mesos/marathon.log downloaded to marathon.log-mesos-02 + [mesos-03] wrote 415850-byte chunk + [mesos-03] /var/log/mesos/marathon.log downloaded to marathon.log-mesos-03 + [mesos-01] wrote 2940327-byte chunk + [mesos-01] /var/log/mesos/marathon.log downloaded to marathon.log-mesos-01 + $ ls -1 marathon.log* + marathon.log-mesos-01 + marathon.log-mesos-02 + marathon.log-mesos-03 + diff --git a/cmd/clustersh/main.go b/cmd/clustersh/main.go new file mode 100644 index 0000000..a27eb74 --- /dev/null +++ b/cmd/clustersh/main.go @@ -0,0 +1,347 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "io" + "log" + "net" + "os" + "strings" + "sync" + + "github.com/kisom/goutils/lib" + "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" + "golang.org/x/crypto/ssh/agent" +) + +func usage(w io.Writer) { + fmt.Fprintf(w, `Usage: %s [-a addresses] [-c chunksize] [-h] [-u user] command args + +Flags: + -a addresses The comma-separated list of servers to send + to. There must not be any spaces in this list. + + -c chunk The size of chunks to transfer, in bytes. The + default is 16MB. + + -u user The SSH username to use. It must be the + same for all hosts. Defaults to the value + of the USER environment variable. + +Commands: + exec, run: run a command on the servers + The args list must be the command line to send to the hosts. + + upload, up, push, send: upload a file to the servers + The first argument is the local file to upload, and the second + is the filename to store it as on the remote. + + download, down, pull, fetch: download a file from the servers + The first argument is the filename to fetch.The second argument + is the base name for the local file. It will have a '-' and the + hostname appended. + +`, lib.ProgName()) +} + +var chunkSize = 16777216 + +var modes = ssh.TerminalModes{ + ssh.ECHO: 0, + ssh.TTY_OP_ISPEED: 14400, + ssh.TTY_OP_OSPEED: 14400, +} + +func sshAgent() ssh.AuthMethod { + a, err := net.Dial("unix", os.Getenv("SSH_AUTH_SOCK")) + if err == nil { + return ssh.PublicKeysCallback(agent.NewClient(a).Signers) + } + + lib.Err(lib.ExitFailure, err, "failed to authenticate with SSH agent") + return nil +} + +func sshConfig(user string) *ssh.ClientConfig { + return &ssh.ClientConfig{ + User: user, + Auth: []ssh.AuthMethod{ + sshAgent(), + }, + } +} + +func scanner(host string, in io.Reader, out io.Writer) { + scanner := bufio.NewScanner(in) + for scanner.Scan() { + line := scanner.Text() + fmt.Fprintf(out, "[%s] %s\n", host, line) + } +} + +func logError(host string, err error, format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + log.Printf("[%s] FAILED: %s: %v\n", host, msg, err) +} + +func exec(wg *sync.WaitGroup, user, host string, commands []string) { + var shutdown []func() error + + defer func() { + for i := len(shutdown) - 1; i >= 0; i-- { + err := shutdown[i]() + if err != nil && err != io.EOF { + logError(host, err, "shutting down") + } + } + }() + defer wg.Done() + + conf := sshConfig(user) + conn, err := ssh.Dial("tcp", host+":22", conf) + if err != nil { + logError(host, err, "failed to connect") + return + } + shutdown = append(shutdown, conn.Close) + + session, err := conn.NewSession() + if err != nil { + logError(host, err, "failed to establish session") + return + } + shutdown = append(shutdown, session.Close) + + if err := session.RequestPty("xterm", 80, 40, modes); err != nil { + session.Close() + logError(host, err, "request for pty failed") + return + } + + stdout, err := session.StdoutPipe() + if err != nil { + logError(host, err, "failed to setup standard output") + return + } + go scanner(host, stdout, os.Stdout) + + stderr, err := session.StderrPipe() + if err != nil { + logError(host, err, "failed to setup standard error") + return + } + go scanner(host, stderr, os.Stderr) + + for _, command := range commands { + err = session.Run(command) + if err != nil { + logError(host, err, "running command failed") + return + } + } +} + +func upload(wg *sync.WaitGroup, user, host, local, remote string) { + var shutdown []func() error + + defer func() { + for i := len(shutdown) - 1; i >= 0; i-- { + err := shutdown[i]() + if err != nil && err != io.EOF { + logError(host, err, "shutting down") + } + } + }() + defer wg.Done() + + conf := sshConfig(user) + conn, err := ssh.Dial("tcp", host+":22", conf) + if err != nil { + logError(host, err, "failed to connect") + return + } + shutdown = append(shutdown, conn.Close) + + sftp, err := sftp.NewClient(conn) + if err != nil { + logError(host, err, "setting up SFTP client") + return + } + shutdown = append(shutdown, sftp.Close) + + remoteFile, err := sftp.Create(remote) + if err != nil { + logError(host, err, "creating file %s on remote", remote) + return + } + shutdown = append(shutdown, remoteFile.Close) + + localFile, err := os.Open(local) + if err != nil { + logError(host, err, "opening local file") + return + } + shutdown = append(shutdown, localFile.Close) + + var buf = make([]byte, chunkSize) + for { + var n int + n, err = localFile.Read(buf) + if n > 0 { + _, err = remoteFile.Write(buf[:n]) + if err != nil { + logError(host, err, "writing chunk") + return + } + fmt.Printf("[%s] wrote %d-byte chunk\n", host, n) + } + + if err == io.EOF { + break + } else if err != nil { + logError(host, err, "reading chunk") + return + } + } + fmt.Printf("[%s] %s uploaded to %s\n", host, remote, local) +} + +func download(wg *sync.WaitGroup, user, host, local, remote string) { + var shutdown []func() error + + defer func() { + for i := len(shutdown) - 1; i >= 0; i-- { + err := shutdown[i]() + if err != nil && err != io.EOF { + logError(host, err, "shutting down") + } + } + }() + defer wg.Done() + + conf := sshConfig(user) + conn, err := ssh.Dial("tcp", host+":22", conf) + if err != nil { + logError(host, err, "failed to connect") + return + } + shutdown = append(shutdown, conn.Close) + + sftp, err := sftp.NewClient(conn) + if err != nil { + logError(host, err, "setting up SFTP client") + return + } + shutdown = append(shutdown, sftp.Close) + + remoteFile, err := sftp.Open(remote) + if err != nil { + logError(host, err, "opening file %s on remote", remote) + return + } + shutdown = append(shutdown, remoteFile.Close) + + local = local + "-" + host + localFile, err := os.Create(local) + if err != nil { + logError(host, err, "opening local file") + return + } + shutdown = append(shutdown, localFile.Close) + + var buf = make([]byte, chunkSize) + for { + var n int + n, err = remoteFile.Read(buf) + if n > 0 { + _, err = localFile.Write(buf[:n]) + if err != nil { + logError(host, err, "writing chunk") + return + } + fmt.Printf("[%s] wrote %d-byte chunk\n", host, n) + } + + if err == io.EOF { + break + } else if err != nil { + logError(host, err, "reading chunk") + return + } + } + fmt.Printf("[%s] %s downloaded to %s\n", host, remote, local) +} + +func init() { + flag.Usage = func() { + usage(os.Stdout) + } +} + +func main() { + var hostsFlag, user string + flag.StringVar(&hostsFlag, "a", "", "`hosts` to run on") + flag.IntVar(&chunkSize, "c", chunkSize, "`size` in bytes of transfer chunks") + flag.StringVar(&user, "u", os.Getenv("USER"), "`username` to run commands as") + flag.Parse() + + hosts := strings.Split(hostsFlag, ",") + if len(hosts) == 0 { + os.Exit(0) + } + + if flag.NArg() < 1 { + usage(os.Stderr) + os.Exit(1) + } + + cmd := flag.Arg(0) + var wg = new(sync.WaitGroup) + + switch cmd { + case "exec", "run": + if flag.NArg() < 2 { + usage(os.Stderr) + os.Exit(1) + } + + commands := []string{strings.Join(flag.Args()[1:], " ")} + for _, host := range hosts { + wg.Add(1) + go exec(wg, user, host, commands) + } + case "upload", "up", "push", "send": + if flag.NArg() != 3 { + usage(os.Stderr) + os.Exit(1) + } + + for _, host := range hosts { + wg.Add(1) + go upload(wg, user, host, flag.Arg(1), flag.Arg(2)) + } + case "download", "down", "pull", "fetch": + if flag.NArg() != 3 { + usage(os.Stderr) + os.Exit(1) + } + + for _, host := range hosts { + wg.Add(1) + go download(wg, user, host, flag.Arg(2), flag.Arg(1)) + } + case "help": + usage(os.Stdout) + os.Exit(0) + default: + fmt.Fprintf(os.Stderr, "Unknown command %s.\n", cmd) + usage(os.Stderr) + os.Exit(1) + } + + log.Printf("waiting for sessions to complete") + wg.Wait() +} diff --git a/cmd/fragment/README b/cmd/fragment/README index 8aa09fa..2aedc3f 100644 --- a/cmd/fragment/README +++ b/cmd/fragment/README @@ -2,7 +2,8 @@ fragment This is a program to print a fragment of a file. It takes a filename, a starting line (lines are numbered starting at 1) and an optional -end line or offset. +end line or offset. It has one flag, `-nl`, which causes line numbers +not to be printed. For example, compare the output to `nl -ba fragment.go`: