Files
2025-03-01 14:52:43 +00:00
..
2025-03-01 14:52:43 +00:00

def sync(obj):
    import asyncio, functools
    if asyncio.iscoroutine(coro := obj):
        loop, future = asyncio.get_event_loop(), asyncio.ensure_future(coro)
        while not future.done():
            loop._process_events(loop._selector.select(0))
            if (ready := loop._ready) and not (handle := ready.popleft())._cancelled:
                task = (tasks := asyncio.tasks._current_tasks).pop(loop, None)
                handle._run()
                tasks[loop] = task
        return future.result()
    if asyncio.iscoroutinefunction(func := obj): return functools.wraps(func)(
        lambda *args, **kwargs: sync(func(*args, **kwargs)))
    for attr in dir(obj):
        if asyncio.iscoroutinefunction(method := getattr(obj, attr)):
            setattr(obj, attr, sync(method))
    return obj

async def foo(): return 42
print(sync(foo()), sync(foo)())

from playwright.async_api import async_playwright
browser = sync(sync(async_playwright().start()).firefox.launch())
page = sync(sync(browser.new_page()))
page._repr_png_ = page.screenshot
page.goto('https://naver.com')
page
def sync(coro):
    import asyncio, functools
    if not asyncio.iscoroutinefunction(coro): return coro
    @functools.wraps(coro)
    def wrapper(*args, **kwargs):
        loop, future = asyncio.get_event_loop(), asyncio.ensure_future(coro(*args, **kwargs))
        while not future.done(): 
            loop._process_events(loop._selector.select(0))
            if (ready := loop._ready) and (handle := ready.popleft())._cancelled is False:
                task = (tasks := asyncio.tasks._current_tasks).pop(loop, None)
                handle._run(); tasks[loop] = task
        return future.result()
    return wrapper

@sync
async def hello():
    @sync
    async def world(): print('hello')
    world() or print('world')
hello()
"""
hello
world
"""