私信发送成功
Watch Star Fork

James Iter / sparrowPythonGPL-3.0

分布式数据库中间件
克隆/下载
James Iter 最后提交于 2016-03-15 13:14 加入更多文本类型
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
2016-03-04 18:55
Loading...
README.md 7.67 KB

Sparrow

分布式数据库中间件

简介

在类似订单的业务环境中,订单的记录会随着时间不断的增长。而走完整个周期的订单,时间越久,被访问到的机率就越小。 但越新的订单,访问频率越高。基于这么一个前提,做出一个简单的预测,如果经过一段时间后,订单量积累到一个非常可观的值, 影响到热数据的CURD效率,那么就必须制定一个可行的解决方案。该中间件就是为此而生。其通过对业务数据进行轻、重、热、冷横竖切分, 让每个块都保持一个轻量级状态。从而达到保证效率的目的。

安装依赖库

# 环境基于python2.7
pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com --allow-external mysql-connector-python

配置文件

# 配置环境 [debug|sandbox|production(默认)] (可以写入具体执行用户的.bashrc文件中)
export JI_ENVIRONMENT=debug
# 转存周期 亦即多久从热库转存一次数据到冷库 d以日为周期, w以周为周期, m以月为周期
DUMP_CYCLE
# 数据转存所需时间 单位秒
DATA_FLY_TIME
# 数据库默认名称 详细配置中可做差异配置 
DATABASE
# 数据库默认端口 后面详细配置中可做差异配置
DB_PORT
# 每次最多返回的记录数
DB_RESULT_LIMIT
# 热\冷库的时间线字段
TIME_LINE_FIELD
# 时间线字段的倍率 为了支持时间单位毫秒\纳秒所设, 仅用在dump脚本中, 1为妙, 1000为毫秒...
TIME_X
# 日志路径 注意读写权限
LOG_FILE_BASE
# 记录ID刻度的Key
IDsKeeper
# Redis相关配置
REDIS

DB_S
# 具体的域配置 [states.DBDomain.light.value|states.DBDomain.hot.value|states.DBDomain.cold.value], 对应轻\热\冷 
domain
# 读写模式 [states.RWMode.SW_SR.value|states.RWMode.BW_AR.value], 对应单例写\单例读, 均衡写\聚合读, 一般轻库\冷库单写单读, 热库均衡写\聚合读
rw_mode
# 是否禁用该域 [True|False] 被禁用的域,配置信息不被解析
disable
# 连接池大小 注意: 所有的连接池加起来不能超过数据库环境变量max_connections, (参考命令: SHOW VARIABLES LIKE 'max_connections';)
# 包含转存脚本的连接数
pool_size

冷库格式

冷库需提前创建好,按月份,每月一个库,格式如sparrow__2016__1, 数据库名称\年\月之间用双下划线间隔. 每个冷库里面的数据表,及表的结构,索引都要与热库一致.不然转存数据会失败.

转存的时间

如果转存周期为'w',那么在进入当前周期时,就可以执行转存脚本. crontab格式如 '* * * * 1 cd ~/sparrowt/task_scheduler && /usr/bin/python dump.py', 注意用户权限. 在周期末时,可以执行clear_expire_in_hot.py来清理热库中已被转存的数据. crontab格式如 '* * * * 7 cd ~/sparrowt/task_scheduler && /usr/bin/python clear_expire_in_hot.py', 注意用户权限. 删除的内容会记录在当前目录的yyyymm_log_record.record文件中,当做后悔药.

启动

# HTTP API
python sparrow.py

# zmq API
python zmq_scheduler.py &
python zmq_sparrow.py &

数据库设计限制

  1. 每个表必须有id字段,且为主键,CURD操作都基于此来识别对象;
  2. 冷热库中的表都要有时间线字段,该字段具体名称可通过config.py文件设置(默认为create_time);
  3. 未建索引的字段将不能成为条件过滤字段;
  4. 组合索引,多个字段间用'__'(双下划线(如: ALTER TABLE user_info ADD INDEX first_name__last_name (first_name, last_name);))来间隔;
  5. 查询时,轻库无需指定时间区间. 冷热裤必须指定时间区间, 若不指定,只返回当天的区间结果.
  6. 进入冷库的数据即为历史记录. 不支持删\改操作.

