123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- package gfmicro
- import (
- "context"
- "crypto/tls"
- "git.mydig.net/microrain/gfmicro/auth_jwt"
- "git.mydig.net/microrain/gfmicro/util"
- "git.mydig.net/microrain/gfmicro/zap"
- "github.com/gogf/gf/frame/g"
- "github.com/gogf/gf/os/glog"
- grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
- grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
- "github.com/grpc-ecosystem/grpc-gateway/runtime"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/reflection"
- "net"
- "net/http"
- "path"
- "strings"
- "time"
- )
- type Server struct {
- ServerPort string
- CertServerName string
- CertPemPath string
- CertKeyPath string
- SwaggerDir string
- EndPoint string
- EnableTLS bool
- tlsConfig *tls.Config
- RegGrpcServer func(server *grpc.Server)
- RegGatewayServer func(ctx context.Context, gwmux *runtime.ServeMux, EndPoint string, dopts []grpc.DialOption) error
- }
- func (s *Server) init() {
- s.EnableTLS = g.Config().GetBool("server.enableTLS")
- }
- func (s *Server) newGrpc() *grpc.Server {
- jwtManager := auth_jwt.NewJWTManager(auth_jwt.SecretKey, auth_jwt.TokenDuration)
- interceptor := auth_jwt.NewAuthInterceptor(jwtManager, s.accessibleRoles())
- var kaep = keepalive.EnforcementPolicy{
- MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
- PermitWithoutStream: true, // Allow pings even when there are no active streams
- }
- var kasp = keepalive.ServerParameters{
- MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
- MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
- MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
- Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
- Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
- }
- opts := []grpc.ServerOption{
- grpc.KeepaliveEnforcementPolicy(kaep),
- grpc.KeepaliveParams(kasp),
- //把zap拦截器添加到服务端
- grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
- interceptor.Stream(),
- grpc_zap.StreamServerInterceptor(zap.ZapInterceptor()),
- )),
- grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
- interceptor.Unary(),
- grpc_zap.UnaryServerInterceptor(zap.ZapInterceptor()),
- )),
- }
- //是否开启TLS认证
- if s.EnableTLS {
- //创建grpc的TLS认证凭证
- creds, err := credentials.NewServerTLSFromFile(s.CertPemPath, s.CertKeyPath)
- if err != nil {
- panic(err)
- }
- opts = append(opts, grpc.Creds(creds))
- }
- server := grpc.NewServer(opts...)
- s.RegGrpcServer(server)
- //注册反射
- reflection.Register(server)
- return server
- }
- func (s *Server) newGateway() (http.Handler, error) {
- ctx := context.Background()
- dopts := []grpc.DialOption{grpc.WithInsecure()}
- //是否开启TLS认证
- if s.EnableTLS {
- dcreds, err := credentials.NewClientTLSFromFile(s.CertPemPath, s.CertServerName)
- if err != nil {
- return nil, err
- }
- dopts = []grpc.DialOption{grpc.WithTransportCredentials(dcreds)}
- }
- // 创建一个grpc gateway服务实例
- gwmux := runtime.NewServeMux(
- //设置json结果可以输出默认值
- runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}),
- )
- if err := s.RegGatewayServer(ctx, gwmux, s.EndPoint, dopts); err != nil {
- return nil, err
- }
- return gwmux, nil
- }
- //TODO 角色处理
- func (s *Server) accessibleRoles() map[string][]string {
- const laptopServicePath = "/proto.User/"
- return map[string][]string{
- laptopServicePath + "Create": {"admin"},
- laptopServicePath + "UploadImage": {"admin"},
- laptopServicePath + "UpData": {"admin", "user"},
- }
- }
- func (s *Server) Run() (err error) {
- s.EndPoint = ":" + s.ServerPort
- conn, err := net.Listen("tcp", s.EndPoint)
- if err != nil {
- glog.Error("TCP Listen err", err)
- }
- srv := s.newServer()
- //是否开启TLS认证
- if s.EnableTLS {
- s.tlsConfig = util.GetTLSConfig(s.CertPemPath, s.CertKeyPath)
- glog.Info("gRPC and https listen on: ", s.ServerPort)
- if err = srv.Serve(util.NewTLSListener(conn, s.tlsConfig)); err != nil {
- glog.Error("ListenAndServe: ", err)
- }
- } else {
- glog.Info("gRPC and http listen on: ", s.ServerPort)
- if err = srv.Serve(conn); err != nil {
- glog.Error("ListenAndServe: ", err)
- }
- }
- return err
- }
- func (s *Server) newServer() *http.Server {
- grpcServer := s.newGrpc()
- gwmux, err := s.newGateway()
- if err != nil {
- panic(err)
- }
- mux := http.NewServeMux()
- mux.Handle("/", gwmux)
- mux.HandleFunc("/swagger/", s.serveSwaggerFile)
- //s.serveSwaggerUI(mux)
- return &http.Server{
- Addr: s.EndPoint,
- Handler: util.GrpcHandlerFunc(grpcServer, mux),
- TLSConfig: s.tlsConfig,
- }
- }
- func (s *Server) serveSwaggerFile(w http.ResponseWriter, r *http.Request) {
- if !strings.HasSuffix(r.URL.Path, "swagger.json") {
- glog.Info("Not Found: ", r.URL.Path)
- http.NotFound(w, r)
- return
- }
- p := strings.TrimPrefix(r.URL.Path, "/swagger/")
- p = path.Join(s.SwaggerDir, p)
- glog.Info("Serving swagger-file: ", p)
- http.ServeFile(w, r, p)
- }
- //func (s *Server) serveSwaggerUI(mux *http.ServeMux) {
- // fileServer := http.FileServer(&assetfs.AssetFS{
- // Asset: swagger.Asset,
- // AssetDir: swagger.AssetDir,
- // Prefix: "third_party/swagger-ui",
- // })
- // prefix := "/swagger-ui/"
- // mux.Handle(prefix, http.StripPrefix(prefix, fileServer))
- //}
|