RPC框架从0到10

RPC(Remote Procedure Call)

从单机走向分布式,产生了很多分布式的通信方式

  • 最古老也是最有效,并且永不过时的,TCP/UDP的二进制传输,事实上所有的通信方式归根结底都是TCP/UDP
  • CORBA Common Object Request Broker Architecture。古老而复杂的,支持面向对象的通信协议。
  • Web Service(SOA SOAP RDDI WSDL ...)基于http+xml的标准化Web API
  • RestFul 回归简单化本源的Web API的事实标准,http+json
  • RMI Remote Method Invocation Java内部的分布式通信协议
  • JMS Java Message Service JavaEE中的消息框架标准,为很多MQ所支持
  • RPC Remote Procudure Call 远程过程方法调用,这只是一个统称概念,远程通信的方式,重点在于方法调用(不支持对象的概念),具体实现甚至可以用RMI RestFul等去实现,但一般不用,因为RMI不能跨语言,而RestFul效率太低。多用于服务器集群间的通信,因此常使用更加高效,短小精悍的传输模式以提高效率。
    从单机到分布式->分布式通信(一台机器解决不了的问题,需要多台机器解决,多台机器内部需要进行通信)->最基本:二进制,两台机器之间通过网络通信一定是二进制数据传输TCP/IP(socket)
    RPC框架从0到10

RPC01

首先构造一个用于传输的类User,该类实现了序列化可以进行序列化从而通过网络传输二进制数据,他会跑在一台机器上去访问数据库,有人想拿到这个User的话我会对外提供一些服务
RPC框架从0到10
然后通过AUserService暴露FindById接口来进行数据查询,你给我一个ID,我给你一个User
User.java

package com.airsky.demo.rpc.Common;

import java.io.Serializable;

public class User implements Serializable {
    private static final long serialVersionUID = 1L;
    int id;
    String name;

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

AUserService.java

package com.airsky.demo.rpc.Common;

public interface AUserService {
    User findById(int id);
}

最终在AUserServiceImpl进行了实现,对传入的id进行了模拟数据库查询,返回一个User对象。这里只进行一个模拟,具体实现的话也很简单。

package com.airsky.demo.rpc.Common;

public class AUserServiceImpl implements AUserService{

    @Override
    public User findById(int id) {
        //模拟数据库查询
        return new User(id,"AirSky");
    }
}

如果我们想通过最原始的方式来完成上述操作该怎么做呢?这个也是很多游戏和软件类似功能最底层的实现。
首先我们这台机器对外提供服务,就需要提供一个Server打开一个Socket端口进行监听,下面的实现是在Server类中新建了一个ServerSocket对象,并监听8888端口,accept接收一个客户端连接,然后通过send方法对连接进行处理。

public class Server {
    //加上条件变量,不然s.close()不可达
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
        ServerSocket s=new ServerSocket(8888);
        while (running){
            System.out.println("接收连接中...");
            Socket client=s.accept();
            System.out.println("连接主机:" + client.getInetAddress());
            send(client);
            client.close();
        }
        s.close();
    }

下面来实现send方法对连接进行进一步处理。大概就是你给我一个id,我给你一个id和id对应的名字,就是这么简单。send方法传入了一个socket对象,我们可以通过socket对象的getInputStream方法来打开一个输入流,getOutputStream方法打开一个输出流。使用DataOutputStream的readInt对输入流中的数据进行指定方式读取,使用该包装流来简化读写操作,如果客户端以正确的方式传送了ID的即可读取到,然后调用AUserService的findById方法来查询id对应的User对象,最后通过DataOutputStream的writeInt和writeUTF以指定的方式分别向输出流中写入获取到的id和name。