HTTP API

插入

curl -X "POST" "http://localhost:5000/oo/login_auth" \
	-H "Content-Type: application/json" \
	-d "{\"id\":1,\"login_name\":\"fanliang@iqusong.com\",\"password\":\"000000.com\",\"create_time\":0}"

更新

curl -X "PATCH" "http://localhost:5000/oo/login_auth/1" \
	-H "Content-Type: application/json" \
	-d "{\"password\":\"newpswd\"}"

获取

curl -X "GET" "http://localhost:5000/oo/order_form?filter_str=__order_by__id,__limit__1000,__range__1451197800~1451518200"

删除

curl -X "DELETE" "http://localhost:5000/oo/login_auth/1"

zmq API

RPC

    message = dict()
    message['action'] = 'RPC'
    message['object_name'] = 'order_form'
    message['params'] = {
        'function': 'generate_id_by',       # 支持的方法 'generate_id_by | get_id_of_max_by'
    }
    socket.send_json(message)
    result = socket.recv_json()
    
    
    result:
    {
        state: {
            code: "200",
            zh-cn: "成功",
            en-us: "OK"
        }
        id: 1
    }

插入

message = dict()
message['action'] = 'POST'
message['object_name'] = 'order_form'
message['params'] = {
    'id': 1,
    'create_time': 0,
    'finished_time': 0
}
socket.send_json(message)
socket.recv_json()

result:
{
    state: {
        code: "200",
        zh-cn: "成功",
        en-us: "OK"
    }
}

更新

message = dict()
message['action'] = 'PATCH'
message['object_name'] = 'order_form'
message['params'] = {
    'id': 1,
    'create_time': 0,
    'finished_time': 0
}
socket.send_json(message)
socket.recv_json()

result:
{
    state: {
        code: "200",
        zh-cn: "成功",
        en-us: "OK"
    }
}

获取

message = dict()
message['action'] = 'GET'
message['object_name'] = 'order_form'
message['params'] = {
    'filter_str': '__order_by__id,__limit__1000,__range__1451197800~1451518200'
}
socket.send_json(message)
socket.recv_json()

result:
{
    state: {
        code: "200",
        zh-cn: "成功",
        en-us: "OK"
    },
    list: [
        {}
    ]
}

删除

message = dict()
message['action'] = 'DELETE'
message['object_name'] = 'order_form'
message['params'] = {
    'id': 1
}
socket.send_json(message)
socket.recv_json()

result:
{
    state: {
        code: "200",
        zh-cn: "成功",
        en-us: "OK"
    }
}

filter_str语法

eq

id__eq__100
id等于100的记录

gt

id__gt__100
id大于100的记录

lt

id__lt__100
id小于100的记录

ne

id__ne__100
id不等于100的记录

in

id_in__100~101~103
id等于100或101或103的记录

order_by

__order_by__create_time
依据create_time字段顺序排序

order_by_desc

__order_by_desc__create_time
依据create_time字段倒序排序

limit

__limit__10
限制返回前10条记录

range

__range__1451197800~1451518200
以1451197800~1451518200时间范围限定查找空间

综合示例

__order_by_desc__id,__limit__1000,__range__1451197800~1451518200
依据id字段,倒序返回1451197800~1451518200时间空间中的前1000条记录

id__gt__100,id__lt__1000,name__eq__james
返回id大于100且小于1000的记录中,name等于james的记录

错误码

    error_codes = {
        '41250': {
            'code': '41250',
            'zh-cn': u'未支持索引的字段'
        },
        '50050': {
            'code': '50150',
            'zh-cn': u'MySQL 链接或执行出错'
        },
        '50051': {
            'code': '50051',
            'zh-cn': u'Redis 链接或执行出错'
        }
    }

FAQ

  1. 如何扩大数据库连接限制?
Q: 有两种方式
    a. 通过修改配置文件my.cnf
    [mysqld]
    max_connections             = 1000
    b. 修改环境变量
    SET GLOBAL max_connections = 1000;

项目点评 ( 2 )

你可以在登录后,对此项目发表评论

4_float_left_people 4_float_left_close