博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【RPC】一步一步实现基于netty+zookeeper的RPC框架(一)
阅读量:4079 次
发布时间:2019-05-25

本文共 14053 字,大约阅读时间需要 46 分钟。

随着分布式架构运用的越来越多,RPC框架成为了我们不得不掌握的知识,这里一步一步来手写一个简单的RPC框架,以博文作为记录及自我监督。

首先是技术选型,这边我选用的是当前比较流行的Netty+Zookeeper来实现,通过zookeeper的特性来实现服务注册与发现,通信则使用netty框架。

这里贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc

这里先来讲服务注册发现原理: 利用zookeeper的创建临时节点和watcher机制,可以做到在一个服务下注册多个服务器地址,并且在节点发生变动时通过watcher动态更新服务器列表,来达到在新增/修改/删除时自动注册发现/删除/更新服务器连接信息.这里说一点,zookeeper的增删改操作会交由leader去处理,所以这里不用担心并发问题.

zookeeper相关代码如下:

public class ZKClient {
/** * 获取zookeeper连接 * * @param connectString * @param sessionTimeout * @return */ public ZooKeeper newConnection(String connectString, int sessionTimeout) {
ZooKeeper zooKeeper = null; try {
final CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
if (Event.KeeperState.SyncConnected.equals(watchedEvent.getState())) {
countDownLatch.countDown(); } } }); countDownLatch.await(); } catch (IOException e) {
System.out.println("获取zookeeper连接失败:连接不上zookeeper" + e.getMessage()); } catch (InterruptedException e) {
System.out.println("获取zookeeper连接失败:本地线程原因" + e.getMessage()); } System.out.println("zookeeper连接成功"); return zooKeeper; } /** * 创建临时节点 * * @param zk * @param appCode * @param data */ public void createEphemeralNode(ZooKeeper zk, String appCode, byte[] data) {
try {
initAppPath(zk, appCode); String path = zk.create(MessageFormat.format("{0}/{1}/", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("临时节点创建成功:" + path); } catch (Exception e) {
System.out.println("创建临时节点失败:" + e.getMessage()); } } /** * 初始化appPath * * @param zk * @param appCode */ private void initAppPath(ZooKeeper zk, String appCode) {
initRootPath(zk); try {
if (zk.exists(MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), false) == null) {
zk.create(MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) {
System.out.println("zookeeper创建跟节点失败" + e); } } /** * 初始化根节点 * * @param zk */ private void initRootPath(ZooKeeper zk) {
try {
if (zk.exists(CommonConstant.ZK_REGISTORY_ROOT_PATH, false) == null) {
zk.create(CommonConstant.ZK_REGISTORY_ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) {
System.out.println("zookeeper创建跟节点失败" + e); } }}

接下来看provider相关类,提供服务:

/** * 

* 1.创建netty监听,等待客户端连接 * 2.连接zk,创建临时节点,记录服务器连接信息

* * @author baixiong * @version $Id: RPCProvider.java, v 0.1 2018年10月15日 17:42:00 baixiong Exp$ */public class RPCProvider {
/** * netty客户端 */ private static NettyClient nettyClient = new NettyClient(); /** * zookeeper客户端 */ private static ZKClient zkClient = new ZKClient(); public void registry(String server, int port) {
// 开启netty监听客户端连接 nettyClient.startServer(port); // 创建zk连接并创建临时节点 ZooKeeper zooKeeper = zkClient.newConnection(ProviderConstant.ZK_CONNECTION_STRING, ProviderConstant.ZK_SESSION_TIME_OUT); String serverIp = server + CommonConstant.COMMOA + port; zkClient.createEphemeralNode(zooKeeper, ProviderConstant.APP_CODE, serverIp.getBytes()); }}

代码中注释已经很完整了,文中用到的几个常量自己定就行,appCode作为服务的唯一标识,接下来看代码中用到的NettyClient代码

/**     * 开启新的netty连接     *     * @param port     */    public void startServer(int port) {
try {
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer
() {
@Override protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new RpcServerNettyHandler()); } }); bootstrap.bind(port).sync(); } catch (InterruptedException e) {
System.out.println("netty创建服务端channel失败:" + e.getMessage()); } }

这里来看下RpcServerNettyHandler类,他继承自ChannelInboundHandlerAdapter,负责通信数据。

public class RpcServerNettyHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("服务端收到请求:" + msg); ctx.writeAndFlush("success"); }}

到这里provider相关代码就完了,接下来看consumer相关代码:

public class RPCConsumer {
/** * url处理器 */ private UrlHolder urlHolder = new UrlHolder(); /** * netty客户端 */ private NettyClient nettyClient = new NettyClient(); /** * 远程调用 * * @param appCode * @param param * @return */ public String call(String appCode, String param) {
try {
// 从zookeeper获取服务地址 String serverIp = urlHolder.getUrl(appCode); if (serverIp == null) {
System.out.println("远程调用错误:当前无服务提供者"); return "connect error"; } // 连接netty,请求并接收响应 RpcClientNettyHandler clientHandler = new RpcClientNettyHandler(); clientHandler.setParam(param); nettyClient.initClient(serverIp, clientHandler); String result = clientHandler.process(); System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverIp, param, result)); return result; } catch (Exception e) {
System.out.println("远程服务调用失败:" + e); return "error"; } }}

