Run service at probs simultaneously

pull/813/head
kompotkot 2023-06-12 14:04:24 +00:00
rodzic ce99e7140a
commit f9646ea8ba
18 zmienionych plików z 359 dodań i 55 usunięć

Wyświetl plik

@ -62,4 +62,4 @@ dev.env
prod.env
test.env
.venv
workers_dev
probs_dev

32
probs/README.md 100644
Wyświetl plik

@ -0,0 +1,32 @@
# probs
Running multiple operations simultaneously under one application.
Execute one command:
```bash
probs engine clean-call-requests --db-uri "${ENGINE_DB_URI}"
```
Run service with configuration:
```bash
probs service --config "~/.probs/config.json"
```
Config example:
```json
[
{
"name": "engine",
"db_uri": "ENGINE_DB_URI",
"workers": [
{
"name": "clean-call-requests",
"interval": 10
}
]
}
]
```

Wyświetl plik

@ -6,8 +6,8 @@ import (
"github.com/spf13/cobra"
workers "github.com/moonstream-to/api/workers/pkg"
engine "github.com/moonstream-to/api/workers/pkg/engine"
probs "github.com/moonstream-to/api/probs/pkg"
engine "github.com/moonstream-to/api/probs/pkg/engine"
)
func CreateRootCommand() *cobra.Command {
@ -25,7 +25,8 @@ func CreateRootCommand() *cobra.Command {
versionCmd := CreateVersionCommand()
engineCmd := CreateEngineCommand()
rootCmd.AddCommand(versionCmd, engineCmd)
serviceCmd := CreateServiceCommand()
rootCmd.AddCommand(versionCmd, engineCmd, serviceCmd)
completionCmd := CreateCompletionCommand(rootCmd)
rootCmd.AddCommand(completionCmd)
@ -91,9 +92,9 @@ func CreateVersionCommand() *cobra.Command {
versionCmd := &cobra.Command{
Use: "version",
Short: "Print the version number of workers",
Long: `All software has versions. This is workers's`,
Long: `All software has versions. This is workers's.`,
Run: func(cmd *cobra.Command, args []string) {
cmd.Println(workers.VERSION)
cmd.Println(probs.VERSION)
},
}
return versionCmd
@ -106,26 +107,45 @@ func CreateEngineCommand() *cobra.Command {
}
var dbUri string
engineCommand.PersistentFlags().StringVarP(&dbUri, "db-uri", "d", "", "Database URI.")
engineCommand.PersistentFlags().StringVarP(&dbUri, "db-uri", "d", "", "Database URI")
engineCommand.MarkFlagRequired("db-uri")
cleanCallRequestsCommand := &cobra.Command{
Use: "clean-call-requests",
Short: "Clean all inactive call requests from database.",
Long: "Remove records in call_requests database table with ttl value greater then now.",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
for _, sc := range engine.ENGINE_SUPPORTED_WORKERS {
tempCommand := &cobra.Command{
Use: sc.Name,
Short: sc.Description,
Long: sc.LonDescription,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
dbPool, err := CreateDbPool(ctx, dbUri, "10s")
if err != nil {
return fmt.Errorf("database connection error: %v", err)
}
defer dbPool.Close()
dbPool, err := CreateDbPool(ctx, dbUri, "10s")
if err != nil {
return fmt.Errorf("database connection error: %v", err)
}
defer dbPool.Close()
return engine.CleanCallRequestsCommand(ctx, dbPool)
},
return sc.ExecFunction(ctx, dbPool)
},
}
engineCommand.AddCommand(tempCommand)
}
engineCommand.AddCommand(cleanCallRequestsCommand)
return engineCommand
}
func CreateServiceCommand() *cobra.Command {
var configPath string
serviceCmd := &cobra.Command{
Use: "service",
Short: "Run workers as background asynchronous services",
Long: `Each active worker specified in configuration will run in go-routine.`,
RunE: func(cmd *cobra.Command, args []string) error {
return RunService(configPath)
},
}
serviceCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "", "Config path, default: ~/.workers")
return serviceCmd
}

