1 Star 4 Fork 1

朱慧杰 / netty-study

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

Netty 异步高性能通信框架

互联网行业:RPC框架大量引入Netty,Dubbo 中默认使用Netty做通信框架,大型网络游戏,地图服务器,在大数据领域(AVRO实现数据文件共享)默认采用Netty做跨界点通信,Netty Service 对Netty二次封装...

1. IO模型

1.1 BIO 模型

特点:每建立一个连接就会创建一个线程,没有连接就会阻塞等待

package com.zhj.test.bio;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author zhj
 */
public class BIOServer {

    public static void main(String[] args) throws IOException {
        // 线程池机制

        // 思路
        // 1. 创建一个线程
        // 2. 如果有客户端连接,就创建一个线程,与之通信(单独写一个方法)

        ExecutorService executorService = Executors.newCachedThreadPool();

        // 创建ServerSocket
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务器程序启动!");

        while (true) {
            // 监听,等待客户端连接
            System.out.println("等待连接!!!");
            final Socket socket = serverSocket.accept();
            System.out.println("连接一个客户端(socket)!");

            // 创建一个线程与之通讯
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    // 可以与客户端通讯
                    handler(socket);
                }
            });
        }
    }

    /**
     * 与客户端通讯
     */
    public static void handler(Socket socket) {
        byte[] bytes = new byte[1024];
        try {
            InputStream inputStream = socket.getInputStream();
            // 循环读取客户端读取的数据
            while (true) {
                System.out.println("等待输入数据!!!");
                int read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println(Thread.currentThread().getName() + " : " + Thread.currentThread().getId());
                    System.out.println("接收:" + new String(bytes, 0, read));
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("关闭与客户端的连接!");
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

1.2 NIO

NIO 全称java non-blocking IO 是指JDK提供的新的API。从JDK1.4开始,Java提供了一系列改进输入输出的新特性,被统称NIO(New IO),是同步非阻塞的。

三大核心部分:Channel(通道),Buffer缓存区),Selector(选择题)

NIO是面向缓冲区,或者面向块编程的,数据读到一个它稍后处理的缓冲区,需要时可在缓冲区前后移动,这就增加了它处理过程中的灵活性,使他可以提供非阻塞式的高伸缩性网络。

特点:

  • 非阻塞 不需要线程一直等待,有别的任务线程也可以去执行
  • 一个线程可以处理多个连接,当大量请求到服务器,不需要每个连接开一个线程

HTTP2.0采用多路复用技术,同一个连接处理多个请求。

三大核心组件的关系

  • 每个Channel都会对应一个Buffer
  • Selector对应一个线程,一个线程对应多个Channel连接
  • 该图反应了三个Channel 注册到改Selector 程序
  • 程序切换到那个Channel是由事件决定的,Event就是一个重要的概念
  • Selector会根据不同的时间再各个通道上切换
  • Buffer就是一个内容块,底层是与一个数组的
  • 数据的读取写入是通过Buffer,这个与BIO有本质区别,BIO要么是输入流,要么是输出流,不能是双向的,NIO的Buffer是可以读也可以写的,需要flip方法切换
  • Channel是双向的,可以返回底层操作系统的情况,Linux底层的操作系统就是双向的

1.2.1 Buffer缓冲区的使用

  • Capacity 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变
  • Limit 表示缓冲区当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
  • Position 位置,下一个要读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写操作做准备
  • Mark 标记
package com.zhj.test.bio;

import java.nio.IntBuffer;

/**
 * @author zhj
 */
public class BasicBuffer {

    public static void main(String[] args) {
        // 举例说明Buffer 的使用
        // 创建一个Buffer
        IntBuffer intBuffer = IntBuffer.allocate(5);
        // 向Buffer 存数据
        for (int i = 0; i < intBuffer.capacity(); i++) {
            intBuffer.put(i * 2);
        }
        // 从Buffer 读取数据
        // 将Buffer转换,读写切换
        /*
        public final Buffer flip() {
            limit = position;
            position = 0;
            mark = -1;
            return this;
        }
        */
        intBuffer.flip();
        // 设置读取位置
        intBuffer.position(2);
        // 设置读取结束位置
        intBuffer.limit(4);

        while (intBuffer.hasRemaining()) {
            System.out.println(intBuffer.get());
        }
    }
}

public class NIOByteBufferPutGet {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(64);
        buffer.putInt(100);
        buffer.putLong(9L);
        buffer.putChar('强');
        buffer.putShort((short) 4);
        buffer.flip();
        System.out.println(buffer.getInt());
        System.out.println(buffer.getLong());
        System.out.println(buffer.getChar());
        System.out.println(buffer.getShort());
    }
}
public class ReadOnlyBuffer {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(64);

        for (int i = 0; i < 64; i++) {
            buffer.put((byte) i);
        }
        buffer.flip();
        ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
        System.out.println(readOnlyBuffer.getClass());

        while (readOnlyBuffer.hasRemaining()) {
            System.out.println(readOnlyBuffer.get());
        }
        // 只读不能放数据
        // readOnlyBuffer.put((byte) 1);
    }
}
/**
 * MappedByteBuffer 说明
 * 1. 可以让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次
 * @author zhj
 */
