Add clustersh, update docs.

+ clustersh is a command-line utility for interacting with multiple
  servers over SSH. It can run commands or transfer files.
+ The main README was updated to include clustersh, lib, mwc, and sbuf.
+ The fragment README notes the use of the -nl flag now.
This commit is contained in:
Kyle 2015-12-22 20:54:59 -08:00
parent 1535cba7a4
commit 72211b1496
4 changed files with 433 additions and 1 deletions

View File

@ -11,6 +11,8 @@ Contents:
certchain/ Display the certificate chain from a
TLS connection.
certdump/ Dump certificate information.
clustersh/ Run commands or transfer files across multiple
servers via SSH.
csrpubdump/ Dump the public key from an X.509
certificate request.
fragment/ Print a fragment of a file.
@ -23,6 +25,11 @@ Contents:
stealchain/ Dump the verified chain from a TLS
connection.
tlskeypair/ Check whether a TLS certificate and key file match.
lib/ Commonly-useful functions for writing Go programs.
logging/ A logging library.
mwc/ MultiwriteCloser implementation.
sbuf/ A byte buffer that can be wiped.
Each program should have a small README in the directory with more information.

77
cmd/clustersh/README Normal file
View File

@ -0,0 +1,77 @@
clustersh
This is a program to run commands and transfer files between multiple
servers over SSH.
[ Usage ]
clustersh [-a addresses] [-c chunk] [-h] [-u user] command args
[ Flags ]
-a addresses The comma-separated list of servers to send
to. There must not be any spaces in this list.
-c chunk The size of chunks to transfer, in bytes. The
default is 16MB.
-u user The SSH username to use. It must be the
same for all hosts. Defaults to the value
of the USER environment variable.
[ Commands ]
exec, run: run a command on the servers
The args list must be the command line to send to the hosts.
upload, up, push, send: upload a file to the servers
The first argument is the local file to upload, and the second
is the filename to store it as on the remote.
download, down, pull, fetch: download a file from the servers
The first argument is the filename to fetch.The second argument
is the base name for the local file. It will have a '-' and the
hostname appended.
[ Examples ]
Tailing syslog on the hosts "mesos-01," "mesos-02," and "mesos-03"
with the same username as the logged in user:
$ clustersh -a mesos-01,mesos-02,mesos-03 tail /var/log/syslog
2015/12/22 20:37:17 waiting for sessions to complete
[mesos-02] Dec 22 23:37:20 mesos-02 chronos[1848]: [2015-12-22 23:37:20,325] INFO Declined unused offers with filter refuseSeconds=5.0 (use --decline_offer_duration to reconfigure) (org.apache.mesos.chronos.scheduler.mesos.MesosJobFramework:97)
[mesos-02] Dec 22 23:37:20 mesos-02 mesos-master[534]: I1222 23:37:20.326491 612 master.cpp:3297] Processing DECLINE call for offers: [ 07e0c918-efa9-48c3-9851-77abf0c95f92-O61048 ] for framework 07e0c918-efa9-48c3-9851-77abf0c95f92-0001 (chronos-2.4.0) at scheduler-c6926d5b-2e35-47d9-89fb-0a407234f0ba@159.203.201.212:48192
[mesos-02] Dec 22 23:37:20 mesos-02 mesos-master[534]: I1222 23:37:20.326786 612 hierarchical.cpp:744] Recovered cpus(*):1; mem(*):497; disk(*):14910; ports(*):[31000-32000] (total: cpus(*):1; mem(*):497; disk(*):14910; ports(*):[31000-32000], allocated: ) on slave 07e0c918-efa9-48c3-9851-77abf0c95f92-S78 from framework 07e0c918-efa9-48c3-9851-77abf0c95f92-0001
[mesos-01] Dec 22 23:37:19 mesos-01 marathon[1719]: [2015-12-22 23:37:19,045] INFO 73.189.108.122 - - [23/Dec/2015:04:37:19 +0000] "GET //mesos-01:8080/v2/deployments HTTP/1.1" 200 2 "http://mesos-01:8080/ui/" "Mozilla/5.0 (X11; Linux x86_64; rv:43.0) Gecko/20100101 Firefox/43.0 Iceweasel/43.0" (mesosphere.chaos.http.ChaosRequestLog$$EnhancerByGuice$$f7252a53:qtp643328884-33)
[mesos-01] Dec 22 23:37:19 mesos-01 marathon[1719]: [2015-12-22 23:37:19,047] INFO 73.189.108.122 - - [23/Dec/2015:04:37:19 +0000] "GET //mesos-01:8080/v2/apps HTTP/1.1" 200 11 "http://mesos-01:8080/ui/" "Mozilla/5.0 (X11; Linux x86_64; rv:43.0) Gecko/20100101 Firefox/43.0 Iceweasel/43.0" (mesosphere.chaos.http.ChaosRequestLog$$EnhancerByGuice$$f7252a53:qtp643328884-27)
[mesos-01] Dec 22 23:37:19 mesos-01 marathon[1719]: [2015-12-22 23:37:19,048] INFO 73.189.108.122 - - [23/Dec/2015:04:37:19 +0000] "GET //mesos-01:8080/v2/queue HTTP/1.1" 200 12 "http://mesos-01:8080/ui/" "Mozilla/5.0 (X11; Linux x86_64; rv:43.0) Gecko/20100101 Firefox/43.0 Iceweasel/43.0" 1 (mesosphere.chaos.http.ChaosRequestLog$$EnhancerByGuice$$f7252a53:qtp643328884-29)
[mesos-03] Dec 22 23:35:09 mesos-03 mesos-slave[2675]: I1222 23:35:09.231684 2692 slave.cpp:4039] Current disk usage 8.93%. Max allowed age: 5.674993196753403days
[mesos-03] Dec 22 23:36:09 mesos-03 mesos-slave[2675]: I1222 23:36:09.232365 2694 slave.cpp:4039] Current disk usage 8.93%. Max allowed age: 5.674993196753403days
[mesos-03] Dec 22 23:37:09 mesos-03 mesos-slave[2675]: I1222 23:37:09.232836 2691 slave.cpp:4039] Current disk usage 8.93%. Max allowed age: 5.674993196753403days
Transferring the `clustersh` program to the same three hosts:
$ clustersh -a mesos-01,mesos-02,mesos-03 push clustersh clustersh
2015/12/22 20:39:23 waiting for sessions to complete
[mesos-02] wrote 6422688-byte chunk
[mesos-02] clustersh uploaded to clustersh
[mesos-03] wrote 6422688-byte chunk
[mesos-03] clustersh uploaded to clustersh
[mesos-01] wrote 6422688-byte chunk
[mesos-01] clustersh uploaded to clustersh
Copying the `/var/log/mesos/marathon.log` file from each of the three hosts:
$ clustersh -a mesos-01,mesos-02,mesos-03 fetch /var/log/mesos/marathon.log marathon.log
2015/12/22 20:42:28 waiting for sessions to complete
[mesos-02] wrote 467461-byte chunk
[mesos-02] /var/log/mesos/marathon.log downloaded to marathon.log-mesos-02
[mesos-03] wrote 415850-byte chunk
[mesos-03] /var/log/mesos/marathon.log downloaded to marathon.log-mesos-03
[mesos-01] wrote 2940327-byte chunk
[mesos-01] /var/log/mesos/marathon.log downloaded to marathon.log-mesos-01
$ ls -1 marathon.log*
marathon.log-mesos-01
marathon.log-mesos-02
marathon.log-mesos-03

