package gfmicro import ( "context" "crypto/tls" "git.mydig.net/microrain/gfmicro/auth_jwt" "git.mydig.net/microrain/gfmicro/util" "git.mydig.net/microrain/gfmicro/zap" "github.com/gogf/gf/frame/g" "github.com/gogf/gf/os/glog" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" "github.com/grpc-ecosystem/grpc-gateway/runtime" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" "net" "net/http" "path" "strings" "time" ) type Server struct { ServerPort string CertServerName string CertPemPath string CertKeyPath string SwaggerDir string EndPoint string EnableTLS bool tlsConfig *tls.Config RegGrpcServer func(server *grpc.Server) RegGatewayServer func(ctx context.Context, gwmux *runtime.ServeMux, EndPoint string, dopts []grpc.DialOption) error } func (s *Server) init() { s.EnableTLS = g.Config().GetBool("server.enableTLS") } func (s *Server) newGrpc() *grpc.Server { jwtManager := auth_jwt.NewJWTManager(auth_jwt.SecretKey, auth_jwt.TokenDuration) interceptor := auth_jwt.NewAuthInterceptor(jwtManager, s.accessibleRoles()) var kaep = keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection PermitWithoutStream: true, // Allow pings even when there are no active streams } var kasp = keepalive.ServerParameters{ MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead } opts := []grpc.ServerOption{ grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), //把zap拦截器添加到服务端 grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( interceptor.Stream(), grpc_zap.StreamServerInterceptor(zap.ZapInterceptor()), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( interceptor.Unary(), grpc_zap.UnaryServerInterceptor(zap.ZapInterceptor()), )), } //是否开启TLS认证 if s.EnableTLS { //创建grpc的TLS认证凭证 creds, err := credentials.NewServerTLSFromFile(s.CertPemPath, s.CertKeyPath) if err != nil { panic(err) } opts = append(opts, grpc.Creds(creds)) } server := grpc.NewServer(opts...) s.RegGrpcServer(server) //注册反射 reflection.Register(server) return server } func (s *Server) newGateway() (http.Handler, error) { ctx := context.Background() dopts := []grpc.DialOption{grpc.WithInsecure()} //是否开启TLS认证 if s.EnableTLS { dcreds, err := credentials.NewClientTLSFromFile(s.CertPemPath, s.CertServerName) if err != nil { return nil, err } dopts = []grpc.DialOption{grpc.WithTransportCredentials(dcreds)} } // 创建一个grpc gateway服务实例 gwmux := runtime.NewServeMux( //设置json结果可以输出默认值 runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}), ) if err := s.RegGatewayServer(ctx, gwmux, s.EndPoint, dopts); err != nil { return nil, err } return gwmux, nil } //TODO 角色处理 func (s *Server) accessibleRoles() map[string][]string { const laptopServicePath = "/proto.User/" return map[string][]string{ laptopServicePath + "Create": {"admin"}, laptopServicePath + "UploadImage": {"admin"}, laptopServicePath + "UpData": {"admin", "user"}, } } func (s *Server) Run() (err error) { s.EndPoint = ":" + s.ServerPort conn, err := net.Listen("tcp", s.EndPoint) if err != nil { glog.Error("TCP Listen err", err) } srv := s.newServer() //是否开启TLS认证 if s.EnableTLS { s.tlsConfig = util.GetTLSConfig(s.CertPemPath, s.CertKeyPath) glog.Info("gRPC and https listen on: ", s.ServerPort) if err = srv.Serve(util.NewTLSListener(conn, s.tlsConfig)); err != nil { glog.Error("ListenAndServe: ", err) } } else { glog.Info("gRPC and http listen on: ", s.ServerPort) if err = srv.Serve(conn); err != nil { glog.Error("ListenAndServe: ", err) } } return err } func (s *Server) newServer() *http.Server { grpcServer := s.newGrpc() gwmux, err := s.newGateway() if err != nil { panic(err) } mux := http.NewServeMux() mux.Handle("/", gwmux) mux.HandleFunc("/swagger/", s.serveSwaggerFile) //s.serveSwaggerUI(mux) return &http.Server{ Addr: s.EndPoint, Handler: util.GrpcHandlerFunc(grpcServer, mux), TLSConfig: s.tlsConfig, } } func (s *Server) serveSwaggerFile(w http.ResponseWriter, r *http.Request) { if !strings.HasSuffix(r.URL.Path, "swagger.json") { glog.Info("Not Found: ", r.URL.Path) http.NotFound(w, r) return } p := strings.TrimPrefix(r.URL.Path, "/swagger/") p = path.Join(s.SwaggerDir, p) glog.Info("Serving swagger-file: ", p) http.ServeFile(w, r, p) } //func (s *Server) serveSwaggerUI(mux *http.ServeMux) { // fileServer := http.FileServer(&assetfs.AssetFS{ // Asset: swagger.Asset, // AssetDir: swagger.AssetDir, // Prefix: "third_party/swagger-ui", // }) // prefix := "/swagger-ui/" // mux.Handle(prefix, http.StripPrefix(prefix, fileServer)) //}