public class MappedByteBufferTest {
    public static void main(String[] args) throws Exception {
        File file1 = new File("E:\\data_file\\log1.txt");
        File file2 = new File("E:\\data_file\\log2.txt");
        RandomAccessFile randomAccessFile = new RandomAccessFile(file1, "rw");
        FileChannel fileChannel = randomAccessFile.getChannel();
        /**
         *  参数(1读写模式,2起始位置,3映射到内存大小)
         */
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,5);
        mappedByteBuffer.put(0, (byte) 'H');
        mappedByteBuffer.put(3, (byte) '9');

        randomAccessFile.close();
        System.out.println("修改成功~");
    }
}

1.2.2 Channel通道的使用

基本介绍

1)NIO的通道类似与流,但区别如下

  • 通道可以同时进行读写,而流只能进行读或者写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲区读取数据,也可以写数据到缓冲区

2)BIO中的stream 是单向的,如FileinputStream对象只能进行读取数据的操作,而NIO中的通道是双向的,可以读,也可以写

3)Channel 在NIO中是一个接口

4)常用的Channel类有 FileChannel、DatagramChannel、ServerSocketChannel 和SocketChannel

5)FileChannel用于文件的数据读写,DatagramChannel 用于UDP的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写

FileChannel 类

  • read 将通道数据读取到缓冲区中
  • write 把缓冲区的数据写到通道
  • transferFrom() 从目标通道中复制数据到当前通道
  • transferTo() 把数据从当前通道复制给目标通道
// 案例
// 写
public class NIOFileChannel01 {
    public static void main(String[] args) throws IOException {
        String str = "hello world";
        // 创建一个输出流
        FileOutputStream fileOutputStream = new FileOutputStream("E:\\data_file\\log.txt");
        // 通过fileOutputStream 获取对应fileChannel
        // 这个fileChannel 真实类型是 FileChannelImpl
        FileChannel fileChannel = fileOutputStream.getChannel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        // 将str 放入
        byteBuffer.put(str.getBytes());
        // 读写切换
        byteBuffer.flip();
        // 写入Channel
        fileChannel.write(byteBuffer);

        fileOutputStream.close();
    }
}
// 读
public class NIOFileChannel02 {
    public static void main(String[] args) throws IOException {
        File file = new File("E:\\data_file\\log.txt");
        // 创建一个输出流
        FileInputStream fileInputStream = new FileInputStream(file);
        // 通过fileOutputStream 获取对应fileChannel
        // 这个fileChannel 真实类型是 FileChannelImpl
        FileChannel fileChannel = fileInputStream.getChannel();
        ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length());
        // 将文件 读入缓冲区
        fileChannel.read(byteBuffer);
        // 读写切换
        // byteBuffer.flip();
        System.out.println(new String(byteBuffer.array()));
        fileInputStream.close();
    }
}
// 读写

            }
            // 将buffer 写入到 fileChannel02
            byteBuffer.flip();
            fileChannel02.write(byteBuffer);
        }
        fileInputStream.close();
        fileOutputStream.close();
    }
}
// 文件拷贝
public class NIOFileChannel04 {
    public static void main(String[] args) throws IOException {
        File file1 = new File("E:\\data_file\\img01.jpg");
        File file2 = new File("E:\\data_file\\img02.jpg");
        // 创建一个输出流
        FileInputStream fileInputStream = new FileInputStream(file1);
        FileChannel fileChannel01 = fileInputStream.getChannel();
        // 创建一个输出流
        FileOutputStream fileOutputStream = new FileOutputStream(file2);
        FileChannel fileChannel02 = fileOutputStream.getChannel();
        fileChannel02.transferFrom(fileChannel01,0, fileChannel01.size());
        fileInputStream.close();
        fileOutputStream.close();
    }
}

ScatteringAndGathering 分散聚集

/**
 * Scattering 将数据写入到buffer,可采用buffer数组,依次写入
 * Gathering 将数据读出到buffer
 * @author zhj
 */
public class ScatteringAndGatheringTest {
    public static void main(String[] args) throws IOException {
        // 使用ServerSocketChannel 和SocketChannel 网络
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);

        // 绑定端口到Socket并启动
        serverSocketChannel.socket().bind(inetSocketAddress);

        // 创建buffer数组
        ByteBuffer[] byteBuffers = new ByteBuffer[2];
        byteBuffers[0] = ByteBuffer.allocate(5);
        byteBuffers[1] = ByteBuffer.allocate(3);

        // 等客户端连接
        SocketChannel socketChannel = serverSocketChannel.accept();
        int messageLength = 8; // 假定从客户端接收8个
        while (true) {
            int byteRead = 0;

            while (byteRead < messageLength) {
                long read = socketChannel.read(byteBuffers);
                byteRead += read;
                // System.out.println("byteRead = " + byteRead);
                Arrays.asList(byteBuffers).stream().map(
                        buffer -> "postion = " + buffer.position() + ", limit = " + buffer.limit())
                        .forEach(System.out::println);
            }
            // buffer 反转
            Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip());
            // 将数据显示到客户端
            long byteWrite = 0;
            while (byteWrite < messageLength) {
                long write = socketChannel.write(byteBuffers);
                byteWrite += write;
            }

            Arrays.asList(byteBuffers).forEach(buffer -> buffer.clear());

