RagFlow优化代码解析(一)

引子

前文写到RagFlow的环境搭建&推理测试,感兴趣的童鞋可以移步(RagFlow环境搭建&推理测试-CSDN博客)。前文也写过RagFLow参数配置&测试的文档,详见(RagFlow环境搭建&推理测试-CSDN博客)。很少写关于具体代码的blog,这个不涉密,OK,那我们开始吧。

一、RagFlow检索优化--ES替换为Infitity

在上一篇文章中,我尝试新建了两个知识库,一个知识库中有两个文档,其中一个比较大,另外一个知识库上传的时QA对的excel表格。我在聊天设置中选择了两个知识库,我提出文档,到得到答案差不多要3mins,这个。。。呃,需要排查下。那我们就采用控制变量法来找到问题原因吧,既然时两个知识库,那我们先删除一个QA对知识库。。。好家伙,提出一个问题,依然需要3mins才出答案。那我们继续,剩下的一个知识库中两个文档,那我们先禁用一个长的文档,来看看效果。呃。。时间略快一点,还是要2mins。

根据上一篇博客里所讲,我这里选择了reranker模型,那么在混合查询中的向量相似度部分将被rerank打分代替。那就去掉rerank,我们再测试下,emm 。。。时间略有缩短,可以看到显示搜索中这个过程十分耗时。当然有可能是我的机器配置比较差的缘故,但进一步分析,目前设置是使用关键词相似度与向量余弦相似度相结合的混合查询方式。采用的是ES数据库的查询结果计算的。

刚好看到RagFlow中配置文档中有替换Infinity的部分,那就先来了解下Infinity到底是什么。

开源 AI 原生数据库 Infinity,23年12月 正式开源发布,提供了 2 种新数据类型:稀疏向量 Sparse Vector 和 张量 Tensor,在此前的全文搜索和向量搜索之外, Infinity 提供了更多的召回手段,如下图所示,用户可以采用任意 N 路召回(N ≥ 2)进行混合搜索,这是目前功能最强大的 RAG 专用数据库。

我们知道,仅仅依靠向量搜索(默认情况下,它用来特指稠密向量)并不总能提供令人满意的结果。当用户问题中的特定关键词与存储的数据不准确匹配时,这种问题尤为明显。这是因为向量本身不具备精确语义表征能力:一个词,一句话,乃至一篇文章,都可以只用一个向量来表示,这时向量本质上表达的是这段文字的“语义”,也就是这段文字跟其他文字在一个上下文窗口内共同出现概率的压缩表示 ,因此向量天然无法表示精确的查询。例如如果用户询问“2024 年 3 月我们公司财务计划包含哪些组合”,那么很可能得到的结果是其他时间段的数据,或者得到运营计划,营销管理等其他类型的数据。

因此,在一种好的解决方案是,利用基于关键词的全文搜索提供精确查询,它跟向量搜索共同工作,这就是全文搜索 + 向量搜索 的 2 路召回,又被称为混合搜索(hybrid search)。

多了 那么多我们来看下ES和Infinity执行效率上的对比吧。如下图:

我们可以看到infinity的执行效率是Especially的40倍左右,那我们就替换下试试。

关闭docker容器

docker compose -f docker-compose-base.yml -f docker-compose-gpu.yml down -v

修改参数

vi .env

重启容器

docker compose -f docker-compose-base.yml -f docker-compose-gpu.yml up -d

会去拉取infinity的docker镜像,更新之后,速度果然大幅度提升,几秒内响应。

二、代码解析

1、整体架构

我们从官方的架构图入手,我们可以看到从左往右,是我们在实际在线应用的时候的流程架构,从右到左,是知识库离线生成的流程架构。很明显,这张图中把知识库部分画的很大,彰显它在整个RagFlow项目中的核心地位。于此同时,最右侧详细介绍了文件解析的各种手段,比如 OCR, Document Layout Analyze 等,这些在常规的 RAG 中可能会作为一个不起眼的 Unstructured Loader 包含进去,可以猜到 RagFlow 的一个核心能力在于文件的解析环节。在官方文档中也反复强调 Quality in, quality out, 反映出 RAGFlow 的独到之处在于细粒度文档解析。另外文档中提到其没有使用任何 RAG 中间件,而是完全重新研发了一套智能文档理解系统,并以此为依托构建 RAG 任务编排体系,也可以理解文档的解析是其 RagFlow 的核心亮点。

2、代码结构

我们来看看代码结构(版本:v0.18.0)

agent:RagFlow新增的一个模块,即工作流(注:实际上工作流和agent不是一个概念,agent可以作为workflow的一部分),通过“Graph"是一个由节点和边组成的数学概念。它被用来构建复杂的工作流或代理。

agentic_reasoning:代理推理

api:后端的 API

