• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

RPC框架的简单实现

RPC winrains 来源:说出你的愿望吧丷 1年前 (2019-10-11) 47次浏览

内容一:RPC的流程和任务

1. RPC的流程

其实这个在上一篇的2 – ① 也已经提到过了,如果忘了,没关系,我再复制过来
stub:分布式计算中的存根是一段代码,它转换在远程过程调用期间Clientserver之间传递的参数

1.客户端处理过程中调用client stub(就像调用本地方法一样),传入参数
2.Client stub将参数编组为消息,然后通过系统调用向服务端发送消息
3.客户端本地操作系统将消息从客户端机器发送到服务端机器
4.服务端操作系统将接收到的数据包传递给client stub
5.server stub解组消息为参数
6.server stub再调用服务端的过程,过程执行结果以反方向的相同步骤响应给客户端

2. 从使用者的角度开始分析

1.定义过程接口
2.服务端实现接口的整个过程
3.客户端使用生成的stub代理对象

内容二:RPC框架的设计及实现

1. 准备一个Student的实体类及基础接口

客户端生成过程接口的代理对象,通过设计一个客户端代理工厂,使用JDK动态代理即可生成接口的代理对象

① 定义一个StudentService接口

Student类有三个属性name(String)age(int),sex(String),节省篇幅就不贴代码了,提供gettersettertoString方法即可

public interface StudentService {
  /**
   * 获取信息
   * @return
   */
  public Student getInfo();
  //打印student的信息并返回一个boolean值
  public boolean printInfo(Student student);
}

并且提供一个简单的实现,其实就是打印一个Student的信息出来而已

@Service(StudentService.class)
public class StudentServiceImpl implements StudentService {
  public Student getInfo() {
    Student person = new Student();
    	person.setAge(25);
    person.setName("说出你的愿望吧~");
    person.setSex("男");
    return person;
  }
  public boolean printInfo(Student person) {
    if (person != null) {
      System.out.println(person);
      return true;
    }
    return false;
  }
  public static void main(String[] args) {
    new Thread(()->{
      System.out.println("111");
    }).start();;
  }
}

2.客户端的搭建

① 从测试类去了解所需

首先,客户端通过我们的本地代理,获得我们的StudentService的代理类,此时我们客户端本地是肯定不存在StudentService的实现的,此时寻址我们是直接给出来了

public class ClientTest {
  @Test
  public void test() {
    // 本地没有接口实现,通过代理获得接口实现实例
    RpcClientProxy proxy = new RpcClientProxy("192.168.80.1", 9998);
    StudentService service = proxy.getProxy(StudentService.class);
    System.out.println(service.getInfo());
    Student student = new Student();
    student.setAge(23);
    student.setName("hashmap");
    student.setSex("男");
    System.out.println(service.printInfo(student));
  }
}

此时我们的关注点转到客户端是如何帮我们进行代理的

② 实现了InvocationHandler接口的RpcClientProxy

/**
 * RpcClientProxy
 * 客户端代理服务,客户端往服务端发起的调用将通过客户端代理来发起
 */
public class RpcClientProxy implements InvocationHandler{
  private String host;	// 服务端地址
  private int port;	// 服务端口号
  public RpcClientProxy(String host, int port){
    this.host = host;
    this.port = port;
  }
  /**
   * 生成业务接口的代理对象,代理对象做的事情,在invoke方法中。
   * @param clazz 代理类型(接口)
   * @return
   */
  @SuppressWarnings("unchecked")
  public <T> T getProxy(Class<T> clazz){
    // clazz 不是接口不能使用JDK动态代理
    return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{ clazz }, RpcClientProxy.this);
  }
  /**
   * 动态代理做的事情,接口的实现不在本地,在网络中的其他进程中,我们通过实现了Rpc客户端的对象来发起远程服务的调用。
   */
  public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
    // 调用前
    System.out.println("执行远程方法前,可以做些事情");
    // 调用远程服务,需要封装参数,类似于序列化的过程
    RpcRequest request = new RpcRequest();
    request.setClassName(method.getDeclaringClass().getName());
    request.setMethodName(method.getName());
    request.setParamTypes(method.getParameterTypes());
    request.setParams(params);
    // 链接服务器调用服务
    RpcClient client = new RpcClient();
    Object rst = client.start(request, host, port);
    // 调用后
    System.out.println("执行远程方法后,也可以做些事情");
    return rst;
  }
}

