手搓RPC框架系列(二):核心功能实现与架构原则应用

手搓RPC框架系列(二):核心功能实现与架构原则应用

文 / Kenyon,资深软件架构师,15年软件开发和技术管理经验,从程序员做到企业技术高管,专注技术管理、架构设计、AI技术应用和落地。

由于公众号推流的原因,请在关注页右上角加星标,这样才能及时收到新文章的推送。

引言在上一篇文章中,我们基于架构设计原则设计了RPC框架的基础架构。今天,我们将进入实战阶段,实现RPC框架的核心功能,包括服务代理、序列化、网络通信等模块。在实现过程中,我们将重点展示如何将SOLID原则、高内聚低耦合、KISS等架构设计原则应用到实际代码中。

一、核心组件的实现1. 序列化模块(Serializer)遵循开闭原则,我们设计了Serializer接口,并提供了JSON实现:

代码语言:java复制// 序列化接口,支持扩展不同的序列化方式

public interface Serializer {

/**

* 将对象序列化为字节数组

*

* @param obj 要序列化的对象

* @param 对象类型

* @return 序列化后的字节数组

* @throws Exception 序列化异常

*/

byte[] serialize(T obj) throws Exception;

/**

* 将字节数组反序列化为对象

*

* @param bytes 序列化后的字节数组

* @param clazz 对象类型

* @param 对象类型

* @return 反序列化后的对象

* @throws Exception 反序列化异常

*/

T deserialize(byte[] bytes, Class clazz) throws Exception;

/**

* 序列化类型枚举

*/

enum Type {

//目前暂时只是支持JSON,后续可以在这里添加要支持的其他序列化方式

JSON(1);

private final int code;

Type(int code) {

this.code = code;

}

public int getCode() {

return code;

}

/**

* 根据code查找对应的序列化类型

*

* @param code 序列化类型码

* @return 序列化类型

*/

public static Type findByCode(int code) {

for (Type type : Type.values()) {

if (type.code == code) {

return type;

}

}

return JSON; // 默认使用JSON

}

}

}

// JSON序列化实现

public class JsonSerializer implements Serializer {

private static final ObjectMapper objectMapper = new ObjectMapper();

@Override

public byte[] serialize(T obj) throws Exception {

if (obj == null) {

return new byte[0];

}

return objectMapper.writeValueAsBytes(obj);

}

@Override

public T deserialize(byte[] bytes, Class clazz) throws Exception {

if (bytes == null || bytes.length == 0) {

return null;

}

return objectMapper.readValue(bytes, clazz);

}

}2. 网络传输模块(Transport)基于单一职责原则,我们将网络传输模块拆分为客户端和服务端:

代码语言:java复制// 网络传输客户端接口

public interface TransportClient {

void connect(InetSocketAddress address);

byte[] send(byte[] data) throws Exception;

void close();

}

// 网络传输服务端接口

public interface TransportServer {

void start(int port, RequestHandler handler);

void stop();

int getPort();

}

// 请求处理器接口

public interface RequestHandler {

byte[] handle(byte[] request);

}

// 使用Netty实现的传输客户端

public class NettyTransportClient implements TransportClient {

private static final Logger logger = LoggerFactory.getLogger(NettyTransportClient.class);

private static final int DEFAULT_CONNECT_TIMEOUT = 5000;

private Channel channel;

private EventLoopGroup group;

private ResponseHandler responseHandler;

@Override

public void connect(InetSocketAddress address) {

group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();

try {

bootstrap.group(group)

.channel(NioSocketChannel.class)

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, DEFAULT_CONNECT_TIMEOUT)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

// 处理粘包问题

pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));

pipeline.addLast(new LengthFieldPrepender(4));

// 字节数组编解码器

pipeline.addLast(new ByteArrayDecoder());

pipeline.addLast(new ByteArrayEncoder());

// 客户端处理器

NettyClientHandler clientHandler = new NettyClientHandler();

pipeline.addLast(clientHandler);

}

});

// 连接服务端

ChannelFuture future = bootstrap.connect(address).sync();

this.channel = future.channel();

// 初始化响应处理器

responseHandler = new ResponseHandler();

// 设置客户端处理器的响应处理器

