Hadoop RPC研究

Hadoop RPC研究

Hadoop RPC主要由三大类组成:RPC,Client和Server,分别对应对外编程接口,客户端实现和服务器实现.

1.ipc.RPC类

RPC类实际上是对底层客户机-服务器网络模型的封装,以便为程序员提供一套更方便简洁的编程接口.主要包含两个方法1)getProxy() 2)getServer()

(1)获取Client代理

可以看到真正获取代理的方法为RPC依赖的ProtocolEngine.getProxy()方法,代码如下:


Proxy.newProxyInstance(1.类加载器 2.代理目标类 3.handler处理对象(依赖目标类))得到目标类的代理proxy.在proxy调用方法之前需要首先经过handler的invoke()方法.来看一下handler(Invoker):


Invoker类依赖Client,在调用代理类方法时会进入到invoke()方法内.过程如下:

|---线程Tracer

|---client.call(rpcKind,new Invocation(method,args),)

      |---Invocation是一个可序列化的类,包含了调用远程函数的信息:(1).函数名 (2).参数列表

|---value.get()获取结果


流程:

PRC发起getProxy()调用,获取client代理.跟踪代码-》RpcEngine的getProxy()方法

根据配置的到RpcEngine具体的(默认)实现类ProtobufRpcEngine,该类实现了InvocationHandler接口.当调用代理类的具体方法是会进入到ProtobufRpcEngine的invoke()方法.

此invoke()方法不同于本地动态代理中的invoke()方法,会初始化clientd对象并由client向server发送call()请求.

2.RPC.Client

Client 主要完成的功能是发送远程过程调用信息并接收执行结果。它涉及到的类关系如下图所示。Client 类对外提供了一类执行远程调用的接口,这些接口的名称一样,仅仅是 参数列表不同,比如其中一个的声明如下所示


执行流程:

执行入口Client.call()执行某个远程方法

1)创建一个Connection对象,并将远程方法调用信息封装成Call对象,放到Connection对象中的哈希表中

2)调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端

3)Server 端处理完 RPC 请求后,将结果通过网络返回给 Client 端,Client 端通过

receiveRpcResponse() 函数获取结果;

4)Client 检查结果处理状态(成功还是失败),并将对应 Call 对象从哈希表中删除



如图所示:

1.首先初始化RPC Client参数,然后调用call()方法:封装Call对象放入到Connection的HashTable中

Connection调用addCall()线程安全方法添加call类.

2.完成步骤1后调用sendReq的时候向线程池中添加请求线程,异步发送请求(发送请求过程线程安全)

发送过程通过RpcStreams进行IO操作,Server端收到Call进行解析处理并通过socket将结果返回给client端.

Connection内部流程详解


以下从细节上对该过程进行分析:

1)Client线程发起call()方法,创建call对象

最后call调用getRpcResponse()方法:synchronized(call)导致call线程阻塞

2) 创建/获取connection对象

      |--将call作为参数获取connection对象,如果此时连接已段开则抛出异常否则进入循环体.如果connection不存在则创建一下新的connection对象添加到(concurrentHashMap)connections中.

      |--然后调用addCall方法将call将该对象放入该connection的HashTable中,此操作为同步操作并且notify->connection对象wait()阻塞的线程.(synchronized+wait+notify)

3) 调用connection.setupIOstreams()方法初始化connection对象,并且启动该线程.

connection继承Thread因此入口是run方法

在run中主要做两件事:(1)while (waitForWork()) {//wait here foconnection

                                                          receiveRpcResponse();

                                                    }

                                    (2)markClosed将connection状态置为closed

run中的wait()方法调用,呼应了addCall()方法中的notify方法().确保往calls集合里添加call任务的时候能够触发run()中的waitForwork().

|--waitForWokr()

    |--wait(timeout)到此为止,进入阻塞状态等待被再次唤醒

|--receiveRpcResponse():接受服务端发送的响应,只有一个接收器因此不需要synchronized保证.

4)client调用connection的sendRpcRequest(call),提交异步执行任务到请求线程池.Connection类主要作为一个工具类使用,被其他线程调用主要用来向服务端发送rpc请求.线程安全的调用connection的sendRpcRequest()方法,确保发送的数据安全.

|--- void sendRpcRequest(call);向线程池中提交任务,提交的过程用sendRpcRequestLock对象锁保证线程的安全,该方法被其他线程所引用.

注意:sendRpcREquest()采用同步方式发送消息,不然消息之间交叉重叠无法读取.

5)提交任务之后,开始在线程池中单独执行发送请求任务.该任务向服务端发送请求信息,过程如下图:

6)在发送请求的过程中加了事务锁,在同一个输出管道中保证当前只有一个线程在使用避免造成数据