conf :配置信息

deepdoc: 文件解析模块

docker:docker配置安装启动部署文件

docs:文档

example:案例

graphrag:图rag

helm:打包管理工具

intergreations:集成插件工具

mcp:模型上下文协议

web:对应的是前端页面,TypeScript 开发

其他的一些技术中间件

Web 服务:Flask

业务数据库:Mysql

向量数据库: ElasticSearch (常规关键词搜索用的也是它),前文已经替换 infinity 

文件存储: MinIO,支持分布式存储

缓存中间件:valkey/valkey:8 是从 Redis 7.2.4 fork 而来,旨在作为 Redis 的开源替代品,特别是在 Redis Labs 更改了 Redis 的源码使用协议之后。它保持了与 Redis 的兼容性,同时引入了许多性能和功能上的改进。在网络应用中,Valkey 可以用作缓存、消息队列、会话存储等多种用途,适用于需要快速数据访问和低延迟的场景。

3、源码解析

(1)加载文件

常规的 RAG 服务都是在上传时进行文件的加载和解析,但是 RAGFlow 的上传仅仅包含上传至 MinIO,需要手工点击触发文件的解析。根据实际体验,RAGFlow 的解析相当慢,资源开销也比较大,所以这就是采取二次手工确认的产品方案的原因吧。

实际的文件解析通过接口 /v1/document/run 进行触发的,实际的处理是在 api/db/services/task_service.py 中的 queue_tasks() 中完成的,此方法会根据文件创建一个或多个异步任务,方便异步执行。实现如下所示:

def queue_tasks(doc: dict, bucket: str, name: str, priority: int):def new_task():return {"id": get_uuid(), "doc_id": doc["id"], "progress": 0.0, "from_page": 0, "to_page": 100000000}parse_task_array = []# pdf 文件的解析,根据不同的类型设置单个任务最多处理的页数# 默认单个任务处理 12 页 pdf,paper 类型的 pdf 一个任务处理 22 页,其他 pdf 不分页if doc["type"] == FileType.PDF.value:file_bin = STORAGE_IMPL.get(bucket, name)do_layout = doc["parser_config"].get("layout_recognize", "DeepDOC")pages = PdfParser.total_page_number(doc["name"], file_bin)page_size = doc["parser_config"].get("task_page_size", 12)if doc["parser_id"] == "paper":page_size = doc["parser_config"].get("task_page_size", 22)if doc["parser_id"] in ["one", "knowledge_graph"] or do_layout != "DeepDOC":page_size = 10 ** 9page_ranges = doc["parser_config"].get("pages") or [(1, 10 ** 5)]for s, e in page_ranges:s -= 1s = max(0, s)e = min(e - 1, pages)for p in range(s, e, page_size):task = new_task()task["from_page"] = ptask["to_page"] = min(p + page_size, e)parse_task_array.append(task)# 表格数据单个任务处理 3000 行elif doc["parser_id"] == "table":file_bin = STORAGE_IMPL.get(bucket, name)rn = RAGFlowExcelParser.row_number(doc["name"], file_bin)for i in range(0, rn, 3000):task = new_task()task["from_page"] = itask["to_page"] = min(i + 3000, rn)parse_task_array.append(task)else:parse_task_array.append(new_task())chunking_config = DocumentService.get_chunking_config(doc["id"])# 任务插入 Redis 消息队列,方便异步处理for task in parse_task_array:hasher = xxhash.xxh64()for field in sorted(chunking_config.keys()):if field == "parser_config":for k in ["raptor", "graphrag"]:if k in chunking_config[field]:del chunking_config[field][k]hasher.update(str(chunking_config[field]).encode("utf-8"))for field in ["doc_id", "from_page", "to_page"]:hasher.update(str(task.get(field, "")).encode("utf-8"))task_digest = hasher.hexdigest()task["digest"] = task_digesttask["progress"] = 0.0task["priority"] = priorityprev_tasks = TaskService.get_tasks(doc["id"])ck_num = 0if prev_tasks:for task in parse_task_array:ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config)TaskService.filter_delete([Task.doc_id == doc["id"]])chunk_ids = []for task in prev_tasks:if task["chunk_ids"]:chunk_ids.extend(task["chunk_ids"].split())if chunk_ids:settings.docStoreConn.delete({"id": chunk_ids}, search.index_name(chunking_config["tenant_id"]),chunking_config["kb_id"])DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num})bulk_insert_into_db(Task, parse_task_array, True)DocumentService.begin2parse(doc["id"])unfinished_task_array = [task for task in parse_task_array if task["progress"] < 1.0]for unfinished_task in unfinished_task_array:assert REDIS_CONN.queue_product(get_svr_queue_name(priority), message=unfinished_task), "Can't access Redis. Please check the Redis' status."

