go 多线程并发 queue demo
阅读原文时间:2023年07月11日阅读:1

原文链接:Writing worker queues, in Go

1.work.go

[root@wangjq queue]# cat work.go
package main

import "time"

type WorkRequest struct {
Name string
Delay time.Duration
}

2.collector.go

[root@wangjq queue]# cat collector.go
package main

import (
"fmt"
"net/http"
"time"
)

// A buffered channel that we can send work requests on.
var WorkQueue = make(chan WorkRequest, )

func Collector(w http.ResponseWriter, r *http.Request) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

    // Parse the delay.  
    delay, err := time.ParseDuration(r.FormValue("delay"))  
    if err != nil {  
            http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)  
            return  
    }

    // Check to make sure the delay is anywhere from 1 to 10 seconds.  
    if delay.Seconds() <  || delay.Seconds() >  {  
            http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)  
            return  
    }

    // Now, we retrieve the person's name from the request.  
    name := r.FormValue("name")

    // Just do a quick bit of sanity checking to make sure the client actually provided us with a name.  
    if name == "" {  
            http.Error(w, "You must specify a name.", http.StatusBadRequest)  
            return  
    }

    // Now, we take the delay, and the person's name, and make a WorkRequest out of them.  
    work := WorkRequest{Name: name, Delay: delay}

    // Push the work onto the queue.  
    WorkQueue <- work  
    fmt.Println("Work request queued")

    // And let the user know their work request was created.  
    w.WriteHeader(http.StatusCreated)  
    return  

}

3.worker.go

[root@wangjq queue]# cat worker.go
package main

import (
"fmt"
"time"
)

// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest) Worker {
// Create, and return the worker.
worker := Worker{
ID: id,
Work: make(chan WorkRequest),
WorkerQueue: workerQueue,
QuitChan: make(chan bool)}

    return worker  

}

type Worker struct {
ID int
Work chan WorkRequest
WorkerQueue chan chan WorkRequest
QuitChan chan bool
}

// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w *Worker) Start() {
go func() {
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w.Work

                    select {  
                    case work := <-w.Work:  
                            // Receive a work request.  
                            fmt.Printf("worker%d: Received work request, delaying for %f seconds\\n", w.ID, work.Delay.Seconds())

                            time.Sleep(work.Delay)  
                            fmt.Printf("worker%d: Hello, %s!\\n", w.ID, work.Name)

                    case <-w.QuitChan:  
                            // We have been asked to stop.  
                            fmt.Printf("worker%d stopping\\n", w.ID)  
                            return  
                    }  
            }  
    }()  

}

// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w *Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}

4.dispatcher.go

[root@wangjq queue]# cat dispatcher.go
package main

import "fmt"

var WorkerQueue chan chan WorkRequest

func StartDispatcher(nworkers int) {
// First, initialize the channel we are going to but the workers' work channels into.
WorkerQueue = make(chan chan WorkRequest, nworkers)

    // Now, create all of our workers.  
    for i := ; i < nworkers; i++ {  
            fmt.Println("Starting worker", i+)  
            worker := NewWorker(i+, WorkerQueue)  
            worker.Start()  
    }

    go func() {  
            for {  
                    select {  
                    case work := <-WorkQueue:  
                            fmt.Println("Received work requeust")  
                            go func() {  
                                    worker := <-WorkerQueue

                                    fmt.Println("Dispatching work request")  
                                    worker <- work  
                            }()  
                    }  
            }  
    }()  

}

5.main.go

[root@wangjq queue]# cat main.go
package main

import (
"flag"
"fmt"
"net/http"
)

var (
NWorkers = flag.Int("n", , "The number of workers to start")
HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
)

func main() {
// Parse the command-line flags.
flag.Parse()

    // Start the dispatcher.  
    fmt.Println("Starting the dispatcher")  
    StartDispatcher(\*NWorkers)

    // Register our collector as an HTTP handler function.  
    fmt.Println("Registering the collector")  
    http.HandleFunc("/work", Collector)

    // Start the HTTP server!  
    fmt.Println("HTTP server listening on", \*HTTPAddr)  
    if err := http.ListenAndServe(\*HTTPAddr, nil); err != nil {  
            fmt.Println(err.Error())  
    }  

}

6.编译

[root@wangjq queue]# go build -o queued *.go

7.运行

[root@wangjq queue]# ./queued -n
Starting the dispatcher
Starting worker
Starting worker
Starting worker
Starting worker
Starting worker
Registering the collector
HTTP server listening on 127.0.0.1:

8.测试

[root@wangjq ~]# for i in {..}; do curl localhost:/work -d name=$USER -d delay=$(expr $i % )s; done

9.效果

[root@wangjq queue]# ./queued -n
Starting the dispatcher
Starting worker
Starting worker
Starting worker
Starting worker
Starting worker
Registering the collector
HTTP server listening on 127.0.0.1:
Work request queued
Received work requeust
Dispatching work request
worker1: Received work request, delaying for 1.000000 seconds
Work request queued
Received work requeust
Dispatching work request
worker2: Received work request, delaying for 2.000000 seconds
Work request queued
Received work requeust
Dispatching work request
worker4: Received work request, delaying for 3.000000 seconds
worker1: Hello, root!
worker2: Hello, root!
worker4: Hello, root!