    private static void send(Socket client) throws Exception {
        //获取Socket的输入流,用Data流包装读取二进制数据
        DataInputStream dis = new DataInputStream(client.getInputStream());
        //获取Socket的输出流,用Data流包装写出二进制数据
        DataOutputStream dos = new DataOutputStream(client.getOutputStream());

        //读取Id
        int id = dis.readInt();
        System.out.println("接收到ID为:"+id);
        AUserService service= new AUserServiceImpl();
        User user = service.findById(id);
        //写出数据
        System.out.println("返回的对象为:"+user);
        dos.writeInt(user.getId());
        dos.writeUTF(user.getName());
        dos.flush();
        dis.close();
        dos.close();
    }

服务端实现了我们再来实现以下客户端。服务端监听了8888端口我们是不是就需要新建一个Socket来连接8888端口,同样使用getOutputStream来获取一个输出流,再用DataOutputStream向流中写指定格式数据(二进制),最后来使用getInputStream获取输入流得到数据n。得到数据之后new一个新对象拿来用。

public class Client {
    public static void main(String[] args) throws Exception {
        Socket s = new Socket("127.0.0.1",8888);
        //使用DataOutputStream包装以二进制写入
        DataOutputStream dos = new DataOutputStream(s.getOutputStream());
        dos.writeInt(8888);
        DataInputStream dis = new DataInputStream(s.getInputStream());
        int id = dis.readInt();
        String name = dis.readUTF();
        User user = new User(id,name);
        System.out.println(user);
    }
}

现在我们启动服务端,然后启动客户端后,服务端和客户端就会收到以下数据
RPC框架从0到10
RPC框架从0到10
以上这种是最简单最原始的方式,这种方式非常的不灵活,这种方式只能传输单一的对象,对于传输的对象必须了解才能传的过去拿的过来,如果在代码量大的情况下需要对某个或者多个对象加属性,那么我们的就需要大量整 改,尤其是传输过程和业务逻辑代码全部混合在一起的时候,所以我们就需要一种写起来更加爽的方式。RPC02应运而生。

RPC02

假如我现在只是个业务开发,对网络这块儿不太熟,现在我想告诉自己,能不能给个简单的方法,让我直接访问接口编写逻辑代码就行了。所以这里我们就需要对网络这块儿进行一个简单的封装。这就是RPC的演进过程。
最简单的方法是将网络操作封装为代理类Stub,Stub屏蔽了关于网络的细节

package com.airsky.demo.rpc.rpc02;

import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;

public class Stub {
    public User findUserById(Integer id) throws Exception {
        Socket s = new Socket("127.0.0.1",8888);
        //使用DataOutputStream包装以二进制写入
        DataOutputStream dos = new DataOutputStream(s.getOutputStream());
        dos.writeInt(id);
        DataInputStream dis = new DataInputStream(s.getInputStream());
        int id1 = dis.readInt();
        String name = dis.readUTF();
        User user = new User(id1,name);
        return user;
    }
}

现在只要在new一个Stub出来,用Stub的findUserById方法就能拿到对象
Client中调用代理类

package com.airsky.demo.rpc.rpc02;

public class Client {
    public static void main(String[] args) throws Exception {
        Stub stub = new Stub();
        System.out.println(stub.findUserById(33));
    }
}

这就是第一步的演进,开发是一个不断螺旋递增的迭代过程,瀑布式的模型在很多场景下,尤其是互联网下早就被敏捷开发所替代了,敏捷一定要迭代。我们很多时候了解到的都是结果,直接接触到了最终的版本,所以我们根本理解不了中间的演进过程,也理解不了前人们为什么做出来这么多的改进,这就是我们不能理解RPC的原因。从历史演进学习技术,会理解得更透彻,知道前因后果,怎么问题的步骤与思路。

RPC03

到了这里大家可能会说,作为一个Stub,这非常的不完善,你只能代理一个方法,返回一个类,这也太弱了。那我们就一点点来演进。
如果说Stub能给我提供这样的一个接口,我们想使用Service的findUserById,而Stub的接口提供给我这个方法。通过Stub代理过的findUserById方法我们就能远程访问了。当我们调用findUserById的时候,他帮我们加进去了一些代码,这些代码就是网络服务,所以Stub这里实现了代理模式里面的动态代理,这里最难的。
Stub

package com.airsky.demo.rpc.rpc03;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
    public static IUserService getStub(){
        InvocationHandler h=new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket s = new Socket("127.0.0.1",8888);
                //使用ByteArrayOutputStream在内存中声明一个数组流
//        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                //使用DataOutputStream包装以二进制写入
                DataOutputStream dos = new DataOutputStream(s.getOutputStream());
                dos.writeInt(456);//这里写死了需要改进
//        s.getOutputStream().write(bos.toByteArray());
//
//        System.out.println(s.getInputStream().read());
                DataInputStream dis = new DataInputStream(s.getInputStream());
                int id1 = dis.readInt();
                String name = dis.readUTF();
                User user = new User(id1,name);

                dos.close();
                dis.close();
                s.close();
                return user;
            }
        };
        Object o= Proxy.newProxyInstance(IUserService.class.getClassLoader(),new Class[]{IUserService.class},h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return (IUserService)o;

    }
}

