41 Star 135 Fork 46

leonchen83 / redis-replicator

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

内容索引(Table of Contents)

1. Redis-replicator

Buy Me A Coffee

1.1. 简介

Java CI with Maven Coverage Status Maven Central Javadocs Hex.pm LICENSE

Redis Replicator是一款RDB解析以及AOF解析的工具. 此工具完整实现了Redis Replication协议. 支持SYNC, PSYNC, PSYNC2等三种同步命令. 还支持远程RDB文件备份以及数据同步等功能. 此文中提到的 命令 特指Redis中的写(比如 set,hmset)命令,不包括读命令(比如 get,hmget), 支持的redis版本范围从2.6到6.2

1.2. QQ讨论组

479688557

1.3. 联系作者

chen.bao.yi@qq.com

2. 安装

2.1. 安装前置条件

编译最低jdk版本 jdk9+
运行最低jdk版本 jdk8+
maven-3.3.1+
redis 2.6 - 7.0

2.2. Maven依赖

    <dependency>
        <groupId>com.moilioncircle</groupId>
        <artifactId>redis-replicator</artifactId>
        <version>3.8.1</version>
    </dependency>

2.3. 安装源码到本地maven仓库

    step 1: 安装 jdk-9.0.x (或 jdk-11.0.x)
    step 2: git clone https://github.com/leonchen83/redis-replicator.git
    step 3: $cd ./redis-replicator 
    step 4: $mvn clean install package -DskipTests

2.4. 选择一个版本

redis 版本 redis-replicator 版本
[2.6, 7.2.x] [3.8.0, ]
[2.6, 7.0.x] [3.6.4, 3.7.0]
[2.6, 7.0.x-RC2] [3.6.2, 3.6.3]
[2.6, 7.0.0-RC1] [3.6.0, 3.6.1]
[2.6, 6.2.x] [3.5.2, 3.5.5]
[2.6, 6.2.x-RC1] [3.5.0, 3.5.1]
[2.6, 6.0.x] [3.4.0, 3.4.4]
[2.6, 5.0.x] [2.6.1, 3.3.3]
[2.6, 4.0.x] [2.3.0, 2.5.0]
[2.6, 4.0-RC3] [2.1.0, 2.2.0]
[2.6, 3.2.x] [1.0.18](不再提供支持)

3. 简要用法

3.1. 用法

        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueString) {
                    KeyStringValueString kv = (KeyStringValueString) event;
                    System.out.println(new String(kv.getKey()));
                    System.out.println(new String(kv.getValue()));
                } else {
                    ....
                }
            }
        });
        replicator.open();

3.2. 备份远程redis的rdb文件

参阅 RdbBackupExample.java

3.3. 备份远程redis的实时命令

参阅 CommandBackupExample.java

3.4. 将rdb转换成dump格式

我们可以用 DumpRdbVisitor 来将 rdb 转换成 redis DUMP 格式。


        Replicator r = new RedisReplicator("redis:///path/to/dump.rdb");
        r.setRdbVisitor(new DumpRdbVisitor(r));
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (!(event instanceof DumpKeyValuePair)) return;
                DumpKeyValuePair dkv = (DumpKeyValuePair) event;
                byte[] serialized = dkv.getValue();
                // we can use redis RESTORE command to migrate this serialized value to another redis.
            }
        });
        r.open();

3.5. 检查Rdb的正确性

我们可以用 SkipRdbVisitor 来检查 rdb 的正确性.


        Replicator r = new RedisReplicator("redis:///path/to/dump.rdb");
        r.setRdbVisitor(new SkipRdbVisitor(r));
        r.open();

3.6. Scan与PSYNC

默认情况下, redis-replicator 使用 PSYNC 命令伪装成slave接收命令, 如下所示

        Replicator r = new RedisReplicator("redis://127.0.0.1:6379");
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                System.out.println(event);
            }
        });
        
        r.open();

然而, 在某些云服务中, PSYNC 是被禁止使用的, 因此我们使用 Scan 命令来替换PSYNC命令扫描全库, 如下所示


        Replicator r = new RedisReplicator("redis://127.0.0.1:6379?enableScan=yes&scanStep=256");
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                System.out.println(event);
            }
        });
        
        r.open();

