当前位置:   article > 正文

分布式网络通信框架Netty_基于Netty+Zookeeper手写RPC远程调用框架_2 Dubbo框架完整实现_bdclientutil.init(this)

bdclientutil.init(this)

                                      分布式网络通信框架Netty

                              基于Netty+Zookeeper纯手写RPC远程调用框架

                                                     2 Dubbo框架完整实现

                                                                                            学习总结

                                                                                                                                                                                  田超凡

                                                                                                                                                                                  2019年11月4日

1 创建生产者(Provider)和消费者(Consumer)

生产者:netty-zookeeper-dubbo-user-service-api 用户服务-接口

netty-zookeeper-dubbo-user-service-provider 用户服务-接口实现类

 

package com.tcf.netty.zookeeper.dubbo.user.service;

 

/***

 * TODO TCF 用户接口-生产者

 * @author Hasee

 *

 */

public interface UserService {

 

//TODO TCF 根据用户id查询用户信息

public String getUserById(String id);

 

}

 

package com.tcf.netty.zookeeper.dubbo.user.service.impl;

 

import com.tcf.netty.zookeeper.dubbo.common.annotation.RpcRegistration;

import com.tcf.netty.zookeeper.dubbo.user.service.UserService;

 

/***

 * TODO TCF 用户接口实现类-生产者

 * @author Hasee

 *

 */

@RpcRegistration(UserService.class)

public class UserServiceImpl implements UserService {

 

//TODO TCF 根据用户id查询用户信息

public String getUserById(String id)

{

return "张三";

}

 

}

 

消费者:netty-zookeeper-dubbo-order-service-api 订单服务-接口

netty-zookeeper-dubbo-order-service-consumer 订单服务-接口实现类

 

package com.tcf.netty.zookeeper.dubbo.order.service;

 

/***

 * TODO TCF 订单业务接口-消费者

 * @author Hasee

 *

 */

public interface OrderService {

 

//TODO TCF 根据用户id查询用户订单

public String getOrderByUserId(String id);

 

}

 

package com.tcf.netty.zookeeper.dubbo.order.service.impl;

 

import com.tcf.netty.zookeeper.dubbo.order.service.OrderService;

import com.tcf.netty.zookeeper.dubbo.order.util.DubboClientUtil;

import com.tcf.netty.zookeeper.dubbo.user.service.UserService;

 

/***

 * TODO TCF 订单服务-消费者

 * @author Hasee

 *

 */

public class OrderServiceImpl implements OrderService {

 

//TODO TCF 需要远程调用的用户服务接口

private UserService userService;

 

//TODO TCF 构造柱入

public OrderServiceImpl()

{

this.userService=DubboClientUtil.getDubboClient().createService(UserService.class);

}

 

//TODO TCF 根据用户id获取订单信息,远程调用用户服务接口

public String getOrderByUserId(String id)

{

//TODO TCF 根据用户id获取用户信息,远程服务调用

String userInfo=userService.getUserById(id);

 

return userInfo;

}

}

 

 

2 创建Zookeeper注册中心服务注册接口ServerRegistration和实现类ServerRegistry

package com.tcf.netty.zookeeper.dubbo.server.registry;

 

/***

 * TODO TCF Zookeeper注册中心服务注册接口,定义服务注册类需要注册的规范

 * @author Hasee

 *

 */

public interface ServerRegistration {

 

//TODO TCF 注册服务到Zookeeper注册中心

public void registry(String serverName,String serverClassAddr);

 

}

 

package com.tcf.netty.zookeeper.dubbo.server.registry.impl;

 

import java.net.URLEncoder;

 

import org.I0Itec.zkclient.ZkClient;

 

import com.tcf.netty.zookeeper.dubbo.server.registry.ServerRegistration;

 

/***

 * TODO TCF Zookeeper注册中心服务注册工具类

 * @author Hasee

 *

 */