混淆.

7)当server端接收到返回的数据时,进入到connection->run(){receiveRpcResponse()}方法中.首先通过ipcStream.readResponse()读取返回响应内容,并且经过一些列的内容校验(header长度校验,status状态校验).当读取响应内容成功后call对象调用setRpcResponse(value)触发锁同步,使阻塞的client->call()方法继续进而获取结果.

setRpcResponse设置rpcResponse并调用callComplete()

8)callComplate->notify()唤醒client的call()方法.

3.RPC.Server

ipc.Server 的主要功能是接收来自客户端的 RPC 请求,经过调用相应的函 数获取结果后,返回给对应的客户端。为此,ipc.Server 被划分成 3 个阶段 :接收请求、处 理请求和返回结果.ipc.Server 采用了很多提高并发处理能力的技术,主要包括:线程池,事件驱动,Reactor设计模式等.这些技术均采用了 JDK 自带的库实现,这里重点分析它是如何利用 Reactor 设计模式提高整体性能的。

server端的入口为start方法:

主要启动server内步服务线程:Listener->Handler->Responder

(1)接受请求:Lister线程

该阶段主要任务是接收来自各个客户端的 RPC 请求,并将它们封装成固定的格式

(Call 类)放到一个共享队列(callQueue)中,以便进行后续处理。该阶段内部又分为建立 连接和接收请求两个子阶段,分别由 Listener 和 Reader 两种线程完成.

整个接收请求的流程如上图主要非为12步:

|--1.cient向serverSocketChannel发起请求.

|--2.Listener线程selector轮询监听连接事件进行处理.

|--3.接收连接请求事件调用doAccept()方法

|--4.调用connectionManager的register方法创建connection对象等

|--5.调用reader对象的addConnection将该对象放入pendingConnections(BlockingQueue<Connection>)集合中.同时唤醒readSelector

|--6.调用readSelector的wakeup方法

|--7.reader线程doRunLoop

|--8.从pendingConnections中take connection对象,将对象包含的channel注册到reader selector中

|--9.reader selector轮询获取可读事件

|--10.调用Listener的doRead方法,开始解析client发送的请求.

|--11.processOneRpc->processRpcRequest->internalQueueCall(call)将call添加到callQueue队列中

|--12.CallQueueManager implements BlockingQueue add call

(2)处理请求:Handler线程

从Handler线程类的入口run方法开始分析:

|--1.handler作为线程类,run函数启动


|--2.run方法中从callQueue队列中获取connection对象,然后调用rpcCall的run()方法

|--3.rpcCall对象调用server类全局方法call()获取服务端执行结果,将结果封装到call对象中

|--4.调用connection对象的sendResponse()方法最终调用responder的doRepond(call)方法

(3)返回结果:Responder线程

|--5.将call对象添加到关联的connection对象的队列中,同时将channel注册到writeSelector中.当 Handler 没能将结果一次性发送到客户端时,会向该 Selector 对象注册 SelectionKey.OP_WRITE 事件,进而由 Responder 线程采用异步方式继续 发送未发送完成的结果。

|--6.Responder线程类执行run方法进入doRunLoop()

|--7.writeSelector轮询监听写事件

|--8.异步写

提示:

*Server 端可同时存在多个 Handler 线程,它们并行从共享队列中读取 Call 对象,经执

行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用 返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时 Handler 将尝 试着将后续发送任务交给 Responder 线程。

提高性能优化方案

(1)线程池

在Listener线程类中包括Reader线程类,可以生成多个reader线程放入线程池中.整个server只有一个Listener,采用单线程selector方式运行统一监听来自客户端的连接.一旦有新的请 求到达,它会采用轮询的方式从线程池中选择一个 Reader 线程进行处理

(2)事件驱动

基于selector模式中的OP_ACCEPT,OP_WRITE等事件来触发selector进行处理.

(3)Reactor 设计模式等

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,138评论 1 32
  • 网络通信模块是分布式系统中最底层的模块,他直接支撑了上层分布式环境下复杂的进程间通信逻辑,是所有分布式系统的基础。...
    SmallBird_阅读 2,226评论 0 1
  • 摘自: https://my.oschina.net/hosee/blog/711632摘要: 本文主要说明RPC...
    holy_z阅读 1,151评论 0 7
  • 本文转自微信公众号:架构师之路 服务化有什么好处? 服务化的一个好处就是,不限定服务的提供方使用什么技术选型,能够...
    吃火龙果吐芝麻阅读 235评论 0 1
  • Hadoop RPC Hadoop作为分布式存储系统,为了实现各节点间的通信和交互,所以需要实现一套这样的机制.R...
    you_lys阅读 1,394评论 0 2