Trying To Implement 2 "threads" Using `asyncio` Module
Solution 1:
Why current code is not working:
You're running event loop until
self._supervisor()
is complete.self._supervisor()
creates task (it happens immediately) and finishes immediately.You're trying to run event loop until
_supervisor
complete, but how and when are you going start server? I think event loop should be running until server stopped._supervisor
or other stuff can be added as task (to same event loop).aiohttp
already has function to start server and event loop -web.run_app
, but we can do it manually.
Your questions:
Your server will run until you stop it. You can start/stop different coroutines while your server working.
You need only one event loop for different coroutines.
I think you don't need
supervisor
.More general question:
asyncio
helps you to run different functions parallel in single thread in single process. That's why asyncio is so cool and fast. Some of your sync code with threads you can rewrite using asyncio and it's coroutines. Moreover: asyncio can interact with threads and processes. It can be useful in case you still need threads and processes: here's example.
Useful notes:
- It's better to use term
coroutine
instead ofthread
while we talk about asyncio coroutines that are not threads - If you use Python 3.5, you can use
async
/await
syntax instead ofcoroutine
/yield from
I rewrote your code to show you idea. How to check it: run program, see console, open http://localhost:8080/stop
, see console, open http://localhost:8080/start
, see console, type CTRL+C.
import asyncio
import random
from contextlib import suppress
from aiohttp import web
class aiotest():
def __init__(self):
self._webapp = None
self._d_task = None
self.init_server()
# SERVER:
def init_server(self):
app = web.Application()
app.router.add_route('GET', '/start', self.start)
app.router.add_route('GET', '/stop', self.stop)
app.router.add_route('GET', '/kill_server', self.kill_server)
self._webapp = app
def run_server(self):
# Create server:
loop = asyncio.get_event_loop()
handler = self._webapp.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
try:
# Start downloader at server start:
asyncio.async(self.start(None)) # I'm using controllers here and below to be short,
# but it's better to split controller and start func
# Start server:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Stop downloader when server stopped:
loop.run_until_complete(self.stop(None))
# Cleanup resources:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(self._webapp.shutdown())
loop.run_until_complete(handler.finish_connections(60.0))
loop.run_until_complete(self._webapp.cleanup())
loop.close()
@asyncio.coroutine
def kill_server(self, request):
print('Server killing...')
loop = asyncio.get_event_loop()
loop.stop()
return web.Response(body=b"Server killed")
# DOWNLOADER
@asyncio.coroutine
def start(self, request):
if self._d_task is None:
print('Downloader starting...')
self._d_task = asyncio.async(self._downloader())
return web.Response(body=b"Downloader started")
else:
return web.Response(body=b"Downloader already started")
@asyncio.coroutine
def stop(self, request):
if (self._d_task is not None) and (not self._d_task.cancelled()):
print('Downloader stopping...')
self._d_task.cancel()
# cancel() just say task it should be cancelled
# to able task handle CancelledError await for it
with suppress(asyncio.CancelledError):
yield from self._d_task
self._d_task = None
return web.Response(body=b"Downloader stopped")
else:
return web.Response(body=b"Downloader already stopped or stopping")
@asyncio.coroutine
def _downloader(self):
while True:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(1, 2))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(1)
if __name__ == '__main__':
t = aiotest()
t.run_server()
Post a Comment for "Trying To Implement 2 "threads" Using `asyncio` Module"