public class ServerRegistry implements ServerRegistration {

 

//TODO TCF Zookeeper注册中心ip地址

private String host;

 

//TODO TCF Zookeeper注册中心客户端

private ZkClient zkClient;

 

//TODO TCF Zookeeper注册中心连接超时时长

private int connectTimeout=5000;

 

//TODO TCF 服务注册根目录

private String rootPath="/netty-zookeeper-dubbo-user-service-api-v1.1";

 

//TODO TCF 注册服务类型:生产者

private String serverSuffix="/providers";

 

//TODO TCF 构造柱入

public ServerRegistry(String host)

{

this.host=host;

this.zkClient=new ZkClient(host,connectTimeout);

}

 

//TODO TCF 注册服务到Zookeeper注册中心

public void registry(String serverName, String serverClassAddr)

{

try

{

//TODO TCF Zookeeper服务根目录/rootPath

if(!zkClient.exists(rootPath))

{

zkClient.createPersistent(rootPath);

}

 

//TODO TCF Zookeeper服务二级目录/rootPath/serverName

String secondLevelPath=rootPath+"/"+serverName;

if(!zkClient.exists(secondLevelPath))

{

zkClient.createPersistent(secondLevelPath);

}

 

//TODO TCF Zookeeper服务三级目录/rootPath/serverName/serverSuffix

String thirdLevelPath=secondLevelPath+serverSuffix;

if(!zkClient.exists(thirdLevelPath))

{

zkClient.createPersistent(thirdLevelPath);

}

 

//TODO TCF Zookeeper服务四级目录/rootPath/serverName/serverSuffix/serverClassPath

String nodePath=thirdLevelPath+"/"+URLEncoder.encode(serverClassAddr,"UTF-8");

if(zkClient.exists(nodePath))

{

//TODO TCF 服务节点已存在,先删除该服务节点

zkClient.delete(nodePath);

}

 

//TODO TCF 创建服务节点,注册服务

zkClient.createEphemeral(nodePath);

System.out.println("Zookeeper服务注册成功,服务地址===>"+nodePath);

}

catch(Exception e)

{

e.printStackTrace();

}

}

}

 

3 创建Dubbo服务器:DubboServer

  1. .基于反射获取需要注册到Zookeeper注册中心的服务并实现服务注册,存入已注册服务容器
  2. .初始化Netty服务器、通道,绑定事件驱动处理器、MarShalling编码解码器
  3. .创建DubboServerEventHandle事件驱动处理器,监听通讯事件并处理

 

package com.tcf.netty.zookeeper.dubbo.server.core;

 

import java.util.HashMap;

import java.util.Map;

 

import com.tcf.netty.zookeeper.dubbo.common.annotation.RpcRegistration;

import com.tcf.netty.zookeeper.dubbo.common.coder.MarShallingCoderFactory;

import com.tcf.netty.zookeeper.dubbo.server.core.event.DubboServerEventHandle;

import com.tcf.netty.zookeeper.dubbo.server.registry.ServerRegistration;

import com.tcf.netty.zookeeper.dubbo.server.registry.impl.ServerRegistry;

 

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

 

/***

 * TODO TCF Dubbo服务器,基于Netty服务器端实现

 * @author Hasee

 *

 */