            System.out.println("byteRead = " + byteRead);
            System.out.println("byteWrite = " + byteWrite);
            System.out.println("messageLength = " + messageLength);
        }
    }
}

1.2.3 Selector 选择器的使用

特点

  1. Netty 的IO线程NioEventLoop 聚合了Selector (选择器,也叫多路复用器),可以同时并发处理成百上千个客户端的连接。
  2. 当线程从某客户端Socket通道进行读写时,若没有数据可用时,该线程可以进行其他任务。
  3. 线程常将非阻塞IO的空闲时间用于在其他通道上执行IO操作,所以单独的线程可以管理多个输入和输出通道。
  4. 由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁I/O阻塞导致线程挂起。
  5. 一个I/O线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O一连接一线程的模型,架构性能、弹性伸缩能力和可靠性都得到了极大的提升。

方法:open() 获得

  • selector.select() 阻塞
  • selector.select(1000) 阻塞1s,返回
  • selector.wakeup() 唤醒
  • selector.selectNow() 不阻塞

1.2.4 NIO实现

NIO入门案例

ator.next();
                // 事件驱动
                if (key.isAcceptable()) {
                    System.out.println("有新的客户端连接");
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    // 设置为非阻塞
                    socketChannel.configureBlocking(false);
                    // 注册selector 关联Buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                    System.out.println("生成非阻塞socketChannel:" + socketChannel.hashCode());
                }
                if (key.isReadable()) {
                    // 通过key反向获取对应channel
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    // 获取到该channel 关联的 Buffer
                    ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                    socketChannel.read(byteBuffer);
                    System.out.println("客户端:" + new String(byteBuffer.array()));
                }
                // 手动从集合移出当前key 防止多线程发生重复读取
                iterator.remove();
            }
        }
    }
}

public class NIOClient {
    public static void main(String[] args) throws Exception {
        // 1.得到一个网络通道
        SocketChannel socketChannel = SocketChannel.open();
        // 2.设置非阻塞
        socketChannel.configureBlocking(false);
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        // 3.连接服务器
        if (!socketChannel.connect(inetSocketAddress)) {
            while (!socketChannel.finishConnect()) {
                System.out.println("客户端因为连接需要时间,客户端不会阻塞");
            }
        }
        System.out.println("客户端连接服务器连接成功!");
        // 4.设置发送内容
        String str = "hello world!!!";
        // 5.将数据放入缓冲区 wrap可以根据字节数组大小分配大小
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        socketChannel.write(buffer);
        System.in.read();
    }
}

1.3 NIO与BIO的比较

  1. BIO是以流的方式处理的,而NIO以块的方式处理数据,块IO的效率比流IO的高很多
  2. BIO是阻塞的,NIO是非阻塞的
  3. BIO基于字节流和字符流进行操作,而NIO基于Channel通道和Buffer缓冲区进行操作,数据总是从通道读到缓冲区中,或者从缓冲区写入到通道中。Sellector选择器用于监听多个通道的事件比如连接请求,数据到达等,因此使用单个线程就可以监听多个客户端通道

2 NIO群聊

服务端

public class GroupChatServer {

    // 定义相关属性
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private static final int PORT = 8888;

