grpcserver.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package main
  2. import(
  3. "context"
  4. "fmt"
  5. log "github.com/sirupsen/logrus"
  6. "google.golang.org/grpc"
  7. "io"
  8. "net"
  9. "strconv"
  10. "sync"
  11. pb "test/grpctest/proto"
  12. "time"
  13. )
  14. type Server struct {
  15. }
  16. //一元
  17. func (s *Server) GetData (context context.Context, reqData *pb.ReqData) (*pb.RepData, error){
  18. fmt.Println("request:",reqData.Data)
  19. rep := &pb.RepData{
  20. Data: "hello,"+reqData.Data,
  21. }
  22. return rep,nil
  23. }
  24. //服务端发送流调用
  25. func (s *Server) GetStream (req *pb.ReqData, rep pb.ObjectStorageService_GetStreamServer ) error{
  26. fmt.Println("request",req.Data)
  27. if req.Data == "shuai" {
  28. cnt := 0
  29. resp := &pb.RepData{
  30. Data: "hello,"+req.Data,
  31. }
  32. for{
  33. err := rep.Send(resp)
  34. if err != nil{
  35. fmt.Println("send to client failed:",err.Error())
  36. break
  37. }
  38. time.Sleep(time.Second)
  39. if cnt > 10 {
  40. break
  41. }
  42. cnt++
  43. }
  44. }else{
  45. fmt.Println("error request")
  46. }
  47. return nil
  48. }
  49. //
  50. func (s *Server) SetStream(req pb.ObjectStorageService_SetStreamServer) error {
  51. var err error
  52. for {
  53. req,err := req.Recv()
  54. if err != nil{
  55. if err == io.EOF {
  56. fmt.Println("server received over")
  57. break
  58. }else{
  59. fmt.Println("error:",err.Error())
  60. break
  61. }
  62. }
  63. fmt.Println("server received:",req.Data)
  64. }
  65. rep := &pb.ReqData{
  66. Data: "over",
  67. }
  68. req.SendMsg(rep)
  69. return err
  70. }
  71. //双向流式调用
  72. func (s *Server) AllStream (req pb.ObjectStorageService_AllStreamServer) error {
  73. wg := sync.WaitGroup{}
  74. wg.Add(2)
  75. go func() {
  76. defer wg.Done()
  77. for {
  78. recvMsg,err := req.Recv()
  79. if err != nil{
  80. if err == io.EOF {
  81. fmt.Println("server received over")
  82. break
  83. }else{
  84. fmt.Println("error:",err.Error())
  85. break
  86. }
  87. }
  88. fmt.Println("server received",recvMsg.Data)
  89. }
  90. }()
  91. go func() {
  92. defer wg.Done()
  93. cnt := 0
  94. for {
  95. err := req.Send(&pb.RepData{
  96. Data: "over" + strconv.Itoa(cnt),
  97. })
  98. if err != nil{
  99. fmt.Println("error:",err.Error())
  100. }
  101. if cnt > 3 {
  102. break
  103. }
  104. cnt++
  105. time.Sleep(time.Second)
  106. }
  107. }()
  108. wg.Wait()
  109. return nil
  110. }
  111. func main(){
  112. listen,err := net.Listen("tcp","127.0.0.1:10000")
  113. if err != nil {
  114. log.Fatal("server build error:",err.Error())
  115. }
  116. g := grpc.NewServer()
  117. pb.RegisterObjectStorageServiceServer(g,&Server{})
  118. err = g.Serve(listen)
  119. if err != nil {
  120. log.Fatal("failed to server:",err.Error())
  121. }
  122. }