acquiring the lock before starting coroutines lock acquired: True waiting for coroutines `coro1` waiting for the lock `coro2` waiting for the lock callback releasing lock `coro1` acquired lock `coro1` released lock `coro2` acquired lock `coro2` released lock
asyncdefproducer(q, num_workers): print('producer: starting') # 向队列中添加一些数据 for i in range(num_workers * 3): await q.put(i) print(f'producer: added task {i} to the queue') # 传入终止信号 print('producer: adding stop signals to the queue') for i in range(num_workers): await q.put(None) print('producer: waiting for queue to empty') await q.join() print('producer: ending')
consumer 0: starting consumer 0: waiting for item consumer 1: starting consumer 1: waiting for item producer: starting producer: added task 0 to the queue producer: added task 1 to the queue consumer 0: has item 0 consumer 1: has item 1 producer: added task 2 to the queue producer: added task 3 to the queue consumer 0: waiting for item consumer 0: has item 2 producer: added task 4 to the queue consumer 1: waiting for item consumer 1: has item 3 producer: added task 5 to the queue producer: adding stop signals to the queue consumer 0: waiting for item consumer 0: has item 4 consumer 1: waiting for item consumer 1: has item 5 producer: waiting for queue to empty consumer 0: waiting for item consumer 0: has item None consumer 0: ending consumer 1: waiting for item consumer 1: has item None consumer 1: ending producer: ending
用 Protocol 抽象类实现异步 I / O
到目前为止,这些示例都避免了将并发和 I / O 操作混合在一起,一次只关注一个概念。但是,在 I / O 阻塞时切换上下文是 asyncio 的主要使用情形之一。在已经介绍的并发概念的基础上,本节将实现简单的 echo 服务器程序和客户端程序。客户端可以连接到服务器,发送一些数据,然后接收与响应相同的数据。每次启动 I / O 操作时,执行代码都会放弃对事件循环的控制,从而允许其他任务运行,直到 I / O 操作就绪。
factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS) server = event_loop.run_until_complete(factory) log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))
虽然看起来数据都是立即发送的,但实际上 transport 对象缓冲传出的数据,并在当 socket 的缓冲区准备好接收数据时设置回调来进行实际的传输。所有这些都是透明处理的,因此可以编写应用程序代码,就好像 I / O 操作正在立即发生一样:
1 2 3 4 5 6 7 8 9 10 11 12
defconnection_made(self, transport): self.transport = transport self.address = transport.get_extra_info('peername') self.log.debug('connecting to {} port {}'.format(*self.address))
# 这里可以是 transport.writelines() # 但这会使显示要发送的消息的每个部分变得更加困难 for msg in self.messages: transport.write(msg) self.log.debug(f'sending {msg!r}') if transport.can_write_eof(): transport.write_eof()
log.debug('waiting for client to complete') try: event_loop.run_until_complete(factory_coroutine) event_loop.run_until_complete(client_completed) finally: log.debug('closing event loop') event_loop.close()
输出
在一个窗口中运行服务器而在另一个窗口中运行客户端。
客户端将产生以下输出:
1 2 3 4 5 6 7 8 9 10
asyncio: Using selector: KqueueSelector main: waiting for client to complete EchoClient: connecting to ::1 port 10000 EchoClient: sending b'This is the message. ' EchoClient: sending b'It will be sent ' EchoClient: sending b'in parts.' EchoClient: received b'This is the message. It will be sent in parts.' EchoClient: received EOF EchoClient: server closed connection main: closing event loop
asyncio: Using selector: KqueueSelector main: starting up on localhost port 10000 EchoServer_::1_55307: connection accepted EchoServer_::1_55307: received b'This is the message. It will be sent in part s.' EchoServer_::1_55307: sent b'This is the message. It will be sent in parts.' EchoServer_::1_55307: received EOF EchoServer_::1_55307: closing
1 2 3 4 5 6 7
EchoServer_::1_55309: connection accepted EchoServer_::1_55309: received b'This is the message. ' EchoServer_::1_55309: sent b'This is the message. ' EchoServer_::1_55309: received b'It will be sent in parts.' EchoServer_::1_55309: sent b'It will be sent in parts.' EchoServer_::1_55309: received EOF EchoServer_::1_55309: closing