БС
Есть код, который в нормальном случае должен работать асинхронно. (В моем случае это обновление нескольких полей, которые берутся из thirdparty внешней системы, и таймауты там огромные могут быть, но эти поля не влияют на основной функционал, поэтому я могу позволить их обновить позже. ) Пишу вот так:
asyncio.ensure_future(xxx())
Но вот в этой xxx могут быть ошибки, и мне желательно в unittestах их все поймать, то есть чтобы тест сразу упал надо делать синхронный вызов. Получается код аля:
coro = xxx()
if test:
await coro
else:
asyncio.ensure_fututre(coro)
А это как то не красиво. Понятно что можно написать обертку аля:
def ensure_future(coro):
if test:
await coro
else:
asyncio.ensure_future(coro_
ensure_future(xxx())
Это красивее, но получается что в юниттестах тестируется только один бранч в этой функции. То есть асинхронный вызов не тестируется.
Кто нить с подобным сталкивался и красиво решал?