feat: Create worker queue.

This commit is contained in:
Nicolas Carlier 2014-10-31 23:18:38 +00:00
parent deba0ef462
commit b71c506587
8 changed files with 216 additions and 103 deletions

View File

@ -1,5 +1,5 @@
.SILENT :
.PHONY : volume build clean run shell test
.PHONY : volume dev build clean run shell test dist
USERNAME:=ncarlier
APPNAME:=webhookd
@ -18,16 +18,16 @@ define docker_run_flags
-i -t
endef
ifdef DEVMODE
docker_run_flags += --volumes-from $(APPNAME)_volumes
endif
all: build
volume:
echo "Building $(APPNAME) volumes..."
sudo docker run -v $(PWD):/opt/$(APPNAME) -v ~/var/$(APPNAME):/var/opt/$(APPNAME) --name $(APPNAME)_volumes busybox true
dev:
$(eval docker_run_flags += --volumes-from $(APPNAME)_volumes)
echo "DEVMODE: Using volumes from $(APPNAME)_volumes"
build:
echo "Building $(IMAGE) docker image..."
sudo docker build --rm -t $(IMAGE) .

View File

@ -102,7 +102,7 @@ func (n *HttpNotifier) Notify(subject string, text string, attachfile string) {
// Check the response
if res.StatusCode != http.StatusOK {
log.Println("bad status: %s", res.Status)
log.Println("bad status: ", res.Status)
log.Println(res.Body)
return
}

View File

@ -16,7 +16,7 @@ func CompressFile(filename string) (zipfile string, err error) {
}
out, err := os.Create(zipfile)
if err != nil {
log.Println("Unable to create zip file", err)
log.Println("Unable to create gzip file", err)
return
}
@ -29,9 +29,9 @@ func CompressFile(filename string) (zipfile string, err error) {
_, err = bufin.WriteTo(gw)
if err != nil {
log.Println("Unable to write into the zip file", err)
log.Println("Unable to write into the gzip file", err)
return
}
log.Println("Zip file created: ", zipfile)
log.Println("Gzip file created: ", zipfile)
return
}

View File

@ -5,105 +5,32 @@ import (
"fmt"
"github.com/gorilla/mux"
"github.com/ncarlier/webhookd/hook"
"github.com/ncarlier/webhookd/notification"
"github.com/ncarlier/webhookd/tools"
"github.com/ncarlier/webhookd/worker"
"log"
"net/http"
"os"
"os/exec"
"path"
)
var (
laddr = flag.String("l", ":8080", "HTTP service address (e.g.address, ':8080')")
workingdir = os.Getenv("APP_WORKING_DIR")
scriptsdir = os.Getenv("APP_SCRIPTS_DIR")
LAddr = flag.String("l", ":8080", "HTTP service address (e.g.address, ':8080')")
NWorkers = flag.Int("n", 2, "The number of workers to start")
)
type HookContext struct {
Hook string
Action string
args []string
}
func Notify(subject string, text string, outfilename string) {
var notifier, err = notification.NotifierFactory()
if err != nil {
log.Println(err)
return
}
if notifier == nil {
log.Println("Notification provider not found.")
return
}
var zipfile string
if outfilename != "" {
zipfile, err = tools.CompressFile(outfilename)
if err != nil {
log.Println(err)
zipfile = outfilename
}
}
notifier.Notify(subject, text, zipfile)
}
func RunScript(w http.ResponseWriter, context *HookContext) {
scriptname := path.Join(scriptsdir, context.Hook, fmt.Sprintf("%s.sh", context.Action))
log.Println("Exec script: ", scriptname)
cmd := exec.Command(scriptname, context.args...)
var ErrorHandler func(err error, out string)
ErrorHandler = func(err error, out string) {
subject := fmt.Sprintf("Webhook %s/%s FAILED.", context.Hook, context.Action)
Notify(subject, err.Error(), out)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
// open the out file for writing
outfilename := path.Join(workingdir, fmt.Sprintf("%s-%s.txt", context.Hook, context.Action))
outfile, err := os.Create(outfilename)
if err != nil {
ErrorHandler(err, "")
return
}
defer outfile.Close()
cmd.Stdout = outfile
err = cmd.Start()
if err != nil {
ErrorHandler(err, "")
return
}
err = cmd.Wait()
if err != nil {
ErrorHandler(err, outfilename)
return
}
subject := fmt.Sprintf("Webhook %s/%s SUCCEEDED.", context.Hook, context.Action)
Notify(subject, "See attached file for logs.", outfilename)
fmt.Fprintf(w, subject)
}
func Handler(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
context := new(HookContext)
context.Hook = params["hookname"]
context.Action = params["action"]
hookname := params["hookname"]
action := params["action"]
var record, err = hook.RecordFactory(context.Hook)
// Get hook decoder
record, err := hook.RecordFactory(hookname)
if err != nil {
log.Println(err.Error())
http.Error(w, err.Error(), http.StatusNotFound)
return
}
log.Println("Using hook: ", context.Hook)
fmt.Printf("Using hook %s with action %s.\n", hookname, action)
// Decode request
err = record.Decode(r)
if err != nil {
log.Println(err.Error())
@ -111,27 +38,32 @@ func Handler(w http.ResponseWriter, r *http.Request) {
return
}
log.Println("Extracted data: ", record.GetURL(), record.GetName())
context.args = []string{record.GetURL(), record.GetName()}
// Create work
work := new(worker.WorkRequest)
work.Name = hookname
work.Action = action
fmt.Println("Extracted data: ", record.GetURL(), record.GetName())
work.Args = []string{record.GetURL(), record.GetName()}
RunScript(w, context)
//Put work in queue
worker.WorkQueue <- *work
fmt.Printf("Work request queued: %s/%s\n", hookname, action)
fmt.Fprintf(w, "Action %s of hook %s queued.", action, hookname)
}
func main() {
if workingdir == "" {
workingdir = os.TempDir()
}
if scriptsdir == "" {
scriptsdir = "scripts"
}
flag.Parse()
// Start the dispatcher.
fmt.Println("Starting the dispatcher")
worker.StartDispatcher(*NWorkers)
rtr := mux.NewRouter()
rtr.HandleFunc("/{hookname:[a-z]+}/{action:[a-z]+}", Handler).Methods("POST")
http.Handle("/", rtr)
log.Println("webhookd server listening...")
log.Fatal(http.ListenAndServe(*laddr, nil))
fmt.Println("webhookd server listening...")
log.Fatal(http.ListenAndServe(*LAddr, nil))
}

35
src/worker/dispatcher.go Normal file
View File

@ -0,0 +1,35 @@
package worker
import (
"fmt"
)
var WorkerQueue chan chan WorkRequest
var WorkQueue = make(chan WorkRequest, 100)
func StartDispatcher(nworkers int) {
// First, initialize the channel we are going to but the workers' work channels into.
WorkerQueue = make(chan chan WorkRequest, nworkers)
// Now, create all of our workers.
for i := 0; i < nworkers; i++ {
fmt.Println("Starting worker", i+1)
worker := NewWorker(i+1, WorkerQueue)
worker.Start()
}
go func() {
for {
select {
case work := <-WorkQueue:
fmt.Println("Received work requeust")
go func() {
worker := <-WorkerQueue
fmt.Println("Dispatching work request")
worker <- work
}()
}
}
}()
}

View File

@ -0,0 +1,50 @@
package worker
import (
"fmt"
"os"
"os/exec"
"path"
)
var (
workingdir = os.Getenv("APP_WORKING_DIR")
scriptsdir = os.Getenv("APP_SCRIPTS_DIR")
)
func RunScript(work *WorkRequest) (string, error) {
if workingdir == "" {
workingdir = os.TempDir()
}
if scriptsdir == "" {
scriptsdir = "scripts"
}
scriptname := path.Join(scriptsdir, work.Name, fmt.Sprintf("%s.sh", work.Action))
fmt.Println("Exec script: ", scriptname)
// Exec script...
cmd := exec.Command(scriptname, work.Args...)
// Open the out file for writing
outfilename := path.Join(workingdir, fmt.Sprintf("%s-%s.txt", work.Name, work.Action))
outfile, err := os.Create(outfilename)
if err != nil {
return "", err
}
defer outfile.Close()
cmd.Stdout = outfile
err = cmd.Start()
if err != nil {
return "", err
}
err = cmd.Wait()
if err != nil {
return "", err
}
return outfilename, nil
}

View File

@ -0,0 +1,7 @@
package worker
type WorkRequest struct {
Name string
Action string
Args []string
}

89
src/worker/worker.go Normal file
View File

@ -0,0 +1,89 @@
package worker
import (
"fmt"
"github.com/ncarlier/webhookd/notification"
"github.com/ncarlier/webhookd/tools"
)
// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest) Worker {
// Create, and return the worker.
worker := Worker{
ID: id,
Work: make(chan WorkRequest),
WorkerQueue: workerQueue,
QuitChan: make(chan bool)}
return worker
}
type Worker struct {
ID int
Work chan WorkRequest
WorkerQueue chan chan WorkRequest
QuitChan chan bool
}
// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w Worker) Start() {
go func() {
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w.Work
select {
case work := <-w.Work:
// Receive a work request.
fmt.Printf("worker%d: Received work request %s/%s\n", w.ID, work.Name, work.Action)
filename, err := RunScript(&work)
if err != nil {
subject := fmt.Sprintf("Webhook %s/%s FAILED.", work.Name, work.Action)
Notify(subject, err.Error(), "")
} else {
subject := fmt.Sprintf("Webhook %s/%s SUCCEEDED.", work.Name, work.Action)
Notify(subject, "See attachment.", filename)
}
case <-w.QuitChan:
// We have been asked to stop.
fmt.Printf("worker%d stopping\n", w.ID)
return
}
}
}()
}
// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}
func Notify(subject string, text string, outfilename string) {
var notifier, err = notification.NotifierFactory()
if err != nil {
fmt.Println(err)
return
}
if notifier == nil {
fmt.Println("Notification provideri not found.")
return
}
var zipfile string
if outfilename != "" {
zipfile, err = tools.CompressFile(outfilename)
if err != nil {
fmt.Println(err)
zipfile = outfilename
}
}
notifier.Notify(subject, text, zipfile)
}