3.7. 其他示例

参阅 examples

4. 高级主题

4.1. 命令扩展

4.1.1. 首先写一个command类

    @CommandSpec(command = "APPEND")
    public static class YourAppendCommand extends AbstractCommand {
        private final String key;
        private final String value;
    
        public YourAppendCommand(String key, String value) {
            this.key = key;
            this.value = value;
        }
                
        public String getKey() {
            return key;
        }
        
        public String getValue() {
            return value;
        }
    }

4.1.2. 然后写一个command parser

    public class YourAppendParser implements CommandParser<YourAppendCommand> {

        @Override
        public YourAppendCommand parse(Object[] command) {
            return new YourAppendCommand(new String((byte[]) command[1], UTF_8), new String((byte[]) command[2], UTF_8));
        }
    }

4.1.3. 注册这个command parser到replicator

    Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
    replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser());

4.1.4. 处理这个注册的command事件

    replicator.addEventListener(new EventListener() {
        @Override
        public void onEvent(Replicator replicator, Event event) {
            if(event instanceof YourAppendCommand){
                YourAppendCommand appendCommand = (YourAppendCommand)event;
                // your code goes here
            }
        }
    });

4.1.5. 结合到一起

参阅 CommandExtensionExample.java

4.2. Module扩展(redis-4.0及以上)

4.2.1. 编译redis源码中的测试modules

    $cd /path/to/redis-4.0-rc2/src/modules
    $make

4.2.2. 打开redis配置文件redis.conf中相关注释

    loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so

4.2.3. 写一个module parser

    public class HelloTypeModuleParser implements ModuleParser<HelloTypeModule> {

        @Override
        public HelloTypeModule parse(RedisInputStream in, int version) throws IOException {
            DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
            int elements = parser.loadUnsigned(version).intValue();
            long[] ary = new long[elements];
            int i = 0;
            while (elements-- > 0) {
                ary[i++] = parser.loadSigned(version);
            }
            return new HelloTypeModule(ary);
        }
    }

    public class HelloTypeModule implements Module {
        private final long[] value;

        public HelloTypeModule(long[] value) {
            this.value = value;
        }

        public long[] getValue() {
            return value;
        }
    }

4.2.4. 再写一个command parser

    public class HelloTypeParser implements CommandParser<HelloTypeCommand> {
        @Override
        public HelloTypeCommand parse(Object[] command) {
            String key = new String((byte[])command[1],Constants.UTF_8);
            long value = Long.parseLong(new String((byte[])command[2],Constants.UTF_8));
            return new HelloTypeCommand(key, value);
        }
    }
    
    @CommandSpec(command = "hellotype.insert")
    public class HelloTypeCommand extends AbstractCommand {
        private final String key;
        private final long value;

        public long getValue() {
            return value;
        }

        public String getKey() {
            return key;
        }

        public HelloTypeCommand(String key, long value) {
            this.key = key;
            this.value = value;
        }
    }

4.2.5. 注册module parser和command parser并处理相关事件

    public static void main(String[] args) throws IOException {
        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
        replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueModule) {
                    System.out.println(event);
                }
                
                if (event instanceof HelloTypeCommand) {
                    System.out.println(event);
                }
            }
        });
        replicator.open();
    }

4.2.6. 结合到一起

参阅 ModuleExtensionExample.java

4.3. Stream

Redis-5.0+ 增加了一个新的数据结构 STREAM. Redis-replicator 用下述代码解析 STREAM


        Replicator r = new RedisReplicator("redis://127.0.0.1:6379");
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueStream) {
                    KeyStringValueStream kv = (KeyStringValueStream)event;
                    // key
                    String key = kv.getKey();
                    
                    // stream
                    Stream stream = kv.getValueAsStream();
                    // last stream id
                    stream.getLastId();
                    
                    // entries
                    NavigableMap<Stream.ID, Stream.Entry> entries = stream.getEntries();
                    
                    // optional : group
                    for (Stream.Group group : stream.getGroups()) {
                        // group PEL(pending entries list)
                        NavigableMap<Stream.ID, Stream.Nack> gpel = group.getPendingEntries();
                        
                        // consumer
                        for (Stream.Consumer consumer : group.getConsumers()) {
                            // consumer PEL(pending entries list)
                            NavigableMap<Stream.ID, Stream.Nack> cpel = consumer.getPendingEntries();
                        }
                    }
                }
            }
        });
        r.open();

