mysql://username:password@host:port/dbname?character_set=charset&character_set_results=charset
如果以SSL连接访问MySQL,则scheme设为mysqls://。MySQL server 5.7及以上支持;
username和password按需填写;
port默认为3306;
dbname为要用的数据库名,一般如果SQL语句只操作一个db的话建议填写;
character_set为client的字符集,等价于使用官方客户端启动时的参数--default-character-set
的配置,默认utf8,具体可以参考MySQL官方文档character-set.html。
character_set_results为client、connection和results的字符集,如果想要在SQL语句里使用SET NAME
来指定这些字符集的话,请把它配置到url的这个位置。
MySQL URL示例:
mysql://root:password@127.0.0.1
mysql://@test.mysql.com:3306/db1?character_set=utf8&character_set_results=utf8
mysqls://localhost/db1?character_set=big5
与workflow其他任务类似,可以用task.get_resp()
拿到MySQLResponse
,我们可以通过MySQLResultCursor
遍历结果集及其中的每个列的信息MySQLField
、每行和每个MySQLCell
注意:用户在遍历MySQL结果集的过程中获得的MySQLResultCursor
、MySQLField
、MySQLCell
都是MySQLResponse
内部对象的引用,即仅可以在回调函数中使用
一次请求所对应的回复中,其数据是一个三维结构
一个回复中包含了一个或多个结果集(result set)
一个结果集包含了一行或多行(row)
一行包含了一到多个阈(Field/Cell)
具体使用从外到内的步骤应该是:
判断任务状态(代表通信层面状态):用户通过判断 task.get_state() 是否为WFT_STATE_SUCCESS来查看任务执行是否成功;
判断回复包类型(代表返回包解析状态):调用 resp.get_packet_type() 查看最后一条MySQL语句的返回包类型,常见的几个类型为:
get_field_count()
fetch_fields()
get_rows_count()
fetch_row()
fetch_all()
: 返回 list[list[MySQLCell]]返回当前结果集的头部:如果有必要重读这个结果集,可以使用 cursor.rewind() 回到当前结果集头部,再通过第5步或第6步进行读取;
拿到下一个结果集:因为MySQL server返回的数据包可能是包含多结果集的(比如每个select语句为一个结果集;或者call procedure返回的多结果集数据),因此用户可以通过 cursor.next_result_set() 跳到下一个结果集,返回值为false表示所有结果集已取完。
返回第一个结果集:cursor.first_result_set() 可以让我们返回到所有结果集的头部,然后可以从第3步开始重新拿数据;
每列具体数据MySQLCell:第5步中读取到的一行,由多列组成,每列结果为MySQLCell,基本使用接口有:
get_data_type()
返回MYSQL_TYPE_LONG、MYSQL_TYPE_STRING...is_TYPE()
TYPE为int、string、ulonglong,判断是否是某种类型as_TYPE()
以某种类型读出MySQLCell的数据as_object()
按数据类型返回Python对象wf.MYSQL_TYPE_*
MYSQL_TYPE_NULL
MYSQL_TYPE_TINY
、MYSQL_TYPE_SHORT
、MYSQL_TYPE_INT24
、MYSQL_TYPE_LONG
MYSQL_TYPE_DECIMAL
、MYSQL_TYPE_STRING
、MYSQL_TYPE_VARCHAR
、MYSQL_TYPE_VAR_STRING
、MYSQL_TYPE_JSON
MYSQL_TYPE_FLOAT
MYSQL_TYPE_DOUBLE
MYSQL_TYPE_LONGLONG
MYSQL_TYPE_DATE
MYSQL_TYPE_TIME
MYSQL_TYPE_DATETIME
、MYSQL_TYPE_TIMESTAMP
is_string()
时以bytes类型返回底层数据is_string()
不应该调用该接口,若调用则返回长度为零的bytesdatetime.timedelta
更相符as_*
函数__str__
-> str
str(cell.as_object())
as_object()
返回了utf-8
编码的bytes,则会尝试将其解码为str类型,否则会调用bytes.__str__
mysql_list_fields()
时设置此值wf.MYSQL_TYPE_*
__str__()
-> str
get_name()
的str表示wf.MYSQL_STATUS_*
wf.MYSQL_PACKET_*
start(port)
wf.mysql_datatype2str(int) -> str
wf.MYSQL_TYPE_*
的str表示wf.create_mysql_task(str url, int retry_max, Callable[[wf.MySQLTask], None]) -> wf.MySQLTask
wf.MySQLRowIterator(wf.MySQLResultCursor)
list[wf.MySQLCell]
wf.MySQLRowObjectIterator(wf.MySQLResultCursor)
list[object]
,其中object为wf.MySQLCell.as_object()
的结果wf.MySQLResultSetIterator(wf.MySQLResultCursor)
# 发起一个MySQL请求
import pywf as wf
def mysql_callback(task):
print(task.get_state(), task.get_error())
url = "mysql://user:password@host:port/database"
mysql_task = wf.create_mysql_task(url=url, retry_max=1, callback=mysql_callback)
req = mysql_task.get_req()
req.set_query("select * from your_table limit 10;")
mysql_task.start()
wf.wait_finish()
# 遍历MySQL请求结果
default_column_width = 20
def header_line(sz):
return '+' + '+'.join(['-' * default_column_width for i in range(sz)]) + '+'
def format_row(row):
return '|' + '|'.join([f"{str(x):^{default_column_width}}" for x in row]) + '|'
def mysql_callback(task):
state = task.get_state()
error = task.get_error()
if state != wf.WFT_STATE_SUCCESS:
print(wf.get_error_string(state, error))
return
resp = task.get_resp()
cursor = wf.MySQLResultCursor(resp)
# 使用迭代语法遍历所有结果集
for result_set in wf.MySQLResultSetIterator(cursor):
# 1. 遍历每个结果集
if result_set.get_cursor_status() == wf.MYSQL_STATUS_GET_RESULT:
fields = result_set.fetch_fields()
print(header_line(len(fields)))
print(format_row(fields))
print(header_line(len(fields)))
# 2. 遍历每一行
for row in wf.MySQLRowIterator(result_set):
# 3. 遍历每个Cell
print(format_row(row))
print(header_line(len(fields)))
print("{} {} in set\n".format(
result_set.get_rows_count(),
"row" if result_set.get_rows_count() == 1 else "rows"
))
elif result_set.get_cursor_status() == wf.MYSQL_STATUS_OK:
print("OK. {} {} affected. {} warnings. insert_id={}. {}".format(
result_set.get_affected_rows(),
"row" if result_set.get_affected_rows() == 1 else "rows",
result_set.get_warnings(),
result_set.get_insert_id(),
str(result_set.get_info())
))
# ...
# 遍历MySQL请求结果
import pywf as wf
default_column_width = 20
def header_line(sz):
return '+' + '+'.join(['-' * default_column_width for i in range(sz)]) + '+'
def format_row(row):
return '|' + '|'.join([f"{str(x):^{default_column_width}}" for x in row]) + '|'
def mysql_callback(task):
state = task.get_state()
error = task.get_error()
if state != wf.WFT_STATE_SUCCESS:
print(wf.get_error_string(state, error))
return
resp = task.get_resp()
cursor = wf.MySQLResultCursor(resp)
# 遍历所有结果集
while True:
# 1. 遍历每个结果集
if cursor.get_cursor_status() == wf.MYSQL_STATUS_GET_RESULT:
fields = cursor.fetch_fields()
print(header_line(len(fields)))
print(format_row(fields))
print(header_line(len(fields)))
# 2. 遍历每一行,两种方案任选其一
# 2.1 使用fetch_all
all_row = cursor.fetch_all()
for row in all_row:
print(format_row(row))
# 2.2 使用fetch_row
cursor.rewind() # 回到当前结果集起始位置
while True:
row = cursor.fetch_row()
if row == None:
break
# 3. 遍历每个Cell
print(format_row(row))
print(header_line(len(fields)))
print("{} {} in set\n".format(
cursor.get_rows_count(),
"row" if cursor.get_rows_count() == 1 else "rows"
))
elif cursor.get_cursor_status() == wf.MYSQL_STATUS_OK:
print("OK. {} {} affected. {} warnings. insert_id={}. {}".format(
cursor.get_affected_rows(),
"row" if cursor.get_affected_rows() == 1 else "rows",
cursor.get_warnings(),
cursor.get_insert_id(),
str(cursor.get_info())
))
if cursor.next_result_set() == False:
break
# ...
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。