    //构造器
    public GroupChatServer() {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 读取客户端信息
    private void readData(SelectionKey key) {
        // 定义
        SocketChannel socketChannel = null;
        try {
            socketChannel = (SocketChannel) key.channel();
            // 创建buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = socketChannel.read(buffer);
            if (count > 0) {
                // 把缓冲区数据转字符串输出
                String msg = new String(buffer.array());
                // 输出该消息
                System.out.println("From 客户端:" + msg);
                // 转发消息
                sendInfoToOtherClients(msg, socketChannel);
            }
        } catch (Exception e) {
            try {
                System.out.println(socketChannel.getRemoteAddress() + "离线了");
                key.cancel();
                socketChannel.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

    private void sendInfoToOtherClients(String msg, SocketChannel self) {
        System.out.println("服务器转发消息中。。。");
        // 遍历所有注册在selector 并排除自己
        try {
            for (SelectionKey key : selector.keys()) {
                Channel targetChannel = key.channel();
                if (targetChannel instanceof SocketChannel && targetChannel != self) {
                    // 转型
                    SocketChannel socketChannel = (SocketChannel) targetChannel;
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    socketChannel.write(buffer);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 监听
    public void listen() {
        try {
           while (true) {
               int count = selector.select(2000);
               if (count > 0) {
                   Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                   while (iterator.hasNext()) {
                       SelectionKey key = iterator.next();
                       if (key.isAcceptable()) {
                           SocketChannel socketChannel = serverSocketChannel.accept();
                           socketChannel.configureBlocking(false);
                           socketChannel.register(selector, SelectionKey.OP_READ);
                           System.out.println(socketChannel.getRemoteAddress() + "上线了!");
                       }
                       if (key.isReadable()) {
                           // 处理读方法
                           readData(key);
                       }
                       iterator.remove();
                   }
               } else {
                   // System.out.println("等待。。。");
               }
           }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        GroupChatServer server = new GroupChatServer();
        server.listen();
    }

}

客户端

public class GroupChatClient {
    private final String HOST = "127.0.0.1";
    private final int PORT = 8888;
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;

    public GroupChatClient() {
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open(new InetSocketAddress(HOST,PORT));
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            username = socketChannel.getLocalAddress().toString().substring(1);
            System.out.println(username + "准备就绪。。。");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendInfo(String info) {
        info = username + " : " + info;
        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void readInfo() {
        try {
            int readChannels = selector.select();
            if (readChannels > 0) {
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        sc.read(buffer);
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                    }
                    iterator.remove();
                }
            } else {
                // System.out.println("没有可以用的通道。。。");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        GroupChatClient client = new GroupChatClient();

        new Thread() {
            public void run() {
                while (true) {
                    client.readInfo();
                    try {
                        Thread.sleep(3000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String msg = scanner.nextLine();
            client.sendInfo(msg);
        }
    }
}

3 零拷贝

  1. 从操作系统的角度来说,因为内核缓冲区之间,没有数据是重复的
  2. 零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下文切换,更少的CPU缓存伪共享以及无CPU校验和计算

sendFile优化 2.1版本不是 2.4版本是零拷贝

mmap 和 sendfile 的区别

  • mmap 适合小数据量读写,sendFile 适合大文件传输
  • mmap 需要4次上下文切换,3次数据拷贝:sendFile 需要3次上下文切换,最少2次数据拷贝
  • sendFile 可以利用DMA方式,减少CPU拷贝,mmap 则不能(必须从内核拷贝到Socket缓冲区)

案例

// 传统IO服务器端
public class OldIOServer {
    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(7001);

        while (true) {
            Socket socket = serverSocket.accept();
            DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
            try {
                byte[] bytes = new byte[4096];
                while (true) {
                    int readCount = dataInputStream.read(bytes, 0, bytes.length);
                    if (-1 == readCount) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
// 传统IO服务器端
public class OldIOClient {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket("127.0.0.1", 7001);
        String fileName = "";
        InputStream inputStream = new FileInputStream(fileName);
        DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
        byte[] bytes = new byte[4096];
        long readCount;
        long total = 0;
        long startTime = System.currentTimeMillis();

        while ((readCount = inputStream.read(bytes)) > 0) {
            total += readCount;
            dataOutputStream.write(bytes);
        }
        System.out.println("发送总字节数: " + total + " 耗时:" + (System.currentTimeMillis()-startTime));
        dataOutputStream.close();
        socket.close();
        inputStream.close();
    }
}
// 新
public class NewIOServer {
    public static void main(String[] args) throws Exception {
        InetSocketAddress address = new InetSocketAddress(7002);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket socket = serverSocketChannel.socket();
        socket.bind(address);

        ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            int readCount = 0;
            while (-1 != readCount) {
                try {
                    readCount = socketChannel.read(byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                    break;
                }
                byteBuffer.rewind(); // 倒带 position = 0 mark = -1(作废)
            }
        }
    }
}
public class NewIOClient {
    public static void main(String[] args) throws Exception {
        InetSocketAddress address = new InetSocketAddress("127.0.0.1",7002);
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(address);
        String fileName = "";

        FileChannel channel = new FileInputStream(fileName).getChannel();
        long startTime = System.currentTimeMillis();
        // linux 下一个transferTo方法就可以完成传输
        // windows 下调用只能发8m,就需要分段传输文件,要注意传输位置 需要循环计算
        // 使用零拷贝
        long transferCount = channel.transferTo(0, channel.size(), socketChannel);
        System.out.println("发送总字节数: " + transferCount + " 耗时:" + (System.currentTimeMillis()-startTime));
        channel.close();
    }
}

4 AIO 了解

  1. JDK 7 引入Asynchronous I/O ,即AIO.在进行I/O编程中,常用到两种模式;Reactor 和 Proactor。Java的NIO就是Reactor,当有事件触发时,服务器端得到通知,进行相应处理
  2. AIO即NIO 2.0 ,叫异步不阻塞IO.AIO引入异步通道的概念,采用了proactor模式,简化了程序的编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
  3. 目前AIO没有被广泛应用,Netty也是基于NIO,而不是AIO
BIO NIO AIO
IO模型 同步阻塞 同步非阻塞(多路复用) 异步非阻塞
编程难度 简单 复杂 负载
可靠性
吞吐量

5 Netty 概述

异步的基于事件驱动的网络应用的框架,用于快速开发高性能,高可靠的网络IO程序

原生NIO存在的问题

  1. NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel,SocketChannel、ByteBuffer等。
  2. 需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程涉及到 Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。一
  3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
  4. JDK NIO的Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector空轮询,最终导致CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决。

Netty 的优点

Netty对JDK自带的NIO的API进行了封装,解决了上述问题。

  1. 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池
  2. 使用方便:详细记录的Javadoc,用户指南和示例;没有其他依赖项,JDK 5 (Netty3.x或6 (Netty 4.x)就足够了。
  3. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
  4. 安全:完整的SSL/TLS和StartTLS支持。
  5. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的Bug可以被及时修复,同时,更多的新功能会被加入

Netty版本说明

  1. netty版本分为netty3.x和netty4.x、netty5.x
  2. 因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本
  3. 目前在官网可下载的版本netty3.x netty4.0.x和netty4.1.x4)
  4. 在本套课程中,我们讲解Netty4.1.x版本
  5. netty下载地址:https://bintray.com/netty/downloads/nettyl

6 Netty 线程模型

6.1 线程模型

传统阻塞I/O服务模型 和 Reactor模式(单Reactor单线程、单Reactor多线程、主从Reactor多线程)

Netty基于主从Reactor多线程模型

传统IO模型

缺点

  1. 当并发数很大,就会创建大量的线程,占用很大系统资源
  2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费

Reactor(反应器模式,分发者模式,通知者模式)

针对传统阻塞I/o服务模型的2个缺点,解决方案:

  1. 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
  2. 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

原理

Reactor模式中核心组成:

  1. Reactor: Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对I0O事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
  2. Handlers:处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。

单Reactor单线程

在这里插入图片描述

单Reactor多线程

在这里插入图片描述

方案说明

  1. Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行分发
  2. 如果建立连接请求,则右Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
  3. 如果不是连接请求,则由reactor分发调用连接对 应的handler来处理
  4. handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务
  5. worker线程池会分配独立线程完成真正的业务,并将结果返回给handler
  6. handler收到响应后,通过send将结果返回给client

主从Reactor多线程

在这里插入图片描述

方案优缺点说明:

  1. 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
  2. 优点:父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。
  3. 缺点:编程复杂度较高

结合实例:这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持

Netty模型

在这里插入图片描述

6.2 Netty简单案例

<dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.20.Final</version>
        </dependency>
</dependencies>
// 服务端
public class SimpleNettyServer {
    public static void main(String[] args) throws InterruptedException {
        // 创建两个线程组 BossGroup 和 WorkerGroup
        // BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理
        // 两个线程组都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(32);
        // 创建服务器端启动对象,配置参数
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 链式编程
            serverBootstrap.group(bossGroup,workerGroup) // 设置线程组
                    .channel(NioServerSocketChannel.class) // 设置NIO通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // 创建通道初始化对象
                        // 给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new SimpleNettyServerHandler());
                        }
                    }); // 给workerGroup管道设置处理器
            System.out.println("服务器初始化完毕!!!");
            // 启动服务器,并绑定端口,并且同步处理
            ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();

            // 对关闭通道进行监听 (异步模型)
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
// 服务端处理器
public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据
     * @param ctx 上下文对象,含有管道pipeline,通道channel,地址
     * @param msg 客户端发送的数据 默认是Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
        System.out.println("server ctx = " + ctx);
        // 将msg转为一个ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址为:" + ctx.channel().remoteAddress());
                // 比如 这有一个非常耗时的业务 需要异步执行
        // 解决方案1 用户自定义普通任务 taskQueue
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5*1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2",CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("服务端发生异常了!!!");
                }
            }
        });
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // 15 秒
                    Thread.sleep(10*1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵3",CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("服务端发生异常了!!!");
                }
            }
        });
        // 解决方案2 用户自定义定时任务 scheduleTaskQueue
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    // 20 秒
                    Thread.sleep(5*1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵4",CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("服务端发生异常了!!!");
                }
            }
        },5, TimeUnit.SECONDS);
        // 解决方案3 非当前Reactor 线程调用Channel的各种方法
        System.out.println("Go to...");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // write + flush 将数据写入缓冲并刷新
        // 发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 发生异常,关闭通道
        ctx.close();
    }
}
// 客户端
public class SimpleNettyClient {
    public static void main(String[] args) throws InterruptedException {
        // 客户端需要一个事件循环组
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        // 创建客户端启动对象
        try {
            Bootstrap bootstrap = new Bootstrap();
            // 设置相关参数
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new SimpleNettyClientHandler());
                        }
                    });
            System.out.println("客户端启动完成!!!");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }
}
// 客户端处理器
public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 通道就绪就可以发送消息
        System.out.println("client active ctx = " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务器!",CharsetUtil.UTF_8));
    }

    /**
     * 通道有读取数据时,会触发
     * @param ctx 上下文对象,含有管道pipeline,通道channel,地址
     * @param msg 客户端发送的数据 默认是Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("client read ctx = " + ctx);
        // 将msg转为一个ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器端地址为:" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 发生异常,关闭通道
        ctx.close();
    }
}

7 Netty异步模型

基本介绍

  1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
  2. Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。
  3. 调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
  4. Netty的异步模型是建立在future和callback的之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)

8 Netty入门实例 Http服务

public class TestHttpServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new TestHttpServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
public class TestHttpServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 向管道加入处理器
        // 得到管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 加入一个netty 提供的httpServerCodec => [coder - decoder]
        // HttpServerCodec 说明
        // 1.HttpServerCodec 是netty提供的处理http的编码解码器
        pipeline.addLast("MyHttpServerCodeC", new HttpServerCodec());
        // 2.增加自定义handler
        pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
    }
}
/**
 * 说明
 * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
 * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
 * @author zhj
 */
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    // 读取客户端数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {
            HttpRequest httpRequest = (HttpRequest) msg;
            URI uri = new URI(httpRequest.uri());
            if ("/favicon.ico".equals(uri.getPath())) {
                System.out.println("请求了favicon.ico资源 不做处理!!!");
                return;
            }
            // 每次请求都会产生新的
            System.out.println("pipeline hashcode" + ctx.pipeline().hashCode());
            System.out.println("TestHttpServerHandler hashcode" + this.hashCode());

            System.out.println("msg 类型 : " + msg.getClass());
            System.out.println("客户端地址 : " + ctx.channel().remoteAddress());

            // 回复信息给浏览器 [http协议]
            ByteBuf content = Unpooled.copiedBuffer("Hello,服务器", CharsetUtil.UTF_8);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            // 将构建好 response 返回
            ctx.writeAndFlush(response);
        }
    }
}