4.4. 编写你自己的rdb解析器

  • 写一个类继承 RdbVisitor 抽象类
  • 通过 ReplicatorsetRdbVisitor 方法注册你自己的 RdbVisitor.

4.5. Redis URI

在 redis-replicator-2.4.0 版之前, 我们按如下方式构造 RedisReplicator :

Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/dump.rdb", FileType.RDB, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.AOF, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.MIXED, Configuration.defaultSetting());

在 redis-replicator-2.4.0 版之后, 我们引入了一个新的概念(Redis URI) 来简化 RedisReplicator 的构造, 以便提供一致的API.

Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");

// 配置的例子
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379?authPassword=foobared&readTimeout=10000&ssl=yes");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb?rateLimit=1000000");
Replicator replicator = new RedisReplicator("rediss://user:pass@127.0.0.1:6379?rateLimit=1000000");

5. 其他主题

5.1. 内置的Command Parser

命令 命令 命令 命令 命令 命令
PING APPEND SET SETEX MSET DEL
SADD HMSET HSET LSET EXPIRE EXPIREAT
GETSET HSETNX MSETNX PSETEX SETNX SETRANGE
HDEL UNLINK SREM LPOP LPUSH LPUSHX
LRem RPOP RPUSH RPUSHX ZREM ZINTERSTORE
INCR DECR INCRBY PERSIST SELECT FLUSHALL
FLUSHDB HINCRBY ZINCRBY MOVE SMOVE BRPOPLPUSH
PFCOUNT PFMERGE SDIFFSTORE RENAMENX PEXPIREAT SINTERSTORE
ZADD BITFIELD SUNIONSTORE RESTORE LINSERT ZREMRANGEBYLEX
GEOADD PEXPIRE ZUNIONSTORE EVAL SCRIPT ZREMRANGEBYRANK
PUBLISH BITOP SETBIT SWAPDB PFADD ZREMRANGEBYSCORE
RENAME MULTI EXEC LTRIM RPOPLPUSH SORT
EVALSHA ZPOPMAX ZPOPMIN XACK XADD XCLAIM
XDEL XGROUP XTRIM XSETID COPY LMOVE
BLMOVE ZDIFFSTORE GEOSEARCHSTORE FUNCTION SPUBLISH

5.2. 当出现EOFException

  • 调整redis server中的以下配置. 相关配置请参考 redis.conf
    client-output-buffer-limit slave 0 0 0

警告: 这个配置可能会使redis-server中的内存溢出

5.3. 跟踪事件日志log

  • 日志级别调整成 debug
  • 如果你项目中使用log4j2,请加入如下Logger到配置文件:
    <Logger name="com.moilioncircle" level="debug">
        <AppenderRef ref="YourAppender"/>
    </Logger>
    Configuration.defaultSetting().setVerbose(true);
    // redis uri
    "redis://127.0.0.1?verbose=yes"

5.4. SSL安全链接

    System.setProperty("javax.net.ssl.keyStore", "/path/to/keystore");
    System.setProperty("javax.net.ssl.keyStorePassword", "password");
    System.setProperty("javax.net.ssl.keyStoreType", "your_type");

    System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
    System.setProperty("javax.net.ssl.trustStorePassword", "password");
    System.setProperty("javax.net.ssl.trustStoreType", "your_type");

    Configuration.defaultSetting().setSsl(true);

    // 可选设置
    Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
    Configuration.defaultSetting().setSslParameters(sslParameters);
    Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier);
    // redis uri
    "redis://127.0.0.1:6379?ssl=yes"
    "rediss://127.0.0.1:6379"

5.5. redis认证

    Configuration.defaultSetting().setAuthUser("default");
    Configuration.defaultSetting().setAuthPassword("foobared");
    // redis uri
    "redis://127.0.0.1:6379?authPassword=foobared&authUser=default"
    "redis://default:foobared@127.0.0.1:6379"