Client

package com.airsky.demo.rpc.rpc03;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

public class Client {
    public static void main(String[] args) {
        IUserService service=Stub.getStub();//返回一个动态生成的对象
        User user=service.findUserById(111);
        System.out.println(user);
    }
}

当我们在Client端调用findUserById的时候,首页一点要明白,service是一个具体的类,是我们动态产生的,当调用这个方法的时候,他本质上会调用动态代理,那么既然调用了动态代理,那么就要使用InvocationHandler调用处理器进行处理,从而调用我们在invoke方法中实现的内容。invoke的第一个参数声明了调用该方法的代理实例,第二参数指向了它里边的findUserById方法,第三个参数为传入的参数。这样我们就可以代理实现修改类方法。这样写的好处在于以后在这个接口里添加任何方法的时候都可以在这里进行处理。此时通过动态代理产生的是一个实现了UserService接口的新的类,通过打印o类名即可看出。
RPC框架从0到10
通过反射获取className看出这是动态产生的新类$Proxy0
RPC框架从0到10
我们通过invoke第二个参数告诉他这个是实现UserService接口。
RPC框架从0到10
RPC框架从0到10
通过反射获取interface看出这是实现了UserService接口
RPC框架从0到10
我们对这个类进行任何方法调用的时候,都是要经过InvocationHandler的处理,然后我在里面可以判断你调用的是哪个方法,我做相应的处理,调用findUserById我要联网,调用别的我要检查你的权限等等。

RPC04

上一个版本里的代码是有缺陷的,为什么说呢,可以看到在Stub的此处
RPC框架从0到10
不管我们调用哪个方法,都只能传456,如果我们想实现一个Save功能那不就完蛋了吗。所以我们需要改进这里为可控参数。那么怎么改进呢,无论什么方法,我都用一个通用的版本实现即可。既然我们要远程调用其他方法,那么就可以通过invoke方法的三个参数来获取被代理的的方法信息,然后将方法信息传输过去。
Stub

package com.airsky.demo.rpc.rpc04;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
    public static IUserService getStub(){
        InvocationHandler h=new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket s = new Socket("127.0.0.1",8888);

                //使用ObjectOutputStream将对象数据序列化传输过去。
                ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
                String methodName = method.getName();
                Class[] parametersTypes = method.getParameterTypes();
                oos.writeUTF(methodName);
                oos.writeObject(parametersTypes);
                oos.writeObject(args);
                oos.flush();

                DataInputStream dis = new DataInputStream(s.getInputStream());
                int id1 = dis.readInt();
                String name = dis.readUTF();
                User user = new User(id1,name);

                oos.close();
                dis.close();
                s.close();
                return user;
            }
        };
        Object o= Proxy.newProxyInstance(IUserService.class.getClassLoader(),new Class[]{IUserService.class},h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return (IUserService)o;

    }
}

