| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- package main
- import (
- "flag"
- "log"
- "math/rand"
- "net/http"
- "strconv"
- "sync"
- "time"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/prometheus/client_golang/prometheus"
- )
- var (
- types = []string{"emai", "deactivation", "activation", "transaction", "customer_renew", "order_processed"}
- workers = 0
- totalCounterVec = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "worker",
- Subsystem: "jobs",
- Name: "processed_total",
- Help: "Total number of jobs processed by the workers",
- },
- []string{"worker_id", "type"},
- )
- inflightCounterVec = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "worker",
- Subsystem: "jobs",
- Name: "inflight",
- Help: "Number of jobs inflight",
- },
- []string{"type"},
- )
- processingTimeVec = prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "worker",
- Subsystem: "jobs",
- Name: "process_time_seconds",
- Help: "Amount of time spent processing jobs",
- },
- []string{"worker_id", "type"},
- )
- )
- func init() {
- flag.IntVar(&workers, "workers", 10, "Number of workers to use")
- }
- func getType() string {
- return types[rand.Int()%len(types)]
- }
- // main entry point for the application
- func main() {
- // parse the flags
- flag.Parse()
- //////////
- // Demo of Worker Processing
- //////////
- // register with the prometheus collector
- prometheus.MustRegister(
- totalCounterVec,
- inflightCounterVec,
- processingTimeVec,
- )
- // create a channel with a 10,000 Job buffer
- jobsChannel := make(chan *Job, 10000)
- // start the job processor
- go startJobProcessor(jobsChannel)
- go createJobs(jobsChannel)
- handler := http.NewServeMux()
- handler.Handle("/metrics", promhttp.Handler())
- log.Println("[INFO] starting HTTP server on port :9009")
- log.Fatal(http.ListenAndServe(":9009", handler))
- }
- type Job struct {
- Type string
- Sleep time.Duration
- }
- // makeJob creates a new job with a random sleep time between 10 ms and 4000ms
- func makeJob() *Job {
- return &Job{
- Type: getType(),
- Sleep: time.Duration(rand.Int()%100+10) * time.Millisecond,
- }
- }
- func startJobProcessor(jobs <-chan *Job) {
- log.Printf("[INFO] starting %d workers\n", workers)
- wait := sync.WaitGroup{}
- // notify the sync group we need to wait for 10 goroutines
- wait.Add(workers)
- // start 10 works
- for i := 0; i < workers; i++ {
- go func(workerID int) {
- // start the worker
- startWorker(workerID, jobs)
- wait.Done()
- }(i)
- }
- wait.Wait()
- }
- func createJobs(jobs chan<- *Job) {
- for {
- // create a random job
- job := makeJob()
- // track the job in the inflight tracker
- inflightCounterVec.WithLabelValues(job.Type).Inc()
- // send the job down the channel
- jobs <- job
- // don't pile up too quickly
- time.Sleep(5 * time.Millisecond)
- }
- }
- // creates a worker that pulls jobs from the job channel
- func startWorker(workerID int, jobs <-chan *Job) {
- for {
- select {
- // read from the job channel
- case job := <-jobs:
- startTime := time.Now()
- // fake processing the request
- time.Sleep(job.Sleep)
- log.Printf("[%d][%s] Processed job in %0.3f seconds", workerID, job.Type, time.Now().Sub(startTime).Seconds())
- // track the total number of jobs processed by the worker
- totalCounterVec.WithLabelValues(strconv.FormatInt(int64(workerID), 10), job.Type).Inc()
- // decrement the inflight tracker
- inflightCounterVec.WithLabelValues(job.Type).Dec()
- processingTimeVec.WithLabelValues(strconv.FormatInt(int64(workerID), 10), job.Type).Observe(time.Now().Sub(startTime).Seconds())
- }
- }
- }
|