streamlit是一个用于快速构建web可化化界面的python库,主要可以用于快速验证算法或者模型的效果。尽管它提供了快速构建应用的便利,但在一些复杂的应用场景中,我们可能需要在同一个程序中实现后台任务处理,例如启动一个异步 worker 进程,对于该任务的启动方式streamlit官方推荐启动方式是使用命令行streamlit进行启动,这种方法并不能很好地解决当前问题。因此,为了解决该问题,本文将深入探讨如何在Streamlit 应用中有效地集成后台异步进程,希望能够帮助你克服在使用 Streamlit 进行复杂应用开发时遇到的挑战。
一、解决方式
首先我翻阅了streamlit的相关文档,并未发现自定义启动方式的相关文章,最好查阅streamlit开源项目的源代码,找到了streamlit的启动方法。
第一层启动方法如下
def _main_run(
file,
args: Optional[List[str]] = None,
flag_options: Optional[Dict[str, Any]] = None,
) -> None:
if args is None:
args = []
if flag_options is None:
flag_options = {}
is_hello = _get_command_line_as_string() == "streamlit hello"
check_credentials()
bootstrap.run(file, is_hello, args, flag_options)
然后依次查看后面的相关方法,进入bootstrap的相关包,找到对应的run方法。
def run(
main_script_path: str,
is_hello: bool,
args: List[str],
flag_options: Dict[str, Any],
) -> None:
"""Run a script in a separate thread and start a server for the app. This starts a blocking asyncio eventloop. """_fix_sys_path(main_script_path)
_fix_matplotlib_crash()
_fix_tornado_crash()
_fix_sys_argv(main_script_path, args)
_fix_pydeck_mapbox_api_warning()
_fix_pydantic_duplicate_validators_error()
_install_config_watchers(flag_options)
_install_pages_watcher(main_script_path)
# Create the server. It won't start running yet.
server = Server(main_script_path, is_hello)
async def run_server() -> None:
# Start the server
await server.start()
_on_server_start(server)
# Install a signal handler that will shut down the server
# and close all our threads
_set_up_signal_handler(server)
# Wait until `Server.stop` is called, either by our signal handler, or
# by a debug websocket session.
await server.stopped
# Run the server. This function will not return until the server is shut down.
asyncio.run(run_server())
成功在这个方法中找到了异步启动的相关方法,可以观察到,在该方法中streamlit是通过asyncio的run方法进行启动,就是调用了系统默认的事件循环。
所以最终解决方式,就是导出bootstrap包中的run方法,然后使用自定义的事件循环进行启动。
import signal
import sys
import worker
import asyncio
from streamlit.web.bootstrap import _on_server_start,_set_up_signal_handler
from streamlit.web.server import Server
server = Server('1_主页.py', False)
async def run_web():
async def run_server() -> None:
# Start the server
await server.start()
_on_server_start(server)
_set_up_signal_handler()
await server.stopped
await run_server()
loop = asyncio.new_event_loop()
async def main():
task1 = asyncio.create_task(run_web())
task2 = asyncio.create_task(worker.run())
asyncio.set_event_loop(loop=loop)
await asyncio.gather(task1,task2)
if __name__ == '__main__':
try:
loop.run_until_complete(main())
except Exception as e:
print(e)
结果运行程序发现,程序正常结束后,自定义的后台worker进程并未结束,然后查看相关代码发现,在_set_up_signal_handler方法中,监听了系统的sign结束的事件,仅结束了streamlit的web服务,自定义的服务并未结束。
最终解决方案,删除streamlit的监听,实现统一的监听,在监听方法中停止调自建的事件循环。
def signal_handler(signal_number, stack_frame):
# The server will shut down its threads and exit its loop.
server.stop()
loop.stop()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if sys.platform == "win32":
signal.signal(signal.SIGBREAK, signal_handler)
else:
signal.signal(signal.SIGQUIT, signal_handler)
二、思考
在探索开源项目如 Streamlit 时,我们难免会遇到官方文档未覆盖的复杂问题。面对这种情况,直接阅读和理解源代码不仅是一种解决问题的手段,也是一次深入了解项目内部工作机制的机会。通过这种方式,我们不仅能够提升自己的编程技能和问题解决能力,还能增强对开源项目架构和设计哲学的理解。
免费试用 更多热门智能应用