commit a3966fc28b4c1a7e95f872826b6a3bfe0c4e2b13 Author: Kyle Isom Date: Sat Feb 26 22:57:13 2022 -0800 Initial import. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..133ec2c --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/proto/*.pb.go +/proto/*_pb2.py +build +__pycache__ +.idea \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6118817 --- /dev/null +++ b/Makefile @@ -0,0 +1,29 @@ +SOURCES := $(shell find ./ -type f -name '*.go') +BUILD_DIR := build +TARGET := $(BUILD_DIR)/sensenet +INSTALL_DIR := /usr/local/bin + +all: $(TARGET) + +$(BUILD_DIR): + mkdir -p $(BUILD_DIR) + +$(TARGET): $(BUILD_DIR) $(SOURCES) + go build -o $@ ./cmd/sensenet + +.PHONY: install +install: $(TARGET) /etc/sensenet /etc/systemd/system/sensenet.service + sudo install $(TARGET) $(INSTALL_DIR) + +/etc/sensenet: + sudo mkdir -p /etc/sensenet + +/etc/systemd/system/sensenet.service: ops/sensenet.service + install ops/sensenet.service /etc/systemd/system/sensenet.service + systemctl daemon-reload + +.PHONY: service +service: /etc/systemd/system/sensenet.service + systemctl enable sensenet.service + systemctl restart sensenet.service + systemctl status sensenet.service diff --git a/cmd/sensenet/main.go b/cmd/sensenet/main.go new file mode 100644 index 0000000..7b6aa6e --- /dev/null +++ b/cmd/sensenet/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "flag" + "fmt" + "log" + + "git.wntrmute.dev/kyle/sensenet/config" +) + +func main() { + configFilePath := flag.String("f", config.DefaultConfigFile, "`path` to config file") + flag.Parse() + + config, err := config.LoadConfig(*configFilePath) + if err != nil { + log.Fatal(err) + } + + fmt.Println(config) +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..4d425b0 --- /dev/null +++ b/config/config.go @@ -0,0 +1,42 @@ +package config + +import ( + "io/ioutil" + + "gopkg.in/yaml.v3" +) + +type Database struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Name string `yaml:"name"` + User string `yaml:"user"` + Password string `yaml:"password"` +} + +type Publisher struct { + Name string `yaml:"name"` + Topics []string `yaml:"topics"` +} + +type Config struct { + Database Database `yaml:"db"` + Publishers map[string]Publisher `yaml:"publishers"` +} + +var DefaultConfigFile = "/etc/sensenet/sensenet.yaml" + +func LoadConfig(path string) (*Config, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + + config := &Config{} + err = yaml.Unmarshal(data, config) + if err != nil { + return nil, err + } + + return config, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d28258a --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module git.wntrmute.dev/kyle/sensenet + +go 1.17 + +require ( + github.com/Masterminds/squirrel v1.5.2 + github.com/jackc/pgconn v1.11.0 + google.golang.org/protobuf v1.27.1 + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c + gopkg.in/zeromq/goczmq.v4 v4.1.0 +) + +require ( + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.1.1 // indirect + github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect + github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect + golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect + golang.org/x/text v0.3.6 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f58cddb --- /dev/null +++ b/go.sum @@ -0,0 +1,148 @@ +github.com/Masterminds/squirrel v1.5.2 h1:UiOEi2ZX4RCSkpiNDQN5kro/XIBpSRk9iTqdIRPzUXE= +github.com/Masterminds/squirrel v1.5.2/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= +github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= +github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= +github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= +github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= +github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= +github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= +github.com/jackc/pgconn v1.11.0 h1:HiHArx4yFbwl91X3qqIHtUFoiIfLNJXCQRsnzkiwwaQ= +github.com/jackc/pgconn v1.11.0/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= +github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZTNpt/7Y5rr2xe/9Mn757Wtb2xeBzPv2c= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= +github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= +github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.1.1 h1:7PQ/4gLoqnl87ZxL7xjO0DR5gYuviDCZxQJsUlFW1eI= +github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= +github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= +github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= +github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= +github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= +github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= +github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= +github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 h1:/UOmuWzQfxxo9UtlXMwuQU8CMgg1eZXqTRwkSQJWKOI= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/zeromq/goczmq.v4 v4.1.0 h1:CE+FE81mGVs2aSlnbfLuS1oAwdcVywyMM2AC1g33imI= +gopkg.in/zeromq/goczmq.v4 v4.1.0/go.mod h1:h4IlfePEYMpFdywGr5gAwKhBBj+hiBl/nF4VoSE4k+0= diff --git a/ops/sensenet.service b/ops/sensenet.service new file mode 100644 index 0000000..9f73b19 --- /dev/null +++ b/ops/sensenet.service @@ -0,0 +1,16 @@ +[Unit] +Description=sensor network telemetry relay +Wants=network-online.target +After=network-online.target +AssertFileIsExecutable=/usr/local/bin/sensenet + +[Service] +User=kyle +Group=kyle +ExecStart=/usr/local/bin/sensenet + +# Let systemd restart this service always +Restart=always + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/ops/sensenet.yaml b/ops/sensenet.yaml new file mode 100644 index 0000000..2b78bfa --- /dev/null +++ b/ops/sensenet.yaml @@ -0,0 +1,16 @@ +db: + host: db.wntrmute.dev + port: 5432 + name: sensenet + user: sensenet + password: CHANGEME +publishers: + 192.168.1.83: + name: chime + topics: + - pht + 192.168.1.117: + name: proxima + topics: + - power + - proxima \ No newline at end of file diff --git a/proto/Makefile b/proto/Makefile new file mode 100644 index 0000000..5ca8648 --- /dev/null +++ b/proto/Makefile @@ -0,0 +1,11 @@ +all: telemetry.pb.go telemetry_pb2.py + +%.pb.go: %.proto + protoc -I=. --go_out=. *.proto + +%_pb2.py: %.proto + protoc -I=. --python_out=. *.proto + +.PHONY: clean +clean: + rm -f *.pb.go *_pb2.py diff --git a/proto/telemetry.proto b/proto/telemetry.proto new file mode 100644 index 0000000..c32590b --- /dev/null +++ b/proto/telemetry.proto @@ -0,0 +1,9 @@ +syntax = 'proto3'; +package telemetrypb; +option go_package = '.;telemetrypb'; + +message Packet { + string topic = 1; + uint64 timestamp = 2; + bytes payload = 3; +} diff --git a/pubsub.py b/pubsub.py new file mode 100644 index 0000000..6306643 --- /dev/null +++ b/pubsub.py @@ -0,0 +1,55 @@ +import time +import zmq +import proto.telemetry_pb2 as proto + + +class Subscriber: + def __init__(self, addr, topics=None, conflate=1): + self.ctx = zmq.Context().instance() + self.socket = self.ctx.socket(zmq.SUB) + self.socket.setsockopt(zmq.CONFLATE, conflate) + self._topics = set() + self.socket.connect(addr) + + if topics is not None: + for topic in topics: + self.subscribe(topic) + + def subscribe(self, topic): + if len(topic) == 0: + self.socket.subscribe('') + self._topics.add('') + return + + if not topic in self._topics: + self._topics.add(topic) + topic_bytes = bytearray([10, len(topic)]) + topic_bytes.extend(topic) + self.socket.subscribe(bytes(topic_bytes)) + + def topics(self): + return list(self._topics) + + def receive(self): + return self.socket.recv() + + def close(self): + return self.socket.close() + + +class Publisher: + def __init__(self, addr, conflate=1): + self.ctx = zmq.Context().instance() + self.socket = self.ctx.socket(zmq.PUB) + self.socket.setsockopt(zmq.CONFLATE, conflate) + self.socket.bind(addr) + + def publish(self, message, topic=b''): + message = proto.Packet(topic=topic, created=int(time.time()), payload=message) + return self.socket.send(message.SerializeToString()) + + def publish_json(self, message, topic=b''): + return self.publish(json.dumps(message).encode('UTF-8'), topic) + + def close(self): + return self.socket.close() diff --git a/pubsub/publisher.go b/pubsub/publisher.go new file mode 100644 index 0000000..fa482a1 --- /dev/null +++ b/pubsub/publisher.go @@ -0,0 +1,58 @@ +package pubsub + +import ( + "time" + + telemetrypb "git.wntrmute.dev/kyle/sensenet/proto" + "google.golang.org/protobuf/proto" + goczmq "gopkg.in/zeromq/goczmq.v4" +) + +const singleFrame = 0 + +type Publisher struct { + addr string + sock *goczmq.Sock +} + +func NewPublisher(addr string) (*Publisher, error) { + pub := &Publisher{ + addr: addr, + sock: goczmq.NewSock(goczmq.Pub), + } + + pub.Conflate(1) + err := pub.connect() + if err != nil { + return nil, err + } + + return pub, nil +} + +func (pub *Publisher) connect() error { + return pub.sock.Connect(pub.addr) +} + +func (pub *Publisher) Conflate(n int) { + pub.sock.SetConflate(n) +} + +func (pub *Publisher) Transmit(topic string, data []byte) error { + packet, err := genPacket(topic, data) + if err != nil { + return err + } + + return pub.sock.SendFrame(packet, singleFrame) +} + +func genPacket(topic string, payload []byte) ([]byte, error) { + packet := &telemetrypb.Packet{ + Topic: topic, + Timestamp: uint64(time.Now().Unix()), + Payload: payload, + } + + return proto.Marshal(packet) +} diff --git a/pubsub/subscriber.go b/pubsub/subscriber.go new file mode 100644 index 0000000..85be7c6 --- /dev/null +++ b/pubsub/subscriber.go @@ -0,0 +1,105 @@ +package pubsub + +import ( + "fmt" + "sort" + + telemetrypb "git.wntrmute.dev/kyle/sensenet/proto" + "git.wntrmute.dev/kyle/sensenet/topic" + "google.golang.org/protobuf/proto" + "gopkg.in/zeromq/goczmq.v4" +) + +type Subscriber struct { + addr string + publisher string + sock *goczmq.Sock + topics map[string]bool +} + +func NewSubscriber(addr, publisher string, topics ...string) (*Subscriber, error) { + sub := &Subscriber{ + addr: addr, + sock: goczmq.NewSock(goczmq.Pub), + } + + err := sub.connect() + if err != nil { + return nil, err + } + + sub.Conflate(1) + for _, topic := range topics { + sub.Subscribe(topic) + } + + return sub, nil +} + +func (sub *Subscriber) Conflate(n int) { + sub.sock.SetConflate(n) +} + +func (sub *Subscriber) connect() error { + return sub.sock.Connect(sub.addr) +} + +func (sub *Subscriber) Subscribe(topic string) { + if sub.topics[topic] { + return + } + + sub.topics[topic] = true + topic = fmt.Sprintf("\x0a%d%s", len(topic), topic) + sub.sock.SetSubscribe(topic) +} + +func (sub *Subscriber) Topics() []string { + var topics = make([]string, 0, len(sub.topics)) + for k := range sub.topics { + topics = append(topics, k) + } + + sort.Strings(topics) + return topics +} + +func (sub *Subscriber) receive() ([]byte, error) { + var ( + data []byte + err error + packet []byte + more = 1 + ) + for more > 0 { + packet, more, err = sub.sock.RecvFrame() + if err != nil { + return nil, err + } + + data = append(data, packet...) + } + + return data, nil +} + +func (sub *Subscriber) Receive() (*topic.Packet, error) { + data, err := sub.receive() + if err != nil { + return nil, err + } + + pbPacket := &telemetrypb.Packet{} + err = proto.Unmarshal(data, pbPacket) + if err != nil { + return nil, err + } + + packet := &topic.Packet{ + Publisher: sub.publisher, + Received: int64(pbPacket.Timestamp), + Payload: pbPacket.Payload, + } + + return packet, nil +} diff --git a/receiver/receiver.go b/receiver/receiver.go new file mode 100644 index 0000000..cf2968d --- /dev/null +++ b/receiver/receiver.go @@ -0,0 +1,60 @@ +package receiver + +import ( + "log" + + "git.wntrmute.dev/kyle/sensenet/config" + "git.wntrmute.dev/kyle/sensenet/pubsub" + "git.wntrmute.dev/kyle/sensenet/topic" +) + +const ( + packetBuffer = 64 + errBuffer = 64 +) + +type Receiver struct { + packets chan *topic.Packet + errs chan error +} + +func receive(packets chan *topic.Packet, errs chan error, subscriber *pubsub.Subscriber) { + packet, err := subscriber.Receive() + if err != nil { + errs <- err + return + } + + packets <- packet +} + +func (rcvr *Receiver) Subscribe(addr string, publisher *config.Publisher) error { + subscriber, err := pubsub.NewSubscriber(addr, publisher.Name, publisher.Topics...) + if err != nil { + return err + } + + go func() { + for { + receive(rcvr.packets, rcvr.errs, subscriber) + } + }() + + return nil +} + +func (rcvr *Receiver) LogErrors() { + for { + select { + case err := <-rcvr.errs: + log.Println(err) + } + } +} + +func New() *Receiver { + return &Receiver{ + packets: make(chan *topic.Packet, packetBuffer), + errs: make(chan error, errBuffer), + } +} diff --git a/topic/pht/pht.go b/topic/pht/pht.go new file mode 100644 index 0000000..d3bb6f7 --- /dev/null +++ b/topic/pht/pht.go @@ -0,0 +1,62 @@ +package pht + +import ( + "context" + "encoding/json" + + "git.wntrmute.dev/kyle/sensenet/topic" + "github.com/Masterminds/squirrel" +) + +var psql = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar) + +func createTable(ctx context.Context, db topic.Database) error { + stmt := `CREATE TABLE pht IF NOT EXISTS ( + id uuid primary key default gen_random_uuid(), + source text, + timestamp int, + temp real, + press real, + humid real +);` + _, err := db.Exec(ctx, stmt) + return err +} + +type reading struct { + Timestamp int64 `json:"timestamp"` + Temperature float64 `json:"temperature"` + Pressure float64 `json:"pressure"` + Humidity float64 `json:"relative_humidity"` +} + +func store(ctx context.Context, db topic.Database, packet *topic.Packet) error { + r := &reading{} + err := json.Unmarshal(packet.Payload, r) + if err != nil { + return err + } + stmt := psql.Insert("pht").Columns( + "source", + "timestamp", + "temp", + "press", + "humid", + ).Values(packet.Publisher, r.Timestamp, r.Pressure, r.Humidity) + query, args, err := stmt.ToSql() + if err != nil { + return err + } + + _, err = db.Exec(ctx, query, args...) + return err +} + +var PHT = &topic.Topic{ + CreateTable: createTable, + Store: store, +} + +func init() { + topic.Register("pht", PHT) +} diff --git a/topic/topic.go b/topic/topic.go new file mode 100644 index 0000000..06027d4 --- /dev/null +++ b/topic/topic.go @@ -0,0 +1,70 @@ +package topic + +import ( + "context" + "fmt" + "log" + "sync" + + "github.com/jackc/pgconn" +) + +type Database interface { + Exec(ctx context.Context, query string, args ...interface{}) (pgconn.CommandTag, error) +} + +type Topic struct { + CreateTable func(ctx context.Context, db Database) error + Store func(ctx context.Context, db Database, packet *Packet) error +} + +var registry = struct { + topics map[string]*Topic + lock *sync.Mutex +}{ + topics: map[string]*Topic{}, + lock: new(sync.Mutex), +} + +func Register(topic string, handler *Topic) { + registry.lock.Lock() + defer registry.lock.Unlock() + + if _, ok := registry.topics[topic]; ok { + panic(fmt.Sprintf("attempt to register pht '%s' that has already been registered", topic)) + } + + registry.topics[topic] = handler +} + +type Packet struct { + Topic string + Publisher string + Received int64 // unix timestamp + Payload []byte +} + +// Setup ensures all the required tables are created. +func Setup(ctx context.Context, db Database) error { + registry.lock.Lock() + defer registry.lock.Unlock() + + for key, topic := range registry.topics { + log.Println("running CreateTable for", key) + err := topic.CreateTable(ctx, db) + if err != nil { + return err + } + } + + return nil +} + +func Publish(ctx context.Context, db Database, packet *Packet) error { + topic, ok := registry.topics[packet.Topic] + if !ok { + return fmt.Errorf("topic: no handler for topic '%s' [%x]", packet.Topic, packet.Payload) + } + + return topic.Store(ctx, db, packet) +}