1、使用async定义协程函数
普通函数
def regular_function():
return "I'm regular_function"
协程函数(加上 async 关键字)
async def coroutine_function():
return "I'm a coroutine_function"
调用对比
print(regular_function()) # 输出: I'm regular_function
print(coroutine_function()) # 输出: <coroutine object ...> ← 返回协程对象!
注意:调用协程函数不会立即执行,而是返回一个协程对象,需要用 await 或 asyncio.run() 来执行。
2、使用await等待完成
await coroutine_function()
3、开启事件循环
# 方式 1:推荐(Python 3.7+)
asyncio.run(main())
# 方式 2:手动管理(更多控制)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
# 方式 3:获取当前运行的循环(在协程内部使用)
async def inside_coroutine():
loop = asyncio.get_running_loop()
print(f"当前循环: {loop}")
4、task任务
async def worker(name, delay):
print(f"Worker {name} 开始")
await asyncio.sleep(delay)
print(f"Worker {name} 完成")
return f"result_{name}"
async def main():
# 创建 Task(会立即开始调度)
task1 = asyncio.create_task(worker("A", 2))
task2 = asyncio.create_task(worker("B", 1))
task3 = asyncio.create_task(worker("C", 3))
4、gather
async def main():
# 并发执行,按顺序返回结果
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3),
return_exceptions=True # 异常不会中断其他任务
)
print(results) # ['data_1', 'data_2', 'data_3']
6、使用Semaphore
async with sem: # 自动获取和释放信号量
print(f"任务 {task_id} 开始执行")
await asyncio.sleep(1)
print(f"任务 {task_id} 执行完成")
return task_id