public class DubboServer {

 

//TODO TCF 已经注册到Zookeeper注册中心的服务

private Map<String,Object> registryMap=new HashMap<String,Object>();

 

//TODO TCF Zookeeper注册中心和Dubbo服务器ip地址

private String host="";

 

//TODO TCF 服务启动端口号:20880(Dubbo默认)

private int port=20880;

 

//TODO TCF Zookeeper注册中心注册服务协议类型

private String protocol="tcf://";

 

//TODO TCF Zookeeper服务注册工具类

private ServerRegistration serverRegistration=null;

 

//TODO TCF 构造柱入

public DubboServer(String host)

{

this.host=host;

this.serverRegistration=new ServerRegistry(host);

}

 

//TODO TCF 初始化Dubbo服务器,注册服务到Zookeeper注册中心

public void start(Object service)

{

//TODO TCF 基于反射获取需要注册到Zookeeper注册中心的服务,注册到Zookeeper注册中心

serviceRegistry(service);

 

//TODO TCF 初始化Netty服务器

initServer();

}

 

//TODO TCF 基于反射获取需要注册到Zookeeper注册中心的服务并进行服务注册

public void serviceRegistry(Object service)

{

//TODO TCF 获取需要注册到Zookeeper注册中心的服务上方标注的RPC注解

if(service.getClass().isAnnotationPresent(RpcRegistration.class))

{

RpcRegistration rpcRegistration=service.getClass().getAnnotation(RpcRegistration.class);

if(rpcRegistration!=null)

{

//TODO TCF 需要注册到Zookeeper注册中心的服务接口

Class<?> interfaceClass=rpcRegistration.value();

 

if(interfaceClass!=null)

{

//TODO TCF 需要注册的服务名称

String serviceName=interfaceClass.getName().replace("interface ","");

 

//TODO TCF 拼装服务注册二级地址(唯一标识需要注册的服务)

String serviceClassAddr=protocol+host+":"+port+"//";

 

//TODO TCF 把服务注册到Zookeeper注册中心

serverRegistration.registry(serviceName,serviceClassAddr);

 

//TODO TCF 注册成功的服务

registryMap.put(serviceName,service);

}

}

}

}

 

//TODO TCF 初始化Dubbo服务器端,使用Netty服务器

public void initServer()

{

//TODO TCF Boss线程组和工作线程组

NioEventLoopGroup bossGroup=new NioEventLoopGroup();

    NioEventLoopGroup workGroup=new NioEventLoopGroup();

    

    //TODO TCF Netty服务器初始化,绑定事件驱动处理器、编码解码器,初始化Netty通道

    ServerBootstrap serverBootstrap=new ServerBootstrap();

    serverBootstrap.group(bossGroup,workGroup)

                   .channel(NioServerSocketChannel.class)

                   .childHandler(new ChannelInitializer<SocketChannel>() {

                    

                    //TODO TCF Netty服务器加载通道时绑定编码解码器、事件驱动处理器

                    @Override

                    protected void initChannel(SocketChannel socketChannel) throws Exception

                    {

                    //TODO TCF Netty-RPC请求编码解码器MarSharlling

                    socketChannel.pipeline().addLast(MarShallingCoderFactory.buildMarshallingEncoder());

                    socketChannel.pipeline().addLast(MarShallingCoderFactory.buildMarshallingDecoder());

                    

                    //TODO TCF 绑定事件驱动处理器-事件监听

                    socketChannel.pipeline().addLast(new DubboServerEventHandle(registryMap));

                    }

                   });

    

    try

    {

     //TODO TCF 初始化Netty通道

     ChannelFuture channelFuture=serverBootstrap.bind(port).sync();

    

     System.out.println("====Dubbo 服务器启动成功===="+port);

    

     //TODO TCF 当断开和客户端的连接时,关闭通道,释放资源,未断开连接时,此次调用会产生阻塞

     channelFuture.channel().closeFuture().sync();

    }

    catch(Exception e)

    {

     e.printStackTrace();

    }

    finally

    {

     //TODO TCF 关闭Netty线程组,释放资源

     bossGroup.shutdownGracefully();

     workGroup.shutdownGracefully();

    }

}

}

 

DubboSeverEventHandle事件驱动处理器

package com.tcf.netty.zookeeper.dubbo.server.core.event;

 

import java.lang.reflect.Method;

import java.util.HashMap;

import java.util.Map;

 

import com.tcf.netty.zookeeper.dubbo.common.model.RpcRequest;

 

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

 

/***

 * TODO TCF Dubbo服务器事件驱动处理器-实现事件监听和处理

 * @author Hasee

 *

 */

public class DubboServerEventHandle extends ChannelInboundHandlerAdapter {

 

//TODO TCF 已经注册到Zookeeper注册中心的服务

private Map<String,Object> registryMap=new HashMap<String,Object>();

 

//TODO TCF 构造柱入

public DubboServerEventHandle(Map<String,Object> registryMap)

{

this.registryMap=registryMap;

}

 

//TODO TCF 监听客户端发送的消息

@Override

public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception

{

//TODO TCF 将数据解析为客户端发送的RPCRequest请求参数模型

if(msg!=null)

{

if(msg instanceof RpcRequest)

{

RpcRequest rpcRequest=(RpcRequest)msg;

 

System.out.println("====接收到客户端发送的服务调用请求:"+rpcRequest.toString());

 

//TODO TCF 根据需要调用的服务名称获取对应的服务

Object serviceInstance=registryMap.get(rpcRequest.getServiceClass().getName());

 

if(serviceInstance!=null)

{

//TODO TCF 基于反射获取对应服务需要调用的方法

Method method=serviceInstance.getClass().getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());

 

//TODO TCF 执行对应方法,实现服务调用

Object invokeResult=method.invoke(serviceInstance,rpcRequest.getParameterValues());

 

//TODO TCF 返回调用方法执行结果给客户端

ctx.writeAndFlush(invokeResult);

ctx.close();

}

}

}

}

 

}

 

4 创建Zookeeper注册中心服务发现接口和实现类ZkFoundService、ZkFoundServiceImpl

 

