spacepaste

  1.  
  2. #!/usr/bin/env python3
  3. import asyncio
  4. import random
  5. async def fetch_aio(i):
  6. await asyncio.sleep(random.randint(1, 5) / 5.0)
  7. return {'aio_arg': str(i)}
  8. def aio_map(coro, iterable, loop=None):
  9. if loop is None:
  10. loop = asyncio.get_event_loop()
  11. async def wrapped_coro(coro, value):
  12. return await coro(value)
  13. coroutines = (wrapped_coro(coro, v) for v in iterable)
  14. coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)
  15. if not loop.is_running():
  16. loop.run_until_complete(coros)
  17. else: # problem starts here
  18. # If we run loop.run_until_complete(coros) as well,
  19. # we get 'RuntimeError: Event loop is running.'
  20. asyncio.wait(coros)
  21. # But this way, we get 'asyncio.futures.InvalidStateError: Result is not ready.'
  22. return coros.result()
  23. def run_aio_map_fron_non_async():
  24. results = list(aio_map(fetch_aio, range(5)))
  25. assert(results == [{'aio_arg': '0'}, {'aio_arg': '1'}, {'aio_arg': '2'}, {'aio_arg': '3'}, {'aio_arg': '4'}]), results
  26. print('Assert ok!')
  27. async def main_loop(loop):
  28. results = list(aio_map(fetch_aio, range(5)))
  29. assert(results == [{'aio_arg': '0'}, {'aio_arg': '1'}, {'aio_arg': '2'}, {'aio_arg': '3'}, {'aio_arg': '4'}]), results
  30. print('Assert ok!')
  31. def run_aio_map_fron_async():
  32. loop = asyncio.get_event_loop()
  33. close_after_run = not loop.is_running()
  34. loop.run_until_complete(main_loop(loop))
  35. if close_after_run:
  36. loop.close()
  37. if __name__ == '__main__':
  38. print('run_aio_map_fron_non_async:')
  39. run_aio_map_fron_non_async()
  40. print('run_aio_map_fron_async:')
  41. run_aio_map_fron_async()
  42. print('done.')
  43.