((NettyClientHandler) channel.pipeline().last()).setResponseHandler(responseHandler);

} catch (Exception e) {

logger.error("Failed to connect to server: {}", address, e);

throw new RuntimeException("Failed to connect to server: " + address, e);

}

}

@Override

public byte[] send(byte[] data) throws Exception {

if (channel == null || !channel.isActive()) {

throw new IllegalStateException("Channel is not connected");

}

// 发送数据

channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> {

if (!future.isSuccess()) {

Throwable cause = future.cause();

if (cause instanceof Exception) {

responseHandler.setException((Exception) cause);

} else {

responseHandler.setException(new RuntimeException(cause));

}

}

});

// 等待响应

return responseHandler.waitForResponse();

}

@Override

public void close() {

if (channel != null) {

channel.close();

}

if (group != null) {

group.shutdownGracefully();

}

}

/**

* 客户端处理器

*/

private static class NettyClientHandler extends SimpleChannelInboundHandler {

private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

private ResponseHandler responseHandler;

@Override

protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {

if (responseHandler != null) {

responseHandler.setResponse(msg);

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

logger.error("Exception in Netty client handler", cause);

if (responseHandler != null) {

if (cause instanceof Exception) {

responseHandler.setException((Exception) cause);

} else {

responseHandler.setException(new RuntimeException(cause));

}

}

ctx.close();

}

public void setResponseHandler(ResponseHandler responseHandler) {

this.responseHandler = responseHandler;

}

}

/**

* 响应处理器

*/

private static class ResponseHandler {

private final CountDownLatch latch = new CountDownLatch(1);

private byte[] response;

private Exception exception;

public byte[] waitForResponse() throws Exception {

if (!latch.await(30, TimeUnit.SECONDS)) {

throw new RuntimeException("Request timeout");

}

if (exception != null) {

throw exception;

}

return response;

}

public void setResponse(byte[] response) {

this.response = response;

latch.countDown();

}

public void setException(Exception exception) {

this.exception = exception;

latch.countDown();

}

}

}3. 服务代理模块(Service Proxy)使用迪米特法则,代理模块只与必要的组件通信:

代码语言:java复制// RPC客户端核心类

public class ServiceProxy implements InvocationHandler {

private static final Logger logger = LoggerFactory.getLogger(ServiceProxy.class);

// 服务接口类

private final Class serviceClass;

// 服务注册中心

private final RegistryCenter registryCenter;

// 负载均衡策略

private final LoadBalance loadBalance;

/**

* 构造函数

*

* @param serviceClass 服务接口类

* @param registryCenter 服务注册中心

*/

public ServiceProxy(Class serviceClass, RegistryCenter registryCenter) {

this(serviceClass, registryCenter, new RandomLoadBalance());

}

/**

* 构造函数

*

* @param serviceClass 服务接口类

* @param registryCenter 服务注册中心

* @param loadBalance 负载均衡策略

*/

public ServiceProxy(Class serviceClass, RegistryCenter registryCenter, LoadBalance loadBalance) {

this.serviceClass = serviceClass;

this.registryCenter = registryCenter;

this.loadBalance = loadBalance;

}

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

// 创建RPC请求的对象,把调用的服务接口类、方法名、参数类型、参数值等信息封装到请求对象中

RpcRequest request = new RpcRequest();

request.setRequestId(UUID.randomUUID().toString());

request.setServiceName(serviceClass.getName());

request.setMethodName(method.getName());

request.setParameterTypes(method.getParameterTypes());

request.setParameters(args);

logger.debug("Sending RPC request: {}, service: {}, method: {}",

request.getRequestId(), request.getServiceName(), request.getMethodName());

// 从注册中心获取服务地址列表

List addresses = registryCenter.discover(serviceClass.getName());

if (addresses == null || addresses.isEmpty()) {

throw new RuntimeException("No service available for: " + serviceClass.getName());

}

// 使用负载均衡策略选择服务地址

InetSocketAddress address = loadBalance.select(serviceClass.getName(), addresses);

if (address == null) {

throw new RuntimeException("No service address selected for: " + serviceClass.getName());

}

logger.debug("Selected service address: {}", address);

// 创建客户端并发送请求,这里暂时使用Netty作为网络传输组件

TransportClient client = new NettyTransportClient();

try {

// 创建序列化器,这里暂时使用JSON序列化,后续可以添加其他序列化方式,并且改成读取配置的方式来确定使用哪种序列化方式

Serializer serializer = new JsonSerializer();

// 连接到服务端

client.connect(address);

// 序列化请求

byte[] requestData = serializer.serialize(request);

// 发送请求并获取响应数据

byte[] responseData = client.send(requestData);

// 反序列化响应

RpcResponse response = serializer.deserialize(responseData, RpcResponse.class);

if (response.isSuccess()) {

return response.getResult();

} else {

throw new RuntimeException("RPC call failed: " + response.getError());

}

} finally {

client.close();

}

}

}二、架构设计原则的应用总结在上面代码实现的过程中,我们分别应用了以下架构设计原则:

1. SOLID原则比如每个组件都只是负责一个明确的功能,这就很好符合了单一职责原则;然后涉及到后续可能要调整或者扩展到地方我们都是通过面向接口的编程,然后再通过实现接口的方式来实现组件的可扩展,这样就很好地应用了开闭原则和里氏替换原则;在编写接口的时候,我们也遵循了接口隔离原则和里氏替换原则,即每个组件都有自己的接口,而且接口只包含必要的方法,然后组件接口的实现类可以随时替换父类的实现,而不会影响到程序的正常运行。最后,我们使用其他模块的时候,依赖都是依赖接口,然后再通过构造函数的方式来注入具体的实现类,这样高层模块就不需要依赖底层模块,从而做到了依赖倒置原则。2. 通用设计原则在实现RPC框架的过程中,我们也应用了多个通用设计原则:

我们在代码的实现过程中非常注重代码的简洁,基本都是做最基础的设计和实现,避免了过度设计,实现的过程中也只实现当前必要的核心功能,确保代码的可读性和可维护性,同时也考虑到了代码的性能和扩展性,这就很好地体现了KISS原则。我们设计和实现的每个组件内部的功能都紧密相关的(高内聚),而组件之间基本都是通过抽象接口来进行通信,减少跟实现模块或者代码的直接依赖(低耦合),这样的设计使得各组件可以独立演化和维护,这就是高内聚低耦合原则的应用。在模块和组件之间,我们遵循"只与直接朋友通信"的原则,组件之间只与直接依赖的组件进行交互,避免形成复杂的依赖链,提高系统的稳定性,这就是 迪米特法则的应用。然后我们通过业务的抽象和代码复用的机制避免了出现大量代码重复的情况,例如通过统一的接口定义实现不同组件的复用,降低了维护成本,这个就是DRY原则的应用。三、总结与下一步计划因为篇幅的问题,在这篇文章就先写这么多,文章中我们实现了整个RPC框架里面最核心的组件,包括了序列化模块、网络传输模块和服务代理模块。在实现的过程中,我们重点展示了如何将架构设计原则应用到实际代码中,确保代码的可扩展性、可维护性和灵活性。

在下一篇文章中,我们将会完成这个RPC框架的剩余功能,像服务注册与发现、服务端核心的实现、客户端的负载均衡等模块,并编写相关的测试用例来进行完整的测试。同时,也会把项目的代码一起放上来给大家观摩和吐槽。

互动话题:在实现RPC框架的过程中,你认为哪个组件的设计最具挑战性?为什么?欢迎在评论区分享你的观点。

关于作者Kenyon,资深软件架构师,15年的软件开发和技术管理经验,从程序员做到企业技术高管。多年企业数字化转型和软件架构设计经验,善于帮助企业构建高质量、可维护的软件系统,目前专注技术管理、架构设计、AI技术应用和落地;全网统一名称"六边形架构",欢迎关注交流。

原创不易,转载请联系授权,如果觉得有帮助,请点赞、收藏、转发三连支持!

相关推荐

DNF虚空魔石获取攻略:刷图、分解、合成全方位指南
365体育平台网址

DNF虚空魔石获取攻略:刷图、分解、合成全方位指南

📅 09-02 👁️ 1949
苹果4s升级9.3.1系统的完整教程(解锁更多功能,让你的4s焕发新生)
网点注销
beat365体育官网

网点注销

📅 11-02 👁️ 8290