Wyświetl plik

@ -0,0 +1,163 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
probs "github.com/moonstream-to/api/probs/pkg"
engine "github.com/moonstream-to/api/probs/pkg/engine"
)
var (
DEFAULT_CONFIG_DIR_NAME = ".probs"
DEFAULT_CONFIG_FILE_NAME = "config.json"
)
// Workers configuration
type ServiceWorkersConfig struct {
Name string `json:"name"`
DbUri string `json:"db_uri"`
Workers []probs.ServiceWorker `json:"workers"`
}
func ReadConfig(configPath string) (*[]ServiceWorkersConfig, int, error) {
totalWorkersNum := 0
rawBytes, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, totalWorkersNum, err
}
serviceWorkersConfigTemp := &[]ServiceWorkersConfig{}
err = json.Unmarshal(rawBytes, serviceWorkersConfigTemp)
if err != nil {
return nil, totalWorkersNum, err
}
var serviceWorkersConfig []ServiceWorkersConfig
for _, service := range *serviceWorkersConfigTemp {
serviceDbUri := os.Getenv(service.DbUri)
if serviceDbUri == "" {
return nil, totalWorkersNum, fmt.Errorf("unable to load database URI for service %s", service.Name)
}
var serviceWorkers []probs.ServiceWorker
// Link worker function
for w, worker := range service.Workers {
switch service.Name {
case "engine":
for _, engineWorker := range engine.ENGINE_SUPPORTED_WORKERS {
if worker.Name == engineWorker.Name {
serviceWorkers = append(serviceWorkers, probs.ServiceWorker{
Name: worker.Name,
Interval: worker.Interval,
ExecFunction: engineWorker.ExecFunction,
})
log.Printf("[%s] [%s] - Registered function", service.Name, worker.Name)
totalWorkersNum++
continue
}
}
if worker.ExecFunction == nil {
service.Workers = append(service.Workers[:w], service.Workers[w+1:]...)
log.Printf("Function for worker %s at service %s not found, removed from the list", worker.Name, service.Name)
}
default:
service.Workers = append(service.Workers[:w], service.Workers[w+1:]...)
log.Printf("Unsupported %s service with %s worker from the list", worker.Name, service.Name)
}
}
serviceWorkersConfig = append(serviceWorkersConfig, ServiceWorkersConfig{
Name: service.Name,
DbUri: serviceDbUri,
Workers: serviceWorkers,
})
}
return &serviceWorkersConfig, totalWorkersNum, nil
}
type ConfigPlacement struct {
ConfigDirPath string
ConfigDirExists bool
ConfigPath string
ConfigExists bool
}
func CheckPathExists(path string) (bool, error) {
var exists = true
_, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
exists = false
} else {
return exists, fmt.Errorf("error due checking file path exists, err: %v", err)
}
}
return exists, nil
}
func GetConfigPlacement(providedPath string) (*ConfigPlacement, error) {
var configDirPath, configPath string
if providedPath == "" {
homeDir, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("unable to find user home directory, %v", err)
}
configDirPath = fmt.Sprintf("%s/%s", homeDir, DEFAULT_CONFIG_DIR_NAME)
configPath = fmt.Sprintf("%s/%s", configDirPath, DEFAULT_CONFIG_FILE_NAME)
} else {
configPath = strings.TrimSuffix(providedPath, "/")
configDirPath = filepath.Dir(configPath)
}
configDirPathExists, err := CheckPathExists(configDirPath)
if err != nil {
return nil, err
}
configPathExists, err := CheckPathExists(configPath)
if err != nil {
return nil, err
}
configPlacement := &ConfigPlacement{
ConfigDirPath: configDirPath,
ConfigDirExists: configDirPathExists,
ConfigPath: configPath,
ConfigExists: configPathExists,
}
return configPlacement, nil
}
func GenerateDefaultConfig(config *ConfigPlacement) error {
if !config.ConfigDirExists {
if err := os.MkdirAll(config.ConfigDirPath, os.ModePerm); err != nil {
return fmt.Errorf("unable to create directory, %v", err)
}
log.Printf("Config directory created at: %s", config.ConfigDirPath)
}
if !config.ConfigExists {
tempConfig := []ServiceWorkersConfig{}
tempConfigJson, err := json.Marshal(tempConfig)
if err != nil {
return fmt.Errorf("unable to marshal configuration data, err: %v", err)
}
err = ioutil.WriteFile(config.ConfigPath, tempConfigJson, os.ModePerm)
if err != nil {
return fmt.Errorf("unable to write default config to file %s, err: %v", config.ConfigPath, err)
}
log.Printf("Created default configuration at %s", config.ConfigPath)
}
return nil
}

