7 Star 19 Fork 3

wupeaking / python_mysql_driver

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
MysqlWarp.py 8.04 KB
一键复制 编辑 原始数据 按行查看 历史
wupeaking 提交于 2016-01-10 19:31 . 修改: MysqlWarp.py
#encoding=utf-8
'''
this is a sample MySQLdb warp
这是一个简单的对MySQLdb的一个封装 实现ORM的模型 以便更容易的去操作数据库
'''
import MySQLdb
class DBModel(object):
"""docstring for ClassName"""
@classmethod
def Connect(cls, dbname, user, passwd, ip='localhost', port = 3306, charset='utf8'):
try:
connect = MySQLdb.connect(host=ip, port=port, user=user, passwd=passwd, db = dbname, charset=charset)
except:
raise Exception,"con't connect mysql"
return DBModel(connect)
def __init__(self, connect):
self._connect = connect
def __getattr__(self, name):
return _TableModel(name, self._connect)
def __del__(self):
'''
!!注意:在调用connect函数创建一个DBModel之后,一定要单独使用 否则会被python自动给析构
right:
db = DBModel.Connect(dbname='wpxappdb', user='root', passwd='888888')
for row in db.find_all(name='wpx'):
print row.id
error:
DBModel.Connect(dbname='wpxappdb', user='root', passwd='888888').find_all(name='wpx')
'''
self._connect.cursor().close()
self._connect.close()
class _TableModel(object):
'''
fieldname
'''
table_info = {}
def __init__(self, table_name, connect):
self._table_name = table_name
self._connect = connect
self.field_info()
def field_info(self):
if self._table_name in _TableModel.table_info.keys():
return _TableModel.table_info[self._table_name]
cursor = self._connect.cursor()
try:
cursor.execute('desc %s;'%self._table_name)
fields = cursor.fetchall()
except:
raise Exception , "con't find field info"
else:
self._connect.commit()
_TableModel.table_info[self._table_name] = _FieldModel(fields)
return True
def delete(self, **kw):
'''
删除表中的一条记录, 如果成功 返回删除的个数 如果失败 返回None
'''
if kw:
where = ''
for arg in kw.keys():
v = kw[arg]
s = ''
if isinstance(v, str):
s = arg + "='%s'"%v
else:
s = arg + "=%s"%v
where = s + " " + 'and' + ' ' + where
#clear last 'and'
where = where[0:-4]
sql = "delete from %s where %s"%(self._table_name, where)
cursor = self._connect.cursor()
try:
ret = cursor.execute(sql)
self._connect.commit()
except:
return None
else:
return ret
else:
return None
def delete_all(self, **kw):
'''
考虑是否实现这个函数 后来想想还是不实现了 太恐怖了
'''
raise Exception, "notice : careful use this function"
def update(self, **set):
'''
set表示要更新的内容 condition表示要条件
成功返回影响的结果 失败返回None
'''
pass
def execute_sql(self, sql):
'''
execute user sql
'''
cursor = self._connect.cursor()
try:
return cursor.execute(sql)
except Exception, e:
return None
def _process_search_argument(self, **kw):
'''
构造select 语句
'''
where = ''
if kw:
for arg in kw.keys():
v = kw[arg]
s = ''
if isinstance(v, str):
s = arg + "='%s'"%v
else:
s = arg + "=%s"%v
where = s + " " + 'and' + ' ' + where
#clear last 'and'
where = where[0:-4]
sql = 'select * from %s where %s;'%(self._table_name, where)
else:
sql = 'select * from %s;'%self._table_name
return sql
def find_all(self, **kw):
'''
进行SQL的查询 如果查询失败则返回一个None 否则会返回一个记录的迭代对象
每一个对象直接使用点运算符即可直接得到该字段的值
for example:
db = MysqlWarp.DBModel.Connect(dbname='wpxappdb', user='root', passwd='888888')
rows = db.wpxapp_userinfo.find_all(name='wpx')
if rows is None:
...process....
else:
for row in db.wpxapp_userinfo.find_all(name='wpx'):
print row.id
print row.name
print row.password
'''
sql = self._process_search_argument(**kw)
cursor = self._connect.cursor()
try:
cursor.execute(sql)
row_info = cursor.fetchall()
except:
return None
field_model = _TableModel.table_info[self._table_name]
row_models = []
for row in row_info:
i = 0
info = {}
for field in field_model:
info[field['name']] = row[i]
i += 1
row_models.append(_RowModel(**info))
return iter(row_models)
def find_one(self, **kw):
'''
与find_all函数类似 只是只返回一个结果,前提是能查询到记录 如果没有该记录会返回空 对空类型操作会出现异常
for example:
row = db.wpxapp_userinfo.find_one(name='wpx')
if row:
print row.id
print row.uid
.....
'''
sql = self._process_search_argument(**kw)
sql.replace(';', 'limit 0 , 1 ;')
cursor = self._connect.cursor()
try:
cursor.execute(sql)
row_info = cursor.fetchall()
except:
return None
field_model = _TableModel.table_info[self._table_name]
info = {}
i = 0
for field in field_model:
# id uid passwd .....
info[field['name']] = row_info[0][i]
i += 1
return _RowModel(**info)
def insert(self, **kw):
'''
if insert success return True else return _RowModel object
'''
cursor = self._connect.cursor()
field_name = kw.keys()
#create sql
field_str = ''
value_str = ''
for field in field_name:
field_str = '%s %s %s'%(field_str, field , ',')
if isinstance(kw[field], str):
value_str = "%s '%s' %s"%(value_str, kw[field], ',')
else:
value_str = "%s %s %s"%(value_str, kw[field], ',')
#clear last ','
field_str = field_str[0:-2]
value_str = value_str[0:-2]
sql = 'insert into %s (%s) values(%s);'%(self._table_name, field_str, value_str)
try:
cursor.execute(sql)
self._connect.commit()
except:
return None
return _RowModel(**kw)
def primary_key(self):
'''
返回primary key的字段名称 如果没有则返回为None
'''
return _TableModel.table_info[self._table_name].primary_key_name()
def foreign_key(self):
'''
返回外键的名称 由于一张表中可以有多个外键 所以返回的是一个以字段名组成的列表 若没有外键则返回一个空的列表
'''
return _TableModel.table_info[self._table_name].foreign_key_name()
class _FieldModel(object):
'''
mysql mysql的字段内型有许多种:整形:tinyint(1) smallint(2) mediumint(3) int(4) bigint(8)
字符:char varchar text
时间:year date time datetime
(u'id', u'int(11)', u'NO', u'PRI', None, u'auto_increment')
(u'time', u'int(11)', u'NO', u'', None, u'')
(u'title', u'varchar(100)', u'NO', u'', None, u'')
(u'content', u'longtext', u'NO', u'', None, u'')
(u'uid_id', u'int(11)', u'NO', u'MUL', None, u'')
'''
_type_list = ['tinyint', 'smallint', 'mediumint', 'int', 'bigint', 'char', 'varchar', 'text',
'year', 'data', 'time', 'datetime']
#field_name, _type, value, is_primay_key = False, is_foreign_key = False
def __init__(self, field_info=[]):
'''
这里为什么使用的是列表而不是字典的原因是由于 通过SQL语句查询字段信息返回的是一个
有顺序的列表,如果转换成字典就会变成无序的,那么在上面进行SELECT查询的时候,返回的
内容正好和这个_FieldModel的顺序是对应的
'''
self._field = []
self._foreign_key = []
self._primary_key = None #only one primary key
for field in field_info:
self._field.append(dict(name=field[0], type=field[1], primary=True if field[3]=='PRI' else False,
foreign=True if field[3]=='MUL' else False))
if field[3]=='PRI':
self._primary_key = field[0]
elif field[3] == 'MUL':
self._foreign_key.append(field[0])
def __iter__(self):
return iter(self._field)
def primary_key_name(self):
return self._primary_key
def foreign_key_name(self):
return self._foreign_key
class _RowModel(object):
'''
将每一条记录作为一个对象来使用 实现一条记录既可以通过字典的方式也可以通过点运算的方式获取各个字段的值
'''
def __init__(self, **kw):
'''
kw is a dict for display -->field_name:value
'''
self._row_values = kw
def __str__(self):
return str(self._row_values.items())
def __getitem__(self, value):
return self._row_values.get(value, '')
def __getattr__(self, name):
return self._row_values.get(name, '')
Python
1
https://gitee.com/wupeaking/python_mysql_driver.git
git@gitee.com:wupeaking/python_mysql_driver.git
wupeaking
python_mysql_driver
python_mysql_driver
master

搜索帮助