5.6. 避免全量同步

  • 调整redis-server中的如下配置
    repl-backlog-size
    repl-backlog-ttl
    repl-ping-slave-period

repl-ping-slave-period 必须 小于 Configuration.getReadTimeout(), 默认的 Configuration.getReadTimeout() 是60秒.

5.7. 生命周期事件

        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        final long start = System.currentTimeMillis();
        final AtomicInteger acc = new AtomicInteger(0);
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if(event instanceof PreRdbSyncEvent) {
                    System.out.println("pre rdb sync");
                } else if(event instanceof PostRdbSyncEvent) {
                    long end = System.currentTimeMillis();
                    System.out.println("time elapsed:" + (end - start));
                    System.out.println("rdb event count:" + acc.get());
                } else {
                    acc.incrementAndGet();
                }
            }
        });
        replicator.open();

5.8. 处理巨大的KV

根据 4.3. 编写你自己的rdb解析器, 这个工具内嵌了一个迭代方式的rdb解析器, 以便处理巨大的KV.
详细的例子参阅:
[1] HugeKVFileExample.java
[2] HugeKVSocketExample.java

5.9. Redis6支持

5.9.1. SSL支持

    $cd /path/to/redis
    $./utils/gen-test-certs.sh
    $cd tests/tls
    $openssl pkcs12 -export -CAfile ca.crt -in redis.crt -inkey redis.key -out redis.p12
    $cd /path/to/redis
    $./src/redis-server --tls-port 6379 --port 0 --tls-cert-file ./tests/tls/redis.crt \
         --tls-key-file ./tests/tls/redis.key --tls-ca-cert-file ./tests/tls/ca.crt \
         --tls-replication yes --bind 0.0.0.0 --protected-mode no

    System.setProperty("javax.net.ssl.keyStore", "/path/to/redis/tests/tls/redis.p12");
    System.setProperty("javax.net.ssl.keyStorePassword", "password");
    System.setProperty("javax.net.ssl.keyStoreType", "pkcs12");

    System.setProperty("javax.net.ssl.trustStore", "/path/to/redis/tests/tls/redis.p12");
    System.setProperty("javax.net.ssl.trustStorePassword", "password");
    System.setProperty("javax.net.ssl.trustStoreType", "pkcs12");

    Replicator replicator = new RedisReplicator("rediss://127.0.0.1:6379");

如果你不想设置 System.setProperty 可以使用下面的方式


    RedisSslContextFactory factory = new RedisSslContextFactory();
    factory.setKeyStorePath("/path/to/redis/tests/tls/redis.p12");
    factory.setKeyStoreType("pkcs12");
    factory.setKeyStorePassword("password");

    factory.setTrustStorePath("/path/to/redis/tests/tls/redis.p12");
    factory.setTrustStoreType("pkcs12");
    factory.setTrustStorePassword("password");

    SslConfiguration ssl = SslConfiguration.defaultSetting().setSslContextFactory(factory);
    Replicator replicator = new RedisReplicator("rediss://127.0.0.1:6379", ssl);

5.9.2. ACL支持


    Replicator replicator = new RedisReplicator("redis://user:pass@127.0.0.1:6379");

5.10. Redis7支持

5.10.1. Function

Redis 7.0 添加了 function 的支持. function 的结构存储在rdb文件中. 因此我们能用如下方式解析function.


    Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
    replicator.addEventListener(new EventListener() {
        @Override
        public void onEvent(Replicator replicator, Event event) {
            if (event instanceof Function) {
                Function function = (Function) event;
                function.getCode();
                    
                // your code goes here
            }
        }
    });
    replicator.open();

也可以把 function 解析成 serialized 格式. 这样接下来我们可以用 FUNCTION RESTORE 命令把 serialized 数据迁移到目标redis


    Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
    replicator.setRdbVisitor(new DumpRdbVisitor(replicator));
    replicator.addEventListener(new EventListener() {
        @Override
        public void onEvent(Replicator replicator, Event event) {
            if (event instanceof DumpFunction) {
                DumpFunction function = (DumpFunction) event;
                byte[] serialized = function.getSerialized();
                // your code goes here
                // you can use FUNCTION RESTORE to restore above serialized data to target redis
            }
        }
    });
    replicator.open();