347
cmd/clustersh/main.go Normal file
View File

@ -0,0 +1,347 @@
package main
import (
"bufio"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"sync"
"github.com/kisom/goutils/lib"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/agent"
)
func usage(w io.Writer) {
fmt.Fprintf(w, `Usage: %s [-a addresses] [-c chunksize] [-h] [-u user] command args
Flags:
-a addresses The comma-separated list of servers to send
to. There must not be any spaces in this list.
-c chunk The size of chunks to transfer, in bytes. The
default is 16MB.
-u user The SSH username to use. It must be the
same for all hosts. Defaults to the value
of the USER environment variable.
Commands:
exec, run: run a command on the servers
The args list must be the command line to send to the hosts.
upload, up, push, send: upload a file to the servers
The first argument is the local file to upload, and the second
is the filename to store it as on the remote.
download, down, pull, fetch: download a file from the servers
The first argument is the filename to fetch.The second argument
is the base name for the local file. It will have a '-' and the
hostname appended.
`, lib.ProgName())
}
var chunkSize = 16777216
var modes = ssh.TerminalModes{
ssh.ECHO: 0,
ssh.TTY_OP_ISPEED: 14400,
ssh.TTY_OP_OSPEED: 14400,
}
func sshAgent() ssh.AuthMethod {
a, err := net.Dial("unix", os.Getenv("SSH_AUTH_SOCK"))
if err == nil {
return ssh.PublicKeysCallback(agent.NewClient(a).Signers)
}
lib.Err(lib.ExitFailure, err, "failed to authenticate with SSH agent")
return nil
}
func sshConfig(user string) *ssh.ClientConfig {
return &ssh.ClientConfig{
User: user,
Auth: []ssh.AuthMethod{
sshAgent(),
},
}
}
func scanner(host string, in io.Reader, out io.Writer) {
scanner := bufio.NewScanner(in)
for scanner.Scan() {
line := scanner.Text()
fmt.Fprintf(out, "[%s] %s\n", host, line)
}
}
func logError(host string, err error, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
log.Printf("[%s] FAILED: %s: %v\n", host, msg, err)
}
func exec(wg *sync.WaitGroup, user, host string, commands []string) {
var shutdown []func() error
defer func() {
for i := len(shutdown) - 1; i >= 0; i-- {
err := shutdown[i]()
if err != nil && err != io.EOF {
logError(host, err, "shutting down")
}
}
}()
defer wg.Done()
conf := sshConfig(user)
conn, err := ssh.Dial("tcp", host+":22", conf)
if err != nil {
logError(host, err, "failed to connect")
return
}
shutdown = append(shutdown, conn.Close)
session, err := conn.NewSession()
if err != nil {
logError(host, err, "failed to establish session")
return
}
shutdown = append(shutdown, session.Close)
if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
session.Close()
logError(host, err, "request for pty failed")
return
}
stdout, err := session.StdoutPipe()
if err != nil {
logError(host, err, "failed to setup standard output")
return
}
go scanner(host, stdout, os.Stdout)
stderr, err := session.StderrPipe()
if err != nil {
logError(host, err, "failed to setup standard error")
return
}
go scanner(host, stderr, os.Stderr)
for _, command := range commands {
err = session.Run(command)
if err != nil {
logError(host, err, "running command failed")
return
}
}
}
func upload(wg *sync.WaitGroup, user, host, local, remote string) {
var shutdown []func() error
defer func() {
for i := len(shutdown) - 1; i >= 0; i-- {
err := shutdown[i]()
if err != nil && err != io.EOF {
logError(host, err, "shutting down")
}
}
}()
defer wg.Done()
conf := sshConfig(user)
conn, err := ssh.Dial("tcp", host+":22", conf)
if err != nil {
logError(host, err, "failed to connect")
return
}
shutdown = append(shutdown, conn.Close)
sftp, err := sftp.NewClient(conn)
if err != nil {
logError(host, err, "setting up SFTP client")
return
}
shutdown = append(shutdown, sftp.Close)
remoteFile, err := sftp.Create(remote)
if err != nil {
logError(host, err, "creating file %s on remote", remote)
return
}
shutdown = append(shutdown, remoteFile.Close)
localFile, err := os.Open(local)
if err != nil {
logError(host, err, "opening local file")
return
}
shutdown = append(shutdown, localFile.Close)
var buf = make([]byte, chunkSize)
for {
var n int
n, err = localFile.Read(buf)
if n > 0 {
_, err = remoteFile.Write(buf[:n])
if err != nil {
logError(host, err, "writing chunk")
return
}
fmt.Printf("[%s] wrote %d-byte chunk\n", host, n)
}
if err == io.EOF {
break
} else if err != nil {
logError(host, err, "reading chunk")
return
}
}
fmt.Printf("[%s] %s uploaded to %s\n", host, remote, local)
}
func download(wg *sync.WaitGroup, user, host, local, remote string) {
var shutdown []func() error
defer func() {
for i := len(shutdown) - 1; i >= 0; i-- {
err := shutdown[i]()
if err != nil && err != io.EOF {
logError(host, err, "shutting down")
}
}
}()
defer wg.Done()
conf := sshConfig(user)
conn, err := ssh.Dial("tcp", host+":22", conf)
if err != nil {
logError(host, err, "failed to connect")
return
}
shutdown = append(shutdown, conn.Close)
sftp, err := sftp.NewClient(conn)
if err != nil {
logError(host, err, "setting up SFTP client")
return
}
shutdown = append(shutdown, sftp.Close)
remoteFile, err := sftp.Open(remote)
if err != nil {
logError(host, err, "opening file %s on remote", remote)
return
}
shutdown = append(shutdown, remoteFile.Close)
local = local + "-" + host
localFile, err := os.Create(local)
if err != nil {
logError(host, err, "opening local file")
return
}
shutdown = append(shutdown, localFile.Close)
var buf = make([]byte, chunkSize)
for {
var n int
n, err = remoteFile.Read(buf)
if n > 0 {
_, err = localFile.Write(buf[:n])
if err != nil {
logError(host, err, "writing chunk")
return
}
fmt.Printf("[%s] wrote %d-byte chunk\n", host, n)
}
if err == io.EOF {
break
} else if err != nil {
logError(host, err, "reading chunk")
return
}
}
fmt.Printf("[%s] %s downloaded to %s\n", host, remote, local)
}
func init() {
flag.Usage = func() {
usage(os.Stdout)
}
}
func main() {
var hostsFlag, user string
flag.StringVar(&hostsFlag, "a", "", "`hosts` to run on")
flag.IntVar(&chunkSize, "c", chunkSize, "`size` in bytes of transfer chunks")
flag.StringVar(&user, "u", os.Getenv("USER"), "`username` to run commands as")
flag.Parse()
hosts := strings.Split(hostsFlag, ",")
if len(hosts) == 0 {
os.Exit(0)
}
if flag.NArg() < 1 {
usage(os.Stderr)
os.Exit(1)
}
cmd := flag.Arg(0)
var wg = new(sync.WaitGroup)
switch cmd {
case "exec", "run":
if flag.NArg() < 2 {
usage(os.Stderr)
os.Exit(1)
}
commands := []string{strings.Join(flag.Args()[1:], " ")}
for _, host := range hosts {
wg.Add(1)
go exec(wg, user, host, commands)
}
case "upload", "up", "push", "send":
if flag.NArg() != 3 {
usage(os.Stderr)
os.Exit(1)
}
for _, host := range hosts {
wg.Add(1)
go upload(wg, user, host, flag.Arg(1), flag.Arg(2))
}
case "download", "down", "pull", "fetch":
if flag.NArg() != 3 {
usage(os.Stderr)
os.Exit(1)
}
for _, host := range hosts {
wg.Add(1)
go download(wg, user, host, flag.Arg(2), flag.Arg(1))
}
case "help":
usage(os.Stdout)
os.Exit(0)
default:
fmt.Fprintf(os.Stderr, "Unknown command %s.\n", cmd)
usage(os.Stderr)
os.Exit(1)
}
log.Printf("waiting for sessions to complete")
wg.Wait()
}

View File

@ -2,7 +2,8 @@ fragment
This is a program to print a fragment of a file. It takes a filename,
a starting line (lines are numbered starting at 1) and an optional
end line or offset.
end line or offset. It has one flag, `-nl`, which causes line numbers
not to be printed.
For example, compare the output to `nl -ba fragment.go`: