This action will force synchronization from 搜狗开源/pyworkflow, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
mysql://username:password@host:port/dbname?character_set=charset&character_set_results=charset
如果以SSL连接访问MySQL,则scheme设为mysqls://。MySQL server 5.7及以上支持;
username和password按需填写;
port默认为3306;
dbname为要用的数据库名,一般如果SQL语句只操作一个db的话建议填写;
character_set为client的字符集,等价于使用官方客户端启动时的参数--default-character-set
的配置,默认utf8,具体可以参考MySQL官方文档character-set.html。
character_set_results为client、connection和results的字符集,如果想要在SQL语句里使用SET NAME
来指定这些字符集的话,请把它配置到url的这个位置。
MySQL URL示例:
mysql://root:password@127.0.0.1
mysql://@test.mysql.com:3306/db1?character_set=utf8&character_set_results=utf8
mysqls://localhost/db1?character_set=big5
与workflow其他任务类似,可以用task.get_resp()
拿到MySQLResponse
,我们可以通过MySQLResultCursor
遍历结果集及其中的每个列的信息MySQLField
、每行和每个MySQLCell
注意:用户在遍历MySQL结果集的过程中获得的MySQLResultCursor
、MySQLField
、MySQLCell
都是MySQLResponse
内部对象的引用,即仅可以在回调函数中使用
一次请求所对应的回复中,其数据是一个三维结构
一个回复中包含了一个或多个结果集(result set)
一个结果集包含了一行或多行(row)
一行包含了一到多个阈(Field/Cell)
具体使用从外到内的步骤应该是:
判断任务状态(代表通信层面状态):用户通过判断 task.get_state() 是否为WFT_STATE_SUCCESS来查看任务执行是否成功;
判断回复包类型(代表返回包解析状态):调用 resp.get_packet_type() 查看最后一条MySQL语句的返回包类型,常见的几个类型为:
get_field_count()
fetch_fields()
get_rows_count()
fetch_row()
fetch_all()
: 返回 list[list[MySQLCell]]返回当前结果集的头部:如果有必要重读这个结果集,可以使用 cursor.rewind() 回到当前结果集头部,再通过第5步或第6步进行读取;
拿到下一个结果集:因为MySQL server返回的数据包可能是包含多结果集的(比如每个select语句为一个结果集;或者call procedure返回的多结果集数据),因此用户可以通过 cursor.next_result_set() 跳到下一个结果集,返回值为false表示所有结果集已取完。
返回第一个结果集:cursor.first_result_set() 可以让我们返回到所有结果集的头部,然后可以从第3步开始重新拿数据;
每列具体数据MySQLCell:第5步中读取到的一行,由多列组成,每列结果为MySQLCell,基本使用接口有:
get_data_type()
返回MYSQL_TYPE_LONG、MYSQL_TYPE_STRING...is_TYPE()
TYPE为int、string、ulonglong,判断是否是某种类型as_TYPE()
以某种类型读出MySQLCell的数据as_object()
按数据类型返回Python对象wf.MYSQL_TYPE_*
MYSQL_TYPE_NULL
MYSQL_TYPE_TINY
、MYSQL_TYPE_SHORT
、MYSQL_TYPE_INT24
、MYSQL_TYPE_LONG
MYSQL_TYPE_DECIMAL
、MYSQL_TYPE_STRING
、MYSQL_TYPE_VARCHAR
、MYSQL_TYPE_VAR_STRING
、MYSQL_TYPE_JSON
MYSQL_TYPE_FLOAT
MYSQL_TYPE_DOUBLE
MYSQL_TYPE_LONGLONG
MYSQL_TYPE_DATE
MYSQL_TYPE_TIME
MYSQL_TYPE_DATETIME
、MYSQL_TYPE_TIMESTAMP
is_string()
时以bytes类型返回底层数据is_string()
不应该调用该接口,若调用则返回长度为零的bytesdatetime.timedelta
更相符as_*
函数__str__
-> str
str(cell.as_object())
as_object()
返回了utf-8
编码的bytes,则会尝试将其解码为str类型,否则会调用bytes.__str__
mysql_list_fields()
时设置此值wf.MYSQL_TYPE_*
__str__()
-> str
get_name()
的str表示wf.MYSQL_STATUS_*
wf.MYSQL_PACKET_*
start(port)
wf.mysql_datatype2str(int) -> str
wf.MYSQL_TYPE_*
的str表示wf.create_mysql_task(str url, int retry_max, Callable[[wf.MySQLTask], None]) -> wf.MySQLTask
wf.MySQLRowIterator(wf.MySQLResultCursor)
list[wf.MySQLCell]
wf.MySQLRowObjectIterator(wf.MySQLResultCursor)
list[object]
,其中object为wf.MySQLCell.as_object()
的结果wf.MySQLResultSetIterator(wf.MySQLResultCursor)
# 发起一个MySQL请求
import pywf as wf
def mysql_callback(task):
print(task.get_state(), task.get_error())
url = "mysql://user:password@host:port/database"
mysql_task = wf.create_mysql_task(url=url, retry_max=1, callback=mysql_callback)
req = mysql_task.get_req()
req.set_query("select * from your_table limit 10;")
mysql_task.start()
wf.wait_finish()
# 遍历MySQL请求结果
default_column_width = 20
def header_line(sz):
return '+' + '+'.join(['-' * default_column_width for i in range(sz)]) + '+'
def format_row(row):
return '|' + '|'.join([f"{str(x):^{default_column_width}}" for x in row]) + '|'
def mysql_callback(task):
state = task.get_state()
error = task.get_error()
if state != wf.WFT_STATE_SUCCESS:
print(wf.get_error_string(state, error))
return
resp = task.get_resp()
cursor = wf.MySQLResultCursor(resp)
# 使用迭代语法遍历所有结果集
for result_set in wf.MySQLResultSetIterator(cursor):
# 1. 遍历每个结果集
if result_set.get_cursor_status() == wf.MYSQL_STATUS_GET_RESULT:
fields = result_set.fetch_fields()
print(header_line(len(fields)))
print(format_row(fields))
print(header_line(len(fields)))
# 2. 遍历每一行
for row in wf.MySQLRowIterator(result_set):
# 3. 遍历每个Cell
print(format_row(row))
print(header_line(len(fields)))
print("{} {} in set\n".format(
result_set.get_rows_count(),
"row" if result_set.get_rows_count() == 1 else "rows"
))
elif result_set.get_cursor_status() == wf.MYSQL_STATUS_OK:
print("OK. {} {} affected. {} warnings. insert_id={}. {}".format(
result_set.get_affected_rows(),
"row" if result_set.get_affected_rows() == 1 else "rows",
result_set.get_warnings(),
result_set.get_insert_id(),
str(result_set.get_info())
))
# ...
# 遍历MySQL请求结果
import pywf as wf
default_column_width = 20
def header_line(sz):
return '+' + '+'.join(['-' * default_column_width for i in range(sz)]) + '+'
def format_row(row):
return '|' + '|'.join([f"{str(x):^{default_column_width}}" for x in row]) + '|'
def mysql_callback(task):
state = task.get_state()
error = task.get_error()
if state != wf.WFT_STATE_SUCCESS:
print(wf.get_error_string(state, error))
return
resp = task.get_resp()
cursor = wf.MySQLResultCursor(resp)
# 遍历所有结果集
while True:
# 1. 遍历每个结果集
if cursor.get_cursor_status() == wf.MYSQL_STATUS_GET_RESULT:
fields = cursor.fetch_fields()
print(header_line(len(fields)))
print(format_row(fields))
print(header_line(len(fields)))
# 2. 遍历每一行,两种方案任选其一
# 2.1 使用fetch_all
all_row = cursor.fetch_all()
for row in all_row:
print(format_row(row))
# 2.2 使用fetch_row
cursor.rewind() # 回到当前结果集起始位置
while True:
row = cursor.fetch_row()
if row == None:
break
# 3. 遍历每个Cell
print(format_row(row))
print(header_line(len(fields)))
print("{} {} in set\n".format(
cursor.get_rows_count(),
"row" if cursor.get_rows_count() == 1 else "rows"
))
elif cursor.get_cursor_status() == wf.MYSQL_STATUS_OK:
print("OK. {} {} affected. {} warnings. insert_id={}. {}".format(
cursor.get_affected_rows(),
"row" if cursor.get_affected_rows() == 1 else "rows",
cursor.get_warnings(),
cursor.get_insert_id(),
str(cursor.get_info())
))
if cursor.next_result_set() == False:
break
# ...
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。