RPC (Remote Procedure Call Protocol) – 远程过程协议调用 。通过 RPC 我们可以从网络上的计算机请求服务,而不需要了 解底层网络协议。 Hadoop 底层的交互都是通过 rpc 进行的。例 如: datanode 和 namenode 、 tasktracker和 jobtracker 、 secondary namenode 和 namenode 之间的通信都是通过 rpc 实 现的。
RPC 采用客户机 / 服务器模式。请求程序就是一个客户机, 而服务提供程序就是一个服务器。首先,客户机调用进程发送 一个有进程参数的调用信息到服务进程,然后等待应答信息。 在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答 复信息,然后等待下一个调用信息,最后, 客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
客户机对服务器的 RPC 调用步骤如下:
1. 调用客户端句柄;执行传送参数
2. 调用本地系统内核发送网络 消息
3. 消息传送到远程主机
4. 服务器句柄得到消息并取得参数
5. 执行远程过程
6. 执行的过程将结果返回服务器句柄
7. 服务器句柄返回结果,调用远程系统内核
8. 消息传回本地主机
9. 客户句柄由内核接收消息
10. 客户接收句柄返回的数据
服务端流程
• Listener线程监视RPC Client发送过来的数据。
• 当有数据可以接收时,调用Connection的readAndProcess方法。
• Connection边接收边对数据进行处理,如果接收到一个完整的Call包,则构建一个Call对象。PUSH到Call队列中,由Handler线程来处理Call队列中的所有Call。
• Handler线程监听Call队列,如果Call队列非空,按FIFO规则从Call队列取出Call。
• 将Call交给RPC.Server处理。
• 借助JDK提供的Method,完成对目标方法的调用,目标方法由具体的业务逻辑实现。
• 返回响应,Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出的数据,则交由Server.Responder来完成。

服务端主要代码结构:
代码结构 |
功能说明 |
Server.Listener |
RPC Server的监听者,用来接收RPC Client的连接请求和数据,其中数据封装成Call后PUSH到Call队列。 |
Server.Handler |
RPC Server的Call处理者,和Server.Listener通过Call队列交互。 |
Server.Responder |
RPC Server的响应者。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出的数据,交由Server.Responder来完成。 |
Server.Connection |
RPC Server数据接收者。提供接收数据,解析数据包的功能。 |
Server.Call |
持有客户端的Call信息。 |
客户端主要代码结构:
代码结构 |
功能说明 |
Client.ConnectionId |
到RPC Server对象连接的标识 |
Client.Call |
Call调用信息。 |
Client.ParallelResults |
Call响应。 |
RPC.Invoker |
对InvocationHandler的实现,提供invoke方法,实现RPC Client对RPC Server对象的调用。 |
RPC.Invocation |
用来序列化和反序列化RPC Client的调用信息。(主要应用JAVA的反射机制和InputStream/OutputStream) |
• 客户端发起的RPC调用是同步的,而服务端处理RPC调用是异步的。客户端调用线程以阻塞同步的方式发起RPC连接及RPC调用,将参数等信息发送给Listener,然后等待Connection接收响应返回。
• Listener负责接收RPC连接和RPC数据,当一个Call的数据接收完后,组装成Call,并将Call放入由Handler提供的Call队列中。
• Handler线程监听Call队列,如果Call队列不为空,则按FIFO方式取出Call,并转为实际调用,以非阻塞方式将响应发回给Connection,未发送完毕的响应交给Responder处理。
RPC实例
定义RPC协议
Hadoop中所有自定义RPC接口都需要继承VersionedProtocol接口,它描述了协议的版本信息;Hadoop RPC 严格要求客户端和服务器使用相同的RPC版本,否则会导致无法建立连接。
import java.lang.String;
import org.apache.hadoop.ipc.VersionedProtocol;
/* 这里不扩展VersionedProtocol 也是可以的 */
public interface RpcProtocol extends VersionedProtocol
{
public static final long versionID = 1L;
public String exec(String[] cmd);
}
RPC Server 构建服务
import java.lang.String;
import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.io.BufferedInputStream;
import java.io.InputStream;
public class RpcServer implements RpcProtocol
{
private String host;
private int port;
private RPC.Server server;
public RpcServer (String host, int port) throws IOException
{
this.host = host;
this.port = port;
/* 获取server实体 */
this.server = RPC.getServer (this, host, port, new Configuration ());
}
public void run () throws IOException
{
/* 运行 */
this.server.start ();
}
/* 实现 VersionedProtocol 接口 */
public long getProtocolVersion (String s, long v)
{
return versionID;
}
/* 实现 RpcProtocol.exec接口 */
public String exec (String[] cmd)
{
try {
Process process = Runtime.getRuntime ().exec (cmd);
process.waitFor();
return loadStream (process.getInputStream ())
+ loadStream (process.getErrorStream ());
} catch (Exception e) {
return e.getMessage ();
}
}
/* 用于实现具体的exec接口 */
private static String loadStream (InputStream stream) throws IOException
{
if (stream == null) {
throw new java.io.IOException ("null stream");
}
stream = new java.io.BufferedInputStream (stream);
int avail = stream.available ();
byte[]data = new byte[avail];
int numRead = 0;
int pos = 0;
do {
if (pos + avail > data.length) {
byte[]newData = new byte[pos + avail];
System.arraycopy (data, 0, newData, 0, pos);
data = newData;
}
numRead = stream.read (data, pos, avail);
if (numRead >= 0) {
pos += numRead;
}
avail = stream.available ();
} while (avail > 0 && numRead >= 0);
return new String (data, 0, pos, "UTF-8");
}
public static void main (String[]args) throws IOException
{
RpcServer ser = new RpcServer ("0.0.0.0", 666);
ser.run ();
}
}
Client 通过RPC.getProxy获取本地代理:
import java.lang.String;
import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.net.InetSocketAddress;
public class RpcClient
{
public static void main (String[] args) throws IOException
{
/* 服务器地址 */
InetSocketAddress addr = new InetSocketAddress ("localhost", 1600);
/* 通过RPC.getProxy 获取客户端代理类的实体 */
RpcProtocol proxy = (RpcProtocol) RPC.getProxy(RpcProtocol.class,
RpcProtocol.versionID, addr,
new Configuration ());
System.out.println (proxy.exec(args));
}
}