从上面的实现来看,文件的解析是根据内容拆分为多个任务,通过 Redis 消息队列进行暂存(生产者),之后就可以离线异步处理。直接查看对应的消息队列的消费模块(消费者),对应在 rag/svr/task_executor.py 中的 main() 方法中。实现如下所示:

async def main():logging.info(r"""______           __      ______                     __/_  __/___ ______/ /__   / ____/  _____  _______  __/ /_____  _____/ / / __ `/ ___/ //_/  / __/ | |/_/ _ \/ ___/ / / / __/ __ \/ ___// / / /_/ (__  ) ,<    / /____>  </  __/ /__/ /_/ / /_/ /_/ / /
/_/  \__,_/____/_/|_|  /_____/_/|_|\___/\___/\__,_/\__/\____/_/""")logging.info(f'TaskExecutor: RAGFlow version: {get_ragflow_version()}')settings.init_settings()print_rag_settings()if sys.platform != "win32":signal.signal(signal.SIGUSR1, start_tracemalloc_and_snapshot)signal.signal(signal.SIGUSR2, stop_tracemalloc)TRACE_MALLOC_ENABLED = int(os.environ.get('TRACE_MALLOC_ENABLED', "0"))if TRACE_MALLOC_ENABLED:start_tracemalloc_and_snapshot(None, None)signal.signal(signal.SIGINT, signal_handler)signal.signal(signal.SIGTERM, signal_handler)threading.Thread(name="RecoverPendingTask", target=recover_pending_tasks).start()async with trio.open_nursery() as nursery:nursery.start_soon(report_status)while not stop_event.is_set():async with task_limiter:nursery.start_soon(handle_task)logging.error("BUG!!! You should not reach here!!!")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.tpcf.cn/web/82756.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

永磁同步电机控制算法--模糊PI转速控制器

一、原理介绍 在常规的PID控制系统的基础上提出了一种模糊PID以及矢量变换方法相结合的控制系统&#xff0c;经过仿真分析对比证明&#xff1a; 模糊PID控制系统能够有效的提高永磁同步电机的转速响应速度&#xff0c;降低转矩脉动&#xff0c;增强了整体控制系统的抗干扰能力…

MySQL基本操作(续)

第3章&#xff1a;MySQL基本操作&#xff08;续&#xff09; 3.3 表操作 表是关系型数据库中存储数据的基本结构&#xff0c;由行和列组成。在MySQL中&#xff0c;表操作包括创建表、查看表结构、修改表和删除表等。本节将详细介绍这些操作。 3.3.1 创建表 在MySQL中&#…

探索未知惊喜,盲盒抽卡机小程序系统开发新启航

在消费市场不断追求新鲜感与惊喜体验的当下&#xff0c;盲盒抽卡机以其独特的魅力&#xff0c;迅速成为众多消费者热衷的娱乐与消费方式。我们紧跟这一潮流趋势&#xff0c;专注于盲盒抽卡机小程序系统的开发&#xff0c;致力于为商家和用户打造一个充满趣味与惊喜的数字化平台…

89.实现添加收藏的功能的后端实现

实现完查看收藏列表之后&#xff0c;实现的是添加收藏的功能 我的设想是&#xff1a;在对话界面中&#xff0c;如果用户认为AI的回答非常好&#xff0c;可以通过点击该回答对应的气泡中的图标&#xff0c;对该内容进行添加 所以后端实现为&#xff1a; service类中添加&…

OD 算法题 B卷【猴子吃桃】

文章目录 猴子吃桃 猴子吃桃 猴子喜欢吃桃&#xff0c;桃园有N棵桃树&#xff0c;第i棵桃树上有Ni个桃&#xff0c;看守将在H(>N)小时后回来&#xff1b;猴子可以决定吃桃的速度K(个/小时)&#xff0c;每个小时他会选择一棵桃树&#xff0c;从中吃掉K个桃&#xff0c;如果这…

ubuntu 端口复用

需求描述&#xff1a;复用服务器的 80端口&#xff0c;同时处理 ssh 和 http 请求&#xff0c;也就是 ssh 连接和 http 访问服务器的时候都可以指定 80 端口&#xff0c;然后服务器可以正确分发请求给 ssh 或者 http。 此时&#xff0c;ssh 监听的端口为 22&#xff0c;而 htt…

Hive中ORC存储格式的优化方法

优化Hive中的ORC(Optimized Row Columnar)存储格式可显著提升查询性能、降低存储成本。以下是详细的优化方法,涵盖参数配置、数据组织、写入优化及监控调优等维度: 一、ORC核心参数优化 1. 存储与压缩参数 SET orc.block.size=268435456; -- 块大小(默认256MB)…

Vim 设置搜索高亮底色