下面来假设一下,假如我们要调用saveUser方法,我们通过getName获取了方法名,通过getParamterTypes获取了参数类型,args获取到了参数,那么我们把这些数据通过ObjectOutputStream流告诉服务端,马上给我调这个方法,参数类型数据都给你。服务端马不停蹄的给我找到了这个方法,并且传入参数得到了返回值。
既然客户端做了这么大的改变,那么服务端是不是也要修改呀?是滴,服务端同样使用ObjectInputStream读入对象信息,首先按照传输的顺序使用readUTF读取方法名,然后使用readObject读取参数类型,返回类型为Class数组,接着使用readObject读取方法参数args,返回类型为Object,接下来先实例化需要调用方法的类,然后通过反射找到对应方法名和参数类型的方法,最后通过invoke进行反射调用,args为参数值,调用完成之后即可得到该方法的返回值得到一个User对象。当然如果我们想要实现一个saveUser方法也同样可以使用这种方式进行调用,只需要改变一下Client的方法名,其它的不需要进行改动。非常方便。
RPC框架从0到10

package com.airsky.demo.rpc.rpc04;

import com.airsky.demo.rpc.Common.UserService;
import com.airsky.demo.rpc.Common.UserServiceImpl;
import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ObjectInputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    //加上条件变量,不然s.close()不可达
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
        ServerSocket s=new ServerSocket(8888);
        while (running){
            System.out.println("接收连接中...");
            Socket client=s.accept();
            System.out.println("连接主机:" + client.getInetAddress());
            send(client);
            client.close();
        }
        s.close();
    }

    private static void send(Socket client) throws Exception {

        ObjectInputStream ois= new ObjectInputStream(client.getInputStream());
        //获取Socket的输出流,用Data流包装写出二进制数据
        DataOutputStream dos = new DataOutputStream(client.getOutputStream());


        String methodName = ois.readUTF();
        Class[] parameterTypes= (Class[]) ois.readObject();//读取类型
        Object[] args= (Object[]) ois.readObject();//读取Id
        System.out.println("接收到ID为:"+args[0]);
        UserService service=new com.airsky.demo.rpc.rpc04.UserServiceImpl();
        Method method = service.getClass().getMethod(methodName,parameterTypes);
        User user = (User) method.invoke(service,args);
        //写出数据
        System.out.println("返回的对象为:"+user);
//        client.getOutputStream().write(user.getName().getBytes());
        dos.writeInt(user.getId());
        dos.writeUTF(user.getName());
        dos.flush();
        ois.close();
        dos.close();
//
//
//        System.out.println(client.getInputStream().read());
    }
}

所以在这个版本里面我可以提供很多方法的支持,对于同一个接口里面很多方法支持。

RPC05

但是如果我们想要对随意接口的任意方法进行调用,那么上面的写法就无法满足我们了。还有我们的Client端对于返回值的处理还不完善,现在是将user对象拆解来进行传输的。如果我们的User实现改变了,那么所有的代码也需要进行改变,这肯定不行。所以在Server发送和Client接收对象时我们均使用ObjectOutputStream来直接传输user对象的序列化数据。这样的话我们的User就可以随意改变了。
Server

package com.airsky.demo.rpc.rpc05;

import com.airsky.demo.rpc.Common.User;
import com.airsky.demo.rpc.Common.UserServiceImpl;

import java.io.DataOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    //加上条件变量,不然s.close()不可达
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
        ServerSocket s=new ServerSocket(8888);
        while (running){
            System.out.println("接收连接中...");
            Socket client=s.accept();
            System.out.println("连接主机:" + client.getInetAddress());
            send(client);
            client.close();
        }
        s.close();
    }

    private static void send(Socket client) throws Exception {

        ObjectInputStream ois= new ObjectInputStream(client.getInputStream());

        ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());


        String methodName = ois.readUTF();
        Class[] parameterTypes= (Class[]) ois.readObject();//读取类型
        Object[] args= (Object[]) ois.readObject();//读取Id
        System.out.println("接收到ID为:"+args[0]);
        UserServiceImpl service=new UserServiceImpl();
        Method method = service.getClass().getMethod(methodName,parameterTypes);
        User user = (User) method.invoke(service,args);
        //使用ObjectOutputStream直接序列化写出对象
        System.out.println("返回的对象为:"+user);
        oos.writeObject(user);
        oos.flush();
        ois.close();
        oos.close();
    }
}

Stub