9 Netty 核心模块

Bootstrap、ServerBootstrap

  1. Bootstrap意思是引导,一个Netty应用通常由一个 Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中 Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类

  2. 常见的方法有 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop public B group(EventLoopGroup group),该方法用于客户端,用来设置一个EventLoopGrouppublic B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现public B option(ChannelOption option, Tvalue),用来给ServerChannel添加配置 public ServerBootstrap childOption(ChannelOption childOption, Tvalue),用来给接收到的通道添加配置

    public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) WorkerGroup

    public ServerBootstrap Handler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) BossGroup public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost, int inetPort),该方法用于客户端,用来连接服务器端

Future、ChannelFuture

  1. Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和—ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
  2. 常见的方法有Channel channel(),返回当前正在进行IO操作的通道ChannelFutyre sync(),等待异步操作执行完毕

Channel

  1. Netty 网络通信的组件,能够用于执行网络I/o操作。

  2. 通过Channel可获得当前网络连接的通道的状态

  3. 通过Channel可获得网络连接的配置参数(例如接收缓冲区大小)

  4. Channel提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成

  5. 调用立即返回一个 ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I/O操作成功、失败或取消时回调通知调用方

  6. 支持关联I/O操作与对应的处理程序

  7. 不同协议、不同的阻塞类型的连接都有不同的 Channel类型与之对应

    常用的Channel类型:

    • NioSocketChannel,异步的客户端 TCP Socket 连接。
    • NioServerSocketChannel,异步的服务器端TCP Socket连接。
    • NioDatagramChannel,异步的UDP连接。
    • NioSctpChannel,异步的客户端 Sctp连接。
    • NioSctpServerChannel,异步的Sctp服务器端连接,这些通道涵盖了UDP和TCP网络IO以及文件IO。

