Workers with call_requests clean action

pull/813/head
kompotkot 2023-06-08 14:16:43 +00:00
rodzic 2f08452fbb
commit ce99e7140a
13 zmienionych plików z 431 dodań i 0 usunięć

65
workers/.gitignore vendored 100644
Wyświetl plik

@ -0,0 +1,65 @@
# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,go
# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,go
### Go ###
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
### Go Patch ###
/vendor/
/Godeps/
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# Support for Project snippet scope
.vscode/*.code-snippets
# Ignore code-workspaces
*.code-workspace
# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,go
# Custom
.secrets/*
dev.env
prod.env
test.env
.venv
workers_dev

Wyświetl plik

@ -0,0 +1,131 @@
package main
import (
"context"
"fmt"
"github.com/spf13/cobra"
workers "github.com/moonstream-to/api/workers/pkg"
engine "github.com/moonstream-to/api/workers/pkg/engine"
)
func CreateRootCommand() *cobra.Command {
// rootCmd represents the base command when called without any subcommands
rootCmd := &cobra.Command{
Use: "workers",
Short: "Autonomous workers for moonstream services",
Long: `workers is a CLI that allows you to run multiple operations according to Moonstream services.
workers currently supports services:
- Engine
`,
Run: func(cmd *cobra.Command, args []string) {},
}
versionCmd := CreateVersionCommand()
engineCmd := CreateEngineCommand()
rootCmd.AddCommand(versionCmd, engineCmd)
completionCmd := CreateCompletionCommand(rootCmd)
rootCmd.AddCommand(completionCmd)
return rootCmd
}
func CreateCompletionCommand(rootCmd *cobra.Command) *cobra.Command {
completionCmd := &cobra.Command{
Use: "completion",
Short: "Generate shell completion scripts for workers",
Long: `Generate shell completion scripts for workers.
The command for each shell will print a completion script to stdout. You can source this script to get
completions in your current shell session. You can add this script to the completion directory for your
shell to get completions for all future sessions.
For example, to activate bash completions in your current shell:
$ . <(workers completion bash)
To add workers completions for all bash sessions:
$ workers completion bash > /etc/bash_completion.d/workers_completions`,
}
bashCompletionCmd := &cobra.Command{
Use: "bash",
Short: "bash completions for workers",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenBashCompletion(cmd.OutOrStdout())
},
}
zshCompletionCmd := &cobra.Command{
Use: "zsh",
Short: "zsh completions for workers",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenZshCompletion(cmd.OutOrStdout())
},
}
fishCompletionCmd := &cobra.Command{
Use: "fish",
Short: "fish completions for workers",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenFishCompletion(cmd.OutOrStdout(), true)
},
}
powershellCompletionCmd := &cobra.Command{
Use: "powershell",
Short: "powershell completions for workers",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenPowerShellCompletion(cmd.OutOrStdout())
},
}
completionCmd.AddCommand(bashCompletionCmd, zshCompletionCmd, fishCompletionCmd, powershellCompletionCmd)
return completionCmd
}
func CreateVersionCommand() *cobra.Command {
versionCmd := &cobra.Command{
Use: "version",
Short: "Print the version number of workers",
Long: `All software has versions. This is workers's`,
Run: func(cmd *cobra.Command, args []string) {
cmd.Println(workers.VERSION)
},
}
return versionCmd
}
func CreateEngineCommand() *cobra.Command {
engineCommand := &cobra.Command{
Use: "engine",
Short: "Engine workers and more",
}
var dbUri string
engineCommand.PersistentFlags().StringVarP(&dbUri, "db-uri", "d", "", "Database URI.")
cleanCallRequestsCommand := &cobra.Command{
Use: "clean-call-requests",
Short: "Clean all inactive call requests from database.",
Long: "Remove records in call_requests database table with ttl value greater then now.",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
dbPool, err := CreateDbPool(ctx, dbUri, "10s")
if err != nil {
return fmt.Errorf("database connection error: %v", err)
}
defer dbPool.Close()
return engine.CleanCallRequestsCommand(ctx, dbPool)
},
}
engineCommand.AddCommand(cleanCallRequestsCommand)
return engineCommand
}

Wyświetl plik

@ -0,0 +1,28 @@
package main
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func CreateDbPool(ctx context.Context, dbUri string, timeout string) (*pgxpool.Pool, error) {
conf, err := pgxpool.ParseConfig(dbUri)
if err != nil {
return nil, fmt.Errorf("unable to parse database connection string, err: %v", err)
}
ctDuration, err := time.ParseDuration(timeout)
if err != nil {
return nil, fmt.Errorf("unable to parse connect timeout duration, err: %v", err)
}
conf.ConnConfig.ConnectTimeout = ctDuration
dbPool, err := pgxpool.NewWithConfig(ctx, conf)
if err != nil {
return nil, fmt.Errorf("Unable to establish connection with database, err: %v", err)
}
return dbPool, nil
}

Wyświetl plik

@ -0,0 +1,15 @@
package main
import (
"fmt"
"os"
)
func main() {
command := CreateRootCommand()
err := command.Execute()
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
}

Wyświetl plik

@ -0,0 +1,70 @@
#!/usr/bin/env bash
# Deployment script - intended to run on Moonstream API server
# Colors
C_RESET='\033[0m'
C_RED='\033[1;31m'
C_GREEN='\033[1;32m'
C_YELLOW='\033[1;33m'
# Logs
PREFIX_INFO="${C_GREEN}[INFO]${C_RESET} [$(date +%d-%m\ %T)]"
PREFIX_WARN="${C_YELLOW}[WARN]${C_RESET} [$(date +%d-%m\ %T)]"
PREFIX_CRIT="${C_RED}[CRIT]${C_RESET} [$(date +%d-%m\ %T)]"
# Main
APP_DIR="${APP_DIR:-/home/ubuntu/api}"
AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}"
SCRIPT_DIR="$(realpath $(dirname $0))"
SECRETS_DIR="${SECRETS_DIR:-/home/ubuntu/engineapi-secrets}"
PARAMETERS_ENV_PATH="${SECRETS_DIR}/app.env"
# API server service file
CLEAN_CALL_REQUESTS_SERVICE_FILE="workers-engine-clean-call-requests.service"
CLEAN_CALL_REQUESTS_TIMER_FILE="workers-engine-clean-call-requests.timer"
set -eu
echo
echo
echo -e "${PREFIX_INFO} Install checkenv"
HOME=/home/ubuntu /usr/local/go/bin/go install github.com/bugout-dev/checkenv@latest
echo
echo
echo -e "${PREFIX_INFO} Retrieving deployment parameters"
if [ ! -d "${SECRETS_DIR}" ]; then
mkdir "${SECRETS_DIR}"
echo -e "${PREFIX_WARN} Created new secrets directory"
fi
AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION}" /home/ubuntu/go/bin/checkenv show aws_ssm+engine:true > "${PARAMETERS_ENV_PATH}"
chmod 0640 "${PARAMETERS_ENV_PATH}"
echo
echo
echo -e "${PREFIX_INFO} Add AWS default region to parameters"
echo "AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}" >> "${PARAMETERS_ENV_PATH}"
echo
echo
echo -e "${PREFIX_INFO} Add instance local IP to parameters"
echo "AWS_LOCAL_IPV4=$(ec2metadata --local-ipv4)" >> "${PARAMETERS_ENV_PATH}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Engine API server service definition with ${ENGINE_SERVICE_FILE}"
chmod 644 "${SCRIPT_DIR}/${ENGINE_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${ENGINE_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ENGINE_SERVICE_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ENGINE_SERVICE_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing workers-engine-clean-call-requests service and timer with: ${CLEAN_CALL_REQUESTS_SERVICE_FILE}, ${CLEAN_CALL_REQUESTS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${CLEAN_CALL_REQUESTS_SERVICE_FILE}" "${SCRIPT_DIR}/${CLEAN_CALL_REQUESTS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${CLEAN_CALL_REQUESTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${CLEAN_CALL_REQUESTS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${CLEAN_CALL_REQUESTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${CLEAN_CALL_REQUESTS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${CLEAN_CALL_REQUESTS_TIMER_FILE}"

Wyświetl plik

@ -0,0 +1,11 @@
[Unit]
Description=Clean outdated call_requests from Engine DB
After=network.target
[Service]
Type=oneshot
WorkingDirectory=/home/ubuntu/api/workers
EnvironmentFile=/home/ubuntu/engineapi-secrets/app.env
ExecStart=/home/ubuntu/api/workers/workers engine clean-call-requests -db-uri "${ENGINE_DB_URI}"
CPUWeight=50
SyslogIdentifier=workers-engine-clean-call-requests

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Clean outdated call_requests from Engine DB
[Timer]
OnBootSec=40s
OnUnitActiveSec=60m
[Install]
WantedBy=timers.target

10
workers/dev.sh 100755
Wyświetl plik

@ -0,0 +1,10 @@
#!/usr/bin/env sh
# Compile application and run with provided arguments
set -e
PROGRAM_NAME="workers_dev"
go build -o "$PROGRAM_NAME" cmd/workers/*.go
./"$PROGRAM_NAME" "$@"

19
workers/go.mod 100644
Wyświetl plik

@ -0,0 +1,19 @@
module github.com/moonstream-to/api/workers
go 1.19
require (
github.com/jackc/pgx/v5 v5.3.1
github.com/spf13/cobra v1.7.0
)
require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.7.0 // indirect
)

34
workers/go.sum 100644
Wyświetl plik

@ -0,0 +1,34 @@
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU=
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk=
github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

Wyświetl plik

@ -0,0 +1,35 @@
package engine
import (
"context"
"fmt"
"log"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type CallRequest struct {
Id string `json:"id"`
RegisteredContractIid string `json:"registered_contract_id"`
Caller string `json:"caller"`
MoonstreamUserId string `json:"moonstream_user_id"`
Method string `json:"method"`
Parameters interface{} `json:"parameters"`
ExpiresAt time.Time `json:"expires_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func CleanCallRequestsCommand(ctx context.Context, dbPool *pgxpool.Pool) error {
tag, err := dbPool.Exec(
ctx,
"DELETE FROM call_requests WHERE expires_at <= NOW() - INTERVAL '1 minute';",
)
if err != nil {
return fmt.Errorf("delete execution failed, err: %v", err)
}
log.Printf("Deleted %d call requests", tag.RowsAffected())
return nil
}

Wyświetl plik

@ -0,0 +1,3 @@
package workers
const VERSION string = "0.0.1"

Wyświetl plik

@ -0,0 +1 @@
export ENGINE_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>"