1 Star 1 Fork 0

杨运交 / async-file

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

async-file

Gitee stars Gitee forks Maven central License GitHub stars GitHub forks GitHub watchers GitHub release

Gitee

GitHub

介绍

async-file工具提供Java异步读写文件的能力,使用Java NIO 库开发。Java应用程序引入框架可以简单的,异步和非阻塞的读写文件。框架包含三个工具类:

提示:Java提供的 Files 文件读取功能是阻塞的。

安装教程

首先,如果项目使用Maven工具,在项目的pom.xml文件中添加依赖

<dependency> 
  <groupId>io.github.kavahub</groupId>
  <artifactId>kava-async-file</artifactId>
  <version>1.0.1.RELEASE</version>
</dependency>

如果是Gradle项目,需要添加依赖:

implementation 'io.github.kavahub:kava-async-file:1.0.1.RELEASE'

AIOFileReader使用说明

AIOFileReader方法列表:

  • Query<byte[]> bytes(Path file) : 读取文件,返回文件数据字节数组,每次读取的大小由默认缓冲区决定。

  • Query<byte[]> allBytes(Path file) : 读取文件,返回文件所有数据字节数组。每次按默认缓冲区读取文件,完成后合并。

  • Query<String> line(Path file) : 读取文件,返回文件行字符串。每次按默认缓冲区读取文件数据字节数组,按换行符分割字节数组。

  • Query<String> allLines(Path file) : 读取文件,返回文件所有数据字符串。每次按默认缓冲区读取文件数据字节数组,合并后转换成字符串。

默认缓冲区大小定义:

public static final int BUFFER_SIZE = 4096 * 4;

示例:

// 按行读取文件,并输出到控制台
        final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
        
        AIOFileReader.line(FILE)
                // 订阅行数据
                .subscribe(data -> {
                    // 文件行处理,如输出到控制台
                    System.out.println(data);
                    // doSomethingData(data)
                }, err -> {
                    // 异常处理
                    err.printStackTrace();
                    // doSomethingError(err)
                })
                // 等待文件读取完成
                .join();

        // 也可以这样写
        AIOFileReader.line(FILE)
                // 订阅数据
                .onNext(data -> {
                    // 文件行处理,如输出到控制台
                    System.out.println(data);
                    // doSomethingData(data)
                })
                // 订阅异常
                .onError(err -> {
                    // 异常处理
                    err.printStackTrace();
                    // doSomethingError(err)
                })
                // 等待文件读取完成
                .blockingSubscribe();

这种适用广泛,我们经常读取文本文件,统计相关信息,如:单词统计等,只需要编写 doSomethingData 代码。doSomethingError 方法,处理读或者业务处理中的异常,我们建议将异常输出到日志

doSomethingData 业务中的异常,会导致读文件中断,整个文件操作终止,所有的异常信息可以在 doSomethingError 中处理

示例:

// 按行读取文件,并输出到控制台
        final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
        CompletableFuture<Void> future = AIOFileReader.line(FILE)
                // 订阅行数据
                .subscribe(data -> {
                    // 文件行处理,如输出到控制台
                    System.out.println(data);
                    // doSomethingData(data)
                }, err -> {
                    // 异常处理
                    err.printStackTrace();
                    // doSomethingError(err)
                });

        // 处理其他业务逻辑
        TimeUnit.SECONDS.sleep(2); // 模拟业务

        // 循环,直到文件处理完成
        while (!future.isDone()) {
        }

        System.out.println("文件处理完成");

AIOFileReader 读文件是异步的,所以可以处理写其他的业务

示例:

        // 统计文件中单词个数,并找出次数最多的单词
        final Path FILE = Paths.get("src", "test", "resources", "fileToCount.txt");

        final int MIN = 5;
        final int MAX = 10;

        ConcurrentHashMap<String, Integer> words = new ConcurrentHashMap<>();
        AIOFileReader.line(FILE)
                // 过滤掉前14行
                .filter(line -> !line.trim().isEmpty()).skip(14)
                // 使用空格分隔
                .flatMapMerge(line -> Query.of(line.split(" ")))
                // 过滤单词
                .filter(word -> word.length() > MIN && word.length() < MAX)
                // 统计单词次数
                .onNext((w) -> words.merge(w, 1, Integer::sum))
                // 异常处理
                .onError(err -> err.printStackTrace())
                // 阻塞,直到文件统计完毕
                .blockingSubscribe();

        Map.Entry<String, ? extends Number> common = Collections.max(words.entrySet(),
                Comparator.comparingInt(e -> e.getValue().intValue()));
        assertEquals("Hokosa", common.getKey());
        assertEquals(183, common.getValue().intValue());

示例:

        // 统计“*** END OF ”行之前所有单词的数量
        // 当读取到"*** END OF "行时,读线程会取消读操作,避免继续读取不需要处理的数据
        final Path FILE = Paths.get("src", "test", "resources", "fileToCount.txt");

        int[] count = { 0 };
        AIOFileReader.line(FILE)
                // 过滤空行
                .filter(line -> !line.trim().isEmpty())
                // 忽略前14行
                .skip(14)
                // 忽略掉‘*** END OF ’以后的行
                .takeWhile(line -> !line.contains("*** END OF "))
                // 行按空格切割成单词
                .flatMapMerge(line -> Query.of(line.split("\\W+")))
                // 去重
                .distinct()
                // 统计数量
                .onNext((word) -> {
                        count[0]++;
                })
                // 异常处理
                .onError(err -> err.printStackTrace())                
                // 阻塞,知道文件读取完成
                .blockingSubscribe();
        assertEquals(5206, count[0]);

