webhookd/pkg/worker/worker.go

70 lines
1.5 KiB
Go
Raw Normal View History

2014-10-31 23:18:38 +00:00
package worker
import (
"fmt"
2023-10-03 18:02:57 +00:00
"log/slog"
"github.com/ncarlier/webhookd/pkg/metric"
"github.com/ncarlier/webhookd/pkg/notification"
2014-10-31 23:18:38 +00:00
)
2020-02-26 21:19:11 +00:00
// NewWorker creates, and returns a new Worker object.
2022-05-26 07:05:49 +00:00
func NewWorker(id int, workerQueue chan chan Work) Worker {
2014-10-31 23:18:38 +00:00
// Create, and return the worker.
worker := Worker{
ID: id,
2022-05-26 07:05:49 +00:00
Work: make(chan Work),
2014-10-31 23:18:38 +00:00
WorkerQueue: workerQueue,
2020-02-26 21:19:11 +00:00
QuitChan: make(chan bool),
}
2014-10-31 23:18:38 +00:00
return worker
}
// Worker is a go routine in charge of executing a work.
2014-10-31 23:18:38 +00:00
type Worker struct {
ID int
2022-05-26 07:05:49 +00:00
Work chan Work
WorkerQueue chan chan Work
2014-10-31 23:18:38 +00:00
QuitChan chan bool
}
// Start is the function to starts the worker by starting a goroutine.
// That is an infinite "for-select" loop.
2014-10-31 23:18:38 +00:00
func (w Worker) Start() {
go func() {
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w.Work
select {
case work := <-w.Work:
// Receive a work request.
2023-10-03 18:02:57 +00:00
slog.Debug("hook execution request received", "worker", w.ID, "hook", work.Name(), "id", work.ID())
metric.Requests.Add(1)
2022-05-26 07:05:49 +00:00
err := work.Run()
2014-10-31 23:18:38 +00:00
if err != nil {
metric.RequestsFailed.Add(1)
2022-05-26 07:05:49 +00:00
work.SendMessage(fmt.Sprintf("error: %s", err.Error()))
2014-10-31 23:18:38 +00:00
}
// Send notification
2022-05-26 07:05:49 +00:00
go notification.Notify(work)
2022-05-26 07:05:49 +00:00
work.Close()
2014-10-31 23:18:38 +00:00
case <-w.QuitChan:
2023-10-03 18:02:57 +00:00
slog.Debug("stopping worker...", "worker", w.ID)
2014-10-31 23:18:38 +00:00
return
}
}
}()
}
// Stop tells the worker to stop listening for work requests.
// Note that the worker will only stop *after* it has finished its work.
func (w Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}