大数跨境
0
0

Hadoop RPC 工作机制及实例详解

Hadoop RPC 工作机制及实例详解 曼昂网络爬虫
2015-10-08
0
导读:RPC (Remote Procedure Call Protocol) – 远程过程协议调用 。通

RPC (Remote Procedure Call Protocol) 远程过程协议调用 。通过 RPC 我们可以从网络上的计算机请求服务,而不需要了 解底层网络协议。 Hadoop 底层的交互都是通过 rpc 进行的。例 如: datanode namenode tasktracker jobtracker secondary namenode namenode 之间的通信都是通过 rpc 现的。

RPC 采用客户机 / 服务器模式。请求程序就是一个客户机, 而服务提供程序就是一个服务器。首先,客户机调用进程发送 一个有进程参数的调用信息到服务进程,然后等待应答信息。 在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答 复信息,然后等待下一个调用信息,最后, 客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

客户机对服务器的 RPC 调用步骤如下:

1. 调用客户端句柄;执行传送参数

2. 调用本地系统内核发送网络 消息

3. 消息传送到远程主机

4. 服务器句柄得到消息并取得参数

5. 执行远程过程

6. 执行的过程将结果返回服务器句柄

7. 服务器句柄返回结果,调用远程系统内核

8. 消息传回本地主机

9. 客户句柄由内核接收消息

10. 客户接收句柄返回的数据


服务端流程

• Listener线程监视RPC Client发送过来的数据。

• 当有数据可以接收时,调用Connection的readAndProcess方法。

• Connection边接收边对数据进行处理,如果接收到一个完整的Call包,则构建一个Call对象。PUSH到Call队列中,由Handler线程来处理Call队列中的所有Call。

• Handler线程监听Call队列,如果Call队列非空,按FIFO规则从Call队列取出Call。

• 将Call交给RPC.Server处理。

• 借助JDK提供的Method,完成对目标方法的调用,目标方法由具体的业务逻辑实现。

• 返回响应,Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出的数据,则交由Server.Responder来完成。



服务端主要代码结构:

代码结构

功能说明

Server.Listener

RPC Server的监听者,用来接收RPC Client的连接请求和数据,其中数据封装成Call后PUSH到Call队列。

Server.Handler

RPC Server的Call处理者,和Server.Listener通过Call队列交互。

Server.Responder

RPC Server的响应者。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出的数据,交由Server.Responder来完成。

Server.Connection

RPC Server数据接收者。提供接收数据,解析数据包的功能。

Server.Call

持有客户端的Call信息。

客户端主要代码结构:

代码结构

功能说明

Client.ConnectionId

到RPC Server对象连接的标识

Client.Call

Call调用信息。

Client.ParallelResults

Call响应。

RPC.Invoker

对InvocationHandler的实现,提供invoke方法,实现RPC Client对RPC Server对象的调用。

RPC.Invocation

用来序列化和反序列化RPC Client的调用信息。(主要应用JAVA的反射机制和InputStream/OutputStream)

• 客户端发起的RPC调用是同步的,而服务端处理RPC调用是异步的。客户端调用线程以阻塞同步的方式发起RPC连接及RPC调用,将参数等信息发送给Listener,然后等待Connection接收响应返回。

• Listener负责接收RPC连接和RPC数据,当一个Call的数据接收完后,组装成Call,并将Call放入由Handler提供的Call队列中。

• Handler线程监听Call队列,如果Call队列不为空,则按FIFO方式取出Call,并转为实际调用,以非阻塞方式将响应发回给Connection,未发送完毕的响应交给Responder处理。


RPC实例


定义RPC协议

Hadoop中所有自定义RPC接口都需要继承VersionedProtocol接口,它描述了协议的版本信息;Hadoop RPC 严格要求客户端和服务器使用相同的RPC版本,否则会导致无法建立连接。

  1. import java.lang.String;

  2. import org.apache.hadoop.ipc.VersionedProtocol;

  3. /* 这里不扩展VersionedProtocol 也是可以的 */

  4. public interface RpcProtocol extends VersionedProtocol

  5. {

  6. public static final long versionID = 1L;

  7. public String exec(String[] cmd);

  8. }