示例:

        // 详细演示takeWhile的功能:
        // 1. 控制台输出前部文件内容,框架日志提示[Cancel file reading. [16384 bytes] has been readed],读取操作取消,不在读取文件数据。
        // 2. [16384 bytes] 信息中,16384是框架默认读取缓冲区大小,由此可以判断:文件只读取了一次
        final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");

        AIOFileReader.line(FILE)
                // 控制台输出
                .onNext((data) -> {
                    System.out.println("before:" + data);
                })
                // 终止文件读操纵
                .takeWhile(line -> false)
                //
                .onNext((data) -> {
                    System.out.println("after:" + data);
                })
                // 异常处理
                .onError(err -> err.printStackTrace())                   
                .blockingSubscribe();

示例:

        // 也可以使用cancel方法中断读文件操作

        final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");

        CompletableFuture<Void> future = AIOFileReader.line(FILE).subscribe(data -> {
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, err -> err.printStackTrace());

        TimeUnit.MILLISECONDS.sleep(1000);

        future.cancel(false);

示例:

        // 显示读文件线程的名称
        final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");

        AIOFileReader.bytes(FILE).subscribe(data -> {
            System.out.println(Thread.currentThread().getName());
        }, err -> err.printStackTrace()).join();

输出结果如下:

Thread-8
Thread-7
Thread-8
Thread-7
Thread-8
Thread-7
Thread-8
Thread-7
Thread-8
Thread-7
...

其结果表明:有两个线程读取文件,线程交替读取以保证读取文件数据的顺序,这是 AsynchronousFileChannel 实现的

AIOFileWriter使用说明

AIOFileWriter方法列表:

  • CompletableFuture<Integer> write(Path file, byte[] bytes) : 字节数组数据写入文件。

  • CompletableFuture<Integer> write(Path file, String line) : 字符串数据写入文件。

  • CompletableFuture<Integer> write(Path file, Query<String> lines) : 字符串流数据写入文件。

  • CompletableFuture<Integer> write(Path file, Iterable<String> lines) : 字符串集合数据写入文件。

示例:

        // 写入字符串
        AIOFileWriter.write(Paths.get(FILE_TO_WRITE), "This is file content:你好").join();

示例:

        // 分割字符串写入
        final String content = "This is file content:你好";

        AIOFileWriter.write(Paths.get(FILE_TO_WRITE), String.join(System.lineSeparator(), content.split(" "))).join();

示例:

        // 边度边写
        final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");

        try (CompletableFileWriter writer = AIOFileWriter.of(FILE_TO_WRITE)) {

            AIOFileReader.line(FILE)
                    // 忽略前2行
                    .skip(2)
                    // 过滤掉空行
                    .filter(line -> !line.isBlank())
                    // 转换成大写
                    .map(String::toUpperCase)
                    // 加入换行符
                    .map(line -> line + System.lineSeparator()).subscribe(data -> {
                        writer.write(data);
                    }, err -> err.printStackTrace()).join();

            // 等待写入完成
            writer.getPosition().whenComplete((size, error) -> {
                if (error != null) {
                    error.printStackTrace();
                }

                System.out.println("总共写入字节数:" + size);
            }).join();
        }

NIOFileLineReader使用说明

NIOFileLineReader 方法列表:

Query<String> read(Path file) : 读取文件行。

        // 读取文件行并过滤
        final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
        NIOFileLineReader.read(FILE).subscribe(data -> {
            System.out.println(Thread.currentThread().getName());
        }, err -> err.printStackTrace()).join();

使用建议

  • 文件的异步读写,并不是为了提高文件的读取性能,而是提高文件读取的吞吐量(读取更多的文件,并保持性能,使JVM可以稳定运行)。
  • 在大多数情况下,使用Jdk提供的Files或许更合适。
  • 不要为了异步而异步,找到问题所在,也许解决问题的关键不是异步。

建议使用优先级: Java NIO Files > NIOFileLineReader > AIOFileReader

性能

性能测试,参考 ReadLineBenchmark 。 其他开源项目文件读写的性能测试 ReadFileBenchmark

构建项目

克隆代码到本地,然后运行mvn命令, 执行编译,测试,打包项目:

mvn clean install

发布项目

首先,确保项目可以正确构建。然后执行下面的命令(发布的文件要以release结尾,如:kavahub-async-file-1.0.0.RELEASE.jar):

mvn release:prepare -Prelease
mvn release:perform -Prelease

上面操作将包上传到了Staging Repository,需要转入Release Repository,执行命令:

mvn nexus-staging:release

以上操作全部成功,发布完成。

发布SNAPSHOT包到仓库,命令如下:

mvn clean deploy -Prelease

取消Staging Repository中的包,命令如下:

mvn nexus-staging:drop 

其他开源项目

  • RxIo : Asynchronous non-blocking File Reader and Writer library for Java

历史版本

1.0.0-RELEASE

参考文档

MIT License Copyright (c) 2021 万品伟 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

Java异步文件读写工具(Asynchronous File Reader and Writer tools for Java) 展开 收起
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/yang-yun-jiao/async-file.git
git@gitee.com:yang-yun-jiao/async-file.git
yang-yun-jiao
async-file
async-file
main

搜索帮助