JDK提供了Proxy类来实现我们的动态代理,可以通过newProxyInstance(ClassLoader var0, Class<?>[] var1, InvocationHandler var2)方法来实例化一个代理对象,此时我们传入的参数clazz是规定必须为一个接口的,如果不是接口就不能使用JDK动态代理
而第三个参数RpcClientProxy.this则是newProxyInstance()方法虽然帮我们创建好了实例,但是创建实例完成后的具体动作必须由这个InvocationHandler来提供
InvocationHandler这个接口里面仅仅只有一个 Object invoke(Object var1, Method var2, Object[] var3) throws Throwable,这个方法的参数相信不难理解,第一个是代理对象,第二个是执行的方法,第三个是所需的参数集
回到我们刚刚的代码,在我执行System.out.println(service.getInfo())这条语句的时候,我们的逻辑就会跳到invoke()的实现中来,在invoke()方法的注释中也把过程很详细的说明了,首先我们需要调用远程服务了,进行一个参数的封装,之后就进行一个网络连接把这些参数发送给我们的服务端,此时我们需要用到RpcClient

③ RpcClient

start()方法中,我们的RpcRequest request是实现了Serializable接口的,所以此时封装好的数据会转换成一个二进制然后被flush()过去,此时我们消息已经发送了,需要等待服务端的响应,响应我们就需要通过我们的服务端ObjectOutputStream来接收一个输入流

/**
 * RpcClient
 * Rpc客户端,代表业务代码作为客户端,往远端服务发起请求。
 */