package com.tcf.netty.zookeeper.dubbo.client.service.handle;

 

import java.util.List;

 

/***

 * TODO TCF Zookeeper注册中心服务发现接口

 * @author Hasee

 *

 */

public interface ZkFoundService {

 

//TODO TCF 根据服务名称获取注册到Zookeeper注册中心的服务访问地址

public List<String> getServiceAddressList(String serviceName);

 

}

 

 

package com.tcf.netty.zookeeper.dubbo.client.service.handle.impl;

 

import java.util.List;

 

import org.I0Itec.zkclient.ZkClient;

 

import com.tcf.netty.zookeeper.dubbo.client.service.handle.ZkFoundService;

 

/***

 * TODO TCF 根据服务名称获取Zookeeper注册中心的服务访问地址

 * @author Hasee

 *

 */

public class ZkFoundServiceImpl implements ZkFoundService {

 

//TODO TCF Zookeeper注册中心地址

private String host="";

 

//TODO TCF Zookeeper注册中心连接超时时长

private int connectTimeout=5000;

 

//TODO TCF Zookeeper服务节点根目录

private String rootPath="/netty-zookeeper-dubbo-user-service-api-v1.1";

 

//TODO TCF Zookeeper服务注册角色:生产者

private String serviceSuffix="/providers";

 

//TODO TCF Zookeeper客户端

private ZkClient zkClient;

 

//TODO TCF 构造柱入,初始化Zookeeper客户端

public ZkFoundServiceImpl(String host)

{

this.host=host;

this.zkClient=new ZkClient(host,connectTimeout);

}

 

//TODO TCF 根据服务名称获取注册到Zookeeper注册中心的服务访问地址

public List<String> getServiceAddressList(String serviceName)

{

    //TODO TCF 服务节点路由

String serviceNodePath=rootPath+"/"+serviceName+serviceSuffix;

List<String> addressList=zkClient.getChildren(serviceNodePath);

return addressList;

}

 

}

 

5 基于策略模式实现Dubbo负载均衡处理器DubboLoadBalance

  1. .提供随机负载均衡策略实现类RandomLoadBalanceService
  2. .提供索引负载均衡策略实现类

IndexLoadBalanceService

(3).注意考虑多线程并发的情况

 

package com.tcf.netty.zookeeper.dubbo.client.loadbalance;

 

import java.util.List;

 

/***

 * TODO TCF 基于策略模式实现Dubbo负载均衡处理器业务接口

 * @author Hasee

 *

 */

public interface DubboLoadBalanceService {

 

//TODO TCF 根据访问服务地址进行负载均衡策略实现,返回最终需要调用的服务访问地址

public String loadBalance(List<String> serviceAddressList);

 

}

 

 

package com.tcf.netty.zookeeper.dubbo.client.loadbalance.impl;

 

import java.util.List;

import java.util.Random;

import com.tcf.netty.zookeeper.dubbo.client.loadbalance.DubboLoadBalanceService;

 

/***

 * TODO TCF 随机负载均衡策略实现类

 * @author Hasee

 *

 */

public class RandomLoadBalanceService implements DubboLoadBalanceService {

 

//TODO TCF 随机获取服务调用地址中的某一个服务地址,实现随机负载均衡策略

public String loadBalance(List<String> serviceAddressList)

{

String address=serviceAddressList.get(new Random().nextInt(serviceAddressList.size()));

return address;

}

 

}

 

 

package com.tcf.netty.zookeeper.dubbo.client.loadbalance.impl;

 

import java.util.List;

 

import com.tcf.netty.zookeeper.dubbo.client.loadbalance.DubboLoadBalanceService;

 

/***

 * TODO TCF 基于索引的负载均衡处理器

 * @author Hasee

 *

 */

public class IndexLoadBalanceService implements DubboLoadBalanceService {

 

//TODO TCF 当前加载到第几个服务访问地址

private int index=0;

 

//TODO TCF 实现索引负载均衡策略

public synchronized String loadBalance(List<String> serviceAddressList)

{

String resultAddress="";

 

if(serviceAddressList!=null && serviceAddressList.size()>0)

{

if(index>=serviceAddressList.size())

{

index=0;

}

 

resultAddress=serviceAddressList.get(index++);

System.out.println("负载均衡处理后,服务调用地址===>"+resultAddress);

}

 

return resultAddress;

}

}

 

 