Wyświetl plik

@ -0,0 +1,70 @@
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
probs "github.com/moonstream-to/api/probs/pkg"
)
func RunService(configPath string) error {
// Load configuration
configPlacement, err := GetConfigPlacement(configPath)
if err != nil {
return err
}
if !configPlacement.ConfigExists {
if err := GenerateDefaultConfig(configPlacement); err != nil {
return err
}
} else {
log.Printf("Loaded configuration from %s", configPlacement.ConfigPath)
}
serviceConfigs, totalWorkersNum, err := ReadConfig(configPlacement.ConfigPath)
if err != nil {
return fmt.Errorf("unable to read config, err: %v", err)
}
log.Printf("Loaded configurations of %d services with %d workers", len(*serviceConfigs), totalWorkersNum)
var wg sync.WaitGroup
for _, service := range *serviceConfigs {
for _, worker := range service.Workers {
wg.Add(1)
go RunWorker(&wg, worker, service.Name, service.DbUri)
}
}
wg.Wait()
return nil
}
func RunWorker(wg *sync.WaitGroup, worker probs.ServiceWorker, serviceName, dbUri string) error {
defer wg.Done()
ctx := context.Background()
dbPool, err := CreateDbPool(ctx, dbUri, "10s")
if err != nil {
log.Printf("[%s] [%s] - database connection error, err: %v", serviceName, worker.Name, err)
return err
}
defer dbPool.Close()
t := time.NewTicker(time.Duration(worker.Interval) * time.Second)
for {
select {
case <-t.C:
err = worker.ExecFunction(ctx, dbPool)
if err != nil {
log.Printf("[%s] [%s] - an error occurred during execution, err: %v", serviceName, worker.Name, err)
return err
}
}
}
}

Wyświetl plik

@ -21,8 +21,7 @@ SECRETS_DIR="${SECRETS_DIR:-/home/ubuntu/engineapi-secrets}"
PARAMETERS_ENV_PATH="${SECRETS_DIR}/app.env"
# API server service file
CLEAN_CALL_REQUESTS_SERVICE_FILE="workers-engine-clean-call-requests.service"
CLEAN_CALL_REQUESTS_TIMER_FILE="workers-engine-clean-call-requests.timer"
PROBS_SERVICE_FILE="probs.service"
set -eu
@ -62,9 +61,8 @@ XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ENGINE_S
echo
echo
echo -e "${PREFIX_INFO} Replacing existing workers-engine-clean-call-requests service and timer with: ${CLEAN_CALL_REQUESTS_SERVICE_FILE}, ${CLEAN_CALL_REQUESTS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${CLEAN_CALL_REQUESTS_SERVICE_FILE}" "${SCRIPT_DIR}/${CLEAN_CALL_REQUESTS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${CLEAN_CALL_REQUESTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${CLEAN_CALL_REQUESTS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${CLEAN_CALL_REQUESTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${CLEAN_CALL_REQUESTS_TIMER_FILE}"
echo -e "${PREFIX_INFO} Replacing existing probs service and timer with: ${PROBS_SERVICE_FILE}"
chmod 644 "${SCRIPT_DIR}/${PROBS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${PROBS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${PROBS_SERVICE_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${CLEAN_CALL_REQUESTS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart "${PROBS_SERVICE_FILE}"