package com.airsky.demo.rpc.rpc05;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
    public static IUserService getStub(){
        InvocationHandler h=new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket s = new Socket("127.0.0.1",8888);

                //使用ObjectOutputStream将对象数据序列化传输过去。
                ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
                String methodName = method.getName();
                Class[] parametersTypes = method.getParameterTypes();
                oos.writeUTF(methodName);
                oos.writeObject(parametersTypes);
                oos.writeObject(args);
                oos.flush();

                //使用ObjectOutputStream将序列化数据转换为对象。
                ObjectInputStream ois = new ObjectInputStream(s.getInputStream());
                User user = (User) ois.readObject();
                oos.close();
                ois.close();
                s.close();
                return user;
            }
        };
        Object o= Proxy.newProxyInstance(IUserService.class.getClassLoader(),new Class[]{IUserService.class},h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return (IUserService)o;

    }
}

到了这里大家发现没有,我们的UserService可以随意添加方法,与此同时User还可以自由改变实现,我们的程序会变得更灵活,尤其是把Stub的内容对特别弱鸡级(我)的程序员屏蔽掉,告诉鸡你就调我的Stub方法就能进行远程调用了,里边怎么处理的你不用管。

RPC06

当然上个版本也不是很完美,我们还可以做进一步的加工。在上个版本中我们只能拿到一个UserService,那我们你不能通过getStub拿到任意类型的公开接口,可不可以?非常可以。我们都知道在Proxy的newProxyInstance方法中传入了接口的class对象,那么我们是不是可以在这个地方进行动态的传入,传入我们想要拿到的任意接口。
Stub

package com.airsky.demo.rpc.rpc06;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
    public static Object getStub(Class<?> clz){
        InvocationHandler h=new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket s = new Socket("127.0.0.1",8888);

                //使用ObjectOutputStream将对象数据序列化传输过去。
                ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
                String clazzName = clz.getName();
                String methodName = method.getName();
                Class[] parametersTypes = method.getParameterTypes();
                oos.writeUTF(clazzName);
                oos.writeUTF(methodName);
                oos.writeObject(parametersTypes);
                oos.writeObject(args);
                oos.flush();

                //使用ObjectOutputStream将序列化数据转换为对象。
                ObjectInputStream ois = new ObjectInputStream(s.getInputStream());
                Object o = ois.readObject();
                oos.close();
                ois.close();
                s.close();
                return o;
            }
        };
        Object o= Proxy.newProxyInstance(clz.getClassLoader(),new Class[]{clz},h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return o;

    }
}

可以看到我们给getStub新添加了一个class参数clz,而将这个参数传入了newProxyInstance,并且我们的getStub返回的是一个Object,这样你给我一个class类型我就可以返回给你一个实现了这个class类型的动态代理的那个类对象。在invoke中我们首先通过clz.getName()获取class的名字,因为我们需要调用的是任意接口,所以我们得先把接口名字写出去让服务端去进行调用。然后往外写的时候先写需要调用的接口名称,在写需要调用的接口方法,最后写参数类型和参数。我们再来看看Server端需要怎么改动。
Server

package com.airsky.demo.rpc.rpc06;

import com.airsky.demo.rpc.Common.ProductServiceImpl;
import com.airsky.demo.rpc.Common.User;
import com.airsky.demo.rpc.Common.UserService;
import com.airsky.demo.rpc.Common.UserServiceImpl;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    //加上条件变量,不然s.close()不可达
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
        ServerSocket s=new ServerSocket(8888);
        while (running){
            System.out.println("接收连接中...");
            Socket client=s.accept();
            System.out.println("连接主机:" + client.getInetAddress());
            send(client);
            client.close();
        }
        s.close();
    }

    private static void send(Socket client) throws Exception {

        ObjectInputStream ois= new ObjectInputStream(client.getInputStream());
        
        ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());


        String clazzName = ois.readUTF();
        String methodName = ois.readUTF();
        Class[] parameterTypes= (Class[]) ois.readObject();//读取类型
        Object[] args= (Object[]) ois.readObject();//读取Id
        System.out.println("接收到ID为:"+args[0]);
