RPC框架的认识和使用Netty实现简易RPC

本文参考资源:

谁能用通俗的语言解释一下什么是RPC框架? - 马秉尧的回答

什么是RPC

  • 简单的说,RPC就是从一台机器(客户端)上通过参数传递的方式调用另一台机器(服务器)上的一个函数或方法(可以统称为服务)并得到返回的结果。
  • RPC 会隐藏底层的通讯细节(不需要直接处理Socket通讯或Http通讯)
  • RPC 是一个请求响应模型。客户端发起请求,服务器返回响应(类似于Http的工作方式)
  • RPC 在使用形式上像调用本地函数(或方法)一样去调用远程的函数(或方法)。

RPC可以直接建立在tcp之上,如dubbo;也可以建立在http协议之上,如gRPC,Hessian。

常用的RPC框架有,Dubbo、gRPC、rpcx、thrift

一些RPC框架的使用

RMI

RMI(remote method invocation)是java原生支持的远程调用,RMI采用JRMP(Java RemoteMessageing Protocol)作为通信协议,可以认为是纯java版本的分布式远程调用解决方案。

RMI的核心概念

Client、Server、注册中心

RPC框架的认识和使用Netty实现简易RPC

RMI基本使用

服务端接口和实现类:

public interface UserService extends Remote {
    public String sayHello(String name) throws RemoteException;
}

public class UserServiceImpl extends UnicastRemoteObject implements UserService {
    public UserServiceImpl() throws RemoteException {
    }

    public String sayHello(String name) throws RemoteException {
        return name+"成功调用了服务端的服务";
    }
}

服务端启动:

public class ServerMain {

    public static void main(String[] args) throws Exception {
        //1. 启动RMI注册服务,指定端口号
        LocateRegistry.createRegistry(8888);

        //2. 创建要被访问的远程对象的实例
        UserService userService = new UserServiceImpl();

        //3. 把远程对象实例注册到RMI注册服务器上
        Naming.bind("rmi://localhost:8888/UserService",userService);

        System.out.println("服务端启动中......");
    }
}

客户端复制一个UserService接口,使用以下方法调用:

public class clientMain {
    public static void main(String[] args) throws RemoteException, NotBoundException, MalformedURLException {
        UserService userService = (UserService) Naming.lookup("rmi://localhost:8888/UserService");
        String s = userService.sayHello("客户端");
        System.out.println(s);
    }
}

Hessian基本使用

Hessian使用C/S方式,基于HTTP协议传输,使用Hessian二进制序列化

客户端和服务端添加hessian依赖

<dependencies>
    <dependency>
        <groupId>com.caucho</groupId>
        <artifactId>hessian</artifactId>
        <version>4.0.63</version>
    </dependency>
</dependencies>

服务端添加webapp环境,部署至tomcat

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.tomcat.maven</groupId>
            <artifactId>tomcat7-maven-plugin</artifactId>
            <version>2.2</version>
            <configuration>
                <port>8888</port>
                <path>/</path>
                <uriEncoding>UTF-8</uriEncoding>
            </configuration>
        </plugin>
    </plugins>
</build>

服务端和客户端添加接口UserService

public interface UserService {
    public String sayHello(String name);
}

服务端添加实现

public class UserServiceImpl implements UserService {
    public String sayHello(String name) {
        return name+"调用了hessian服务端的服务";
    }
}

服务端在web.xml中配置servlet:

<servlet>
    <servlet-name>HessianServlet</servlet-name>
    <servlet-class>com.caucho.hessian.server.HessianServlet</servlet-class>
    <init-param>
        <param-name>service-class</param-name>
        <param-value>com.rhett.service.impl.UserServiceImpl</param-value>
    </init-param>
</servlet>
<servlet-mapping>
    <servlet-name>HessianServlet</servlet-name>
    <url-pattern>/api/service</url-pattern>
</servlet-mapping>

启动服务端,客户端使用以下方法调用服务端服务:

public class ClientTest {
    public static void main(String[] args) throws MalformedURLException {
        String url = "http://localhost:8888/api/service";
        HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
        UserService userService = (UserService) hessianProxyFactory.create(UserService.class, url);
        String h = userService.sayHello("hessian客户端");
        System.out.println(h);
    }
}

使用Netty实现简易RPC

RPC框架的认识和使用Netty实现简易RPC

就是要实现上图这些步骤

  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. sockets 将消息进行编码并发送到服务端
  4. serverstub收到消息后进行解码
  5. serverstub根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给server stub
  7. server stub将返回导入结果封装成消息体
  8. sockets 进行编码并发送至消费方
  9. client stub接收到消息并进行解码
  10. 服务消费方(client)得到结果

服务端和客户端的公用接口:

public interface UserService {
    String sayHello(String name);
}

服务端服务实现:

public class UserServiceImpl implements UserService {
    public String sayHello(String name) {
        return name+"调用自定义rpc框架服务成功了";
    }
}

服务端:

public class NettyServer {
    //编写一个方法,完成对NettyServer的初始化和启动
    public static void startServer(String host,int port){
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());//编解码器
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
            System.out.println("服务提供方开始提供服务...");
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

handler:

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送的消息,并调用服务
        System.out.println("msg=" + msg);
        //客户端在调用服务器的api时,我们需要定义一个协议
        //比如要求客户端每次发消息时都必须以某个字符串开头"UserService#sayHello#xxx"
        if(msg.toString().startsWith("UserService#sayHello#")){
            String result = new UserServiceImpl().sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

服务端启动:

//启动一个服务的提供者,就是NettyServer
public class RPCServer {
    public static void main(String[] args) {
        NettyServer.startServer("localhost",7000);
    }
}

客户端:

public class  NettyClient {
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static NettyClientHandler clientHandler;

    public Object getBean(final Class<?> serviceClass,final String providerName){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},
                (proxy,method,args)->{
                    if(clientHandler == null){
                        initClient();
                    }
                    //设置要发给服务端的信息
                    clientHandler.setMsg(providerName+args[0]);
                    return executor.submit(clientHandler).get();
                });
    }


    //初始化客户端
    private static void initClient(){
        clientHandler = new NettyClientHandler();
        //创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(clientHandler);
                    }
                });
        try {
            ChannelFuture future = bootstrap.connect("localhost", 7000).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

handler:

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;
    private String result;//返回的结果
    private String msg;//客户端调用方法传入的信息

    void setMsg(String msg){
        this.msg = msg;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        notify();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //被代理对象调用,发送数据给服务器,等待被唤醒返回结果
    public synchronized Object call() throws Exception {
        context.writeAndFlush(msg);
        wait();
        return result;
    }
}

客户端调用:

public class RPCClient {
    public static final String providerName = "UserService#sayHello#";

    public static void main(String[] args) {
        //创建一个消费者
        NettyClient nettyClient = new NettyClient();
        //创建代理对象
        UserService userService = (UserService) nettyClient.getBean(UserService.class, providerName);
        String msg = userService.sayHello("客户端");
        System.out.println(msg);
    }
}

查看输出:

客户端调用自定义rpc框架服务成功了

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/rpc%e6%a1%86%e6%9e%b6%e7%9a%84%e8%ae%a4%e8%af%86%e5%92%8c%e4%bd%bf%e7%94%a8netty%e5%ae%9e%e7%8e%b0%e7%ae%80%e6%98%93rpc/

发表评论

电子邮件地址不会被公开。