RPC Server 构建服务

  1. import java.lang.String;

  2. import java.io.IOException;

  3. import org.apache.hadoop.ipc.RPC;

  4. import org.apache.hadoop.conf.Configuration;

  5. import java.io.BufferedInputStream;

  6. import java.io.InputStream;

  7. public class RpcServer implements RpcProtocol

  8. {

  9. private String host;

  10. private int port;

  11. private RPC.Server server;

  12. public RpcServer (String host, int port) throws IOException

  13. {

  14. this.host = host;

  15. this.port = port;

  16. /* 获取server实体 */

  17. this.server = RPC.getServer (this, host, port, new Configuration ());

  18. }

  19. public void run () throws IOException

  20. {

  21. /* 运行 */

  22. this.server.start ();

  23. }

  24. /* 实现 VersionedProtocol 接口 */

  25. public long getProtocolVersion (String s, long v)

  26. {

  27. return versionID;

  28. }

  29. /* 实现 RpcProtocol.exec接口 */

  30. public String exec (String[] cmd)

  31. {

  32. try {

  33. Process process = Runtime.getRuntime ().exec (cmd);

  34. process.waitFor();

  35. return loadStream (process.getInputStream ())

  36. + loadStream (process.getErrorStream ());

  37. } catch (Exception e) {

  38. return e.getMessage ();

  39. }

  40. }

  41. /* 用于实现具体的exec接口 */

  42. private static String loadStream (InputStream stream) throws IOException

  43. {

  44. if (stream == null) {

  45. throw new java.io.IOException ("null stream");

  46. }

  47. stream = new java.io.BufferedInputStream (stream);

  48. int avail = stream.available ();

  49. byte[]data = new byte[avail];

  50. int numRead = 0;

  51. int pos = 0;

  52. do {

  53. if (pos + avail > data.length) {

  54. byte[]newData = new byte[pos + avail];

  55. System.arraycopy (data, 0, newData, 0, pos);

  56. data = newData;

  57. }

  58. numRead = stream.read (data, pos, avail);

  59. if (numRead >= 0) {

  60. pos += numRead;

  61. }

  62. avail = stream.available ();

  63. } while (avail > 0 && numRead >= 0);

  64. return new String (data, 0, pos, "UTF-8");

  65. }

  66. public static void main (String[]args) throws IOException

  67. {

  68. RpcServer ser = new RpcServer ("0.0.0.0", 666);

  69. ser.run ();

  70. }

  71. }


Client 通过RPC.getProxy获取本地代理:

  1. import java.lang.String;

  2. import java.io.IOException;

  3. import org.apache.hadoop.ipc.RPC;

  4. import org.apache.hadoop.conf.Configuration;

  5. import java.net.InetSocketAddress;

  6. public class RpcClient

  7. {

  8. public static void main (String[] args) throws IOException

  9. {

  10. /* 服务器地址 */

  11. InetSocketAddress addr = new InetSocketAddress ("localhost", 1600);

  12. /* 通过RPC.getProxy 获取客户端代理类的实体 */

  13. RpcProtocol proxy = (RpcProtocol) RPC.getProxy(RpcProtocol.class,

  14. RpcProtocol.versionID, addr,

  15. new Configuration ());


  16. System.out.println (proxy.exec(args));

  17. }

  18. }



【声明】内容源于网络
0
0
曼昂网络爬虫
我们是程序员开发者联盟,利用业余时间提供网络爬虫软件定制、微站、H5、网站等各类软件开发服务,有意者请留言!
内容 96
粉丝 0
曼昂网络爬虫 我们是程序员开发者联盟,利用业余时间提供网络爬虫软件定制、微站、H5、网站等各类软件开发服务,有意者请留言!
总阅读128
粉丝0
内容96