Streamlit应用开发中如何实现后台任务的异步处理?

streamlit是一个用于快速构建web可化化界面的python库,主要可以用于快速验证算法或者模型的效果。尽管它提供了快速构建应用的便利,但在一些复杂的应用场景中,我们可能需要在同一个程序中实现后台任务处理,例如启动一个异步 worker 进程,对于该任务的启动方式streamlit官方推荐启动方式是使用命令行streamlit进行启动,这种方法并不能很好地解决当前问题。因此,为了解决该问题,本文将深入探讨如何在Streamlit 应用中有效地集成后台异步进程,希望能够帮助你克服在使用 Streamlit 进行复杂应用开发时遇到的挑战。

一、解决方式

首先我翻阅了streamlit的相关文档,并未发现自定义启动方式的相关文章,最好查阅streamlit开源项目的源代码,找到了streamlit的启动方法。

  第一层启动方法如下

def _main_run(
    file,
    args: Optional[List[str]] = None,
    flag_options: Optional[Dict[str, Any]] = None,
) -> None:
    if args is None:
        args = []

    if flag_options is None:
        flag_options = {}

    is_hello = _get_command_line_as_string() == "streamlit hello"

    check_credentials()

    bootstrap.run(file, is_hello, args, flag_options)

然后依次查看后面的相关方法,进入bootstrap的相关包,找到对应的run方法。

def run(
    main_script_path: str,
    is_hello: bool,
    args: List[str],
    flag_options: Dict[str, Any],
) -> None:
    """Run a script in a separate thread and start a server for the app.    This starts a blocking asyncio eventloop.    """_fix_sys_path(main_script_path)
    _fix_matplotlib_crash()
    _fix_tornado_crash()
    _fix_sys_argv(main_script_path, args)
    _fix_pydeck_mapbox_api_warning()
    _fix_pydantic_duplicate_validators_error()
    _install_config_watchers(flag_options)
    _install_pages_watcher(main_script_path)

    # Create the server. It won't start running yet.
    server = Server(main_script_path, is_hello)

    async def run_server() -> None:
        # Start the server
        await server.start()
        _on_server_start(server)

        # Install a signal handler that will shut down the server
        # and close all our threads
        _set_up_signal_handler(server)

        # Wait until `Server.stop` is called, either by our signal handler, or
        # by a debug websocket session.
        await server.stopped

    # Run the server. This function will not return until the server is shut down.
    asyncio.run(run_server())

成功在这个方法中找到了异步启动的相关方法,可以观察到,在该方法中streamlit是通过asyncio的run方法进行启动,就是调用了系统默认的事件循环。

所以最终解决方式,就是导出bootstrap包中的run方法,然后使用自定义的事件循环进行启动。

import signal
import sys
import worker
import asyncio
from streamlit.web.bootstrap import _on_server_start,_set_up_signal_handler
from streamlit.web.server import Server

    
server = Server('1_主页.py', False)

async def run_web():

    async def run_server() -> None:
        # Start the server
        await server.start()
        _on_server_start(server)
        _set_up_signal_handler()
        await server.stopped



    await run_server()

loop = asyncio.new_event_loop()

async def main():
    task1 = asyncio.create_task(run_web())
    task2 = asyncio.create_task(worker.run())
    asyncio.set_event_loop(loop=loop)
    await asyncio.gather(task1,task2)


if __name__ == '__main__':
    try:
        loop.run_until_complete(main())
    except Exception as e:
        print(e)

结果运行程序发现,程序正常结束后,自定义的后台worker进程并未结束,然后查看相关代码发现,在_set_up_signal_handler方法中,监听了系统的sign结束的事件,仅结束了streamlit的web服务,自定义的服务并未结束。

最终解决方案,删除streamlit的监听,实现统一的监听,在监听方法中停止调自建的事件循环。

def signal_handler(signal_number, stack_frame):
    # The server will shut down its threads and exit its loop.
    server.stop()
    loop.stop()

signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if sys.platform == "win32":
    signal.signal(signal.SIGBREAK, signal_handler)
else:
    signal.signal(signal.SIGQUIT, signal_handler)

二、思考

在探索开源项目如 Streamlit 时,我们难免会遇到官方文档未覆盖的复杂问题。面对这种情况,直接阅读和理解源代码不仅是一种解决问题的手段,也是一次深入了解项目内部工作机制的机会。通过这种方式,我们不仅能够提升自己的编程技能和问题解决能力,还能增强对开源项目架构和设计哲学的理解。

咨询方案 预约演示                        
(1)
研发专家-江山研发专家-江山
上一篇 2024年5月13日
下一篇 2024年5月15日

相关推荐