互联网行业:RPC框架大量引入Netty,Dubbo 中默认使用Netty做通信框架,大型网络游戏,地图服务器,在大数据领域(AVRO实现数据文件共享)默认采用Netty做跨界点通信,Netty Service 对Netty二次封装...
特点:每建立一个连接就会创建一个线程,没有连接就会阻塞等待
package com.zhj.test.bio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author zhj
*/
public class BIOServer {
public static void main(String[] args) throws IOException {
// 线程池机制
// 思路
// 1. 创建一个线程
// 2. 如果有客户端连接,就创建一个线程,与之通信(单独写一个方法)
ExecutorService executorService = Executors.newCachedThreadPool();
// 创建ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器程序启动!");
while (true) {
// 监听,等待客户端连接
System.out.println("等待连接!!!");
final Socket socket = serverSocket.accept();
System.out.println("连接一个客户端(socket)!");
// 创建一个线程与之通讯
executorService.execute(new Runnable() {
@Override
public void run() {
// 可以与客户端通讯
handler(socket);
}
});
}
}
/**
* 与客户端通讯
*/
public static void handler(Socket socket) {
byte[] bytes = new byte[1024];
try {
InputStream inputStream = socket.getInputStream();
// 循环读取客户端读取的数据
while (true) {
System.out.println("等待输入数据!!!");
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(Thread.currentThread().getName() + " : " + Thread.currentThread().getId());
System.out.println("接收:" + new String(bytes, 0, read));
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("关闭与客户端的连接!");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
NIO 全称java non-blocking IO 是指JDK提供的新的API。从JDK1.4开始,Java提供了一系列改进输入输出的新特性,被统称NIO(New IO),是同步非阻塞的。
三大核心部分:Channel(通道),Buffer缓存区),Selector(选择题)
NIO是面向缓冲区,或者面向块编程的,数据读到一个它稍后处理的缓冲区,需要时可在缓冲区前后移动,这就增加了它处理过程中的灵活性,使他可以提供非阻塞式的高伸缩性网络。
特点:
HTTP2.0采用多路复用技术,同一个连接处理多个请求。
三大核心组件的关系
package com.zhj.test.bio;
import java.nio.IntBuffer;
/**
* @author zhj
*/
public class BasicBuffer {
public static void main(String[] args) {
// 举例说明Buffer 的使用
// 创建一个Buffer
IntBuffer intBuffer = IntBuffer.allocate(5);
// 向Buffer 存数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}
// 从Buffer 读取数据
// 将Buffer转换,读写切换
/*
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
*/
intBuffer.flip();
// 设置读取位置
intBuffer.position(2);
// 设置读取结束位置
intBuffer.limit(4);
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
}
public class NIOByteBufferPutGet {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.putInt(100);
buffer.putLong(9L);
buffer.putChar('强');
buffer.putShort((short) 4);
buffer.flip();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());
System.out.println(buffer.getShort());
}
}
public class ReadOnlyBuffer {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(64);
for (int i = 0; i < 64; i++) {
buffer.put((byte) i);
}
buffer.flip();
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
System.out.println(readOnlyBuffer.getClass());
while (readOnlyBuffer.hasRemaining()) {
System.out.println(readOnlyBuffer.get());
}
// 只读不能放数据
// readOnlyBuffer.put((byte) 1);
}
}
/**
* MappedByteBuffer 说明
* 1. 可以让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次
* @author zhj
*/
public class MappedByteBufferTest {
public static void main(String[] args) throws Exception {
File file1 = new File("E:\\data_file\\log1.txt");
File file2 = new File("E:\\data_file\\log2.txt");
RandomAccessFile randomAccessFile = new RandomAccessFile(file1, "rw");
FileChannel fileChannel = randomAccessFile.getChannel();
/**
* 参数(1读写模式,2起始位置,3映射到内存大小)
*/
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,5);
mappedByteBuffer.put(0, (byte) 'H');
mappedByteBuffer.put(3, (byte) '9');
randomAccessFile.close();
System.out.println("修改成功~");
}
}
基本介绍
1)NIO的通道类似与流,但区别如下
2)BIO中的stream 是单向的,如FileinputStream对象只能进行读取数据的操作,而NIO中的通道是双向的,可以读,也可以写
3)Channel 在NIO中是一个接口
4)常用的Channel类有 FileChannel、DatagramChannel、ServerSocketChannel 和SocketChannel
5)FileChannel用于文件的数据读写,DatagramChannel 用于UDP的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写
FileChannel 类
// 案例
// 写
public class NIOFileChannel01 {
public static void main(String[] args) throws IOException {
String str = "hello world";
// 创建一个输出流
FileOutputStream fileOutputStream = new FileOutputStream("E:\\data_file\\log.txt");
// 通过fileOutputStream 获取对应fileChannel
// 这个fileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 将str 放入
byteBuffer.put(str.getBytes());
// 读写切换
byteBuffer.flip();
// 写入Channel
fileChannel.write(byteBuffer);
fileOutputStream.close();
}
}
// 读
public class NIOFileChannel02 {
public static void main(String[] args) throws IOException {
File file = new File("E:\\data_file\\log.txt");
// 创建一个输出流
FileInputStream fileInputStream = new FileInputStream(file);
// 通过fileOutputStream 获取对应fileChannel
// 这个fileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = fileInputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length());
// 将文件 读入缓冲区
fileChannel.read(byteBuffer);
// 读写切换
// byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
}
}
// 读写
}
// 将buffer 写入到 fileChannel02
byteBuffer.flip();
fileChannel02.write(byteBuffer);
}
fileInputStream.close();
fileOutputStream.close();
}
}
// 文件拷贝
public class NIOFileChannel04 {
public static void main(String[] args) throws IOException {
File file1 = new File("E:\\data_file\\img01.jpg");
File file2 = new File("E:\\data_file\\img02.jpg");
// 创建一个输出流
FileInputStream fileInputStream = new FileInputStream(file1);
FileChannel fileChannel01 = fileInputStream.getChannel();
// 创建一个输出流
FileOutputStream fileOutputStream = new FileOutputStream(file2);
FileChannel fileChannel02 = fileOutputStream.getChannel();
fileChannel02.transferFrom(fileChannel01,0, fileChannel01.size());
fileInputStream.close();
fileOutputStream.close();
}
}
ScatteringAndGathering 分散聚集
/**
* Scattering 将数据写入到buffer,可采用buffer数组,依次写入
* Gathering 将数据读出到buffer
* @author zhj
*/
public class ScatteringAndGatheringTest {
public static void main(String[] args) throws IOException {
// 使用ServerSocketChannel 和SocketChannel 网络
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
// 绑定端口到Socket并启动
serverSocketChannel.socket().bind(inetSocketAddress);
// 创建buffer数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
// 等客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
int messageLength = 8; // 假定从客户端接收8个
while (true) {
int byteRead = 0;
while (byteRead < messageLength) {
long read = socketChannel.read(byteBuffers);
byteRead += read;
// System.out.println("byteRead = " + byteRead);
Arrays.asList(byteBuffers).stream().map(
buffer -> "postion = " + buffer.position() + ", limit = " + buffer.limit())
.forEach(System.out::println);
}
// buffer 反转
Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip());
// 将数据显示到客户端
long byteWrite = 0;
while (byteWrite < messageLength) {
long write = socketChannel.write(byteBuffers);
byteWrite += write;
}
Arrays.asList(byteBuffers).forEach(buffer -> buffer.clear());
System.out.println("byteRead = " + byteRead);
System.out.println("byteWrite = " + byteWrite);
System.out.println("messageLength = " + messageLength);
}
}
}
特点
方法:open() 获得
NIO入门案例
ator.next();
// 事件驱动
if (key.isAcceptable()) {
System.out.println("有新的客户端连接");
SocketChannel socketChannel = serverSocketChannel.accept();
// 设置为非阻塞
socketChannel.configureBlocking(false);
// 注册selector 关联Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("生成非阻塞socketChannel:" + socketChannel.hashCode());
}
if (key.isReadable()) {
// 通过key反向获取对应channel
SocketChannel socketChannel = (SocketChannel) key.channel();
// 获取到该channel 关联的 Buffer
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
socketChannel.read(byteBuffer);
System.out.println("客户端:" + new String(byteBuffer.array()));
}
// 手动从集合移出当前key 防止多线程发生重复读取
iterator.remove();
}
}
}
}
public class NIOClient {
public static void main(String[] args) throws Exception {
// 1.得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
// 2.设置非阻塞
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
// 3.连接服务器
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("客户端因为连接需要时间,客户端不会阻塞");
}
}
System.out.println("客户端连接服务器连接成功!");
// 4.设置发送内容
String str = "hello world!!!";
// 5.将数据放入缓冲区 wrap可以根据字节数组大小分配大小
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
socketChannel.write(buffer);
System.in.read();
}
}
服务端
public class GroupChatServer {
// 定义相关属性
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private static final int PORT = 8888;
//构造器
public GroupChatServer() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
// 读取客户端信息
private void readData(SelectionKey key) {
// 定义
SocketChannel socketChannel = null;
try {
socketChannel = (SocketChannel) key.channel();
// 创建buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if (count > 0) {
// 把缓冲区数据转字符串输出
String msg = new String(buffer.array());
// 输出该消息
System.out.println("From 客户端:" + msg);
// 转发消息
sendInfoToOtherClients(msg, socketChannel);
}
} catch (Exception e) {
try {
System.out.println(socketChannel.getRemoteAddress() + "离线了");
key.cancel();
socketChannel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
private void sendInfoToOtherClients(String msg, SocketChannel self) {
System.out.println("服务器转发消息中。。。");
// 遍历所有注册在selector 并排除自己
try {
for (SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel && targetChannel != self) {
// 转型
SocketChannel socketChannel = (SocketChannel) targetChannel;
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 监听
public void listen() {
try {
while (true) {
int count = selector.select(2000);
if (count > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(socketChannel.getRemoteAddress() + "上线了!");
}
if (key.isReadable()) {
// 处理读方法
readData(key);
}
iterator.remove();
}
} else {
// System.out.println("等待。。。");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
GroupChatServer server = new GroupChatServer();
server.listen();
}
}
客户端
public class GroupChatClient {
private final String HOST = "127.0.0.1";
private final int PORT = 8888;
private Selector selector;
private SocketChannel socketChannel;
private String username;
public GroupChatClient() {
try {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(HOST,PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + "准备就绪。。。");
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendInfo(String info) {
info = username + " : " + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
public void readInfo() {
try {
int readChannels = selector.select();
if (readChannels > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
sc.read(buffer);
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
iterator.remove();
}
} else {
// System.out.println("没有可以用的通道。。。");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
GroupChatClient client = new GroupChatClient();
new Thread() {
public void run() {
while (true) {
client.readInfo();
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
client.sendInfo(msg);
}
}
}
sendFile优化 2.1版本不是 2.4版本是零拷贝
mmap 和 sendfile 的区别
案例
// 传统IO服务器端
public class OldIOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(7001);
while (true) {
Socket socket = serverSocket.accept();
DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
try {
byte[] bytes = new byte[4096];
while (true) {
int readCount = dataInputStream.read(bytes, 0, bytes.length);
if (-1 == readCount) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
// 传统IO服务器端
public class OldIOClient {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 7001);
String fileName = "";
InputStream inputStream = new FileInputStream(fileName);
DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
byte[] bytes = new byte[4096];
long readCount;
long total = 0;
long startTime = System.currentTimeMillis();
while ((readCount = inputStream.read(bytes)) > 0) {
total += readCount;
dataOutputStream.write(bytes);
}
System.out.println("发送总字节数: " + total + " 耗时:" + (System.currentTimeMillis()-startTime));
dataOutputStream.close();
socket.close();
inputStream.close();
}
}
// 新
public class NewIOServer {
public static void main(String[] args) throws Exception {
InetSocketAddress address = new InetSocketAddress(7002);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket socket = serverSocketChannel.socket();
socket.bind(address);
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
int readCount = 0;
while (-1 != readCount) {
try {
readCount = socketChannel.read(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
break;
}
byteBuffer.rewind(); // 倒带 position = 0 mark = -1(作废)
}
}
}
}
public class NewIOClient {
public static void main(String[] args) throws Exception {
InetSocketAddress address = new InetSocketAddress("127.0.0.1",7002);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(address);
String fileName = "";
FileChannel channel = new FileInputStream(fileName).getChannel();
long startTime = System.currentTimeMillis();
// linux 下一个transferTo方法就可以完成传输
// windows 下调用只能发8m,就需要分段传输文件,要注意传输位置 需要循环计算
// 使用零拷贝
long transferCount = channel.transferTo(0, channel.size(), socketChannel);
System.out.println("发送总字节数: " + transferCount + " 耗时:" + (System.currentTimeMillis()-startTime));
channel.close();
}
}
BIO | NIO | AIO | |
---|---|---|---|
IO模型 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 |
编程难度 | 简单 | 复杂 | 负载 |
可靠性 | 差 | 好 | 好 |
吞吐量 | 低 | 高 | 高 |
异步的基于事件驱动的网络应用的框架,用于快速开发高性能,高可靠的网络IO程序
原生NIO存在的问题
Netty 的优点
Netty对JDK自带的NIO的API进行了封装,解决了上述问题。
Netty版本说明
传统阻塞I/O服务模型 和 Reactor模式(单Reactor单线程、单Reactor多线程、主从Reactor多线程)
Netty基于主从Reactor多线程模型
传统IO模型
缺点
Reactor(反应器模式,分发者模式,通知者模式)
针对传统阻塞I/o服务模型的2个缺点,解决方案:
Reactor模式中核心组成:
单Reactor单线程
单Reactor多线程
方案说明
主从Reactor多线程
方案优缺点说明:
结合实例:这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持
Netty模型
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
</dependencies>
// 服务端
public class SimpleNettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个线程组 BossGroup 和 WorkerGroup
// BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理
// 两个线程组都是无限循环
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(32);
// 创建服务器端启动对象,配置参数
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 链式编程
serverBootstrap.group(bossGroup,workerGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // 设置NIO通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
// 创建通道初始化对象
// 给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new SimpleNettyServerHandler());
}
}); // 给workerGroup管道设置处理器
System.out.println("服务器初始化完毕!!!");
// 启动服务器,并绑定端口,并且同步处理
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
// 对关闭通道进行监听 (异步模型)
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// 服务端处理器
public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 客户端发送的数据 默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
System.out.println("server ctx = " + ctx);
// 将msg转为一个ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址为:" + ctx.channel().remoteAddress());
// 比如 这有一个非常耗时的业务 需要异步执行
// 解决方案1 用户自定义普通任务 taskQueue
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("服务端发生异常了!!!");
}
}
});
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
// 15 秒
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵3",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("服务端发生异常了!!!");
}
}
});
// 解决方案2 用户自定义定时任务 scheduleTaskQueue
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
// 20 秒
Thread.sleep(5*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵4",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("服务端发生异常了!!!");
}
}
},5, TimeUnit.SECONDS);
// 解决方案3 非当前Reactor 线程调用Channel的各种方法
System.out.println("Go to...");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// write + flush 将数据写入缓冲并刷新
// 发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常,关闭通道
ctx.close();
}
}
// 客户端
public class SimpleNettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
// 创建客户端启动对象
try {
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new SimpleNettyClientHandler());
}
});
System.out.println("客户端启动完成!!!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
// 客户端处理器
public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 通道就绪就可以发送消息
System.out.println("client active ctx = " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务器!",CharsetUtil.UTF_8));
}
/**
* 通道有读取数据时,会触发
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 客户端发送的数据 默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("client read ctx = " + ctx);
// 将msg转为一个ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器端地址为:" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 发生异常,关闭通道
ctx.close();
}
}
基本介绍
public class TestHttpServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestHttpServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class TestHttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个netty 提供的httpServerCodec => [coder - decoder]
// HttpServerCodec 说明
// 1.HttpServerCodec 是netty提供的处理http的编码解码器
pipeline.addLast("MyHttpServerCodeC", new HttpServerCodec());
// 2.增加自定义handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
}
}
/**
* 说明
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
* 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
* @author zhj
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
// 读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("请求了favicon.ico资源 不做处理!!!");
return;
}
// 每次请求都会产生新的
System.out.println("pipeline hashcode" + ctx.pipeline().hashCode());
System.out.println("TestHttpServerHandler hashcode" + this.hashCode());
System.out.println("msg 类型 : " + msg.getClass());
System.out.println("客户端地址 : " + ctx.channel().remoteAddress());
// 回复信息给浏览器 [http协议]
ByteBuf content = Unpooled.copiedBuffer("Hello,服务器", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 将构建好 response 返回
ctx.writeAndFlush(response);
}
}
}
Bootstrap、ServerBootstrap
Bootstrap意思是引导,一个Netty应用通常由一个 Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中 Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类
常见的方法有 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop public B group(EventLoopGroup group),该方法用于客户端,用来设置一个EventLoopGrouppublic B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现public B option(ChannelOption option, Tvalue),用来给ServerChannel添加配置 public ServerBootstrap childOption(ChannelOption childOption, Tvalue),用来给接收到的通道添加配置
public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) WorkerGroup
public ServerBootstrap Handler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) BossGroup public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost, int inetPort),该方法用于客户端,用来连接服务器端
Future、ChannelFuture
Channel
Netty 网络通信的组件,能够用于执行网络I/o操作。
通过Channel可获得当前网络连接的通道的状态
通过Channel可获得网络连接的配置参数(例如接收缓冲区大小)
Channel提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成
调用立即返回一个 ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I/O操作成功、失败或取消时回调通知调用方
支持关联I/O操作与对应的处理程序
不同协议、不同的阻塞类型的连接都有不同的 Channel类型与之对应
常用的Channel类型:
Selector
// 服务端
public class GroupChatServer {
private int port;
public GroupChatServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入解码器
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new GroupChatServerHandler());
}
});
ChannelFuture cf = serverBootstrap.bind(port).sync();
System.out.println("【服务器】启动完成~");
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatServer(7000).run();
}
}
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
// 定义一个channelGroup 管理所有的channel
// GlobalEventExecutor.INSTANCE 全局的事件执行器,是一个单列
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// 私聊
// private static Map<String, Channel> channelMap = new HashMap<>();
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 连接建立,第一个被执行 将 channel 加入 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 将客户加入聊天的信息推送其他客户端
// 不需要自己遍历
channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 加入聊天室。" + sdf.format(new Date()) + "\n");
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 可以自动执行 channelGroup.remove(channel);
channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 离开聊天室。" + sdf.format(new Date()) + "\n");
}
// channel 处于活动状态
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 上线了。\n");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 跑路了。\n");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(ch -> {
if (channel != ch) {
ch.writeAndFlush("【客户端】" + channel.remoteAddress() + " : " + s + "\n");
} else {
// 回显
ch.writeAndFlush("【我】" + " : " + s + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("【服务端】 开小差了~");
ctx.close();
}
}
// 客户端
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入解码器
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture cf = bootstrap.connect(host, port).sync();
Channel channel = cf.channel();
System.out.println("【客户端】" + channel.localAddress() + " 启动完成~");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\r\n");
}
} finally {
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(s.trim());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("【客户端】 开小差了~");
ctx.close();
}
}
改变http协议的状态码为101,升级成为ws协议实现全双工长连接通信
服务端
package com.zhj.test.netty.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @author zhj
*/
public class WebSocketServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个线程组 BossGroup 和 WorkerGroup
// BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理
// 两个线程组都是无限循环
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建服务器端启动对象,配置参数
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 链式编程
serverBootstrap.group(bossGroup,workerGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // 设置NIO通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
// 创建通道初始化对象
// 给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 基于Http协议的,添加Http编解码器
pipeline.addLast(new HttpServerCodec());
// 以块的方式写,添加ChunkedWriteHandler
pipeline.addLast(new ChunkedWriteHandler());
// http数据在传输是分段的 HttpObjectAggregator 可以将多段聚合
pipeline.addLast(new HttpObjectAggregator(8192));
// websocket 数据以帧(frame)形式传递
// websocket 有六个子类
// 浏览器请求时 ws://localhost:7000/xxx 表示请求的uri
// websocket 核心功能是将http协议升级为ws 保持长连接
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 自定义handler ,处理业务逻辑
pipeline.addLast(new WebSocketHandler());
}
}); // 给workerGroup管道设置处理器
System.out.println("服务器初始化完毕!!!");
// 启动服务器,并绑定端口,并且同步处理
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
// 对关闭通道进行监听 (异步模型)
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.zhj.test.netty.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* 这里TextWebSocketFrame 类型,表示一个文本帧(frame)
* @author zhj
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器接收消息:" + msg.text());
// 回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器响应时间:" + LocalDateTime.now() + msg.text()));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// id 表示唯一的一个值
System.out.println("handler added 被调用 " + ctx.channel().id().asLongText());
System.out.println("handler added 被调用 " + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler removed 被调用 " + ctx.channel().id().asLongText());
System.out.println("handler removed 被调用 " + ctx.channel().id().asShortText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 " + cause.getMessage());
ctx.close();
}
}
客户端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>websocket</title>
</head>
<body>
<script>
var socket;
// 判断当前浏览器是否支持websocket编程
if (window.WebSocket) {
// go on
socket = new WebSocket("ws://localhost:7000/hello")
// ev收到服务器端发送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById('responseText')
rt.value = rt.value + "\n" + ev.data
}
// 相当于连接开启
socket.onopen = function (ev) {
var rt = document.getElementById('responseText')
rt.value = '连接开启~'
}
socket.onclose = function (ev) {
var rt = document.getElementById('responseText')
rt.value = rt.value + '\n' + '连接关闭~'
}
} else {
alert("您的浏览器太菜了,不支持websocket")
}
function send(message) {
// 发送消息到服务器
if (!window.socket) {
return
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px;width: 300px" ></textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px;width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
Protobuf基本介绍
protoc.exe --java_out=. Student.proto
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 生成的外部类名 同时也是文件名
// protobuf 使用message 管理数据
message Student { // 内部类 真正发送的POJO对象 1表示属性序号
int32 id = 1;
string name = 2;
}
syntax = "proto3"; // 版本
option optimize_for = SPEED; // 加快解析
option java_package = "com.zhj.test.netty.codec2"; // 指生成到哪个包下
option java_outer_classname = "DataInfo"; // 生成的外部类名 同时也是文件名
// protobuf 使用message 管理数据 其他的message
message MyMessage {
// 定义一个枚举
enum DataType {
StudentType = 0; // 在proto3 要求enum 编号从0开始
workerType = 1;
}
// 用data_type来标识传的是哪一个枚举类型
DataType data_type = 1;
// 表示每次枚举类型只能出现其中的一个,节省空间
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student { // 内部类 真正发送的POJO对象 1表示属性序号
int32 id = 1;
string name = 2;
}
message Worker {
string name = 1;
int32 age = 2;
}
入站先解码,再执行自己的业务处理器,出站先执行自己的业务处理器,再编码
解码器-ReplayingDecoder
其它解码器
传输对象(协议包)
package com.zhj.test.netty.protocoltcp;
import java.util.Arrays;
/**
* 协议包
* @author zhj
*/
public class MessageProtocol {
private int len;
private byte[] content;
public MessageProtocol() {
}
public MessageProtocol(int len, byte[] content) {
this.len = len;
this.content = content;
}
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
@Override
public String toString() {
return "MessageProtocol{" +
"len=" + len +
", content=" + Arrays.toString(content) +
'}';
}
}
编解码器
package com.zhj.test.netty.protocoltcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
* @author zhj
*/
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
System.out.println("MyMessageDecoder decode 方法被调用");
int length = byteBuf.readInt();
byte[] content = new byte[length];
byteBuf.readBytes(content);
MessageProtocol messageProtocol = new MessageProtocol(length, content);
list.add(messageProtocol);
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* @author zhj
*/
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception {
System.out.println("MyMessageEncoder encoder方法被调用");
byteBuf.writeInt(messageProtocol.getLen());
byteBuf.writeBytes(messageProtocol.getContent());
}
}
服务端
package com.zhj.test.netty.protocoltcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author zhj
*/
public class TcpClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
// 创建客户端启动对象
try {
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new TcpClientInitializer());
System.out.println("客户端启动完成!!!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* @author zhj
*/
public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyMessageDecoder());
pipeline.addLast(new MyMessageEncoder());
// 2.增加自定义handler
pipeline.addLast("MyTcpServerHandler", new TcpServerHandler());
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* 说明
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
* 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
* @author zhj
*/
public class TcpServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
// System.out.println(new String(msg.array(), CharsetUtil.UTF_8));
System.out.println("服务端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8));
System.out.println("服务端接收的数据长度 " + msg.getLen());
System.out.println("服务器接收到的消息量:" + (++this.count));
String responseContent = UUID.randomUUID().toString();
int len = responseContent.getBytes(StandardCharsets.UTF_8).length;
MessageProtocol messageProtocol = new MessageProtocol(len, responseContent.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(messageProtocol);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("服务器异常:" + cause.getMessage());
ctx.close();
}
}
客户端
package com.zhj.test.netty.protocoltcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author zhj
*/
public class TcpClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
// 创建客户端启动对象
try {
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new TcpClientInitializer());
System.out.println("客户端启动完成!!!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* @author zhj
*/
public class TcpClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyMessageEncoder());
pipeline.addLast(new MyMessageDecoder());
// 2.增加自定义handler
pipeline.addLast("MyTestTcpClientHandler", new TcpClientHandler());
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;
/**
* 说明
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
* 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
* @author zhj
*/
public class TcpClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 使用客户端发送10条数据 hello server
for (int i = 0; i < 60; i++) {
String mes = "今天真带劲。。。";
byte[] content = mes.getBytes(StandardCharsets.UTF_8);
int length = mes.getBytes(StandardCharsets.UTF_8).length;
MessageProtocol messageProtocol = new MessageProtocol(length, content);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
System.out.println("客户端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8));
System.out.println("客户端接收的数据长度 " + msg.getLen());
System.out.println("客户端接收到的消息量:" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常信息:" + cause.getMessage());
ctx.close();
}
}
RPC 基本介绍
自己实现dubbo RPC(基于Netty)需求说明
设计说明
代码实现:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。