这里来看一下UrlHolder类,它负责连接zookeeper,并获取其中服务地址列表,随机返回其中一条:

public class UrlHolder {
/** * url列表 */ private List
urlList = new ArrayList
(); /** * zk客户端 */ private ZKClient zkClient = new ZKClient(); /** * 获取URL * @param appCode * @return */ public String getUrl(String appCode) {
// 初始化url if (urlList.size() == 0) {
initUrlList(appCode); } // 随机返回一条,此处以后优化为负载均衡策略 if (urlList.size() > 0) {
return urlList.get(new Random(urlList.size()).nextInt()); } else {
System.out.println("目前没有服务提供者"); return null; } } /** * 初始化urlList * @param appCode */ private void initUrlList(final String appCode) {
try {
// 获取zookeeper连接 ZooKeeper zk = zkClient.newConnection(ConsumerConstant.ZK_CONNECTION_STRING, ConsumerConstant.ZK_SESSION_TIME_OUT); // 获取目录下所有子节点 List
urlNodeList = zk.getChildren( MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode), new Watcher() {
public void process(WatchedEvent watchedEvent) {
initUrlList(appCode); } }); if (CollectionUtils.isEmpty(urlNodeList)) {
return; } // 从子节点数据中解析出所有url List
urlList = new ArrayList
(); for (String path : urlNodeList) {
byte[] url = zk.getData(path, new Watcher() {
public void process(WatchedEvent watchedEvent) {
initUrlList(appCode); } }, null); if (url != null) {
urlList.add(new String(url)); } } this.urlList = urlList; } catch (Exception e) {
System.out.println("初始化url异常" + e.getMessage()); } }}public class UrlHolder {
/** * url列表 */ private List
urlList = new ArrayList
(); /** * zk客户端 */ private ZKClient zkClient = new ZKClient(); /** * 获取URL * * @param appCode * @return */ public String getUrl(String appCode) { // 初始化url if (urlList.size() == 0) { initUrlList(appCode); } // 随机返回一条,此处以后优化为负载均衡策略 if (urlList.size() > 0) { return urlList.get(new Random().nextInt(urlList.size())); } else { System.out.println("目前没有服务提供者"); return null; } } /** * 初始化urlList * * @param appCode */ private void initUrlList(final String appCode) { try { // 获取zookeeper连接 ZooKeeper zk = zkClient.newConnection(ConsumerConstant.ZK_CONNECTION_STRING, ConsumerConstant.ZK_SESSION_TIME_OUT); // 获取目录下所有子节点 String appPath = MessageFormat.format("{0}/{1}", CommonConstant.ZK_REGISTORY_ROOT_PATH, appCode); List
urlNodeList = zk.getChildren(appPath , new Watcher() { public void process(WatchedEvent watchedEvent) { initUrlList(appCode); } }); if (CollectionUtils.isEmpty(urlNodeList)) { return; } // 从子节点数据中解析出所有url List
urlList = new ArrayList
(); for (String path : urlNodeList) { byte[] url = zk.getData(appPath + "/" + path, new Watcher() { public void process(WatchedEvent watchedEvent) { initUrlList(appCode); } }, null); if (url != null) { urlList.add(new String(url)); } } this.urlList = urlList; } catch (Exception e) { System.out.println("初始化url异常" + e.getMessage()); } }}

接下来是netty的initClient方法:

/**     * 开始一个新的客户端连接     *     * @param server     */    public void initClient(String server, final RpcClientNettyHandler clientHandler) {
try {
String[] urlArray = server.split(CommonConstant.COMMOA); String ip = urlArray[0]; int port = Integer.parseInt(urlArray[1]); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class) .handler(new ChannelInitializer
() {
@Override protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(clientHandler); } }); bootstrap.connect(ip, port).sync(); } catch (InterruptedException e) {
System.out.println("netty创建客户端channel失败:" + e); } }