在 Vim 中&#xff0c;默认搜索命中会高亮显示&#xff0c;方便用户快速定位关键字。但有些用户希望自定义搜索匹配的底色或前景色&#xff0c;以适应不同的配色方案或提高可读性。本文将详细介绍如何修改 Vim 的搜索高亮颜色。 一、Vim 搜索高亮机制 Vim 用内置的高亮组&…

【计算机网络】非阻塞IO——poll实现多路转接

&#x1f525;个人主页&#x1f525;&#xff1a;孤寂大仙V &#x1f308;收录专栏&#x1f308;&#xff1a;计算机网络 &#x1f339;往期回顾&#x1f339;&#xff1a;【计算机网络】非阻塞IO——select实现多路转接 &#x1f516;流水不争&#xff0c;争的是滔滔不息 一、…

vscode使用系列之快速生成html模板

一.欢迎来到我的酒馆 vscode&#xff0c;yyds! 目录 一.欢迎来到我的酒馆二.vscode下载安装1.关于vscode你需要知道2.开始下载安装 三.vscode快速创建html模板 二.vscode下载安装 1.关于vscode你需要知道 Q&#xff1a;为什么使用vscode? A&#xff1a;使用vscode写…

【C/C++】不同防止头文件重复包含的措施

文章目录 #pragma once vs #ifndef 文件宏1 原理层面区别&#xff08;core&#xff09;2 关键区别与优缺点分析3 总结与最佳实践 #pragma once vs #ifndef 文件宏 在 C/C 中&#xff0c;#pragma once 和传统的文件宏守卫 (#ifndef HEADER_H #define HEADER_H ... #endif) 都用…

java-springboot文件上传校验之只允许上传excel文件,且检查不能是脚本或者有害文件或可行性文件

四重验证机制&#xff1a; 文件扩展名检查&#xff08;.xlsx/.xls&#xff09;MIME类型检查文件魔数验证&#xff08;真实文件类型&#xff09;可执行文件特征检测 防御措施&#xff1a; 使用try-with-resources确保流关闭限制文件大小防止DoS攻击使用Apache POI的FileMagic进…

不确定性分析在LEAP能源-环境系统建模中的整合与应用

本内容突出与实例结合&#xff0c;紧密结合国家能源统计制度及《省级温室气体排放编制指南》&#xff0c;深入浅出地介绍针对不同级别研究对象时如何根据数据结构、可获取性、研究目的&#xff0c;构建合适的能源生产、转换、消费、温室气体排放&#xff08;以碳排放为主&#…

高性能分布式消息队列系统(四)

八、客户端模块的实现 客户端实现的总体框架 在 RabbitMQ 中&#xff0c;应用层提供消息服务的核心实体是 信道&#xff08;Channel&#xff09;。 用户想要与消息队列服务器交互时&#xff0c;通常不会直接操作底层的 TCP 连接&#xff0c;而是通过信道来进行各种消息的发布…

QPair 类说明

QPair 类说明 QPair 是一个模板类&#xff0c;用于存储一对数据项。 头文件&#xff1a; cpp #include <QPair> qmake 配置&#xff1a; QT core 所有成员列表&#xff08;包括继承成员&#xff09; 公共类型 类型定义说明first_type第一个元素的类型&#xff…

4.大语言模型预备数学知识

大语言模型预备数学知识 复习一下在大语言模型中用到的矩阵和向量的运算&#xff0c;及概率统计和神经网络中常用概念。 矩阵的运算 矩阵 矩阵加减法 条件&#xff1a;行数列数相同的矩阵才能做矩阵加减法 数值与矩阵的乘除法 矩阵乘法 条件&#xff1a;矩阵A的列数 矩阵…

uniapp 设置手机不息屏

在使用 UniApp 开发应用时&#xff0c;有时需要在设备长时间未操作时实现息屏保护功能&#xff0c;以节省电量和保护屏幕。以下是如何在 UniApp 中实现这一功能的步骤。 示例一 // 保持屏幕常亮 uni.setKeepScreenOn({keepScreenOn: true });// 监听应用进入后台事件 uni.onH…

智能推荐系统:协同过滤与深度学习结合

智能推荐系统&#xff1a;协同过滤与深度学习结合 系统化学习人工智能网站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目录 智能推荐系统&#xff1a;协同过滤与深度学习结合摘要引言技术原理对比1. 协同过滤算法&#xff1a;基于相似性的推…

使用Python和OpenCV实现图像识别与目标检测

在计算机视觉领域&#xff0c;图像识别和目标检测是两个非常重要的任务。图像识别是指识别图像中的内容&#xff0c;例如判断一张图片中是否包含某个特定物体&#xff1b;目标检测则是在图像中定位并识别多个物体的位置和类别。OpenCV是一个功能强大的开源计算机视觉库&#xf…

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…