Selector

  1. Netty基于Selector对象实现I/O多路复用,通过Selector一个线程可以监听多个连接的 Channel事件。
  2. 当向一个Selector中注册Channel后,Selector内部的机制就可以自动不断地查询(Select) 这些注册的Channel是否有己就绪的I/O事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个Channel

10 Netty 群聊

// 服务端
public class GroupChatServer {
    private int port;
    public GroupChatServer(int port) {
        this.port = port;
    }
    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });
            ChannelFuture cf = serverBootstrap.bind(port).sync();
            System.out.println("【服务器】启动完成~");
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new GroupChatServer(7000).run();
    }
}
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {

    // 定义一个channelGroup 管理所有的channel
    // GlobalEventExecutor.INSTANCE 全局的事件执行器,是一个单列
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    // 私聊
    // private static Map<String, Channel> channelMap = new HashMap<>();

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    // 连接建立,第一个被执行 将 channel 加入 channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 将客户加入聊天的信息推送其他客户端
        // 不需要自己遍历
        channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 加入聊天室。" + sdf.format(new Date()) + "\n");
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 可以自动执行 channelGroup.remove(channel);
        channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 离开聊天室。" + sdf.format(new Date()) + "\n");
    }

    // channel 处于活动状态
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 上线了。\n");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 跑路了。\n");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.forEach(ch -> {
            if (channel != ch) {
                ch.writeAndFlush("【客户端】" + channel.remoteAddress() + " : " + s + "\n");
            } else {
                // 回显
                ch.writeAndFlush("【我】" + " : " + s + "\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("【服务端】 开小差了~");
        ctx.close();
    }
}
// 客户端
public class GroupChatClient {
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(bossGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
            ChannelFuture cf = bootstrap.connect(host, port).sync();
            Channel channel = cf.channel();
            System.out.println("【客户端】" + channel.localAddress() + " 启动完成~");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg + "\r\n");
            }
        } finally {
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(s.trim());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("【客户端】 开小差了~");
        ctx.close();
    }
}

11 Netty 实现Websocket

改变http协议的状态码为101,升级成为ws协议实现全双工长连接通信

服务端

package com.zhj.test.netty.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * @author zhj
 */
public class WebSocketServer {
    public static void main(String[] args) throws InterruptedException {
        // 创建两个线程组 BossGroup 和 WorkerGroup
        // BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理
        // 两个线程组都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 创建服务器端启动对象,配置参数
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 链式编程
            serverBootstrap.group(bossGroup,workerGroup) // 设置线程组
                    .channel(NioServerSocketChannel.class) // 设置NIO通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // 创建通道初始化对象
                        // 给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 基于Http协议的,添加Http编解码器
                            pipeline.addLast(new HttpServerCodec());
                            // 以块的方式写,添加ChunkedWriteHandler
                            pipeline.addLast(new ChunkedWriteHandler());
                            // http数据在传输是分段的 HttpObjectAggregator 可以将多段聚合
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            // websocket 数据以帧(frame)形式传递
                            // websocket 有六个子类
                            // 浏览器请求时 ws://localhost:7000/xxx 表示请求的uri
                            // websocket 核心功能是将http协议升级为ws 保持长连接
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));

                            // 自定义handler ,处理业务逻辑
                            pipeline.addLast(new WebSocketHandler());
                        }
                    }); // 给workerGroup管道设置处理器
            System.out.println("服务器初始化完毕!!!");
            // 启动服务器,并绑定端口,并且同步处理
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            // 对关闭通道进行监听 (异步模型)
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.zhj.test.netty.websocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDateTime;

