本文共 14053 字,大约阅读时间需要 46 分钟。
随着分布式架构运用的越来越多,RPC框架成为了我们不得不掌握的知识,这里一步一步来手写一个简单的RPC框架,以博文作为记录及自我监督。
首先是技术选型,这边我选用的是当前比较流行的Netty+Zookeeper来实现,通过zookeeper的特性来实现服务注册与发现,通信则使用netty框架。
这里贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc
这里先来讲服务注册发现原理: 利用zookeeper的创建临时节点和watcher机制,可以做到在一个服务下注册多个服务器地址,并且在节点发生变动时通过watcher动态更新服务器列表,来达到在新增/修改/删除时自动注册发现/删除/更新服务器连接信息.这里说一点,zookeeper的增删改操作会交由leader去处理,所以这里不用担心并发问题.
zookeeper相关代码如下:
public class ZKClient { /** * 获取zookeeper连接 * * @param connectString * @param sessionTimeout * @return */ public ZooKeeper newConnection(String connectString, int sessionTimeout) { ZooKeeper zooKeeper = null; try { final CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected.equals(watchedEvent.getState())) { countDownLatch.countDown(); } } }); countDownLatch.await(); } catch (IOException e) { System.out.println("获取zookeeper连接失败:连接不上zookeeper" + e.getMessage()); } catch (InterruptedException e) { System.out.println("获取zookeeper连接失败:本地线程原因" + e.getMessage()); } System.out.println("zookeeper连接成功"); return zooKeeper; } /** * 创建临时节点 * * @param zk * @param appCode * @param data */ public void createEphemeralNode(ZooKeeper zk, String appCode, byte[] data) { try { initAppPath(zk, appCode); String path = zk.create(MessageFormat.format("{0}/{1}/", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("临时节点创建成功:" + path); } catch (Exception e) { System.out.println("创建临时节点失败:" + e.getMessage()); } } /** * 初始化appPath * * @param zk * @param appCode */ private void initAppPath(ZooKeeper zk, String appCode) { initRootPath(zk); try { if (zk.exists(MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), false) == null) { zk.create(MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.out.println("zookeeper创建跟节点失败" + e); } } /** * 初始化根节点 * * @param zk */ private void initRootPath(ZooKeeper zk) { try { if (zk.exists(CommonConstant.ZK_REGISTORY_ROOT_PATH, false) == null) { zk.create(CommonConstant.ZK_REGISTORY_ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.out.println("zookeeper创建跟节点失败" + e); } }}
接下来看provider相关类,提供服务:
/** ** 1.创建netty监听,等待客户端连接 * 2.连接zk,创建临时节点,记录服务器连接信息
* * @author baixiong * @version $Id: RPCProvider.java, v 0.1 2018年10月15日 17:42:00 baixiong Exp$ */public class RPCProvider { /** * netty客户端 */ private static NettyClient nettyClient = new NettyClient(); /** * zookeeper客户端 */ private static ZKClient zkClient = new ZKClient(); public void registry(String server, int port) { // 开启netty监听客户端连接 nettyClient.startServer(port); // 创建zk连接并创建临时节点 ZooKeeper zooKeeper = zkClient.newConnection(ProviderConstant.ZK_CONNECTION_STRING, ProviderConstant.ZK_SESSION_TIME_OUT); String serverIp = server + CommonConstant.COMMOA + port; zkClient.createEphemeralNode(zooKeeper, ProviderConstant.APP_CODE, serverIp.getBytes()); }}
代码中注释已经很完整了,文中用到的几个常量自己定就行,appCode作为服务的唯一标识,接下来看代码中用到的NettyClient代码
/** * 开启新的netty连接 * * @param port */ public void startServer(int port) { try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new RpcServerNettyHandler()); } }); bootstrap.bind(port).sync(); } catch (InterruptedException e) { System.out.println("netty创建服务端channel失败:" + e.getMessage()); } }
这里来看下RpcServerNettyHandler类,他继承自ChannelInboundHandlerAdapter,负责通信数据。
public class RpcServerNettyHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("服务端收到请求:" + msg); ctx.writeAndFlush("success"); }}
到这里provider相关代码就完了,接下来看consumer相关代码:
public class RPCConsumer { /** * url处理器 */ private UrlHolder urlHolder = new UrlHolder(); /** * netty客户端 */ private NettyClient nettyClient = new NettyClient(); /** * 远程调用 * * @param appCode * @param param * @return */ public String call(String appCode, String param) { try { // 从zookeeper获取服务地址 String serverIp = urlHolder.getUrl(appCode); if (serverIp == null) { System.out.println("远程调用错误:当前无服务提供者"); return "connect error"; } // 连接netty,请求并接收响应 RpcClientNettyHandler clientHandler = new RpcClientNettyHandler(); clientHandler.setParam(param); nettyClient.initClient(serverIp, clientHandler); String result = clientHandler.process(); System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverIp, param, result)); return result; } catch (Exception e) { System.out.println("远程服务调用失败:" + e); return "error"; } }}
这里来看一下UrlHolder类,它负责连接zookeeper,并获取其中服务地址列表,随机返回其中一条:
public class UrlHolder { /** * url列表 */ private ListurlList = new ArrayList (); /** * zk客户端 */ private ZKClient zkClient = new ZKClient(); /** * 获取URL * @param appCode * @return */ public String getUrl(String appCode) { // 初始化url if (urlList.size() == 0) { initUrlList(appCode); } // 随机返回一条,此处以后优化为负载均衡策略 if (urlList.size() > 0) { return urlList.get(new Random(urlList.size()).nextInt()); } else { System.out.println("目前没有服务提供者"); return null; } } /** * 初始化urlList * @param appCode */ private void initUrlList(final String appCode) { try { // 获取zookeeper连接 ZooKeeper zk = zkClient.newConnection(ConsumerConstant.ZK_CONNECTION_STRING, ConsumerConstant.ZK_SESSION_TIME_OUT); // 获取目录下所有子节点 List urlNodeList = zk.getChildren( MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), new Watcher() { public void process(WatchedEvent watchedEvent) { initUrlList(appCode); } }); if (CollectionUtils.isEmpty(urlNodeList)) { return; } // 从子节点数据中解析出所有url List urlList = new ArrayList (); for (String path : urlNodeList) { byte[] url = zk.getData(path, new Watcher() { public void process(WatchedEvent watchedEvent) { initUrlList(appCode); } }, null); if (url != null) { urlList.add(new String(url)); } } this.urlList = urlList; } catch (Exception e) { System.out.println("初始化url异常" + e.getMessage()); } }}public class UrlHolder { /** * url列表 */ private List urlList = new ArrayList (); /** * zk客户端 */ private ZKClient zkClient = new ZKClient(); /** * 获取URL * * @param appCode * @return */ public String getUrl(String appCode) { // 初始化url if (urlList.size() == 0) { initUrlList(appCode); } // 随机返回一条,此处以后优化为负载均衡策略 if (urlList.size() > 0) { return urlList.get(new Random().nextInt(urlList.size())); } else { System.out.println("目前没有服务提供者"); return null; } } /** * 初始化urlList * * @param appCode */ private void initUrlList(final String appCode) { try { // 获取zookeeper连接 ZooKeeper zk = zkClient.newConnection(ConsumerConstant.ZK_CONNECTION_STRING, ConsumerConstant.ZK_SESSION_TIME_OUT); // 获取目录下所有子节点 String appPath = MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode); List urlNodeList = zk.getChildren(appPath , new Watcher() { public void process(WatchedEvent watchedEvent) { initUrlList(appCode); } }); if (CollectionUtils.isEmpty(urlNodeList)) { return; } // 从子节点数据中解析出所有url List urlList = new ArrayList (); for (String path : urlNodeList) { byte[] url = zk.getData(appPath + "/" + path, new Watcher() { public void process(WatchedEvent watchedEvent) { initUrlList(appCode); } }, null); if (url != null) { urlList.add(new String(url)); } } this.urlList = urlList; } catch (Exception e) { System.out.println("初始化url异常" + e.getMessage()); } }}
接下来是netty的initClient方法:
/** * 开始一个新的客户端连接 * * @param server */ public void initClient(String server, final RpcClientNettyHandler clientHandler) { try { String[] urlArray = server.split(CommonConstant.COMMOA); String ip = urlArray[0]; int port = Integer.parseInt(urlArray[1]); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(clientHandler); } }); bootstrap.connect(ip, port).sync(); } catch (InterruptedException e) { System.out.println("netty创建客户端channel失败:" + e); } }
然后是这里我们自己主要写的一个通信处理类RpcClientNettyHandler,这个类负责远程调用和处理返回的消息:
public interface MethodProcessor { String process() throws Exception;}public class RpcClientNettyHandler extends ChannelInboundHandlerAdapter implements MethodProcessor { private ChannelHandlerContext context; private CountDownLatch contextCountDownLatch = new CountDownLatch(1); private CountDownLatch countDownLatch = new CountDownLatch(1); /** * 入参 */ private String param; /** * 响应 */ private String response; /** * 连上时触发 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.context = ctx; contextCountDownLatch.countDown(); } /** * 获取服务器返回信息 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response = msg.toString(); countDownLatch.countDown(); } /** * 远程调用并返回结果 * @return * @throws InterruptedException */ public String process() throws InterruptedException { contextCountDownLatch.await(); context.writeAndFlush(param); countDownLatch.await(); return response; } public void setParam(String param) { this.param = param; }}
到这里代码实现就写完了,下面是测试:
// 服务端 public static void main(String[] args) throws InterruptedException { RPCProvider provider = new RPCProvider(); provider.registry("127.0.0.1", 8091); provider.registry("127.0.0.1", 8092); provider.registry("127.0.0.1", 8093); provider.registry("127.0.0.1", 8094); provider.registry("127.0.0.1", 8095); Thread.sleep(Long.MAX_VALUE); } // 客户端 public static void main(String[] args) throws InterruptedException { RPCConsumer consumer = new RPCConsumer(); int i = 0; while (true) { consumer.call(ConsumerConstant.APP_CODE, "aaa" + i++); Thread.sleep(2000L); } }
日志输出如下:
// 服务端日志zookeeper连接成功临时节点创建成功:/registry/100000/0000000001zookeeper连接成功临时节点创建成功:/registry/100000/0000000002zookeeper连接成功临时节点创建成功:/registry/100000/0000000003zookeeper连接成功临时节点创建成功:/registry/100000/0000000004zookeeper连接成功临时节点创建成功:/registry/100000/0000000005服务端收到请求:aaa0服务端收到请求:aaa1服务端收到请求:aaa2服务端收到请求:aaa3// 客户端日志zookeeper连接成功调用服务器:127.0.0.1,8091,请求参数:aaa0,响应参数:success调用服务器:127.0.0.1,8093,请求参数:aaa1,响应参数:success调用服务器:127.0.0.1,8092,请求参数:aaa2,响应参数:success调用服务器:127.0.0.1,8094,请求参数:aaa3,响应参数:success调用服务器:127.0.0.1,8091,请求参数:aaa4,响应参数:success调用服务器:127.0.0.1,8092,请求参数:aaa5,响应参数:success
这边可以修改一下服务端里面注册服务的端口,然后再启多个服务,会发现服务启动起来之后,客户端会自动随机分配到新启动的服务请求,并成功返回。再关掉几个端口的服务,会发现不会在请求那几个端口,这就是zookeeper的watcher机制实现的功能。
到这里就实现了RPC框架的服务注册发现+基本通信功能.
这里可能跟大家平时用的RPC框架差别还挺大:我怎么把我自己的Service注册到provider里,然后客户端通过接口来调用呢?.
关于这个问题,下一篇见!
转载地址:http://ahsni.baihongyu.com/