代码拉取完成,页面将自动刷新
[TOC]
$ pip install ugly-code
ugly_code.cmd.Command
)创建测试文件 cmd_debug.py
from ugly_code.cmd import Command
@Command
def main(x:int, y ,z=1023):
"""
测试一下
"""
print("{x} \t {y} \t {z}".format(**locals()))
if __name__ == '__main__':
globals()['main']()
执行该文件
$ python cmd_debug.py -x 1023 -y 333
1023 333 1023
有默认值的参数会被设置为可选参数,无默认值则设置为必选.
使用Type Hints的参数可自动进行类型检查.
编辑文件cmd_debug.py
from ugly_code.cmd import CMDHolder
@CMDHolder.command("test", "测试")
def main(x: int, b: str, c: int=1):
"""
测试一下
"""
print("{x} \t {y} \t {z}".format(**locals()))
@CMDHolder.command("echo", "echo")
def echo(words):
"""echo"""
print(words)
if __name__ == '__main__':
CMDHolder(__name__).execute()
执行该文件:
$ python3 cmd_debug.py echo -words "测试"
测试
$ python3 cmd_debug.py test
usage: Command line create by ugly-code.
测试一下
[-h] -x X -b B [-c C]
Command line create by ugly-code.
测试一下
: error: the following arguments are required: -x, -b
由示例可发现,CMDHolder可以持有多个命令行工具,根据不同的参数调用不同的命令行对象。而且还可以自定义命令行工具的名称与介绍。
ugly_code.runit
多任务管理工具。使用 Runner
和Worker
管理多进程程序。支持XML-RPC控制任务启停。
一个Worker对应一个进程,各进程使用tag区分。
使用方式: 注册任务到Runner,启动Runner即可自动\手动启动任务或使用XML-RPC接口对任务进行调度管理。
import threading
import time
from ugly_code.runit import Switch, Worker, Runner
class AWorker(Worker):
def serve(self):
while self.switch.on:
print(f"{self.switch.name} {time.time()}")
time.sleep(3.0)
def close_it(switch: Switch):
time.sleep(10)
switch.close()
if __name__ == '__main__':
st = Switch('Runner')
threading.Thread(target=close_it, args=(st,)).start()
Runner(st, (("a", AWorker), ('b', AWorker)), m=('127.0.0.1', 0)).run()
import threading
import time
from ugly_code.runit import Worker, Runner, RuntItMonitor
class AWorker(Worker):
def serve(self):
while self.switch.on:
print(f"{self.switch.name} {time.time()}")
time.sleep(3.0)
def start_and_close_task(m, tag):
time.sleep(10)
monitor = RuntItMonitor(m)
monitor.start(tag)
monitor.start(tag)
time.sleep(3)
print(monitor.stats())
monitor.close(tag, True)
time.sleep(3)
monitor.prune()
print(monitor.stats())
monitor.shutdown()
if __name__ == '__main__':
m = ('127.0.0.1', 65432)
runner = Runner(None, (("a", AWorker, 0, 0, False), ('b', AWorker)), m=m)
threading.Thread(target=start_and_close_task, args=("http://{}:{}".format(*m), "a",)).start()
runner.run()
from ugly_code.ex import ObjectProxy
obj = ObjectProxy(dict(a=1, b=2, c=3, d=dict(a=1, b=2)))
print(obj.d.a)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。