/**
 * 这里TextWebSocketFrame 类型,表示一个文本帧(frame)
 * @author zhj
 */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器接收消息:" + msg.text());
        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器响应时间:" + LocalDateTime.now() + msg.text()));
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // id 表示唯一的一个值
        System.out.println("handler added 被调用 " + ctx.channel().id().asLongText());
        System.out.println("handler added 被调用 " + ctx.channel().id().asShortText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handler removed 被调用 " + ctx.channel().id().asLongText());
        System.out.println("handler removed 被调用 " + ctx.channel().id().asShortText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生 " + cause.getMessage());
        ctx.close();
    }
}

客户端

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>websocket</title>
</head>
<body>
<script>
    var socket;
    // 判断当前浏览器是否支持websocket编程
    if (window.WebSocket) {
        // go on
        socket = new WebSocket("ws://localhost:7000/hello")
        // ev收到服务器端发送的消息
        socket.onmessage = function (ev) {
            var rt = document.getElementById('responseText')
            rt.value = rt.value + "\n" + ev.data
        }
        // 相当于连接开启
        socket.onopen = function (ev) {
            var rt = document.getElementById('responseText')
            rt.value = '连接开启~'
        }
        socket.onclose = function (ev) {
            var rt = document.getElementById('responseText')
            rt.value = rt.value + '\n' + '连接关闭~'
        }
    } else {
        alert("您的浏览器太菜了,不支持websocket")
    }

    function send(message) {
        // 发送消息到服务器
        if (!window.socket) {
            return
        }
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        }
    }
</script>
    <form onsubmit="return false">
        <textarea name="message" style="height: 300px;width: 300px" ></textarea>
        <input type="button" value="发送消息" onclick="send(this.form.message.value)">
        <textarea id="responseText" style="height: 300px;width: 300px"></textarea>
        <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
    </form>
</body>
</html>

12 Protobuf 序列化数据

Protobuf基本介绍

  1. Protobuf是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或RPC[远程过程调用remote procedure call ]数据交换格式。 自前很多公司http+json ---> tcp+protobuf
  2. 参考文档:https://developers.google.com/protocol-buffers/docs/proto 语言指南
  3. Protobuf是以 message的方式来管理数据的.
  4. 支持跨平台、跨语言,即客户端和服务器端可以是不同的语言编写的(支持目前绝 大多数语言,例如C++、C#、Java、python等)
  5. 高性能,高可靠性
  6. 使用propobuf编译能自动生成代码,Protobuf是将类定义使用.proto文件进行描述。说明,在idea中编写.proto文件时,会自动提示是否下载.ptotot编写插件.可以让语法高亮。
  7. 然后通过protoc.exe编译器根据.proto自动生成.java文件
  8. protobuf使用 user.proto -> protoc.exe ->user.java 编码 传递二进制 服务端解码
protoc.exe --java_out=. Student.proto
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 生成的外部类名 同时也是文件名
// protobuf 使用message 管理数据
message Student { // 内部类 真正发送的POJO对象 1表示属性序号
  int32 id = 1;
  string name = 2;
}

syntax = "proto3"; // 版本
option optimize_for = SPEED; // 加快解析
option java_package = "com.zhj.test.netty.codec2"; // 指生成到哪个包下
option java_outer_classname = "DataInfo"; // 生成的外部类名 同时也是文件名
// protobuf 使用message 管理数据 其他的message
message MyMessage {
  // 定义一个枚举
  enum DataType {
    StudentType = 0; // 在proto3 要求enum 编号从0开始
    workerType = 1;
  }

  // 用data_type来标识传的是哪一个枚举类型
  DataType data_type = 1;

  // 表示每次枚举类型只能出现其中的一个,节省空间
  oneof dataBody {
    Student student = 2;
    Worker worker = 3;
  }

}
message Student { // 内部类 真正发送的POJO对象 1表示属性序号
  int32 id = 1;
  string name = 2;
}
message Worker {
  string name = 1;
  int32 age = 2;
}

13 Netty 编解码

入站先解码,再执行自己的业务处理器,出站先执行自己的业务处理器,再编码

解码器-ReplayingDecoder

  1. public abstract class ReplayingDecoder extends ByteToMessageDecoder
  2. ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法。参数T指定了用户状态管理的类型,其中Void代表不需要状态管理
  3. ReplayingDecoder使用方便,但它也有一些局限性:并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedoperationException。ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢

其它解码器

  1. LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(In或者Irln)作为分隔符来解析数据。
  2. DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
  3. HttpObjectDecoder:一个HTTP数据的解码器
  4. LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。

14 TCP粘包和拆包基本介绍

  1. TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有—一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
  2. 由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘

在这里插入图片描述

传输对象(协议包)

package com.zhj.test.netty.protocoltcp;

import java.util.Arrays;

/**
 * 协议包
 * @author zhj
 */
public class MessageProtocol {
    private int len;
    private byte[] content;

    public MessageProtocol() {

    }

    public MessageProtocol(int len, byte[] content) {
        this.len = len;
        this.content = content;
    }

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "MessageProtocol{" +
                "len=" + len +
                ", content=" + Arrays.toString(content) +
                '}';
    }
}

