代码拉取完成,页面将自动刷新
a rpc framework which base on netty
基于netty的rpc框架
支持自定义协议,支持直接客户端运行或者集成spring运行,自定义处理器处理,简单易用
在分布式下载组件中已经投入使用x-netty-rpc
gitee地址: https://gitee.com/lingfengx/x-downloader
git地址: https://github.com/lingfengcoder/x-downloader
//例子:服务A调用服务B的register方法
//服务A
//远程服务接口
@RpcClient("ServiceB")
public interface ServiceB {
void register(BasicFrame<Object> frame);
}
class Test {
//或者通过@Autowired
@Autowired
private ServiceB serviceb;
//使用远程服务
public void test() {
serviceb.register(new BasicFrame());
}
}
//服务B
//远程目标方法
@RpcComponent("ServiceB")
public class ServiceB {
@RpcHandler("register")
public void register(BasicFrame<Object> frame) {
//打印BasicFrame数据
log.info(frame);
}
}
支持灵活更换序列化协议(JSON、JAVA、string、protobuf)
支持自动加密解密(AES、RSA)
支持消息自动签名校验防篡改(MD5)
自带时间戳 防重放攻击(timestamp)
后期考虑加入 Protobuf协议 和 Snappy压缩算法
* BEFORE DECODE (66 bytes) AFTER DECODE (66 bytes)
* +------+--------+------+------------------------------------------------+ +------+--------+------+-----------------------------------------------+
* | cmd | serial | encrypt | timestamp | client| sign |length|content(11) | cmd | serial | encrypt | timestamp | client |sign|length|content(11)
* | 1 | 1 | 1 | 8 | 8 | 32 | 4 | "HELLO,WORLD" | 1 | 1 | 1 | 8 | 8 | 32 | 4 |"HELLO,WORLD"
* +------+--------+------+------------------------------------------------+ +------+--------+------+-----------------------------------------------+
//帧类型 //请求REQUEST((byte) 1), //返回RESPONSE((byte) 2), //心跳HEARTBEAT((byte) 3);
private byte cmd;
//数据(content)序列化类型 JSON_SERIAL JAVA_SERIAL
private byte serial;
//加密类型 //明文NONE((byte) 0),//AES AES((byte) 2), //RSA RSA((byte) 3);
private byte encrypt;
//时间戳 相当于salt
private long timestamp;
//客户端id -1代表服务端
private long client;
//消息签名 MD5 固定32位
private String sign;
//content 长度
private int length;
//内容
private T content;
支持自动重启自动连接
支持手动主动关闭连接
支持多线程发送消息
NettyClient {
int state();//客户端状态
void start();//启动
void restart();//重启
void close();//关闭
long getClientId();//获取客户端id
void defaultChannel(Channel channel);//设置默认channel
<M extends Serializable> void writeAndFlush(Channel channel, M msg, Cmd type);//发送消息
<M extends Serializable> void writeAndFlush(ChannelHandlerContext channel, M msg, Cmd type);//发送消息
// 保留接口 <M extends Serializable> void writeAndFlush(M msg, Cmd type);
}
BizNettyClient client = NettyClientFactory.buildBizNettyClient(new Address("127.0.0.1", 9999),
() -> Arrays.asList(new NettyReqHandler()));//自定义 NettyReqHandler 消息处理器
client.start();//启动
服务端过来的消息,通过代理和反射进行远程RPC目标方法的执行
@RpcHandler("complexParam")
public Object complexParam(Map<String, Long> param) {
Thread thread = Thread.currentThread();
log.info(" client get a map data = {} ,thread={}", param, thread);
return "map is OK --bbq";
}
protected void channelRead0(ChannelHandlerContext ctx, SafeFrame<Frame<?>> data) throws Exception {
byte cmd = data.getCmd();
if (cmd == Cmd.REQUEST.code()) {
Frame<?> frame = data.getContent();
String name = frame.getTarget();
//使用线程池处理任务
getExecutor().execute(() -> {
//代理执行方法
RpcInvokeProxy.invoke(ret -> {
//返回数据
Frame<Object> resp = new Frame<>();
resp.setData(ret);
writeAndFlush(ctx.channel(), resp, Cmd.RESPONSE);
}, name, frame.getData());
});
} else {
// ctx.fireChannelRead(data);
}
}
支持关机重启
支持客户端管理
支持API方式发送消息、关闭客户端等
//服务器状态
int state();
//开启服务器
void start();
//重启服务器
void restart();
//停止服务器
void stop();
//获取服务器id
long getServerId();
//发送消息
<M extends Serializable> void writeAndFlush(Channel channel, M msg, Cmd type);
<M extends Serializable> void writeAndFlush(ChannelHandlerContext channel, M msg, Cmd type);
//添加客户端channel
void addChannel(String clientId, Channel channel);
//关闭指定clientId的channel
void closeChannel(String clientId);
//获取所有的客户端channel
Collection<Channel> allChannels();
//获取指定clientId的客户端channel
Channel findChanel(String clientId);
//打印所有channel信息
void showChannels();
BizNettyServer server =
NettyServerFactory.buildBizNettyServer(
new Address("127.0.0.1", 9999),
() -> Arrays.asList(new NettyServerHandler()));
server.start();
@Override
protected void channelRead0(ChannelHandlerContext ctx, SafeFrame<Frame<?>> data) throws Exception {
byte cmd = data.getCmd();
// request
if (cmd == Cmd.REQUEST.code()) {
Frame<?> frame = data.getContent();
log.info(" server get REQUEST data = {}", frame);
//返回数据
// writeAndFlush(ctx.channel(), resp, Cmd.REQUEST);
}
//response
if (cmd == Cmd.RESPONSE.code()) {
Frame<?> frame = data.getContent();
log.info("server get RESPONSE data = {}", frame);
} else {
//ctx.fireChannelRead(data);
}
}
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream();
try {
ObjectOutputStream outputStream =
new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(obj);
outputStream.flush();
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
try {
ObjectInputStream objectInputStream =
new ObjectInputStream(byteArrayInputStream);
return (T) objectInputStream.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
@Override
public <T> byte[] serialize(T obj) {
return GsonTool.toJson(obj).getBytes(StandardCharsets.UTF_8);
}
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
return GsonTool.fromJson(new String(data), clazz);
}
LengthFieldBasedFrameDecoder解码器自定义长度解决TCP粘包黏包问题。所以LengthFieldBasedFrameDecoder又称为: 自定义长度解码器
LengthFieldBasedFrameDecoder是自定义长度解码器,所以构造函数中6个参数,基本都围绕那个定义长度域,进行的描述。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型