public class RpcClient {
  /**
   * 通过网络IO,打开远端服务连接,将请求数据写入网络中,并获得响应结果。
   *
   * @param request 将要发送的请求数据
   * @param host 远端服务域名或者ip地址
   * @param port 远端服务端口号
   * @return 服务端响应结果
   * @throws Throwable 抛出的异常
   */
  public Object start(RpcRequest request, String host, int port) throws Throwable{
    // 打开远端服务连接
    Socket server = new Socket(host, port);
    ObjectInputStream oin = null;
    ObjectOutputStream oout = null;
    try {
      // 1. 服务端输出流,写入请求数据,发送请求数据
      oout = new ObjectOutputStream(server.getOutputStream());
      oout.writeObject(request);
      oout.flush();
      // 2. 服务端输入流,获取返回数据,转换参数类型
      // 类似于反序列化的过程
      oin = new ObjectInputStream(server.getInputStream());
      Object res = oin.readObject();
      RpcResponse response = null;
      if(!(res instanceof RpcResponse)){
        throw new InvalidClassException("返回参数不正确,应当为:"+RpcResponse.class+" 类型");
      }else{
        response = (RpcResponse) res;
      }
      // 3. 返回服务端响应结果
      if(response.getError() != null){ // 服务器产生异常
        throw response.getError();
      }
      return response.getResult();
    }finally{
      try {	// 清理资源,关闭流
        if(oin != null) oin.close();
        if(oout != null) oout.close();
        if(server != null) server.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

④ 进行参数封装的RpcRequest

/**
 * RpcRequest
 * Rpc请求对象,请求远端服务服务的内容,在网络上进行传输。
 */
public class RpcRequest implements Serializable{
  // 需要请求的类名
  private String className;
  // 需求请求的方法名
    private String methodName;
    // 请求方法的参数类型
    private Class<?>[] paramTypes;
    // 请求的参数值
    private Object[] params;
  public String getClassName() {
    return className;
  }
  public void setClassName(String className) {
    this.className = className;
  }
  public String getMethodName() {
    return methodName;
  }
  public void setMethodName(String methodName) {
    this.methodName = methodName;
  }
  public Class<?>[] getParamTypes() {
    return paramTypes;
  }
  public void setParamTypes(Class<?>[] paramTypes) {
    this.paramTypes = paramTypes;
  }
  public Object[] getParams() {
    return params;
  }
  public void setParams(Object[] params) {
    this.params = params;
  }
}

⑤ Rpc服务端响应结果包装RpcResponse

同时也是实现了JDK默认的序列化Serializable

/**
 * RpcResponse
 * Rpc服务端响应结果包装类,在网络上进行传输。
 */
public class RpcResponse implements Serializable {
  // 可能抛出的异常
  private Throwable error;
  // 响应的内容或结果
        private Object result;
  public Throwable getError() {
    return error;
  }
  public void setError(Throwable error) {
    this.error = error;
  }
  public Object getResult() {
    return result;
  }
  public void setResult(Object result) {
    this.result = result;
  }
}

3.服务端的搭建

① 服务端的模拟ServerTest

public class ServerTest {
  @Test
  public void startServer() {
    RpcServer server = new RpcServer();
    server.start(9998, "rpc.simple.RpcServer");
  }
  public static void main(String[] args) {
  }
}

给到一个端口号,参数中带有一个包,功能是扫描某个包下的服务

② start()方法的实现

创建一个Map类型的集合services存放扫描到提供rpc服务的类,此时因为没有放在注册中心上所以就不存在寻址了。后面将会把它放入zookeeper的注册中心
getService()下,我们在ServerTest不是提供了一个包名吗,此时我们先去找到了它们所有的classes(请参考getClasses()方法),getClasses()中我们其实主要是先根据提供的包名往下找,要是目录都有问题的话就抛出异常,如果没问题,就开始遍历此目录下的所有文件,遍历出来的结果如果发现这个文件是class文件,就把其实例化,并且进行判断是否存在一个自定义注解@service,标注了这个注解的类就是RPC服务的实现类。如果存在这个注解,那就是我们需要找的rpc服务,就把它装到一个结果集classes中,如果目录下面仍然是目录,那就自己调用自己,直到看到class文件为止
当我们把所有的class都找到了,回到getService()方法下,就都集中放于一个classList中,然后把它们Map化,就是把接口的名称作为key,把实例作为value(services.put(cla.getAnnotation(Service.class).value().getName(), obj))
最后再回到start(),进行完服务扫描之后还会有一个RpcServerHandler来进行处理

/**
 * RpcServer
 * Rpc服务提供者
 */
public class RpcServer {
  /**
   * 启动指定的网络端口号服务,并监听端口上的请求数据。获得请求数据以后将请求信息委派给服务处理器,放入线程池中执行。
   * @param port 监听端口
   * @param clazz 服务类所在包名,多个用英文逗号隔开
   */
  public void start(int port, String clazz) {
    ServerSocket server = null;
    try {
      // 1. 创建服务端指定端口的socket连接
      server = new ServerSocket(port);
      // 2. 获取所有rpc服务类
      Map<String, Object> services = getService(clazz);
      // 3. 创建线程池
      Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
      while(true){
        // 4. 获取客户端连接
        Socket client = server.accept();
        // 5. 放入线程池中执行
        RpcServerHandler service = new RpcServerHandler(client, services);
        executor.execute(service);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }finally{
      //关闭监听
      if(server != null)
        try {
          server.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
    }
  }
  /**
   * 实例化所有rpc服务类,也可用于暴露服务信息到注册中心。
   * @param clazz 服务类所在包名,多个用英文逗号隔开
   * @return
   */
  public Map<String,Object> getService(String clazz){
    try {
      Map<String, Object> services = new HashMap<String, Object>();
      // 获取所有服务类
      String[] clazzes = clazz.split(",");
      List<Class<?>> classes = new ArrayList<Class<?>>();
      for(String cl : clazzes){
        List<Class<?>> classList = getClasses(cl);
        classes.addAll(classList);
      }
      // 循环实例化
      for(Class<?> cla:classes){
        Object obj = cla.newInstance();
        services.put(cla.getAnnotation(Service.class).value().getName(), obj);
      }
      return services;
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
  /**
   * 获取包下所有有@Sercive注解的类
   * @param pckgname
   * @return
   * @throws ClassNotFoundException
   */
  public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
    // 需要查找的结果
    List<Class<?>> classes = new ArrayList<Class<?>>();
    // 找到指定的包目录
    File directory = null;
    try {
      ClassLoader cld = Thread.currentThread().getContextClassLoader();
      if (cld == null)
        throw new ClassNotFoundException("无法获取到ClassLoader");
      String path = pckgname.replace('.', '/');
      URL resource = cld.getResource(path);
      if (resource == null)
        throw new ClassNotFoundException("没有这样的资源:" + path);
      directory = new File(resource.getFile());
    } catch (NullPointerException x) {
      throw new ClassNotFoundException(pckgname + " (" + directory + ") 不是一个有效的资源");
    }
    if (directory.exists()) {
      // 获取包目录下的所有文件
      String[] files = directory.list();
      File[] fileList = directory.listFiles();
      // 获取包目录下的所有文件
      for (int i = 0; fileList != null && i < fileList.length; i++) {
        File file = fileList[i];
        //判断是否是Class文件
        if (file.isFile() && file.getName().endsWith(".class")) {
          Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
          if(clazz.getAnnotation(Service.class) != null){
            classes.add(clazz);
          }
        }else if(file.isDirectory()){ //如果是目录,递归查找
          List<Class<?>> result = getClasses(pckgname+"."+file.getName());
          if(result != null && result.size() != 0){
            classes.addAll(result);
          }
        }
      }
    } else{
      throw new ClassNotFoundException(pckgname + "不是一个有效的包名");
    }
    return classes;
  }
}

③ 进行处理的RpcServerHandler

和刚刚的RpcClient非常类似,都是序列化和反序列化的过程,主要是第三步中获得了实例和方法及其参数后,再调用invoke()方法然后把结果放入response的过程

/**
 * RpcServerHandler
 * 服务端请求处理,处理来自网络IO的服务请求,并响应结果给网络IO。
 */
public class RpcServerHandler implements Runnable {
  // 客户端网络请求socket,可以从中获得网络请求信息
  private Socket clientSocket;
  // 服务端提供处理请求的类集合
  private Map<String, Object> serviceMap;
  /**
   * @param client 客户端socket
   * @param services 所有服务
   */
  public RpcServerHandler(Socket client, Map<String, Object> services) {
    this.clientSocket = client;
    this.serviceMap = services;
  }
  /**
   * 读取网络中客户端请求的信息,找到请求的方法,执行本地方法获得结果,写入网络IO输出中。
   *
   */
  public void run() {
    ObjectInputStream oin = null;
    ObjectOutputStream oout = null;
    RpcResponse response = new RpcResponse();
    try {
      // 1. 获取流以待操作
      oin = new ObjectInputStream(clientSocket.getInputStream());
      oout = new ObjectOutputStream(clientSocket.getOutputStream());
      // 2. 从网络IO输入流中请求数据,强转参数类型
      Object param = oin.readObject();
      RpcRequest  request = null;
      if(!(param instanceof RpcRequest)){
        response.setError(new Exception("参数错误"));
        oout.writeObject(response);
        oout.flush();
        return;
      }else{
        // 反序列化RpcRequest
        request = (RpcRequest) param;
      }
      // 3. 查找并执行服务方法
      Object service = serviceMap.get(request.getClassName());
      Class<?> clazz= service.getClass();
      Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
      Object result = method.invoke(service, request.getParams());
      // 4. 返回RPC响应,序列化RpcResponse
      response.setResult(result);
      // 序列化结果
      oout.writeObject(response);
      oout.flush();
      return;
    } catch (Exception e) {
      try {	//异常处理
        if(oout != null){
          response.setError(e);
          oout.writeObject(response);
          oout.flush();
        }
      } catch (Exception e1) {
        e1.printStackTrace();
      }
      return;
    }finally{
      try {	// 回收资源,关闭流
        if(oin != null) oin.close();
        if(oout != null) oout.close();
        if(clientSocket != null) clientSocket.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

4.运行结果

先开启ServerTest再开启ClientTest,简单快捷,注意别去右键跑main方法即可

内容三:优化客户端的举措

1.发现者的引入

设计客户端的时候,在ClientStubInvocationHandler中需要完成的两件事为编组消息和发送网络请求,而将请求的内容编组为消息这件事就交由客户端的stub代理,它除了消息协议和网络层的事务以外,可能还存在一个服务信息发现,此外消息协议可能也是会存在变化的,我们也需要去支持多种协议,这个其实是和框架对协议的支持广度有关的。比如dubbo相对于spring cloud而言对协议的支持就相对灵活一些
此时我们需要得知某服务用的是什么协议,所以我们需要引入一个服务发现者

2.协议层

我们想要做到支持多种协议,类该如何设计(面向接口,策略模式,组合)

此时我们的协议需要抽象出来,对于协议的内容需要进行编组和解组,比如我们上面提供的JSON和HTTP两种不同的实现,而此时客户端的存根里面就不仅仅只是需要服务发现者,还需要我们对于这个协议的支持

① 补充:如何从zookeeper中获取注册信息

主要看regist()方法,我们在注册的时候把服务信息进行了拼接,并创建成临时节点,父节点为持久节点。servicePath是类似于dubbo的一个目录结构,一个根目录/rpc+服务名称serviceName+service,获取服务的方法loadServiceResouces()也不难,根据这些地址获取它们下面的子节点,把所有的url加载出来给到调用者

public class RegistCenter {
  ZkClient client = new ZkClient("localhost:2181");
  private String centerRootPath = "/rpc";
  public RegistCenter() {
    client.setZkSerializer(new MyZkSerializer());
  }
  public void regist(ServiceResource serviceResource) {
    String serviceName = serviceResource.getServiceName();
    String uri = JsonMapper.toJsonString(serviceResource);
    try {
      uri = URLEncoder.encode(uri, "UTF-8");
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }
    String servicePath = centerRootPath + "/"+serviceName+"/service";
    if(! client.exists(servicePath)) {
      client.createPersistent(servicePath, true);
    }
    String uriPath = servicePath+"/"+uri;
    client.createEphemeral(uriPath);
  }
  /**
   * 加载配置中心中服务资源信息
   * @param serviceName
   * @return
   */
  public List<ServiceResource> loadServiceResouces(String serviceName) {
    String servicePath = centerRootPath + "/"+serviceName+"/service";
    List<String> children = client.getChildren(servicePath);
    List<ServiceResource> resources = new ArrayList<ServiceResource>();
    for(String ch : children) {
      try {
        String deCh = URLDecoder.decode(ch, "UTF-8");
        ServiceResource r = JsonMapper.fromJsonString(deCh, ServiceResource.class);
        resources.add(r);
      } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
      }
    }
    return resources;
  }
  private void sub(String serviceName, ChangeHandler handler) {
    /*
    String path = centerRootPath + "/"+serviceName+"/service";
    client.subscribeChildChanges(path, new IZkChildListener() {
      @Override
      public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
        handler();
      }
    });
    client.subscribeDataChanges(path, new IZkDataListener() {
      @Override
      public void handleDataDeleted(String dataPath) throws Exception {
        handler();
      }
      @Override
      public void handleDataChange(String dataPath, Object data) throws Exception {
        handler();
      }
    });
    */
  }
  interface ChangeHandler {
    /**
     * 发生变化后给一个完整的属性对象
     * @param resource
     */
    void itemChange(ServiceResource resource);
  }
}

② ClientStubProxyFactory

/**
 * ClientStubProxyFactory
  *   客户端存根代理工厂
 */
public class ClientStubProxyFactory {
  private ServiceInfoDiscoverer sid;
  private Map<String, MessageProtocol> supportMessageProtocols;
  private NetClient netClient;
  private Map<Class<?>, Object> objectCache = new HashMap<>();
  /**
   *
   *
   * @param <T>
   * @param interf
   * @return
   */
  @SuppressWarnings("unchecked")
  public <T> T getProxy(Class<T> interf) {
    T obj = (T) this.objectCache.get(interf);
    if (obj == null) {
      obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
          new ClientStubInvocationHandler(interf));
      this.objectCache.put(interf, obj);
    }
    return obj;
  }
  public ServiceInfoDiscoverer getSid() {
    return sid;
  }
  public void setSid(ServiceInfoDiscoverer sid) {
    this.sid = sid;
  }
  public Map<String, MessageProtocol> getSupportMessageProtocols() {
    return supportMessageProtocols;
  }
  public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
    this.supportMessageProtocols = supportMessageProtocols;
  }
  public NetClient getNetClient() {
    return netClient;
  }
  public void setNetClient(NetClient netClient) {
    this.netClient = netClient;
  }
  /**
   * ClientStubInvocationHandler
   * 客户端存根代理调用实现
   * @date 2019年4月12日 下午2:38:30
   */
  private class ClientStubInvocationHandler implements InvocationHandler {
    private Class<?> interf;
    public ClientStubInvocationHandler(Class<?> interf) {
      super();
      this.interf = interf;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      // 1、获得服务信息
      String serviceName = this.interf.getName();
      ServiceInfo sinfo = sid.getServiceInfo(serviceName);
      if (sinfo == null) {
        throw new Exception("远程服务不存在!");
      }
      // 2、构造request对象
      Request req = new Request();
      req.setServiceName(sinfo.getName());
      req.setMethod(method.getName());
      req.setPrameterTypes(method.getParameterTypes());
      req.setParameters(args);
      // 3、协议层编组
      // 获得该方法对应的协议
      MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
      // 编组请求
      byte[] data = protocol.marshallingRequest(req);
      // 4、调用网络层发送请求
      byte[] repData = netClient.sendRequest(data, sinfo);
      // 5解组响应消息
      Response rsp = protocol.unmarshallingResponse(repData);
      // 6、结果处理
      if (rsp.getException() != null) {
        throw rsp.getException();
      }
      return rsp.getReturnValue();
    }
  }
}

ClientStub中有两个引用,一个是服务发现接口ServiceInfoDiscoverer,作用为根据服务名获得远程服务信息,提供一个ServiceInfo getServiceInfo(String name)方法,还有就是对于不同协议的支持supportMessageProtocolsMessageProtocol我们也是定义了一个接口,这个接口就需要比较详细了,编码成二级制,和解码成Request等,对于response也是同样这么个过程

/**
 * 通信协议接口
 * MessageProtocol
 */
public interface MessageProtocol {
  /**
   * 编组请求消息
   * @param req
   * @return
   */
  byte[] marshallingRequest(Request req);
  /**
   * 解编组请求消息
   * @param data
   * @return
   */
  Request unmarshallingRequest(byte[] data);
  /**
   * 编组响应消息
   * @param rsp
   * @return
   */
  byte[] marshallingResponse(Response rsp);
  /**
   * 解编组响应消息
   * @param data
   * @return
   */
  Response unmarshallingResponse(byte[] data);
}

此时又存在一些问题,单纯依靠编组和解组的方法是不够的,编组和解组的操作对象是请求,响应,但是它们的内容是不同的,此时我们又需要定义框架标准的请求响应类

request有具体的服务名,服务方法,消息头,参数类型和参数,同样的response也有状态(通过枚举),消息头,返回值及类型以及是否存在异常。
此时协议层扩展为4个方法

将消息协议独立为一层,客户端和服务端都需要使用

3. 网络层

网络层的工作主要是发送请求和获得响应,此时我们如果需要发起网络请求必定先要知道服务地址,此时我们利用下图中serviceInfo对象作为必须依赖,setRequest()方法里面会存在发送数据,还有发送给谁,此时给出了BIO和Netty两种实现
所以我们需要的三个依赖就都出来了,一个是服务发现者,一个是协议支持,再然后就是我们网络层的NetClient

4. 总图

紫色代表客户端代理部分,浅绿色属于服务发现,浅蓝色属于协议部分

5.代码部分(可直接无视)

因为这些代码和主要的思路已经没有瓜葛了,只是一些功能代码,所以可以直接忽略了。如果实在是想自己跑一下,也可以问我要一个小样。

① 依旧是回到我们的ClientStubProxyFactory

可以和内容二的RpcClientProxy做一个对比,在原有的基础上加上了三个依赖ServiceInfoDiscoverersupportMessageProtocolsnetClient

ClientStubProxyFactory中对Object做了一个缓存,如果已经存在这个缓存就直接返回,没有的话加入到缓存中然后new出来,只是一个小小的不同。

② invoke()方法的改变

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  // 1、获得服务信息
  String serviceName = this.interf.getName();
  ServiceInfo sinfo = sid.getServiceInfo(serviceName);
  if (sinfo == null) {
    throw new Exception("远程服务不存在!");
  }
  // 2、构造request对象
  Request req = new Request();
  req.setServiceName(sinfo.getName());
  req.setMethod(method.getName());
  req.setPrameterTypes(method.getParameterTypes());
  req.setParameters(args);
  // 3、协议层编组
  // 获得该方法对应的协议
  MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
  // 编组请求
  byte[] data = protocol.marshallingRequest(req);
  // 4、调用网络层发送请求
  byte[] repData = netClient.sendRequest(data, sinfo);
  // 5、解组响应消息
  Response rsp = protocol.unmarshallingResponse(repData);
  // 6、结果处理
  if (rsp.getException() != null) {
    throw rsp.getException();
  }
  return rsp.getReturnValue();
}

首先是服务发现,在我们执行 ① 中提到的getProxy()方法时,此时代理的接口已经直接告诉我们了,所以我们就直接获得了接口信息interf,然后调用getName()方法获取接口的名称,通过接口名,调用服务发现者ServiceInfo提供的getServiceInfo()方法就能获取服务的具体信息,然后放入请求参数request里面,接下来给request的各个属性赋值
之后我们就开始寻找这个服务所对应的协议,获得协议之后可以获取协议支持对象,之后进行编组请求,转换成二进制,通过netClient发送过去,顺带连同服务端信息给出去。获取结果repData进行解组(二进制回到response),之后进行结果处理。

③ 服务发现者的实现

之前也提到了,服务发现者ServiceInfoDiscoverer是作为一个接口提供了getServiceInfo()方法的

有两种不同的实现,本地实现我们可以自己搞一个配置文件加载进来,把相关的服务信息弄进去得了
zookeeper的服务发现实现如下,类似于我们一开始在2 – ① 中补充的zookeeper的内容

public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer {
  ZkClient client = new ZkClient("localhost:2181");
  private String centerRootPath = "/rpc";
  public ZookeeperServiceInfoDiscoverer() {
    client.setZkSerializer(new MyZkSerializer());
  }
  public void regist(ServiceInfo serviceResource) {
    String serviceName = serviceResource.getName();
    String uri = JSON.toJSONString(serviceResource);
    try {
      uri = URLEncoder.encode(uri, "UTF-8");
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }
    String servicePath = centerRootPath + "/"+serviceName+"/service";
    if(! client.exists(servicePath)) {
      client.createPersistent(servicePath, true);
    }
    String uriPath = servicePath+"/"+uri;
    client.createEphemeral(uriPath);
  }
  /**
   * 加载配置中心中服务资源信息
   * @param serviceName
   * @return
   */
  public List<ServiceInfo> loadServiceResouces(String serviceName) {
    String servicePath = centerRootPath + "/"+serviceName+"/service";
    List<String> children = client.getChildren(servicePath);
    List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
    for(String ch : children) {
      try {
        String deCh = URLDecoder.decode(ch, "UTF-8");
        ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
        resources.add(r);
      } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
      }
    }
    return resources;
  }
  @Override
  public ServiceInfo getServiceInfo(String name) {
    List<ServiceInfo> list = loadServiceResouces(name);
    ServiceInfo info = list.get(0);
    list.forEach((e)->{
      if(e != info) {
        info.addAddress(e.getAddress().get(0));
      }
    });
    return info;
  }
}

④ 协议支持相关

这里只实现了JSON的,通过fastJSON来实现

public class JSONMessageProtocol implements MessageProtocol {
  @Override
  public byte[] marshallingRequest(Request req) {
    Request temp = new Request();
    temp.setServiceName(req.getServiceName());
    temp.setMethod(req.getMethod());
    temp.setHeaders(req.getHeaders());
    temp.setPrameterTypes(req.getPrameterTypes());
    if (req.getParameters() != null) {
      Object[] params = req.getParameters();
      Object[] serizeParmas = new Object[params.length];
      for (int i = 0; i < params.length; i++) {
        serizeParmas[i] = JSON.toJSONString(params[i]);
      }
      temp.setParameters(serizeParmas);
    }
    return JSON.toJSONBytes(temp);
  }
  @Override
  public Request unmarshallingRequest(byte[] data) {
    Request req = JSON.parseObject(data, Request.class);
    if(req.getParameters() != null) {
      Object[] serizeParmas = req.getParameters();
      Object[] params = new Object[serizeParmas.length];
      for(int i = 0; i < serizeParmas.length; i++) {
        Object param = JSON.parseObject(serizeParmas[i].toString(), Object.class);
        params[i] = param;
      }
      req.setParameters(params);
    }
    return req;
  }
  @Override
  public byte[] marshallingResponse(Response rsp) {
    Response resp = new Response();
    resp.setHeaders(rsp.getHeaders());
    resp.setException(rsp.getException());
    resp.setReturnValue(rsp.getReturnValue());
    resp.setStatus(rsp.getStatus());
    return JSON.toJSONBytes(resp);
  }
  @Override
  public Response unmarshallingResponse(byte[] data) {
    return JSON.parseObject(data, Response.class);
  }
}

⑤ NetClient相关

分为BIO和Netty两种模式,netty中使用了EventLoopGroup

BIO:
public class BioNetClient implements NetClient {
  @Override
  public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
    List<String> addressList = sinfo.getAddress();
    int randNum = new Random().nextInt(addressList.size());
    String address = addressList.get(randNum);
    String[] addInfoArray = address.split(":");
    try {
      return startSend(data, addInfoArray[0], Integer.valueOf(addInfoArray[1]));
    } catch (Throwable e) {
      e.printStackTrace();
    }
    return null;
  }
  /**
   * 通过网络IO,打开远端服务连接,将请求数据写入网络中,并获得响应结果。
   *
   * @param requestData 将要发送的请求数据
   * @param host 远端服务域名或者ip地址
   * @param port 远端服务端口号
   * @return 服务端响应结果
   * @throws Throwable 抛出的异常
   */
  private byte[] startSend(byte[] requestData, String host, int port) throws Throwable{
    // 打开远端服务连接
    Socket serverSocket = new Socket(host, port);
    InputStream in = null;
    OutputStream out = null;
    try {
      // 1. 服务端输出流,写入请求数据,发送请求数据
      out = serverSocket.getOutputStream();
      out.write(requestData);
      out.flush();
      // 2. 服务端输入流,获取返回数据,转换参数类型
      // 类似于反序列化的过程
      in = serverSocket.getInputStream();
      byte[] res = new byte[1024];
      int readLen = -1;
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      while((readLen = in.read(res)) > 0) {
        baos.write(res, 0, readLen);
      }
      return baos.toByteArray();
    }finally{
      try {	// 清理资源,关闭流
        if(in != null) in.close();
        if(out != null) out.close();
        if(serverSocket != null) serverSocket.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}
netty模式:
public class NettyNetClient implements NetClient {
  private SendHandler sendHandler;
  private Map<String, SendHandler> sendHandlerMap = new ConcurrentHashMap<String, SendHandler>();
  @Override
  public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
    try {
      List<String> addressList = sinfo.getAddress();
      int randNum = new Random().nextInt(addressList.size());
      String address = addressList.get(randNum);
      String[] addInfoArray = address.split(":");
      SendHandler handler = sendHandlerMap.get(address);
      if(handler == null) {
        sendHandler = new SendHandler(data);
        new Thread(()->{
          try {
            connect(addInfoArray[0], Integer.valueOf(addInfoArray[1]));
          } catch (NumberFormatException e) {
            e.printStackTrace();
          } catch (Exception e) {
            e.printStackTrace();
          }
        }).start();
      }
      byte[] respData = (byte[]) sendHandler.rspData();
      return respData;
    } catch (NumberFormatException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }
  public void connect(String host, int port) throws Exception {
        // 配置客户端
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            //EchoClientHandler handler = new EchoClientHandler();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(sendHandler);
                 }
             });
            // 启动客户端连接
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客户端连接关闭
            f.channel().closeFuture().sync();
        } finally {
            // 释放线程组资源
            group.shutdownGracefully();
        }
    }
}

⑥ 运行结果

可以自行模拟一个消费者和一个生产者进行测试,这里就不贴出来了

作者:说出你的愿望吧丷

来源:https://juejin.im/post/5d70d2dde51d4561d044cd31


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (1)