编解码器

package com.zhj.test.netty.protocoltcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**
 * @author zhj
 */
public class MyMessageDecoder extends ReplayingDecoder<Void> {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        System.out.println("MyMessageDecoder decode 方法被调用");
        int length = byteBuf.readInt();
        byte[] content = new byte[length];
        byteBuf.readBytes(content);
        MessageProtocol messageProtocol = new MessageProtocol(length, content);
        list.add(messageProtocol);
    }
}

package com.zhj.test.netty.protocoltcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * @author zhj
 */
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception {
        System.out.println("MyMessageEncoder encoder方法被调用");
        byteBuf.writeInt(messageProtocol.getLen());
        byteBuf.writeBytes(messageProtocol.getContent());
    }
}

服务端

package com.zhj.test.netty.protocoltcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author zhj
 */
public class TcpClient {
    public static void main(String[] args) throws InterruptedException {
        // 客户端需要一个事件循环组
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        // 创建客户端启动对象
        try {
            Bootstrap bootstrap = new Bootstrap();
            // 设置相关参数
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new TcpClientInitializer());
            System.out.println("客户端启动完成!!!");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }
}

package com.zhj.test.netty.protocoltcp;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

/**
 * @author zhj
 */
public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 向管道加入处理器
        // 得到管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new MyMessageDecoder());
        pipeline.addLast(new MyMessageEncoder());
        // 2.增加自定义handler
        pipeline.addLast("MyTcpServerHandler", new TcpServerHandler());
    }
}

package com.zhj.test.netty.protocoltcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

/**
 * 说明
 * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
 * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
 * @author zhj
 */
public class TcpServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        // System.out.println(new String(msg.array(), CharsetUtil.UTF_8));
        System.out.println("服务端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8));
        System.out.println("服务端接收的数据长度 " + msg.getLen());
        System.out.println("服务器接收到的消息量:" + (++this.count));

        String responseContent = UUID.randomUUID().toString();
        int len = responseContent.getBytes(StandardCharsets.UTF_8).length;
        MessageProtocol messageProtocol = new MessageProtocol(len, responseContent.getBytes(StandardCharsets.UTF_8));
        ctx.writeAndFlush(messageProtocol);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("服务器异常:" + cause.getMessage());
        ctx.close();
    }
}

客户端

package com.zhj.test.netty.protocoltcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author zhj
 */
public class TcpClient {
    public static void main(String[] args) throws InterruptedException {
        // 客户端需要一个事件循环组
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        // 创建客户端启动对象
        try {
            Bootstrap bootstrap = new Bootstrap();
            // 设置相关参数
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new TcpClientInitializer());
            System.out.println("客户端启动完成!!!");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }
}

package com.zhj.test.netty.protocoltcp;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

/**
 * @author zhj
 */
public class TcpClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 向管道加入处理器
        // 得到管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new MyMessageEncoder());
        pipeline.addLast(new MyMessageDecoder());
        // 2.增加自定义handler
        pipeline.addLast("MyTestTcpClientHandler", new TcpClientHandler());
    }
}

package com.zhj.test.netty.protocoltcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

import java.nio.charset.StandardCharsets;

/**
 * 说明
 * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
 * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
 * @author zhj
 */
public class TcpClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    private int count;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 使用客户端发送10条数据 hello server
        for (int i = 0; i < 60; i++) {
            String mes = "今天真带劲。。。";
            byte[] content = mes.getBytes(StandardCharsets.UTF_8);
            int length = mes.getBytes(StandardCharsets.UTF_8).length;
            MessageProtocol messageProtocol = new MessageProtocol(length, content);
            ctx.writeAndFlush(messageProtocol);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        System.out.println("客户端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8));
        System.out.println("客户端接收的数据长度 " + msg.getLen());
        System.out.println("客户端接收到的消息量:" + (++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常信息:" + cause.getMessage());
        ctx.close();
    }
}

15 RPC 调用流程

RPC 基本介绍

  1. RPC (Remote Procedure Call) —远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
  2. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
  3. 常见的RPC框架:Dubbo、google 的 gRPC、Go语言的rpxc、Apache的thrift,Spring旗下的Spring Cloud

在这里插入图片描述

自己实现dubbo RPC(基于Netty)需求说明

  1. dubbo底层使用了Netty作为网络通讯框架,要求用Netty实现一个简单的RPC框
  2. 模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty 4.x

设计说明

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty请求提供者返回数据

代码实现:

Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

Netty框架的学习 展开 收起
Java 等 2 种语言
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/zhuhuijie/netty-study.git
git@gitee.com:zhuhuijie/netty-study.git
zhuhuijie
netty-study
netty-study
master

搜索帮助