然后是这里我们自己主要写的一个通信处理类RpcClientNettyHandler,这个类负责远程调用和处理返回的消息:

public interface MethodProcessor {
String process() throws Exception;}public class RpcClientNettyHandler extends ChannelInboundHandlerAdapter implements MethodProcessor {
private ChannelHandlerContext context; private CountDownLatch contextCountDownLatch = new CountDownLatch(1); private CountDownLatch countDownLatch = new CountDownLatch(1); /** * 入参 */ private String param; /** * 响应 */ private String response; /** * 连上时触发 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx; contextCountDownLatch.countDown(); } /** * 获取服务器返回信息 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg.toString(); countDownLatch.countDown(); } /** * 远程调用并返回结果 * @return * @throws InterruptedException */ public String process() throws InterruptedException {
contextCountDownLatch.await(); context.writeAndFlush(param); countDownLatch.await(); return response; } public void setParam(String param) {
this.param = param; }}

到这里代码实现就写完了,下面是测试:

// 服务端    public static void main(String[] args) throws InterruptedException {
RPCProvider provider = new RPCProvider(); provider.registry("127.0.0.1", 8091); provider.registry("127.0.0.1", 8092); provider.registry("127.0.0.1", 8093); provider.registry("127.0.0.1", 8094); provider.registry("127.0.0.1", 8095); Thread.sleep(Long.MAX_VALUE); } // 客户端 public static void main(String[] args) throws InterruptedException {
RPCConsumer consumer = new RPCConsumer(); int i = 0; while (true) {
consumer.call(ConsumerConstant.APP_CODE, "aaa" + i++); Thread.sleep(2000L); } }

日志输出如下:

// 服务端日志zookeeper连接成功临时节点创建成功:/registry/100000/0000000001zookeeper连接成功临时节点创建成功:/registry/100000/0000000002zookeeper连接成功临时节点创建成功:/registry/100000/0000000003zookeeper连接成功临时节点创建成功:/registry/100000/0000000004zookeeper连接成功临时节点创建成功:/registry/100000/0000000005服务端收到请求:aaa0服务端收到请求:aaa1服务端收到请求:aaa2服务端收到请求:aaa3// 客户端日志zookeeper连接成功调用服务器:127.0.0.1,8091,请求参数:aaa0,响应参数:success调用服务器:127.0.0.1,8093,请求参数:aaa1,响应参数:success调用服务器:127.0.0.1,8092,请求参数:aaa2,响应参数:success调用服务器:127.0.0.1,8094,请求参数:aaa3,响应参数:success调用服务器:127.0.0.1,8091,请求参数:aaa4,响应参数:success调用服务器:127.0.0.1,8092,请求参数:aaa5,响应参数:success

这边可以修改一下服务端里面注册服务的端口,然后再启多个服务,会发现服务启动起来之后,客户端会自动随机分配到新启动的服务请求,并成功返回。再关掉几个端口的服务,会发现不会在请求那几个端口,这就是zookeeper的watcher机制实现的功能。

到这里就实现了RPC框架的服务注册发现+基本通信功能.

这里可能跟大家平时用的RPC框架差别还挺大:我怎么把我自己的Service注册到provider里,然后客户端通过接口来调用呢?.

关于这个问题,下一篇见!

转载地址:http://ahsni.baihongyu.com/

你可能感兴趣的文章
CodeForces #196(Div. 2) 337D Book of Evil (树形dp)
查看>>
uva 12260 - Free Goodies (dp,贪心 | 好题)
查看>>
uva-1427 Parade (单调队列优化dp)
查看>>
【设计模式】学习笔记14:状态模式(State)
查看>>
poj 1976 A Mini Locomotive (dp 二维01背包)
查看>>
斯坦福大学机器学习——因子分析(Factor analysis)
查看>>
linux对于没有写权限的文件如何保存退出vim
查看>>
IntelliJ IDEA 下的svn配置及使用的非常详细的图文总结
查看>>
【IntelliJ IDEA】idea导入项目只显示项目中的文件,不显示项目结构
查看>>
ssh 如何方便的切换到其他节点??
查看>>
JSP中文乱码总结
查看>>
Java实现DES加密解密
查看>>
HTML基础
查看>>
Java IO
查看>>
Java NIO
查看>>
Java大数据:Hbase分布式存储入门
查看>>
大数据学习:Spark RDD操作入门
查看>>
大数据框架:Spark 生态实时流计算
查看>>
大数据入门:Hive和Hbase区别对比
查看>>
大数据入门:ZooKeeper工作原理
查看>>