6 实现Dubbo客户端DubboClient

  1. .根据需要调用的服务接口类型创建JDK动态代理实例,实现代理方法
  2. .在代理方法中实现服务发现和远程调用具体逻辑实现,满足开闭原则
  3. .创建DubboClientEventHandle事件驱动处理器实现Dubbo客户端网络通讯事件监听和处理

package com.tcf.netty.zookeeper.dubbo.client.core;

 

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import java.net.InetSocketAddress;

import java.net.URLDecoder;

import java.util.List;

import com.tcf.netty.zookeeper.dubbo.client.core.event.DubboClientEventHandle;

import com.tcf.netty.zookeeper.dubbo.client.loadbalance.DubboLoadBalanceService;

import com.tcf.netty.zookeeper.dubbo.client.loadbalance.impl.IndexLoadBalanceService;

import com.tcf.netty.zookeeper.dubbo.client.service.handle.ZkFoundService;

import com.tcf.netty.zookeeper.dubbo.client.service.handle.impl.ZkFoundServiceImpl;

import com.tcf.netty.zookeeper.dubbo.common.coder.MarShallingCoderFactory;

import com.tcf.netty.zookeeper.dubbo.common.model.RpcRequest;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

 

/***

 * TODO TCF Dubbo客户端,创建需要调用的服务接口的JDK动态代理实例并调用代理方法,具体的服务发现和调用实现逻辑在代理方法中

 * @author Hasee

 *

 */

public class DubboClient {

 

//TODO TCF Zookeeper注册服务发现类

private ZkFoundService zkFoundService;

 

//TODO TCF Dubbo负载均衡处理器

private DubboLoadBalanceService dubboLoadBalanceService;

 

//TODO TCF 构造注入

public DubboClient(String host)

{

this.zkFoundService=new ZkFoundServiceImpl(host);

 

//TODO TCF 默认采用索引负载均衡策略

this.dubboLoadBalanceService=new IndexLoadBalanceService();

}

 

//TODO TCF 创建需要调用的服务接口的JDK动态代理实例,实现Zookeeper服务发现和调用

@SuppressWarnings("unchecked")

public <T> T createService(final Class<T> interfaceClass)

{

return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),

new Class[]{interfaceClass},

new InvocationHandler() {

 

public Object invoke(Object proxy,Method method,Object[] args) throws Throwable

{

//TODO TCF 远程服务调用后方法执行结果

Object invokeResult=null;

 

//TODO TCF 获取需要调用的服务名称

String serviceName=interfaceClass.getName();

 

//TODO TCF 根据服务名称获取已经注册到Zookeeper注册中心的服务访问地址

List<String> addressList=zkFoundService.getServiceAddressList(serviceName);

 

if(addressList!=null && addressList.size()>0)

{

//TODO TCF 基于负载均衡策略获取最终需要调用的服务地址

String address=dubboLoadBalanceService.loadBalance(addressList);

 

address=URLDecoder.decode(address,"UTF-8");

 

//TODO TCF 获取ip地址和端口号

address=address.replace("//","").replace("/","");

 

String[] arrays=address.split(":");

 

//TODO TCF tcf://host:port//serviceName

String host=arrays[1];

Integer port=Integer.parseInt(arrays[2]);

 

//TODO TCF 封装RPC远程调用请求参数模型

final RpcRequest rpcRequest=new RpcRequest(interfaceClass,method.getName(),method.getParameterTypes(),args);

 

//TODO TCF 初始化Netty客户端,连接远程Netty服务器,传递RPC远程调用请求参数,实现服务远程调用

//TODO TCF 工作线程组

NioEventLoopGroup workGroup=new NioEventLoopGroup();

 

//TODO TCF Netty客户端事件驱动处理器

final DubboClientEventHandle dubboClientEventHandle=new DubboClientEventHandle(rpcRequest);

 

//TODO TCF Netty客户端初始化,绑定事件驱动处理器、MarShalling编码解码器,初始化Netty通道

Bootstrap client=new Bootstrap();

client.group(workGroup)

      .channel(NioSocketChannel.class)

      .remoteAddress(new InetSocketAddress(host,port.intValue()))

      .handler(new ChannelInitializer<SocketChannel>() {

    

       //TODO TCF Netty通道初始化

       @Override

       protected void initChannel(SocketChannel socketChannel) throws Exception

       {

       //TODO TCF 绑定MarShalling编码解码器

       socketChannel.pipeline().addLast(MarShallingCoderFactory.buildMarshallingEncoder());

       socketChannel.pipeline().addLast(MarShallingCoderFactory.buildMarshallingDecoder());

       

       //TODO TCF 绑定事件驱动处理器,实现事件监听

       socketChannel.pipeline().addLast(dubboClientEventHandle);

       }

});

 

try

{

//TODO TCF 初始化Netty通道

ChannelFuture channelFuture=client.connect().sync();

 

//TODO TCF 断开客户端连接时关闭Netty通道,释放资源

channelFuture.channel().closeFuture().sync();

 

System.out.println(serviceName+"服务的"+method.getName()+"方法远程调用成功......");

 

//TODO TCF 服务调用之后,方法执行返回结果

    invokeResult=dubboClientEventHandle.getResponseMessage();

}

catch(Exception e)

{

e.printStackTrace();

}

finally

{

//TODO TCF 关闭线程组,释放线程占用资源

workGroup.shutdownGracefully();

}

}

 

return invokeResult;

}

});

}

 

}

 