//去服务注册表找到具体的类,如果使用spring甚至还可以直接根据配置注入bean然后根据bean查找。
        Class clazz = ProductServiceImpl.class;
        Method method = clazz.getMethod(methodName,parameterTypes);
        Object o = method.invoke(clazz.newInstance(),args);
        //使用ObjectOutputStream直接序列化写出对象
        System.out.println("返回的对象为:"+o);
        oos.writeObject(o);
        oos.flush();
        ois.close();
        oos.close();
    }
}

Server端无非就是在读的时候先读到我们的className,然后就去服务注册表找到具体的实现类,最后找对应的方法,最后传入参数进行调用就行了。当然这个过程完全可以使用Spring来注入来查找具体的实现类。我们通过新增一个ProductService接口来测试代码是否完善。
Product

package com.airsky.demo.rpc.Common;

import java.io.Serializable;

public class Product implements Serializable {
    private static final long serialVersionUID = 1L;
    int id;
    String name;
    int count;

    public Product(int id, String name, int count) {
        this.id = id;
        this.name = name;
        this.count = count;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "Product{" +
                "id=" + id +
                ", name=" + name +
                ", count=" + count +
                '}';
    }
}

ProductService

package com.airsky.demo.rpc.Common;

public interface ProductService {
    Product findProductByName(String name);
}

ProductServiceImpl

package com.airsky.demo.rpc.Common;

public class ProductServiceImpl implements ProductService {
    @Override
    public Product findProductByName(String name) {
        return new Product(4,name,4);
    }
}

Client

package com.airsky.demo.rpc.rpc06;

import com.airsky.demo.rpc.Common.Product;
import com.airsky.demo.rpc.Common.ProductService;
public class Client {
    public static void main(String[] args) {
        ProductService service= (ProductService) Stub.getStub(ProductService.class);//返回一个动态生成的对象
        Product product=service.findProductByName("SuperMan");
        System.out.println(product);

    }
}

RPC框架从0到10
可以看到成功的进行了远程方法调用。
现在我们来梳理回顾一下全过程,我在Client端要对Server端的服务做调用,Server端服务提供了很多个,简单点叫服务1,服务2...,调用服务的时候我非常不爽,因为每次都要写网络细节。这时我就生成了一个代理类,这个代理类用动态生成的方式来生成网络细节,你只要告诉我这个工具类,我要服务1的方法1,我传了一个参数1,工具类就会生成一个代理类,将我们的方法调用信息传给服务器,服务器找到对应的实现类,对应的方法。将执行完成的结果返回。 所谓的RPC:Remote Procudure Call,远程的、方法的、调用。帮你屏蔽了底层。这里面的实现有好多种,所以这只是一种通信的方式,我可以通过屏蔽底层的方式(Stub),就跟我调用本机的方法似的。现在只是一个最基础的RPC实现,这里面有大量的可以改进的地方,一步一步的改进你就可以自己写一个RPC框架了。
RPC框架从0到10

RPC08

此时我们在底层的实现是序列化,转换为二进制,因为所有网络传输的都是二进制。我们现在用的序列化是jdk自带的Serializable,这个方式是最土的,因为它只支持java语言,而且效率低,长度长。所以对于RPC序列化上的实现就有好多好多可以替代的内容。
了解一下RPC的序列化框架。

  • java.io.Serializable
  • Hessian
  • google protobuf
  • fackbook Thrift
  • kyro
  • fst
  • json序列化框架
  • jackson
  • google Gson
  • FastJson
  • xmlrpc(xstream)
    Json框架就是先转换为JSON字符串格式,通过JSON框架再转换为二进制,XML就是转换为XML格式。
    以上只是RPC框架的序列化实现部分,完整的RPC框架还要提供服务注册、服务发现、服务治理、服务监控、服务的负载均衡各种各样的东西。
    这里我们就来了解一下Hessian序列化框架,序列化就是把一个对象转换成字节数组,反序列化就是反之。我们在HelloHessian中实现了一个方法serialize,作用是将一个对象转换成字节数组。怎么转换的呢?还是使用ByteArrayOutputStream创建字节数组输出流对象然后捕获内存缓冲区的数据(Object),转换成字节数组,在外面我们将原来嵌套的ObjectOutputStream换成了Hessian2Output,然后将对象数据写入了字节数组输出流中转换成了字节数组。
    HelloHessian
