server.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package gfmicro
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "git.mydig.net/microrain/gfmicro/auth_jwt"
  6. "git.mydig.net/microrain/gfmicro/util"
  7. "git.mydig.net/microrain/gfmicro/zap"
  8. "github.com/gogf/gf/frame/g"
  9. "github.com/gogf/gf/os/glog"
  10. grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  11. grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
  12. "github.com/grpc-ecosystem/grpc-gateway/runtime"
  13. "google.golang.org/grpc"
  14. "google.golang.org/grpc/credentials"
  15. "google.golang.org/grpc/keepalive"
  16. "google.golang.org/grpc/reflection"
  17. "net"
  18. "net/http"
  19. "path"
  20. "strings"
  21. "time"
  22. )
  23. type Server struct {
  24. ServerPort string
  25. CertServerName string
  26. CertPemPath string
  27. CertKeyPath string
  28. SwaggerDir string
  29. EndPoint string
  30. EnableTLS bool
  31. tlsConfig *tls.Config
  32. RegGrpcServer func(server *grpc.Server)
  33. RegGatewayServer func(ctx context.Context, gwmux *runtime.ServeMux, EndPoint string, dopts []grpc.DialOption) error
  34. }
  35. func (s *Server) init() {
  36. s.EnableTLS = g.Config().GetBool("server.enableTLS")
  37. }
  38. func (s *Server) newGrpc() *grpc.Server {
  39. jwtManager := auth_jwt.NewJWTManager(auth_jwt.SecretKey, auth_jwt.TokenDuration)
  40. interceptor := auth_jwt.NewAuthInterceptor(jwtManager, s.accessibleRoles())
  41. var kaep = keepalive.EnforcementPolicy{
  42. MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
  43. PermitWithoutStream: true, // Allow pings even when there are no active streams
  44. }
  45. var kasp = keepalive.ServerParameters{
  46. MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
  47. MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
  48. MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
  49. Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
  50. Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
  51. }
  52. opts := []grpc.ServerOption{
  53. grpc.KeepaliveEnforcementPolicy(kaep),
  54. grpc.KeepaliveParams(kasp),
  55. //把zap拦截器添加到服务端
  56. grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
  57. interceptor.Stream(),
  58. grpc_zap.StreamServerInterceptor(zap.ZapInterceptor()),
  59. )),
  60. grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
  61. interceptor.Unary(),
  62. grpc_zap.UnaryServerInterceptor(zap.ZapInterceptor()),
  63. )),
  64. }
  65. //是否开启TLS认证
  66. if s.EnableTLS {
  67. //创建grpc的TLS认证凭证
  68. creds, err := credentials.NewServerTLSFromFile(s.CertPemPath, s.CertKeyPath)
  69. if err != nil {
  70. panic(err)
  71. }
  72. opts = append(opts, grpc.Creds(creds))
  73. }
  74. server := grpc.NewServer(opts...)
  75. s.RegGrpcServer(server)
  76. //注册反射
  77. reflection.Register(server)
  78. return server
  79. }
  80. func (s *Server) newGateway() (http.Handler, error) {
  81. ctx := context.Background()
  82. dopts := []grpc.DialOption{grpc.WithInsecure()}
  83. //是否开启TLS认证
  84. if s.EnableTLS {
  85. dcreds, err := credentials.NewClientTLSFromFile(s.CertPemPath, s.CertServerName)
  86. if err != nil {
  87. return nil, err
  88. }
  89. dopts = []grpc.DialOption{grpc.WithTransportCredentials(dcreds)}
  90. }
  91. // 创建一个grpc gateway服务实例
  92. gwmux := runtime.NewServeMux(
  93. //设置json结果可以输出默认值
  94. runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}),
  95. )
  96. if err := s.RegGatewayServer(ctx, gwmux, s.EndPoint, dopts); err != nil {
  97. return nil, err
  98. }
  99. return gwmux, nil
  100. }
  101. //TODO 角色处理
  102. func (s *Server) accessibleRoles() map[string][]string {
  103. const laptopServicePath = "/proto.User/"
  104. return map[string][]string{
  105. laptopServicePath + "Create": {"admin"},
  106. laptopServicePath + "UploadImage": {"admin"},
  107. laptopServicePath + "UpData": {"admin", "user"},
  108. }
  109. }
  110. func (s *Server) Run() (err error) {
  111. s.EndPoint = ":" + s.ServerPort
  112. conn, err := net.Listen("tcp", s.EndPoint)
  113. if err != nil {
  114. glog.Error("TCP Listen err", err)
  115. }
  116. srv := s.newServer()
  117. //是否开启TLS认证
  118. if s.EnableTLS {
  119. s.tlsConfig = util.GetTLSConfig(s.CertPemPath, s.CertKeyPath)
  120. glog.Info("gRPC and https listen on: ", s.ServerPort)
  121. if err = srv.Serve(util.NewTLSListener(conn, s.tlsConfig)); err != nil {
  122. glog.Error("ListenAndServe: ", err)
  123. }
  124. } else {
  125. glog.Info("gRPC and http listen on: ", s.ServerPort)
  126. if err = srv.Serve(conn); err != nil {
  127. glog.Error("ListenAndServe: ", err)
  128. }
  129. }
  130. return err
  131. }
  132. func (s *Server) newServer() *http.Server {
  133. grpcServer := s.newGrpc()
  134. gwmux, err := s.newGateway()
  135. if err != nil {
  136. panic(err)
  137. }
  138. mux := http.NewServeMux()
  139. mux.Handle("/", gwmux)
  140. mux.HandleFunc("/swagger/", s.serveSwaggerFile)
  141. //s.serveSwaggerUI(mux)
  142. return &http.Server{
  143. Addr: s.EndPoint,
  144. Handler: util.GrpcHandlerFunc(grpcServer, mux),
  145. TLSConfig: s.tlsConfig,
  146. }
  147. }
  148. func (s *Server) serveSwaggerFile(w http.ResponseWriter, r *http.Request) {
  149. if !strings.HasSuffix(r.URL.Path, "swagger.json") {
  150. glog.Info("Not Found: ", r.URL.Path)
  151. http.NotFound(w, r)
  152. return
  153. }
  154. p := strings.TrimPrefix(r.URL.Path, "/swagger/")
  155. p = path.Join(s.SwaggerDir, p)
  156. glog.Info("Serving swagger-file: ", p)
  157. http.ServeFile(w, r, p)
  158. }
  159. //func (s *Server) serveSwaggerUI(mux *http.ServeMux) {
  160. // fileServer := http.FileServer(&assetfs.AssetFS{
  161. // Asset: swagger.Asset,
  162. // AssetDir: swagger.AssetDir,
  163. // Prefix: "third_party/swagger-ui",
  164. // })
  165. // prefix := "/swagger-ui/"
  166. // mux.Handle(prefix, http.StripPrefix(prefix, fileServer))
  167. //}