Dubbo客户端事件驱动处理器DubboClientEventHandle,发送RPC服务远程调用请求到对应的Netty服务器并接收服务器返回的方法执行响应结果

package com.tcf.netty.zookeeper.dubbo.client.core.event;

 

import com.tcf.netty.zookeeper.dubbo.common.model.RpcRequest;

 

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

 

/***

 * TODO TCF Dubbo客户端事件驱动处理器-监听事件

 * @author Hasee

 *

 */

public class DubboClientEventHandle extends ChannelInboundHandlerAdapter {

 

//TODO TCF 服务远程调用请求参数

private RpcRequest rpcRequest;

 

//TODO TCF Netty服务器返回的服务调用执行结果

private Object responseMessage;

 

//TODO TCF 构造柱入

public DubboClientEventHandle(RpcRequest rpcRequest)

{

this.rpcRequest=rpcRequest;

}

 

public Object getResponseMessage() {

return responseMessage;

}

public void setResponseMessage(Object responseMessage) {

this.responseMessage = responseMessage;

}

 

//TODO TCF 接收到服务器返回的响应信息

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

{

if(msg!=null)

{

this.responseMessage=msg;

System.out.println("接收到服务器返回的响应信息(方法执行结果):"+msg);

ctx.close();

}

}

 

//TODO TCF 发送RPC服务远程调用请求到Netty服务器端

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception

{

ctx.writeAndFlush(rpcRequest);

}

 

}

 

 

 

7 创建自定义RPC服务注册注解RpcRegistration

package com.tcf.netty.zookeeper.dubbo.common.annotation;

 

import java.lang.annotation.Documented;

import java.lang.annotation.ElementType;

import java.lang.annotation.Inherited;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

 

/***

 * TODO TCF 标注需要注册到Zookeeper注册中心的服务接口实现类

 * @author Hasee

 *

 */

@Documented

@Inherited

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

public @interface RpcRegistration {

 

//TODO TCF 需要注册到Zookeeper注册中心的服务接口类型

Class<?> value();

 

}

 

8 创建RPC远程服务调用-数据传输载体RpcRequest模型类

 

package com.tcf.netty.zookeeper.dubbo.common.model;

 

import java.io.Serializable;

 

/***

 * TODO TCF RPC远程调用请求数据模型类

 * @author Hasee

 *

 */

public class RpcRequest implements Serializable {

 

 

/**

 *

 */

private static final long serialVersionUID = 1L;

 

//TODO TCF 需要调用的服务接口

private Class<?> serviceClass;

 

//TODO TCF 需要调用的服务接口方法名

private String methodName;

 

//TODO TCF 需要调用的服务接口方法参数类型

private Class<?>[] parameterTypes;

 

//TODO TCF 需要调用的服务接口方法参数列表

private Object[] parameterValues;

 

//TODO TCF 构造柱入

public RpcRequest(Class<?> serviceClass,String methodName,Class<?>[] parameterTypes,Object[] parameterValues)

{

this.serviceClass=serviceClass;

this.methodName=methodName;

this.parameterTypes=parameterTypes;

this.parameterValues=parameterValues;

}

 

public Class<?> getServiceClass() {

return serviceClass;

}

public void setServiceClass(Class<?> serviceClass) {

this.serviceClass = serviceClass;

}

public String getMethodName() {

return methodName;

}

public void setMethodName(String methodName) {

this.methodName = methodName;

}

public Class<?>[] getParameterTypes() {

return parameterTypes;

}

public void setParameterTypes(Class<?>[] parameterTypes) {

this.parameterTypes = parameterTypes;

}

public Object[] getParameterValues() {

return parameterValues;

}

public void setParameterValues(Object[] parameterValues) {

this.parameterValues = parameterValues;

}

 

}