6. 贡献者

7. 商业咨询

redis-replicator 支持如下的商业咨询服务:

  • 现场咨询. 50,000元/天
  • 现场培训. 50,000元/天

可以直接联系陈宝仪, 发送邮件至 chen.bao.yi@gmail.com.

8. 相关引用

9. 致谢

9.1. 宁文君

2023年1月27日,在这一天我的妈妈宁文君(1953-2023)离世了。她是一个慈祥严格又乐于助人的老太太,自己的退休金虽然不多,但每年也会给贫困山区捐衣物现金。她是支撑我写下这个工具的最大动力,每当我跟她说又有新的公司在用这个工具时,她都和我一样高兴并鼓励我继续维护下去,也一直鼓励我参加各种技术分享活动。虽然我并没有取得多少成就,但她一直为我自豪。可能很多年后宁文君这个名字会被遗忘,但我希望 Github 会再有将数据备份到北极的活动,这样这个名字就会保存一千年。愿逝者安息。

9.2. YourKit

YourKit
YourKit is kindly supporting this open source project with its full-featured Java Profiler.
YourKit, LLC is the creator of innovative and intelligent tools for profiling
Java and .NET applications. Take a look at YourKit's leading software products:
YourKit Java Profiler and YourKit .NET Profiler.

9.3. IntelliJ IDEA

IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software.
It is developed by JetBrains (formerly known as IntelliJ), and is available as an Apache 2 Licensed community edition,
and in a proprietary commercial edition. Both can be used for commercial development.

9.4. Redisson

Redisson is Redis based In-Memory Data Grid for Java offers distributed objects and services (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) backed by Redis server. Redisson provides more convenient and easiest way to work with Redis. Redisson objects provides a separation of concern, which allows you to keep focus on the data modeling and application logic.

Copyright (c) 2016-2019 Leon Chen "Anti 996" License Version 1.0 (Draft) Permission is hereby granted to any individual or legal entity obtaining a copy of this licensed work (including the source code, documentation and/or related items, hereinafter collectively referred to as the "licensed work"), free of charge, to deal with the licensed work for any purpose, including without limitation, the rights to use, reproduce, modify, prepare derivative works of, distribute, publish and sublicense the licensed work, subject to the following conditions: 1. The individual or the legal entity must conspicuously display, without modification, this License and the notice on each redistributed or derivative copy of the Licensed Work. 2. The individual or the legal entity must strictly comply with all applicable laws, regulations, rules and standards of the jurisdiction relating to labor and employment where the individual is physically located or where the individual was born or naturalized; or where the legal entity is registered or is operating (whichever is stricter). In case that the jurisdiction has no such laws, regulations, rules and standards or its laws, regulations, rules and standards are unenforceable, the individual or the legal entity are required to comply with Core International Labor Standards. 3. The individual or the legal entity shall not induce, suggest or force its employee(s), whether full-time or part-time, or its independent contractor(s), in any methods, to agree in oral or written form, to directly or indirectly restrict, weaken or relinquish his or her rights or remedies under such laws, regulations, rules and standards relating to labor and employment as mentioned above, no matter whether such written or oral agreements are enforceable under the laws of the said jurisdiction, nor shall such individual or the legal entity limit, in any methods, the rights of its employee(s) or independent contractor(s) from reporting or complaining to the copyright holder or relevant authorities monitoring the compliance of the license about its violation(s) of the said license. THE LICENSED WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN ANY WAY CONNECTION WITH THE LICENSED WORK OR THE USE OR OTHER DEALINGS IN THE LICENSED WORK.

简介

Redis-replicator是一款用java写的redis rdb以及命令解析软件.它可以实时解析,过滤,广播rdb以及command事件,支持redis2.6-7.2,支持psync,sync,psync2三种同步协议. 展开 收起
Java
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/leonchen83/redis-replicator.git
git@gitee.com:leonchen83/redis-replicator.git
leonchen83
redis-replicator
redis-replicator
master

搜索帮助

14c37bed 8189591 565d56ea 8189591