From 5e65febceb39a468fe508e3138d2b96e25af3e99 Mon Sep 17 00:00:00 2001 From: Nicolas Carlier Date: Thu, 26 May 2022 09:05:49 +0200 Subject: [PATCH] refactor(): internals rescafolding --- README.md | 4 +- pkg/api/index.go | 40 ++- pkg/api/routes.go | 2 +- .../script_resolver.go => hook/helper.go} | 2 +- pkg/hook/job.go | 256 ++++++++++++++++++ pkg/{worker/work_log.go => hook/logs.go} | 6 +- .../test/helper_test.go} | 10 +- pkg/hook/test/job_test.go | 96 +++++++ pkg/{worker => hook}/test/test_error.sh | 0 pkg/{worker => hook}/test/test_simple.sh | 3 + pkg/{worker => hook}/test/test_timeout.sh | 0 pkg/hook/types.go | 26 ++ pkg/middleware/cors.go | 2 +- pkg/model/work_request.go | 132 --------- pkg/notification/http_notifier.go | 13 +- pkg/notification/notifier.go | 11 +- pkg/notification/smtp_notifier.go | 23 +- pkg/notification/types.go | 9 + pkg/worker/dispatcher.go | 9 +- pkg/worker/test/work_runner_test.go | 72 ----- pkg/worker/types.go | 22 ++ pkg/worker/work_runner.go | 122 --------- pkg/worker/worker.go | 19 +- scripts/async.sh | 10 + scripts/echo.sh | 2 +- scripts/long.sh | 12 + tooling/bench/simple.js | 16 ++ 27 files changed, 524 insertions(+), 395 deletions(-) rename pkg/{worker/script_resolver.go => hook/helper.go} (96%) create mode 100644 pkg/hook/job.go rename pkg/{worker/work_log.go => hook/logs.go} (72%) rename pkg/{worker/test/script_resolver_test.go => hook/test/helper_test.go} (68%) create mode 100644 pkg/hook/test/job_test.go rename pkg/{worker => hook}/test/test_error.sh (100%) rename pkg/{worker => hook}/test/test_simple.sh (65%) rename pkg/{worker => hook}/test/test_timeout.sh (100%) create mode 100644 pkg/hook/types.go delete mode 100644 pkg/model/work_request.go create mode 100644 pkg/notification/types.go delete mode 100644 pkg/worker/test/work_runner_test.go create mode 100644 pkg/worker/types.go delete mode 100644 pkg/worker/work_runner.go create mode 100755 scripts/async.sh create mode 100755 scripts/long.sh create mode 100644 tooling/bench/simple.js diff --git a/README.md b/README.md index 0f07ba0..8628521 100644 --- a/README.md +++ b/README.md @@ -158,7 +158,7 @@ The script: ```bash #!/bin/bash -echo "Hook information: name=$hook_name, id=$hook_id" +echo "Hook information: name=$hook_name, id=$hook_id, method=$hook_method" echo "Query parameter: foo=$foo" echo "Header parameter: user-agent=$user_agent" echo "Script parameters: $1" @@ -168,7 +168,7 @@ The result: ```bash $ curl --data @test.json http://localhost:8080/echo?foo=bar -Hook information: name=echo, id=1 +Hook information: name=echo, id=1, method=POST Query parameter: foo=bar Header parameter: user-agent=curl/7.52.1 Script parameter: {"message": "this is a test"} diff --git a/pkg/api/index.go b/pkg/api/index.go index 64a7fb3..f826773 100644 --- a/pkg/api/index.go +++ b/pkg/api/index.go @@ -12,8 +12,8 @@ import ( "strings" "github.com/ncarlier/webhookd/pkg/config" + "github.com/ncarlier/webhookd/pkg/hook" "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/model" "github.com/ncarlier/webhookd/pkg/worker" ) @@ -56,13 +56,13 @@ func triggerWebhook(w http.ResponseWriter, r *http.Request) { return } - // Get script location - p := strings.TrimPrefix(r.URL.Path, "/") - if p == "" { + // Get hook location + hookName := strings.TrimPrefix(r.URL.Path, "/") + if hookName == "" { infoHandler(w, r) return } - script, err := worker.ResolveScript(scriptDir, p) + _, err := hook.ResolveScript(scriptDir, hookName) if err != nil { logger.Error.Println(err.Error()) http.Error(w, "hook not found", http.StatusNotFound) @@ -97,10 +97,23 @@ func triggerWebhook(w http.ResponseWriter, r *http.Request) { // Create work timeout := atoiFallback(r.Header.Get("X-Hook-Timeout"), defaultTimeout) - work := model.NewWorkRequest(p, script, string(body), outputDir, params, timeout) + job, err := hook.NewHookJob(&hook.Request{ + Name: hookName, + Method: r.Method, + Payload: string(body), + Args: params, + Timeout: timeout, + BaseDir: scriptDir, + OutputDir: outputDir, + }) + if err != nil { + logger.Error.Printf("error creating hook job: %v", err) + http.Error(w, "unable to create hook job", http.StatusInternalServerError) + return + } // Put work in queue - worker.WorkQueue <- *work + worker.WorkQueue <- job // Use content negotiation to enable Server-Sent Events useSSE := r.Method == "GET" && r.Header.Get("Accept") == "text/event-stream" @@ -113,21 +126,18 @@ func triggerWebhook(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - w.Header().Set("X-Hook-ID", strconv.FormatUint(work.ID, 10)) + w.Header().Set("X-Hook-ID", strconv.FormatUint(job.ID(), 10)) for { - msg, open := <-work.MessageChan - + msg, open := <-job.MessageChan if !open { break } - if useSSE { fmt.Fprintf(w, "data: %s\n\n", msg) // Send SSE response } else { fmt.Fprintf(w, "%s\n", msg) // Send chunked response } - // Flush the data immediately instead of buffering it for later. flusher.Flush() } @@ -138,8 +148,8 @@ func getWebhookLog(w http.ResponseWriter, r *http.Request) { id := path.Base(r.URL.Path) // Get script location - name := path.Dir(strings.TrimPrefix(r.URL.Path, "/")) - _, err := worker.ResolveScript(scriptDir, name) + hookName := path.Dir(strings.TrimPrefix(r.URL.Path, "/")) + _, err := hook.ResolveScript(scriptDir, hookName) if err != nil { logger.Error.Println(err.Error()) http.Error(w, err.Error(), http.StatusNotFound) @@ -147,7 +157,7 @@ func getWebhookLog(w http.ResponseWriter, r *http.Request) { } // Retrieve log file - logFile, err := worker.RetrieveLogFile(id, name, outputDir) + logFile, err := hook.Logs(id, hookName, outputDir) if err != nil { logger.Error.Println(err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 1cca259..c1cfa32 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -47,7 +47,7 @@ func routes(conf *config.Config) Routes { route( "/", index, - middlewares.UseBefore(middleware.Methods("GET", "PATCH", "POST"))..., + middlewares..., ), route( staticPath, diff --git a/pkg/worker/script_resolver.go b/pkg/hook/helper.go similarity index 96% rename from pkg/worker/script_resolver.go rename to pkg/hook/helper.go index 8a3bc94..76da9e8 100644 --- a/pkg/worker/script_resolver.go +++ b/pkg/hook/helper.go @@ -1,4 +1,4 @@ -package worker +package hook import ( "errors" diff --git a/pkg/hook/job.go b/pkg/hook/job.go new file mode 100644 index 0000000..bc1a70e --- /dev/null +++ b/pkg/hook/job.go @@ -0,0 +1,256 @@ +package hook + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os" + "os/exec" + "path" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/ncarlier/webhookd/pkg/logger" + "github.com/ncarlier/webhookd/pkg/strcase" +) + +var hookID uint64 + +// Job a hook job +type Job struct { + id uint64 + name string + script string + method string + payload string + args []string + MessageChan chan []byte + timeout int + status Status + logFilename string + err error + mutex sync.Mutex +} + +// NewHookJob creates new hook job +func NewHookJob(request *Request) (*Job, error) { + script, err := ResolveScript(request.BaseDir, request.Name) + if err != nil { + return nil, err + } + job := &Job{ + id: atomic.AddUint64(&hookID, 1), + name: request.Name, + script: script, + method: request.Method, + payload: request.Payload, + args: request.Args, + timeout: request.Timeout, + MessageChan: make(chan []byte), + status: Idle, + } + job.logFilename = path.Join(request.OutputDir, fmt.Sprintf("%s_%d_%s.txt", strcase.ToSnake(job.name), job.id, time.Now().Format("20060102_1504"))) + return job, nil +} + +func (job *Job) ID() uint64 { + return job.id +} + +func (job *Job) Name() string { + return job.name +} + +func (job *Job) Err() error { + return job.err +} + +// Meta return job meta +func (job *Job) Meta() []string { + return []string{ + "hook_id=" + strconv.FormatUint(job.id, 10), + "hook_name=" + job.name, + "hook_method=" + job.method, + } +} + +// Terminate set job as terminated +func (job *Job) Terminate(err error) error { + job.mutex.Lock() + defer job.mutex.Unlock() + if err != nil { + job.status = Error + job.err = err + logger.Info.Printf("hook %s#%d done [ERROR]\n", job.Name(), job.ID()) + return err + } + job.status = Success + logger.Info.Printf("hook %s#%d done [SUCCESS]\n", job.Name(), job.ID()) + return nil +} + +// IsTerminated ask if the job is terminated +func (job *Job) IsTerminated() bool { + job.mutex.Lock() + defer job.mutex.Unlock() + return job.status == Success || job.status == Error +} + +// Status get job status +func (job *Job) Status() Status { + return job.status +} + +// StatusLabel return job status as string +func (job *Job) StatusLabel() string { + switch job.status { + case Error: + return "error" + case Success: + return "success" + case Running: + return "running" + default: + return "idle" + } +} + +// SendMessage send message to the message channel +func (job *Job) SendMessage(message string) { + job.MessageChan <- []byte(message) +} + +// Logs returns job logs filtered with the prefix +func (job *Job) Logs(prefixFilter string) string { + file, err := os.Open(job.logFilename) + if err != nil { + return err.Error() + } + defer file.Close() + + var result bytes.Buffer + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, prefixFilter) { + line = strings.TrimPrefix(line, prefixFilter) + line = strings.TrimLeft(line, " ") + result.WriteString(line + "\n") + } + } + if err := scanner.Err(); err != nil { + return err.Error() + } + return result.String() +} + +// Close job message chan +func (job *Job) Close() { + close(job.MessageChan) +} + +// Run hook job +func (job *Job) Run() error { + if job.status != Idle { + return fmt.Errorf("unable to run job: status=%s", job.StatusLabel()) + } + job.status = Running + logger.Info.Printf("hook %s#%d started...\n", job.name, job.id) + logger.Debug.Printf("hook %s#%d script: %s\n", job.name, job.id, job.script) + logger.Debug.Printf("hook %s#%d parameter: %v\n", job.name, job.id, job.args) + + binary, err := exec.LookPath(job.script) + if err != nil { + return job.Terminate(err) + } + + // Exec script with parameter... + cmd := exec.Command(binary, job.payload) + // with env variables and hook arguments... + cmd.Env = append(os.Environ(), job.args...) + // and hook meta... + cmd.Env = append(cmd.Env, job.Meta()...) + // using a process group... + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // Open the log file for writing + logFile, err := os.Create(job.logFilename) + if err != nil { + return job.Terminate(err) + } + defer logFile.Close() + logger.Debug.Printf("hook %s#%d output file: %s\n", job.name, job.id, logFile.Name()) + + wLogFile := bufio.NewWriter(logFile) + defer wLogFile.Flush() + + // Combine cmd stdout and stderr + outReader, err := cmd.StdoutPipe() + if err != nil { + return job.Terminate(err) + } + errReader, err := cmd.StderrPipe() + if err != nil { + return job.Terminate(err) + } + cmdReader := io.MultiReader(outReader, errReader) + + // Start the script... + err = cmd.Start() + if err != nil { + return job.Terminate(err) + } + + // Create wait group to wait for command output completion + var wg sync.WaitGroup + wg.Add(1) + + // Write script output to log file and the work message channel + go func(reader io.Reader) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + // writing to the work channel + if !job.IsTerminated() { + job.MessageChan <- []byte(line) + } else { + logger.Error.Printf("hook %s#%d is over ; unable to write more data into the channel: %s\n", job.name, job.id, line) + break + } + // write to stdout if configured + logger.Output.Println(line) + // writing to outfile + if _, err := wLogFile.WriteString(line + "\n"); err != nil { + logger.Error.Println("error while writing into the log file:", logFile.Name(), err) + break + } + } + if err := scanner.Err(); err != nil { + logger.Error.Printf("hook %s#%d is unable to read script stdout: %v\n", job.name, job.id, err) + } + wg.Done() + }(cmdReader) + + // Start timeout timer + timer := time.AfterFunc(time.Duration(job.timeout)*time.Second, func() { + logger.Warning.Printf("hook %s#%d has timed out (%ds): killing process #%d ...\n", job.name, job.id, job.timeout, cmd.Process.Pid) + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + }) + + // Wait for command output completion + wg.Wait() + + // Wait for command completion + err = cmd.Wait() + + // Stop timeout timer + timer.Stop() + + // Mark work as terminated + return job.Terminate(err) +} diff --git a/pkg/worker/work_log.go b/pkg/hook/logs.go similarity index 72% rename from pkg/worker/work_log.go rename to pkg/hook/logs.go index 6cb6370..0246976 100644 --- a/pkg/worker/work_log.go +++ b/pkg/hook/logs.go @@ -1,4 +1,4 @@ -package worker +package hook import ( "fmt" @@ -9,8 +9,8 @@ import ( "github.com/ncarlier/webhookd/pkg/strcase" ) -// RetrieveLogFile retrieve work log with its name and id -func RetrieveLogFile(id, name, base string) (*os.File, error) { +// Logs get hook log with its name and id +func Logs(id, name, base string) (*os.File, error) { logPattern := path.Join(base, fmt.Sprintf("%s_%s_*.txt", strcase.ToSnake(name), id)) files, err := filepath.Glob(logPattern) if err != nil { diff --git a/pkg/worker/test/script_resolver_test.go b/pkg/hook/test/helper_test.go similarity index 68% rename from pkg/worker/test/script_resolver_test.go rename to pkg/hook/test/helper_test.go index 400d805..b4ebf89 100644 --- a/pkg/worker/test/script_resolver_test.go +++ b/pkg/hook/test/helper_test.go @@ -4,29 +4,29 @@ import ( "testing" "github.com/ncarlier/webhookd/pkg/assert" - "github.com/ncarlier/webhookd/pkg/worker" + "github.com/ncarlier/webhookd/pkg/hook" ) func TestResolveScript(t *testing.T) { - script, err := worker.ResolveScript("../../../scripts", "../scripts/echo") + script, err := hook.ResolveScript("../../../scripts", "../scripts/echo") assert.Nil(t, err, "") assert.Equal(t, "../../../scripts/echo.sh", script, "") } func TestNotResolveScript(t *testing.T) { - _, err := worker.ResolveScript("../../scripts", "foo") + _, err := hook.ResolveScript("../../scripts", "foo") assert.NotNil(t, err, "") assert.Equal(t, "Script not found: ../../scripts/foo.sh", err.Error(), "") } func TestResolveBadScript(t *testing.T) { - _, err := worker.ResolveScript("../../scripts", "../tests/test_simple") + _, err := hook.ResolveScript("../../scripts", "../tests/test_simple") assert.NotNil(t, err, "") assert.Equal(t, "Invalid script path: ../tests/test_simple.sh", err.Error(), "") } func TestResolveScriptWithExtension(t *testing.T) { - _, err := worker.ResolveScript("../../scripts", "node.js") + _, err := hook.ResolveScript("../../scripts", "node.js") assert.NotNil(t, err, "") assert.Equal(t, "Script not found: ../../scripts/node.js", err.Error(), "") } diff --git a/pkg/hook/test/job_test.go b/pkg/hook/test/job_test.go new file mode 100644 index 0000000..d390a90 --- /dev/null +++ b/pkg/hook/test/job_test.go @@ -0,0 +1,96 @@ +package test + +import ( + "os" + "strconv" + "testing" + + "github.com/ncarlier/webhookd/pkg/assert" + "github.com/ncarlier/webhookd/pkg/hook" + "github.com/ncarlier/webhookd/pkg/logger" +) + +func printJobMessages(job *hook.Job) { + go func() { + for { + msg, open := <-job.MessageChan + if !open { + break + } + logger.Info.Println(string(msg)) + } + }() +} + +func TestHookJob(t *testing.T) { + logger.Init("debug", "out") + req := &hook.Request{ + Name: "test_simple", + Method: "GET", + Payload: "{\"foo\": \"bar\"}", + Args: []string{ + "name=foo", + "user_agent=test", + }, + Timeout: 5, + BaseDir: "../test", + OutputDir: os.TempDir(), + } + job, err := hook.NewHookJob(req) + assert.Nil(t, err, "") + assert.NotNil(t, job, "") + printJobMessages(job) + err = job.Run() + assert.Nil(t, err, "") + assert.Equal(t, job.Status(), hook.Success, "") + assert.Equal(t, job.Logs("notify:"), "OK\n", "") + + // Test that we can retrieve log file afterward + id := strconv.FormatUint(job.ID(), 10) + logFile, err := hook.Logs(id, "test", os.TempDir()) + assert.Nil(t, err, "Log file should exists") + defer logFile.Close() + assert.NotNil(t, logFile, "Log file should be retrieve") +} + +func TestWorkRunnerWithError(t *testing.T) { + logger.Init("debug") + req := &hook.Request{ + Name: "test_error", + Method: "POST", + Payload: "", + Args: []string{}, + Timeout: 5, + BaseDir: "../test", + OutputDir: os.TempDir(), + } + job, err := hook.NewHookJob(req) + assert.Nil(t, err, "") + assert.NotNil(t, job, "") + printJobMessages(job) + err = job.Run() + assert.NotNil(t, err, "") + assert.Equal(t, job.Status(), hook.Error, "") + assert.Equal(t, "exit status 1", err.Error(), "") +} + +func TestWorkRunnerWithTimeout(t *testing.T) { + logger.Init("debug") + req := &hook.Request{ + Name: "test_timeout", + Method: "POST", + Payload: "", + Args: []string{}, + Timeout: 1, + BaseDir: "../test", + OutputDir: os.TempDir(), + } + job, err := hook.NewHookJob(req) + assert.Nil(t, err, "") + assert.NotNil(t, job, "") + printJobMessages(job) + err = job.Run() + assert.NotNil(t, err, "") + assert.Equal(t, job.Status(), hook.Error, "") + assert.Equal(t, "signal: killed", err.Error(), "") +} diff --git a/pkg/worker/test/test_error.sh b/pkg/hook/test/test_error.sh similarity index 100% rename from pkg/worker/test/test_error.sh rename to pkg/hook/test/test_error.sh diff --git a/pkg/worker/test/test_simple.sh b/pkg/hook/test/test_simple.sh similarity index 65% rename from pkg/worker/test/test_simple.sh rename to pkg/hook/test/test_simple.sh index af38ea9..4c84724 100755 --- a/pkg/worker/test/test_simple.sh +++ b/pkg/hook/test/test_simple.sh @@ -6,6 +6,9 @@ echo "Testing parameters..." [ -z "$name" ] && echo "Name variable undefined" && exit 1 [ -z "$user_agent" ] && echo "User-Agent variable undefined" && exit 1 [ "$user_agent" != "test" ] && echo "Invalid User-Agent variable: $user_agent" && exit 1 +[ -z "$hook_id" ] && echo "Hook ID variable undefined" && exit 1 +[ "$hook_name" != "test_simple" ] && echo "Invalid hook name variable: $hook_name" && exit 1 +[ "$hook_method" != "GET" ] && echo "Invalid hook method variable: $hook_method" && exit 1 echo "Testing payload..." [ -z "$1" ] && echo "Payload undefined" && exit 1 diff --git a/pkg/worker/test/test_timeout.sh b/pkg/hook/test/test_timeout.sh similarity index 100% rename from pkg/worker/test/test_timeout.sh rename to pkg/hook/test/test_timeout.sh diff --git a/pkg/hook/types.go b/pkg/hook/types.go new file mode 100644 index 0000000..4adb64d --- /dev/null +++ b/pkg/hook/types.go @@ -0,0 +1,26 @@ +package hook + +// Status is the status of a hook +type Status int + +const ( + // Idle means that the hook is not yet started + Idle Status = iota + // Running means that the hook is running + Running + // Success means that the hook over + Success + // Error means that the hook is over but in error + Error +) + +// Request is a hook request +type Request struct { + Name string + Method string + Payload string + Args []string + Timeout int + BaseDir string + OutputDir string +} diff --git a/pkg/middleware/cors.go b/pkg/middleware/cors.go index 0a58312..f61b130 100644 --- a/pkg/middleware/cors.go +++ b/pkg/middleware/cors.go @@ -8,7 +8,7 @@ import ( func Cors(inner http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "PATCH, POST, GET, OPTIONS") + w.Header().Set("Access-Control-Allow-Methods", "*") w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, Authorization") if r.Method != "OPTIONS" { diff --git a/pkg/model/work_request.go b/pkg/model/work_request.go deleted file mode 100644 index a4c191b..0000000 --- a/pkg/model/work_request.go +++ /dev/null @@ -1,132 +0,0 @@ -package model - -import ( - "bufio" - "bytes" - "fmt" - "os" - "path" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/strcase" -) - -var workID uint64 - -// WorkStatus is the status of a workload -type WorkStatus int - -const ( - // Idle means that the work is not yet started - Idle WorkStatus = iota - // Running means that the work is running - Running - // Success means that the work over - Success - // Error means that the work is over but in error - Error -) - -// WorkRequest is a request of work for a worker -type WorkRequest struct { - ID uint64 - Name string - Script string - Payload string - Args []string - MessageChan chan []byte - Timeout int - Status WorkStatus - LogFilename string - Err error - mutex sync.Mutex -} - -// NewWorkRequest creates new work request -func NewWorkRequest(name, script, payload, output string, args []string, timeout int) *WorkRequest { - w := &WorkRequest{ - ID: atomic.AddUint64(&workID, 1), - Name: name, - Script: script, - Payload: payload, - Args: args, - Timeout: timeout, - MessageChan: make(chan []byte), - Status: Idle, - } - w.LogFilename = path.Join(output, fmt.Sprintf("%s_%d_%s.txt", strcase.ToSnake(w.Name), w.ID, time.Now().Format("20060102_1504"))) - return w -} - -// Meta return work request meta -func (wr *WorkRequest) Meta() []string { - return []string{ - "hook_id=" + strconv.FormatUint(wr.ID, 10), - "hook_name=" + wr.Name, - } -} - -// Terminate set work request as terminated -func (wr *WorkRequest) Terminate(err error) error { - wr.mutex.Lock() - defer wr.mutex.Unlock() - if err != nil { - wr.Status = Error - wr.Err = err - logger.Info.Printf("hook %s#%d done [ERROR]\n", wr.Name, wr.ID) - return err - } - wr.Status = Success - logger.Info.Printf("hook %s#%d done [SUCCESS]\n", wr.Name, wr.ID) - return nil -} - -// IsTerminated ask if the work request is terminated -func (wr *WorkRequest) IsTerminated() bool { - wr.mutex.Lock() - defer wr.mutex.Unlock() - return wr.Status == Success || wr.Status == Error -} - -// StatusLabel return work status as string -func (wr *WorkRequest) StatusLabel() string { - switch wr.Status { - case Error: - return "error" - case Success: - return "success" - case Running: - return "running" - default: - return "idle" - } -} - -// GetLogContent returns work logs filtered with the prefix -func (wr *WorkRequest) GetLogContent(prefixFilter string) string { - file, err := os.Open(wr.LogFilename) - if err != nil { - return err.Error() - } - defer file.Close() - - var result bytes.Buffer - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := scanner.Text() - if strings.HasPrefix(line, prefixFilter) { - line = strings.TrimPrefix(line, prefixFilter) - line = strings.TrimLeft(line, " ") - result.WriteString(line + "\n") - } - } - if err := scanner.Err(); err != nil { - return err.Error() - } - return result.String() -} diff --git a/pkg/notification/http_notifier.go b/pkg/notification/http_notifier.go index 12daace..c2ef679 100644 --- a/pkg/notification/http_notifier.go +++ b/pkg/notification/http_notifier.go @@ -9,7 +9,6 @@ import ( "strings" "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/model" ) type notifPayload struct { @@ -34,18 +33,18 @@ func newHTTPNotifier(uri *url.URL) *HTTPNotifier { } // Notify send a notification to a HTTP endpoint. -func (n *HTTPNotifier) Notify(work *model.WorkRequest) error { - payload := work.GetLogContent(n.PrefixFilter) +func (n *HTTPNotifier) Notify(result HookResult) error { + payload := result.Logs(n.PrefixFilter) if strings.TrimSpace(payload) == "" { // Nothing to notify, abort return nil } notif := ¬ifPayload{ - ID: strconv.FormatUint(work.ID, 10), - Name: work.Name, + ID: strconv.FormatUint(result.ID(), 10), + Name: result.Name(), Text: payload, - Error: work.Err, + Error: result.Err(), } notifJSON, err := json.Marshal(notif) if err != nil { @@ -64,6 +63,6 @@ func (n *HTTPNotifier) Notify(work *model.WorkRequest) error { return err } resp.Body.Close() - logger.Info.Printf("job %s#%d notification sent to %s\n", work.Name, work.ID, n.URL.String()) + logger.Info.Printf("job %s#%d notification sent to %s\n", result.Name(), result.ID(), n.URL.String()) return nil } diff --git a/pkg/notification/notifier.go b/pkg/notification/notifier.go index 9d95b0d..3236307 100644 --- a/pkg/notification/notifier.go +++ b/pkg/notification/notifier.go @@ -6,23 +6,22 @@ import ( "strings" "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/model" ) // Notifier is able to send a notification. type Notifier interface { - Notify(work *model.WorkRequest) error + Notify(result HookResult) error } var notifier Notifier -// Notify is the global method to notify work -func Notify(work *model.WorkRequest) { +// Notify is the global method to notify hook result +func Notify(result HookResult) { if notifier == nil { return } - if err := notifier.Notify(work); err != nil { - logger.Error.Printf("unable to send notification for webhook %s#%d: %v\n", work.Name, work.ID, err) + if err := notifier.Notify(result); err != nil { + logger.Error.Printf("unable to send notification for webhook %s#%d: %v\n", result.Name(), result.ID(), err) } } diff --git a/pkg/notification/smtp_notifier.go b/pkg/notification/smtp_notifier.go index 24a3a4a..b39fed6 100644 --- a/pkg/notification/smtp_notifier.go +++ b/pkg/notification/smtp_notifier.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/model" ) // SMTPNotifier is able to send notification to a email destination. @@ -41,15 +40,15 @@ func newSMTPNotifier(uri *url.URL) *SMTPNotifier { } } -func (n *SMTPNotifier) buildEmailPayload(work *model.WorkRequest) string { +func (n *SMTPNotifier) buildEmailPayload(result HookResult) string { // Get email body - body := work.GetLogContent(n.PrefixFilter) + body := result.Logs(n.PrefixFilter) if strings.TrimSpace(body) == "" { return "" } // Build email subject - subject := buildSubject(n.Subject, work) + subject := buildSubject(n.Subject, result) // Build email headers headers := make(map[string]string) @@ -67,9 +66,9 @@ func (n *SMTPNotifier) buildEmailPayload(work *model.WorkRequest) string { } // Notify send a notification to a email destination. -func (n *SMTPNotifier) Notify(work *model.WorkRequest) error { +func (n *SMTPNotifier) Notify(result HookResult) error { hostname, _, _ := net.SplitHostPort(n.Host) - payload := n.buildEmailPayload(work) + payload := n.buildEmailPayload(result) if payload == "" { // Nothing to notify, abort return nil @@ -127,15 +126,15 @@ func (n *SMTPNotifier) Notify(work *model.WorkRequest) error { return err } - logger.Info.Printf("job %s#%d notification sent to %s\n", work.Name, work.ID, n.To) + logger.Info.Printf("job %s#%d notification sent to %s\n", result.Name(), result.ID(), n.To) // Send the QUIT command and close the connection. return client.Quit() } -func buildSubject(template string, work *model.WorkRequest) string { - result := strings.ReplaceAll(template, "{name}", work.Name) - result = strings.ReplaceAll(result, "{id}", strconv.FormatUint(uint64(work.ID), 10)) - result = strings.ReplaceAll(result, "{status}", work.StatusLabel()) - return result +func buildSubject(template string, result HookResult) string { + subject := strings.ReplaceAll(template, "{name}", result.Name()) + subject = strings.ReplaceAll(subject, "{id}", strconv.FormatUint(uint64(result.ID()), 10)) + subject = strings.ReplaceAll(subject, "{status}", result.StatusLabel()) + return subject } diff --git a/pkg/notification/types.go b/pkg/notification/types.go new file mode 100644 index 0000000..9e6554e --- /dev/null +++ b/pkg/notification/types.go @@ -0,0 +1,9 @@ +package notification + +type HookResult interface { + ID() uint64 + Name() string + Logs(filter string) string + StatusLabel() string + Err() error +} diff --git a/pkg/worker/dispatcher.go b/pkg/worker/dispatcher.go index dfd8a55..aed343e 100644 --- a/pkg/worker/dispatcher.go +++ b/pkg/worker/dispatcher.go @@ -2,19 +2,18 @@ package worker import ( "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/model" ) // WorkerQueue is the global queue of Workers -var WorkerQueue chan chan model.WorkRequest +var WorkerQueue chan chan Work // WorkQueue is the global queue of work to dispatch -var WorkQueue = make(chan model.WorkRequest, 100) +var WorkQueue = make(chan Work, 100) // StartDispatcher is charged to start n workers. func StartDispatcher(nworkers int) { // First, initialize the channel we are going to but the workers' work channels into. - WorkerQueue = make(chan chan model.WorkRequest, nworkers) + WorkerQueue = make(chan chan Work, nworkers) // Now, create all of our workers. for i := 0; i < nworkers; i++ { @@ -30,7 +29,7 @@ func StartDispatcher(nworkers int) { go func() { worker := <-WorkerQueue - logger.Debug.Printf("dispatching hook request: %s#%d", work.Name, work.ID) + logger.Debug.Printf("dispatching hook request: %s#%d", work.Name(), work.ID()) worker <- work }() } diff --git a/pkg/worker/test/work_runner_test.go b/pkg/worker/test/work_runner_test.go deleted file mode 100644 index 91df41c..0000000 --- a/pkg/worker/test/work_runner_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package test - -import ( - "os" - "strconv" - "testing" - - "github.com/ncarlier/webhookd/pkg/assert" - "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/model" - "github.com/ncarlier/webhookd/pkg/worker" -) - -func printWorkMessages(work *model.WorkRequest) { - go func() { - for { - msg, open := <-work.MessageChan - if !open { - break - } - logger.Info.Println(string(msg)) - } - }() -} - -func TestWorkRunner(t *testing.T) { - logger.Init("debug", "out") - script := "./test_simple.sh" - args := []string{ - "name=foo", - "user_agent=test", - } - payload := "{\"foo\": \"bar\"}" - work := model.NewWorkRequest("test", script, payload, os.TempDir(), args, 5) - assert.NotNil(t, work, "") - printWorkMessages(work) - err := worker.Run(work) - assert.Nil(t, err, "") - assert.Equal(t, work.Status, model.Success, "") - assert.Equal(t, work.GetLogContent("notify:"), "OK\n", "") - - // Test that we can retrieve log file afterward - id := strconv.FormatUint(work.ID, 10) - logFile, err := worker.RetrieveLogFile(id, "test", os.TempDir()) - defer logFile.Close() - assert.Nil(t, err, "Log file should exists") - assert.NotNil(t, logFile, "Log file should be retrieve") -} - -func TestWorkRunnerWithError(t *testing.T) { - logger.Init("debug") - script := "./test_error.sh" - work := model.NewWorkRequest("test", script, "", os.TempDir(), []string{}, 5) - assert.NotNil(t, work, "") - printWorkMessages(work) - err := worker.Run(work) - assert.NotNil(t, err, "") - assert.Equal(t, work.Status, model.Error, "") - assert.Equal(t, "exit status 1", err.Error(), "") -} - -func TestWorkRunnerWithTimeout(t *testing.T) { - logger.Init("debug") - script := "./test_timeout.sh" - work := model.NewWorkRequest("test", script, "", os.TempDir(), []string{}, 1) - assert.NotNil(t, work, "") - printWorkMessages(work) - err := worker.Run(work) - assert.NotNil(t, err, "") - assert.Equal(t, work.Status, model.Error, "") - assert.Equal(t, "signal: killed", err.Error(), "") -} diff --git a/pkg/worker/types.go b/pkg/worker/types.go new file mode 100644 index 0000000..e9d1c27 --- /dev/null +++ b/pkg/worker/types.go @@ -0,0 +1,22 @@ +package worker + +// ChanWriter is a simple writer to a channel of byte. +type ChanWriter struct { + ByteChan chan []byte +} + +func (c *ChanWriter) Write(p []byte) (int, error) { + c.ByteChan <- p + return len(p), nil +} + +type Work interface { + ID() uint64 + Name() string + Run() error + Close() + SendMessage(message string) + Logs(filter string) string + StatusLabel() string + Err() error +} diff --git a/pkg/worker/work_runner.go b/pkg/worker/work_runner.go deleted file mode 100644 index 98274db..0000000 --- a/pkg/worker/work_runner.go +++ /dev/null @@ -1,122 +0,0 @@ -package worker - -import ( - "bufio" - "io" - "os" - "os/exec" - "sync" - "syscall" - "time" - - "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/model" -) - -// ChanWriter is a simple writer to a channel of byte. -type ChanWriter struct { - ByteChan chan []byte -} - -func (c *ChanWriter) Write(p []byte) (int, error) { - c.ByteChan <- p - return len(p), nil -} - -// Run work request -func Run(work *model.WorkRequest) error { - work.Status = model.Running - logger.Info.Printf("hook %s#%d started...\n", work.Name, work.ID) - logger.Debug.Printf("hook %s#%d script: %s\n", work.Name, work.ID, work.Script) - logger.Debug.Printf("hook %s#%d parameter: %v\n", work.Name, work.ID, work.Args) - - binary, err := exec.LookPath(work.Script) - if err != nil { - return work.Terminate(err) - } - - // Exec script with parameter... - cmd := exec.Command(binary, work.Payload) - // with env variables and hook arguments... - cmd.Env = append(os.Environ(), work.Args...) - // and hook meta... - cmd.Env = append(cmd.Env, work.Meta()...) - // using a process group... - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - // Open the log file for writing - logFile, err := os.Create(work.LogFilename) - if err != nil { - return work.Terminate(err) - } - defer logFile.Close() - logger.Debug.Printf("hook %s#%d output file: %s\n", work.Name, work.ID, logFile.Name()) - - wLogFile := bufio.NewWriter(logFile) - defer wLogFile.Flush() - - // Combine cmd stdout and stderr - outReader, err := cmd.StdoutPipe() - if err != nil { - return work.Terminate(err) - } - errReader, err := cmd.StderrPipe() - if err != nil { - return work.Terminate(err) - } - cmdReader := io.MultiReader(outReader, errReader) - - // Start the script... - err = cmd.Start() - if err != nil { - return work.Terminate(err) - } - - // Create wait group to wait for command output completion - var wg sync.WaitGroup - wg.Add(1) - - // Write script output to log file and the work message channel - go func(reader io.Reader) { - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - line := scanner.Text() - // writing to the work channel - if !work.IsTerminated() { - work.MessageChan <- []byte(line) - } else { - logger.Error.Printf("hook %s#%d is over ; unable to write more data into the channel: %s\n", work.Name, work.ID, line) - break - } - // write to stdout if configured - logger.Output.Println(line) - // writing to outfile - if _, err := wLogFile.WriteString(line + "\n"); err != nil { - logger.Error.Println("error while writing into the log file:", logFile.Name(), err) - break - } - } - if err := scanner.Err(); err != nil { - logger.Error.Printf("hook %s#%d is unable to read script stdout: %v\n", work.Name, work.ID, err) - } - wg.Done() - }(cmdReader) - - // Start timeout timer - timer := time.AfterFunc(time.Duration(work.Timeout)*time.Second, func() { - logger.Warning.Printf("hook %s#%d has timed out (%ds): killing process #%d ...\n", work.Name, work.ID, work.Timeout, cmd.Process.Pid) - syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) - }) - - // Wait for command output completion - wg.Wait() - - // Wait for command completion - err = cmd.Wait() - - // Stop timeout timer - timer.Stop() - - // Mark work as terminated - return work.Terminate(err) -} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index ad4a1ee..d7de14e 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -6,16 +6,15 @@ import ( "github.com/ncarlier/webhookd/pkg/metric" "github.com/ncarlier/webhookd/pkg/logger" - "github.com/ncarlier/webhookd/pkg/model" "github.com/ncarlier/webhookd/pkg/notification" ) // NewWorker creates, and returns a new Worker object. -func NewWorker(id int, workerQueue chan chan model.WorkRequest) Worker { +func NewWorker(id int, workerQueue chan chan Work) Worker { // Create, and return the worker. worker := Worker{ ID: id, - Work: make(chan model.WorkRequest), + Work: make(chan Work), WorkerQueue: workerQueue, QuitChan: make(chan bool), } @@ -26,8 +25,8 @@ func NewWorker(id int, workerQueue chan chan model.WorkRequest) Worker { // Worker is a go routine in charge of executing a work. type Worker struct { ID int - Work chan model.WorkRequest - WorkerQueue chan chan model.WorkRequest + Work chan Work + WorkerQueue chan chan Work QuitChan chan bool } @@ -42,17 +41,17 @@ func (w Worker) Start() { select { case work := <-w.Work: // Receive a work request. - logger.Debug.Printf("worker #%d received hook request: %s#%d\n", w.ID, work.Name, work.ID) + logger.Debug.Printf("worker #%d received hook request: %s#%d\n", w.ID, work.Name(), work.ID()) metric.Requests.Add(1) - err := Run(&work) + err := work.Run() if err != nil { metric.RequestsFailed.Add(1) - work.MessageChan <- []byte(fmt.Sprintf("error: %s", err.Error())) + work.SendMessage(fmt.Sprintf("error: %s", err.Error())) } // Send notification - go notification.Notify(&work) + go notification.Notify(work) - close(work.MessageChan) + work.Close() case <-w.QuitChan: logger.Debug.Printf("stopping worker #%d...\n", w.ID) return diff --git a/scripts/async.sh b/scripts/async.sh new file mode 100755 index 0000000..3575c79 --- /dev/null +++ b/scripts/async.sh @@ -0,0 +1,10 @@ +#!/bin/bash + + +echo "Starting background job..." + +nohup ./scripts/long.sh >/tmp/long.log 2>&1 & + +echo "Background job started." + + diff --git a/scripts/echo.sh b/scripts/echo.sh index 610a760..02863c0 100755 --- a/scripts/echo.sh +++ b/scripts/echo.sh @@ -4,7 +4,7 @@ echo "This is a simple echo hook." -echo "Hook information: name=$hook_name, id=$hook_id" +echo "Hook information: name=$hook_name, id=$hook_id, method=$hook_method" echo "Command result: hostname=`hostname`" diff --git a/scripts/long.sh b/scripts/long.sh new file mode 100755 index 0000000..620f342 --- /dev/null +++ b/scripts/long.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +echo "Running long script..." + +for i in {1..20}; do + sleep 1 + echo "running ${i} ..." +done + +echo "Long script end" + +exit 0 diff --git a/tooling/bench/simple.js b/tooling/bench/simple.js new file mode 100644 index 0000000..a186357 --- /dev/null +++ b/tooling/bench/simple.js @@ -0,0 +1,16 @@ +import http from 'k6/http'; +import { check, sleep } from 'k6'; + +export const options = { + stages: [ + { duration: '30s', target: 20 }, + { duration: '1m', target: 10 }, + { duration: '20s', target: 0 }, + ], +}; + +export default function () { + const res = http.get('http://localhost:8080/echo'); + check(res, { 'status was 200': (r) => r.status == 200 }); + sleep(1); +} \ No newline at end of file