9 创建Netty编码器和解码器工厂MarShallingCoderFactory,创建MarShalling编码器Encoder和解码器Decoder,实现RpcRequest请求数据传输时的序列化和反序列化

 

package com.tcf.netty.zookeeper.dubbo.common.coder;

 

import org.jboss.marshalling.MarshallerFactory;

import org.jboss.marshalling.Marshalling;

import org.jboss.marshalling.MarshallingConfiguration;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;

import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;

import io.netty.handler.codec.marshalling.MarshallerProvider;

import io.netty.handler.codec.marshalling.MarshallingDecoder;

import io.netty.handler.codec.marshalling.MarshallingEncoder;

import io.netty.handler.codec.marshalling.UnmarshallerProvider;

 

public class MarShallingCoderFactory {

 

/**

     * 创建Jboss Marshalling解码器MarshallingDecoder

     * @return MarshallingDecoder

     */

    public static MarshallingDecoder buildMarshallingDecoder()

    {

        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。

        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");

        //创建了MarshallingConfiguration对象,配置了版本号为5

        final MarshallingConfiguration configuration = new MarshallingConfiguration();

        configuration.setVersion(5);

        //根据marshallerFactory和configuration创建provider

        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);

        //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度

        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);

        return decoder;

    }

 

    /**

     * 创建Jboss Marshalling编码器MarshallingEncoder

     * @return MarshallingEncoder

     */

    public static MarshallingEncoder buildMarshallingEncoder()

    {

        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");

        final MarshallingConfiguration configuration = new MarshallingConfiguration();

        configuration.setVersion(5);

        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);

        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组

        MarshallingEncoder encoder = new MarshallingEncoder(provider);

        return encoder;

    }

    

}

 

 

10 创建启动类,查看运行效果

Dubbo客户端工具类:DubboClientUtil

生成并获取单例的Dubbo客户端实例

 

生产者ProviderApplication:

注册服务到Zookeeper注册中心,初始化并启动Dubbo服务器

 

package com.tcf.netty.zookeeper.dubbo.core;

 

import com.tcf.netty.zookeeper.dubbo.server.core.DubboServer;

import com.tcf.netty.zookeeper.dubbo.user.service.impl.UserServiceImpl;

 

/***

 * TODO TCF 用户服务-生产者,应用启动类,实现服务注册

 * @author Hasee

 *

 */

public class ProviderApplication {

 

public static void main(String[] args)

{

//TODO TCF 初始化Dubbo服务器,基于反射获取需要注册到Zookeeper注册中心的服务并实现服务注册,初始化Netty服务器(Dubbo服务器)

DubboServer dubboServer=new DubboServer("127.0.0.1");

dubboServer.start(new UserServiceImpl());

}

 

}

 

消费者ConsumerApplication:

RPC远程调用指定服务的指定方法,获取方法执行返回结果

 

package com.tcf.netty.zookeeper.dubbo.order.core;

 

import com.tcf.netty.zookeeper.dubbo.order.service.OrderService;

import com.tcf.netty.zookeeper.dubbo.order.service.impl.OrderServiceImpl;

 

/***

 * TODO TCF 订单服务-消费者,应用启动,发起远程调用服务请求

 * @author Hasee

 *

 */

public class ConsumerApplication {

 

public static void main(String[] args)

{

OrderService orderService=new OrderServiceImpl();

String userInfo=orderService.getOrderByUserId("1");

System.out.println("UserInfo ===> "+userInfo);

}

}

 

 

maven相关依赖

<dependencies>


    <!-- - ZK客户端工具 -->
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
    </dependency>


    <dependency>

<groupId>com.tcf.netty.zookeeper.dubbo.common</groupId>
        <artifactId>netty-zookeeper-dubbo-common</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </dependency>


    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.42.Final</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.9</version>
    </dependency>


    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling</artifactId>
        <version>1.4.10.Final</version>
    </dependency>
    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling-serial</artifactId>
        <version>1.4.10.Final</version>
    </dependency>
</dependencies>

 

转载请注明原作者

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/854845
推荐阅读
相关标签
  

闽ICP备14008679号