Wyświetl plik

@ -0,0 +1,16 @@
[Unit]
Description=Clean outdated call_requests from Engine DB
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/api/probs
EnvironmentFile=/home/ubuntu/engineapi-secrets/app.env
ExecStart=/home/ubuntu/api/probs/probs service
Restart=on-failure
RestartSec=15s
SyslogIdentifier=probs
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -3,8 +3,8 @@
# Compile application and run with provided arguments
set -e
PROGRAM_NAME="workers_dev"
PROGRAM_NAME="probs_dev"
go build -o "$PROGRAM_NAME" cmd/workers/*.go
go build -o "$PROGRAM_NAME" cmd/probs/*.go
./"$PROGRAM_NAME" "$@"

Wyświetl plik

@ -1,4 +1,4 @@
module github.com/moonstream-to/api/workers
module github.com/moonstream-to/api/probs
go 1.19

16
probs/pkg/data.go 100644
Wyświetl plik

@ -0,0 +1,16 @@
package probs
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
)
type ServiceWorker struct {
Name string `json:"name"`
Description string
LonDescription string
Interval int `json:"interval"`
ExecFunction func(context.Context, *pgxpool.Pool) error
}

Wyświetl plik

@ -7,8 +7,17 @@ import (
"time"
"github.com/jackc/pgx/v5/pgxpool"
probs "github.com/moonstream-to/api/probs/pkg"
)
var ENGINE_SUPPORTED_WORKERS = []probs.ServiceWorker{{
Name: "clean-call-requests",
Description: "Clean all inactive call requests from database",
LonDescription: "Remove records in call_requests database table with ttl value greater then now.",
ExecFunction: CleanCallRequestsExec,
}}
type CallRequest struct {
Id string `json:"id"`
RegisteredContractIid string `json:"registered_contract_id"`
@ -21,7 +30,7 @@ type CallRequest struct {
UpdatedAt time.Time `json:"updated_at"`
}
func CleanCallRequestsCommand(ctx context.Context, dbPool *pgxpool.Pool) error {
func CleanCallRequestsExec(ctx context.Context, dbPool *pgxpool.Pool) error {
tag, err := dbPool.Exec(
ctx,
"DELETE FROM call_requests WHERE expires_at <= NOW() - INTERVAL '1 minute';",
@ -30,6 +39,6 @@ func CleanCallRequestsCommand(ctx context.Context, dbPool *pgxpool.Pool) error {
return fmt.Errorf("delete execution failed, err: %v", err)
}
log.Printf("Deleted %d call requests", tag.RowsAffected())
log.Printf("[engine] [clean-call-requests] - Deleted %d call requests", tag.RowsAffected())
return nil
}

Wyświetl plik

@ -1,3 +1,3 @@
package workers
package probs
const VERSION string = "0.0.1"

Wyświetl plik

@ -1,11 +0,0 @@
[Unit]
Description=Clean outdated call_requests from Engine DB
After=network.target
[Service]
Type=oneshot
WorkingDirectory=/home/ubuntu/api/workers
EnvironmentFile=/home/ubuntu/engineapi-secrets/app.env
ExecStart=/home/ubuntu/api/workers/workers engine clean-call-requests -db-uri "${ENGINE_DB_URI}"
CPUWeight=50
SyslogIdentifier=workers-engine-clean-call-requests

Wyświetl plik

@ -1,9 +0,0 @@
[Unit]
Description=Clean outdated call_requests from Engine DB
[Timer]
OnBootSec=40s
OnUnitActiveSec=60m
[Install]
WantedBy=timers.target