{"id":14784,"date":"2024-12-04T18:42:14","date_gmt":"2024-12-04T10:42:14","guid":{"rendered":"https:\/\/www.aisharenet.com\/?p=14784"},"modified":"2024-12-04T18:42:49","modified_gmt":"2024-12-04T10:42:49","slug":"flow","status":"publish","type":"post","link":"https:\/\/www.kdjingpai.com\/en\/flow\/","title":{"rendered":"Flow\uff08Laminar\uff09\uff1a\u6784\u5efa\u667a\u80fd\u4f53\u7684\u8f7b\u91cf\u7ea7\u4efb\u52a1\u5f15\u64ce\uff0c\u7b80\u5316\u5e76\u7075\u6d3b\u7ba1\u7406\u4efb\u52a1"},"content":{"rendered":"<p>Flow\u662f\u4e00\u4e2a\u8f7b\u91cf\u7ea7\u7684\u4efb\u52a1\u5f15\u64ce\uff0c\u4e13\u4e3a\u6784\u5efaAI\u4ee3\u7406\u800c\u8bbe\u8ba1\uff0c\u5f3a\u8c03\u7b80\u6d01\u6027\u548c\u7075\u6d3b\u6027\u3002\u4e0e\u4f20\u7edf\u7684\u57fa\u4e8e\u8282\u70b9\u548c\u8fb9\u7684\u5de5\u4f5c\u6d41\u4e0d\u540c\uff0cFlow\u91c7\u7528\u52a8\u6001\u4efb\u52a1\u961f\u5217\u7cfb\u7edf\uff0c\u652f\u6301\u5e76\u884c\u6267\u884c\u3001\u52a8\u6001\u8c03\u5ea6\u548c\u667a\u80fd\u4f9d\u8d56\u7ba1\u7406\u3002\u5176\u6838\u5fc3\u7406\u5ff5\u662f\u901a\u8fc7\u5e76\u884c\u4efb\u52a1\u6267\u884c\u3001\u52a8\u6001\u5de5\u4f5c\u6d41\u548c\u6761\u4ef6\u5206\u652f\u63a7\u5236\uff0c\u4f7f\u590d\u6742\u7684\u5de5\u4f5c\u6d41\u53d8\u5f97\u7b80\u5355\u6613\u884c\u3002Flow\u65e0\u9700\u9884\u5b9a\u4e49\u8282\u70b9\u4e4b\u95f4\u7684\u8fb9\uff0c\u91c7\u7528\u52a8\u6001\u4efb\u52a1\u8c03\u5ea6\u67b6\u6784\uff0c\u5e2e\u52a9\u5f00\u53d1\u8005\u7f16\u5199\u66f4\u7b80\u6d01\u3001\u6613\u4e8e\u7406\u89e3\u7684\u4ee3\u7801\u3002Flow\u7531Laminar\u56e2\u961f\u7ef4\u62a4\uff0c\u652f\u6301\u81ea\u52a8\u5316\u8ffd\u8e2a\u548c\u72b6\u6001\u7ba1\u7406\uff0c\u9002\u7528\u4e8e\u5404\u79cdAI\u5e94\u7528\u573a\u666f\u3002<\/p>\n<p>&nbsp;<\/p>\n<h2>\u529f\u80fd\u5217\u8868<\/h2>\n<ul>\n<li>\u5e76\u884c\u4efb\u52a1\u6267\u884c\uff1a\u81ea\u52a8\u5e76\u884c\u8fd0\u884c\u4efb\u52a1\uff0c\u65e0\u9700\u663e\u5f0f\u7ebf\u7a0b\u4ee3\u7801\u3002<\/li>\n<li>\u52a8\u6001\u8c03\u5ea6\uff1a\u4efb\u52a1\u53ef\u4ee5\u5728\u8fd0\u884c\u65f6\u8c03\u5ea6\u65b0\u4efb\u52a1\u3002<\/li>\n<li>\u667a\u80fd\u4f9d\u8d56\u7ba1\u7406\uff1a\u4efb\u52a1\u53ef\u4ee5\u7b49\u5f85\u524d\u4e00\u4e2a\u64cd\u4f5c\u7684\u7ed3\u679c\u3002<\/li>\n<li>\u72b6\u6001\u7ba1\u7406\uff1a\u4fdd\u5b58\u548c\u52a0\u8f7d\u4efb\u52a1\u72b6\u6001\uff0c\u4ece\u7279\u5b9a\u4efb\u52a1\u5f00\u59cb\u6267\u884c\u3002<\/li>\n<li>\u6761\u4ef6\u5206\u652f\u548c\u63a7\u5236\u6d41\uff1a\u652f\u6301\u6761\u4ef6\u5206\u652f\u548c\u5faa\u73af\u63a7\u5236\u3002<\/li>\n<li>\u6d41\u5f0f\u4efb\u52a1\u6267\u884c\uff1a\u652f\u6301\u4efb\u52a1\u6267\u884c\u7684\u6d41\u5f0f\u5904\u7406\u3002<\/li>\n<li>\u81ea\u52a8\u5316\u8ffd\u8e2a\uff1a\u652f\u6301OpenTelemetry\u7684\u81ea\u52a8\u5316\u8ffd\u8e2a\uff0c\u4fbf\u4e8e\u8c03\u8bd5\u548c\u72b6\u6001\u91cd\u5efa\u3002<\/li>\n<li>\u8f7b\u91cf\u7ea7\u4e14\u65e0\u5916\u90e8\u4f9d\u8d56\uff1a\u8bbe\u8ba1\u7b80\u6d01\u3001\u7075\u6d3b\u4e14\u529f\u80fd\u5f3a\u5927\u3002<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<h2>\u4f7f\u7528\u5e2e\u52a9<\/h2>\n<h3>\u5b89\u88c5\u6d41\u7a0b<\/h3>\n<p>\u8981\u5b89\u88c5Flow\uff0c\u53ea\u9700\u4f7f\u7528pip\u547d\u4ee4\uff1a<\/p>\n<pre><code>pip install lmnr-flow\r\n<\/code><\/pre>\n<h3>\u57fa\u672c\u4f7f\u7528<\/h3>\n<p>\u4ee5\u4e0b\u662f\u4e00\u4e2a\u7b80\u5355\u7684\u4f7f\u7528\u793a\u4f8b\uff1a<\/p>\n<pre><code>from concurrent.futures import ThreadPoolExecutor\r\nfrom lmnr_flow import Flow, TaskOutput, NextTask, Context\r\n# \u521b\u5efaFlow\u5b9e\u4f8b\r\nflow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))\r\n# \u5b9a\u4e49\u4e00\u4e2a\u7b80\u5355\u4efb\u52a1\r\ndef my_task(context: Context) -&gt; TaskOutput:\r\nreturn TaskOutput(output=\"Hello World!\")\r\n# \u6dfb\u52a0\u4efb\u52a1\u5230Flow\r\nflow.add_task(\"greet\", my_task)\r\n# \u8fd0\u884c\u4efb\u52a1\r\nresult = flow.run(\"greet\")\r\nprint(result)  # \u8f93\u51fa: {\"greet\": \"Hello World!\"}\r\n<\/code><\/pre>\n<h3>\u4efb\u52a1\u94fe<\/h3>\n<p>\u4efb\u52a1\u53ef\u4ee5\u89e6\u53d1\u5176\u4ed6\u4efb\u52a1\uff1a<\/p>\n<pre><code>def task1(context: Context) -&gt; TaskOutput:\r\nreturn TaskOutput(output=\"result1\", next_tasks=[NextTask(\"task2\")])\r\ndef task2(context: Context) -&gt; TaskOutput:\r\nt1_result = context.get(\"task1\")\r\nreturn TaskOutput(output=\"result2\")\r\nflow.add_task(\"task1\", task1)\r\nflow.add_task(\"task2\", task2)\r\nflow.run(\"task1\")  # \u8f93\u51fa: {\"task2\": \"result2\"}\r\n<\/code><\/pre>\n<h3>\u5e76\u884c\u6267\u884c<\/h3>\n<p>\u591a\u4e2a\u4efb\u52a1\u53ef\u4ee5\u5e76\u884c\u6267\u884c\uff1a<\/p>\n<pre><code>def starter(context: Context) -&gt; TaskOutput:\r\nreturn TaskOutput(output=\"started\", next_tasks=[NextTask(\"slow_task1\"), NextTask(\"slow_task2\")])\r\ndef slow_task1(context: Context) -&gt; TaskOutput:\r\ntime.sleep(1)\r\nreturn TaskOutput(output=\"result1\")\r\ndef slow_task2(context: Context) -&gt; TaskOutput:\r\ntime.sleep(1)\r\nreturn TaskOutput(output=\"result2\")\r\nflow.add_task(\"starter\", starter)\r\nflow.add_task(\"slow_task1\", slow_task1)\r\nflow.add_task(\"slow_task2\", slow_task2)\r\nflow.run(\"starter\")  # \u4e24\u4e2a\u4efb\u52a1\u5e76\u884c\u6267\u884c\uff0c\u603b\u8017\u65f6\u7ea61\u79d2\r\n<\/code><\/pre>\n<h3>\u6d41\u5f0f\u7ed3\u679c<\/h3>\n<p>\u4efb\u52a1\u53ef\u4ee5\u6d41\u5f0f\u8fd4\u56de\u4e2d\u95f4\u7ed3\u679c\uff1a<\/p>\n<pre><code>def streaming_task(context: Context) -&gt; TaskOutput:\r\nstream = context.get_stream()\r\nfor i in range(3):\r\nstream.put(StreamChunk(\"streaming_task\", f\"interim_{i}\"))\r\nreturn TaskOutput(output=\"final\")\r\nflow.add_task(\"streaming_task\", streaming_task)\r\nfor task_id, output in flow.stream(\"streaming_task\"):\r\nprint(f\"{task_id}: {output}\")\r\n<\/code><\/pre>\n<h3>\u52a8\u6001\u5de5\u4f5c\u6d41<\/h3>\n<p>\u4efb\u52a1\u53ef\u4ee5\u6839\u636e\u6761\u4ef6\u52a8\u6001\u8c03\u5ea6\uff1a<\/p>\n<pre><code>def conditional_task(context: Context) -&gt; TaskOutput:\r\ncount = context.get(\"count\", 0)\r\nif count &gt;= 3:\r\nreturn TaskOutput(output=\"done\")\r\ncontext.set(\"count\", count + 1)\r\nreturn TaskOutput(output=f\"iteration_{count}\", next_tasks=[NextTask(\"conditional_task\")])\r\nflow.add_task(\"conditional_task\", conditional_task)\r\nflow.run(\"conditional_task\")  # \u4efb\u52a1\u5faa\u73af3\u6b21\u540e\u5b8c\u6210\r\n<\/code><\/pre>\n<h3>\u8f93\u5165\u53c2\u6570<\/h3>\n<p>\u4efb\u52a1\u53ef\u4ee5\u63a5\u6536\u8f93\u5165\u53c2\u6570\uff1a<\/p>\n<pre><code>def parameterized_task(context: Context) -&gt; TaskOutput:\r\nname = context.get(\"user_name\")\r\nreturn TaskOutput(output=f\"Hello {name}!\")\r\nflow.add_task(\"greet\", parameterized_task)\r\nresult = flow.run(\"greet\", inputs={\"user_name\": \"Alice\"})\r\nprint(result)  # \u8f93\u51fa: {\"greet\": \"Hello Alice!\"}\r\n<\/code><\/pre>\n<h3>\u52a8\u6001\u8def\u7531<\/h3>\n<p>\u4efb\u52a1\u53ef\u4ee5\u6839\u636e\u8f93\u5165\u52a8\u6001\u8def\u7531\uff1a<\/p>\n<pre><code>def router(context: Context) -&gt; TaskOutput:\r\ntask_type = context.get(\"type\")\r\nroutes = {\r\n\"process\": [NextTask(\"process_task\")],\r\n\"analyze\": [NextTask(\"analyze_task\")],\r\n\"report\": [NextTask(\"report_task\")]\r\n}\r\nreturn TaskOutput(output=f\"routing to {task_type}\", next_tasks=routes.get(task_type, []))\r\nflow.add_task(\"router\", router)\r\nflow.add_task(\"process_task\", lambda ctx: TaskOutput(\"processed data\"))\r\nflow.run(\"router\", inputs={\"type\": \"process\"})  # \u8f93\u51fa: {\"process_task\": \"processed data\"}\r\n<\/code><\/pre>\n<h3>\u72b6\u6001\u7ba1\u7406<\/h3>\n<p>\u4efb\u52a1\u72b6\u6001\u53ef\u4ee5\u4fdd\u5b58\u548c\u52a0\u8f7d\uff1a<\/p>\n<pre><code>context = Context()\r\ncontext.from_dict({\"task1\": \"result1\"})\r\nflow = Flow(context=context)\r\nflow.add_task(\"task2\", lambda ctx: TaskOutput(\"result2\"))\r\nflow.run(\"task2\")\r\nassert flow.context.get(\"task1\") == \"result1\"\r\nassert flow.context.get(\"task2\") == \"result2\"\r\n<\/code><\/pre>\n<h3>Map Reduce<\/h3>\n<p>\u4efb\u52a1\u53ef\u4ee5\u8fdb\u884cMap Reduce\u64cd\u4f5c\uff1a<\/p>\n<pre><code>def task1(ctx):\r\nctx.set(\"collector\", [])\r\nreturn TaskOutput(\"result1\", next_tasks=[NextTask(\"task2\", spawn_another=True) for _ in range(3)])\r\ndef task2(ctx):\r\ncollector = ctx.get(\"collector\")\r\ncollector.append(\"result2\")\r\nctx.set(\"collector\", collector)\r\nreturn TaskOutput(\"\", next_tasks=[NextTask(\"task3\")])\r\ndef task3(ctx):\r\ncollector = ctx.get(\"collector\")\r\nreturn TaskOutput(collector)\r\nflow.add_task(\"task1\", task1)\r\nflow.add_task(\"task2\", task2)\r\nflow.add_task(\"task3\", task3)\r\nresult = flow.run(\"task1\")\r\nassert result == {\"task3\": [\"result2\", \"result2\", \"result2\"]}\r\n<\/code><\/pre>\n<h3>LLM\u4ee3\u7406<\/h3>\n<p>\u52a8\u6001\u5de5\u5177\u9009\u62e9\u7684LLM\u4ee3\u7406\u793a\u4f8b\uff1a<\/p>\n<pre><code>from typing import List\r\nimport json\r\ndef llm_agent(context: Context) -&gt; TaskOutput:\r\nprompt = context.get(\"user_input\")\r\nllm_response = {\r\n\"reasoning\": \"Need to search database and format results\",\r\n\"tools\": [\"search_db\", \"format_results\"]\r\n}\r\nnext_tasks = [NextTask(tool) for tool in llm_response[\"tools\"]]\r\nreturn TaskOutput(output=\"LLM agent response\", next_tasks=next_tasks)\r\nflow.add_task(\"llm_agent\", llm_agent)\r\nflow.run(\"llm_agent\", inputs={\"user_input\": \"Find data\"})<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>Flow\u662f\u4e00\u4e2a\u8f7b\u91cf\u7ea7\u7684\u4efb\u52a1\u5f15\u64ce\uff0c\u4e13\u4e3a\u6784\u5efaAI\u4ee3\u7406\u800c\u8bbe\u8ba1\uff0c\u5f3a\u8c03\u7b80\u6d01\u6027\u548c\u7075\u6d3b\u6027\u3002\u4e0e\u4f20\u7edf\u7684\u57fa\u4e8e\u8282\u70b9\u548c\u8fb9\u7684\u5de5\u4f5c\u6d41\u4e0d\u540c\uff0cFlow\u91c7\u7528\u52a8\u6001\u4efb\u52a1\u961f\u5217\u7cfb\u7edf\uff0c\u652f\u6301\u5e76\u884c\u6267\u884c\u3001\u52a8\u6001\u8c03\u5ea6\u548c\u667a\u80fd\u4f9d\u8d56\u7ba1\u7406\u3002\u5176\u6838\u5fc3\u7406\u5ff5\u662f\u901a\u8fc7\u5e76\u884c\u4efb\u52a1\u6267\u884c\u3001\u52a8\u6001\u5de5\u4f5c\u6d41\u548c\u6761\u4ef6\u5206\u652f\u63a7\u5236\uff0c\u4f7f\u590d\u6742&#8230;<\/p>\n","protected":false},"author":1,"featured_media":61365,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[20],"tags":[230,368],"class_list":["post-14784","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-tool","tag-aikaiyuanxiangmu","tag-workflow"],"_links":{"self":[{"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/posts\/14784","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/comments?post=14784"}],"version-history":[{"count":0,"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/posts\/14784\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/media\/61365"}],"wp:attachment":[{"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/media?parent=14784"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/categories?post=14784"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.kdjingpai.com\/en\/wp-json\/wp\/v2\/tags?post=14784"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}