example.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package main
  2. import (
  3. "flag"
  4. "log"
  5. "math/rand"
  6. "net/http"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/prometheus/client_golang/prometheus/promhttp"
  11. "github.com/prometheus/client_golang/prometheus"
  12. )
  13. var (
  14. types = []string{"emai", "deactivation", "activation", "transaction", "customer_renew", "order_processed"}
  15. workers = 0
  16. totalCounterVec = prometheus.NewCounterVec(
  17. prometheus.CounterOpts{
  18. Namespace: "worker",
  19. Subsystem: "jobs",
  20. Name: "processed_total",
  21. Help: "Total number of jobs processed by the workers",
  22. },
  23. []string{"worker_id", "type"},
  24. )
  25. inflightCounterVec = prometheus.NewGaugeVec(
  26. prometheus.GaugeOpts{
  27. Namespace: "worker",
  28. Subsystem: "jobs",
  29. Name: "inflight",
  30. Help: "Number of jobs inflight",
  31. },
  32. []string{"type"},
  33. )
  34. processingTimeVec = prometheus.NewHistogramVec(
  35. prometheus.HistogramOpts{
  36. Namespace: "worker",
  37. Subsystem: "jobs",
  38. Name: "process_time_seconds",
  39. Help: "Amount of time spent processing jobs",
  40. },
  41. []string{"worker_id", "type"},
  42. )
  43. )
  44. func init() {
  45. flag.IntVar(&workers, "workers", 10, "Number of workers to use")
  46. }
  47. func getType() string {
  48. return types[rand.Int()%len(types)]
  49. }
  50. // main entry point for the application
  51. func main() {
  52. // parse the flags
  53. flag.Parse()
  54. //////////
  55. // Demo of Worker Processing
  56. //////////
  57. // register with the prometheus collector
  58. prometheus.MustRegister(
  59. totalCounterVec,
  60. inflightCounterVec,
  61. processingTimeVec,
  62. )
  63. // create a channel with a 10,000 Job buffer
  64. jobsChannel := make(chan *Job, 10000)
  65. // start the job processor
  66. go startJobProcessor(jobsChannel)
  67. go createJobs(jobsChannel)
  68. handler := http.NewServeMux()
  69. handler.Handle("/metrics", promhttp.Handler())
  70. log.Println("[INFO] starting HTTP server on port :9009")
  71. log.Fatal(http.ListenAndServe(":9009", handler))
  72. }
  73. type Job struct {
  74. Type string
  75. Sleep time.Duration
  76. }
  77. // makeJob creates a new job with a random sleep time between 10 ms and 4000ms
  78. func makeJob() *Job {
  79. return &Job{
  80. Type: getType(),
  81. Sleep: time.Duration(rand.Int()%100+10) * time.Millisecond,
  82. }
  83. }
  84. func startJobProcessor(jobs <-chan *Job) {
  85. log.Printf("[INFO] starting %d workers\n", workers)
  86. wait := sync.WaitGroup{}
  87. // notify the sync group we need to wait for 10 goroutines
  88. wait.Add(workers)
  89. // start 10 works
  90. for i := 0; i < workers; i++ {
  91. go func(workerID int) {
  92. // start the worker
  93. startWorker(workerID, jobs)
  94. wait.Done()
  95. }(i)
  96. }
  97. wait.Wait()
  98. }
  99. func createJobs(jobs chan<- *Job) {
  100. for {
  101. // create a random job
  102. job := makeJob()
  103. // track the job in the inflight tracker
  104. inflightCounterVec.WithLabelValues(job.Type).Inc()
  105. // send the job down the channel
  106. jobs <- job
  107. // don't pile up too quickly
  108. time.Sleep(5 * time.Millisecond)
  109. }
  110. }
  111. // creates a worker that pulls jobs from the job channel
  112. func startWorker(workerID int, jobs <-chan *Job) {
  113. for {
  114. select {
  115. // read from the job channel
  116. case job := <-jobs:
  117. startTime := time.Now()
  118. // fake processing the request
  119. time.Sleep(job.Sleep)
  120. log.Printf("[%d][%s] Processed job in %0.3f seconds", workerID, job.Type, time.Now().Sub(startTime).Seconds())
  121. // track the total number of jobs processed by the worker
  122. totalCounterVec.WithLabelValues(strconv.FormatInt(int64(workerID), 10), job.Type).Inc()
  123. // decrement the inflight tracker
  124. inflightCounterVec.WithLabelValues(job.Type).Dec()
  125. processingTimeVec.WithLabelValues(strconv.FormatInt(int64(workerID), 10), job.Type).Observe(time.Now().Sub(startTime).Seconds())
  126. }
  127. }
  128. }