kopia lustrzana https://github.com/bugout-dev/moonstream
DB timeout as service config and CLI arg
rodzic
b2bb3b70a9
commit
d438801b98
|
@ -107,7 +107,9 @@ func CreateEngineCommand() *cobra.Command {
|
|||
}
|
||||
|
||||
var dbUri string
|
||||
var dbTimeout string
|
||||
engineCommand.PersistentFlags().StringVarP(&dbUri, "db-uri", "d", "", "Database URI")
|
||||
engineCommand.PersistentFlags().StringVarP(&dbTimeout, "db-timeout", "t", "", "Database timeout (format: 10s)")
|
||||
engineCommand.MarkFlagRequired("db-uri")
|
||||
|
||||
for _, sc := range engine.ENGINE_SUPPORTED_WORKERS {
|
||||
|
@ -118,7 +120,7 @@ func CreateEngineCommand() *cobra.Command {
|
|||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
dbPool, err := CreateDbPool(ctx, dbUri, "10s")
|
||||
dbPool, err := CreateDbPool(ctx, dbUri, dbTimeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("database connection error: %v", err)
|
||||
}
|
||||
|
|
|
@ -20,9 +20,10 @@ var (
|
|||
|
||||
// Workers configuration
|
||||
type ServiceWorkersConfig struct {
|
||||
Name string `json:"name"`
|
||||
DbUri string `json:"db_uri"`
|
||||
Workers []probes.ServiceWorker `json:"workers"`
|
||||
Name string `json:"name"`
|
||||
DbUri string `json:"db_uri"`
|
||||
DbTimeout string `json:"db_timeout"`
|
||||
Workers []probes.ServiceWorker `json:"workers"`
|
||||
}
|
||||
|
||||
func ReadConfig(configPath string) (*[]ServiceWorkersConfig, int, error) {
|
||||
|
@ -51,31 +52,29 @@ func ReadConfig(configPath string) (*[]ServiceWorkersConfig, int, error) {
|
|||
for w, worker := range service.Workers {
|
||||
switch service.Name {
|
||||
case "engine":
|
||||
for _, engineWorker := range engine.ENGINE_SUPPORTED_WORKERS {
|
||||
if worker.Name == engineWorker.Name {
|
||||
serviceWorkers = append(serviceWorkers, probes.ServiceWorker{
|
||||
Name: worker.Name,
|
||||
Interval: worker.Interval,
|
||||
ExecFunction: engineWorker.ExecFunction,
|
||||
})
|
||||
log.Printf("[%s] [%s] - Registered function", service.Name, worker.Name)
|
||||
totalWorkersNum++
|
||||
continue
|
||||
}
|
||||
}
|
||||
if worker.ExecFunction == nil {
|
||||
engineWorker := engine.ENGINE_SUPPORTED_WORKERS[fmt.Sprintf("%s-%s", service.Name, worker.Name)]
|
||||
if engineWorker.ExecFunction == nil {
|
||||
service.Workers = append(service.Workers[:w], service.Workers[w+1:]...)
|
||||
log.Printf("Function for worker %s at service %s not found, removed from the list", worker.Name, service.Name)
|
||||
continue
|
||||
}
|
||||
serviceWorkers = append(serviceWorkers, probes.ServiceWorker{
|
||||
Name: worker.Name,
|
||||
Interval: worker.Interval,
|
||||
ExecFunction: engineWorker.ExecFunction,
|
||||
})
|
||||
log.Printf("[%s] [%s] - Registered function", service.Name, worker.Name)
|
||||
totalWorkersNum++
|
||||
default:
|
||||
service.Workers = append(service.Workers[:w], service.Workers[w+1:]...)
|
||||
log.Printf("Unsupported %s service with %s worker from the list", worker.Name, service.Name)
|
||||
}
|
||||
}
|
||||
serviceWorkersConfig = append(serviceWorkersConfig, ServiceWorkersConfig{
|
||||
Name: service.Name,
|
||||
DbUri: serviceDbUri,
|
||||
Workers: serviceWorkers,
|
||||
Name: service.Name,
|
||||
DbUri: serviceDbUri,
|
||||
DbTimeout: service.DbTimeout,
|
||||
Workers: serviceWorkers,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ func RunService(configPath string) error {
|
|||
for _, service := range *serviceConfigs {
|
||||
for _, worker := range service.Workers {
|
||||
wg.Add(1)
|
||||
go RunWorker(&wg, worker, service.Name, service.DbUri)
|
||||
go RunWorker(&wg, worker, service.Name, service.DbUri, service.DbTimeout)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
@ -44,16 +44,17 @@ func RunService(configPath string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func RunWorker(wg *sync.WaitGroup, worker probes.ServiceWorker, serviceName, dbUri string) error {
|
||||
func RunWorker(wg *sync.WaitGroup, worker probes.ServiceWorker, serviceName, dbUri, dbTimeout string) error {
|
||||
defer wg.Done()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
dbPool, err := CreateDbPool(ctx, dbUri, "10s")
|
||||
dbPool, err := CreateDbPool(ctx, dbUri, dbTimeout)
|
||||
if err != nil {
|
||||
log.Printf("[%s] [%s] - database connection error, err: %v", serviceName, worker.Name, err)
|
||||
log.Printf("[%s] [%s] - unable to establish connection with database, err: %v", serviceName, worker.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
defer dbPool.Close()
|
||||
|
||||
t := time.NewTicker(time.Duration(worker.Interval) * time.Second)
|
||||
|
@ -63,7 +64,7 @@ func RunWorker(wg *sync.WaitGroup, worker probes.ServiceWorker, serviceName, dbU
|
|||
err = worker.ExecFunction(ctx, dbPool)
|
||||
if err != nil {
|
||||
log.Printf("[%s] [%s] - an error occurred during execution, err: %v", serviceName, worker.Name, err)
|
||||
return err
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
[Unit]
|
||||
Description=Clean outdated call_requests from Engine DB
|
||||
Description=Run probes service
|
||||
After=network.target
|
||||
StartLimitIntervalSec=300
|
||||
StartLimitBurst=3
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
probes "github.com/moonstream-to/api/probes/pkg"
|
||||
)
|
||||
|
||||
var ENGINE_SUPPORTED_WORKERS = []probes.ServiceWorker{{
|
||||
var ENGINE_SUPPORTED_WORKERS = map[string]probes.ServiceWorker{"engine-clean-call-requests": {
|
||||
Name: "clean-call-requests",
|
||||
Description: "Clean all inactive call requests from database",
|
||||
LonDescription: "Remove records in call_requests database table with ttl value greater then now.",
|
||||
|
|
Ładowanie…
Reference in New Issue