package com.airsky.demo.rpc.rpc08_Hessian01;

import com.airsky.demo.rpc.Common.User;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class HelloHessian {
    public static void main(String[] args) throws IOException {
        User a = new User(4,"AirSky");
        byte[] bytes = serialize(a);
        System.out.println(bytes.length);
        User a1 = (User)deserialize(bytes);
        System.out.println(a1);
    }

    private static byte[] serialize(User a) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Hessian2Output output = new Hessian2Output(baos);
        output.writeObject(a);
        output.flush();
        byte[] bytes = baos.toByteArray();
        baos.close();
        output.close();
        return bytes;

    }
    private static Object deserialize(byte[] bytes) throws IOException {
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        Hessian2Input input = new Hessian2Input(bais);
        Object o = input.readObject();
        bais.close();
        input.close();
        return o;
    }
}

这时你可能会说,你在搞笑吗?我直接用JDK自带的序列化不就行了,干嘛搞的这么麻烦。别急,跟着我来看看Hessian和JDK的对比。

package com.airsky.demo.rpc.rpc08_Hessian01;

import com.airsky.demo.rpc.Common.User;
import com.caucho.hessian.io.Hessian2Output;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

public class HessianVSJDK {
    public static void main(String[] args) throws IOException {
        User a = new User(1,"AirSky");

        long b =System.currentTimeMillis();
        byte[] HessianBytes = HessianSerialize(a);
        long c =System.currentTimeMillis();
        System.out.println("HessianLength:"+HessianBytes.length);
        System.out.println("HessianString:"+new String(HessianBytes));
        System.out.println("HessianTime:"+(c-b));

        System.out.println("------------------------------------------------------------------");

        long b1 =System.currentTimeMillis();
        byte[] JdkBytes = JdkSerialize(a);
        long c1 =System.currentTimeMillis();
        System.out.println("JdkLength:"+JdkBytes.length);
        System.out.println("JdkString:"+new String(JdkBytes));
        System.out.println("JdkTime:"+(c1-b1));
    }
    private static byte[] HessianSerialize(Object a) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Hessian2Output output = new Hessian2Output(baos);
        output.writeObject(a);
        output.flush();
        byte[] bytes = baos.toByteArray();
        baos.close();
        output.close();
        return bytes;

    }
    private static byte[] JdkSerialize(Object a) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(a);
        oos.flush();
        byte[] bytes = baos.toByteArray();
        baos.close();
        oos.close();
        return bytes;
    }
}

额外
我们在HessianVSJDK中实现了一个使用Hessian和ObjectOutputStream序列化的方法,通过比较这两个方法序列化User对象a之后的长度和字符串内容可以看出Hessian去掉了原生序列化中没有必要传输的大量字符。但通过序列化时间也可以看出java自身所带的方法明显比hessian自带的序列化效率更高。这里也有实验数据https://my.oschina.net/caomuquan/blog/378416。RPC中必须有的就是序列化,所以在RPC的序列化中产生了一系列的序列化框架,这些框架有什么区别呢?区别就是有的快、有的慢、有的大、有的小、有的效率高、有的效率低、有的用纯文本、有的用xml。
说到这里大家是不是明白了几个概念了,第一、RPC的基本概念,第二、RPC的序列化框架。

RPC10

RPC除了有序列化的一面还有什么呢?作为RPC中的Server来说,现在用的传输数据的方式是最基础的TCP/IP,但是我们传输数据的协议有好多种是不是?我也可以用HTTP来传输我的字符串或者二进制,所以RPC的概念除了序列化之外还有RPC的网络协议,这个你可以自己选。可以选HTTP,也可以选底层的TCP/IP,可以选WebService,甚至可以选mail协议。协议无所谓,无非就是通过这些协议把我们序列化好的数据发出去。不同的框架可能就使用的不同的协议,需要自己去选,去定义才行。

整理自马士兵老师:《36行代码透彻解析RPC》

发表评论

评论已关闭。

相关文章