ElasticSearch迁移至openGauss

Elasticsearch 作为一种高效的全文搜索引擎,广泛应用于实时搜索、日志分析等场景。而 openGauss,作为一款企业级关系型数据库,强调事务处理与数据一致性。那么,当这两者的应用场景和技术架构发生交集时,如何实现它们之间的平滑迁移呢?

本文将探讨 Elasticsearch 基础数据数据迁移至 openGauss 的解决方案,在此,我们首先根据等价实例来看一下 Elasticsearch 和关系型数据库(如 openGauss)的基础数据结构:

关系型数据库操作

CREATE TABLE products (    id INT PRIMARY KEY,    name VARCHAR(100),    price DECIMAL(10,2));
INSERT INTO products VALUES (1, 'Laptop', 999.99);

Elasticsearch等价操作

PUT /products{  "mappings": {    "properties": {      "id": { "type": "integer" },      "name": { "type": "text" },      "price": { "type": "double" }    }  }}
POST /products/_doc/1{  "id": 1,  "name": "Laptop",  "price": 999.99}

数据组织层级

  • 关系型数据库:

    Database → Table → Row/Column

  • Elasticsearch:

    6.x之前:Index → Type → Document (类似Database → Table → Row)

    7.x之后:Index → Document (Type被移除,强化了Index≈Table的对应关系)

Elasticsearch 概念

关系型数据库(如openGauss)概念

说明

索引(Index)库-表(Table)

对应关系

类型(Type)

(已弃用,7.x后无对应)

早期版本中类似表分区

文档(Document)

行(Row)

一条记录

字段(Field)

列(Column)

数据属性

映射(Mapping)

表结构定义(Schema)

定义字段类型等

索引别名(Alias)

视图(View)

虚拟索引/表

分片(Shard)

分区(Partition)

数据水平拆分

检索方式

1、向量检索

Elasticsearch 向量检索

# 1. 创建包含向量字段的索引PUT /image_vectors{  "mappings": {    "properties": {      "image_name": {        "type": "text"      },      "image_vector": {        "type": "dense_vector",        "dims": 512      }    }  }}
# 2. 插入向量数据POST /image_vectors/_doc{  "image_name": "sunset.jpg",  "image_vector": [0.12, 0.34, ..., 0.56]  // 512维向量}
# 3. 精确向量检索 (script_score)GET /image_vectors/_search{  "query": {    "script_score": {      "query": {"match_all": {}},      "script": {        "source": "cosineSimilarity(params.query_vector, 'image_vector') + 1.0",        "params": {          "query_vector": [0.23, 0.45, ..., 0.67]  // 查询向量        }      }    }  }}
# 4. 近似最近邻搜索 (kNN search)GET /image_vectors/_search{  "knn": {    "field": "image_vector",    "query_vector": [0.23, 0.45, ..., 0.67],    "k": 10,    "num_candidates": 100  }}

openGauss 向量检索(openGauss 从 7.0 版本开始支持向量检索功能)

# 1. 创建包含向量字段的表
-- 创建表CREATE TABLE image_vectors (  id SERIAL PRIMARY KEY,  image_name TEXT,  image_vector VECTOR(512)  -- 512维向量);#2. 插入向量数据INSERT INTO image_vectors (image_name, image_vector) VALUES ('sunset.jpg', '[0.12, 0.34, ..., 0.56]');
# 3. 精确向量检索 (余弦相似度)-- 使用余弦相似度SELECT id, image_name,        1 - (image_vector <=> '[0.23, 0.45, ..., 0.67]') AS cosine_similarityFROM image_vectorsORDER BY cosine_similarity DESCLIMIT 10;
# 4. 近似最近邻搜索 (使用IVFFLAT索引)-- 创建IVFFLAT索引CREATE INDEX idx_image_vector ON image_vectors USING IVFFLAT(image_vector) WITH (lists = 100);
-- 近似最近邻查询SELECT id, image_name,        image_vector <=> '[0.23, 0.45, ..., 0.67]' AS distanceFROM image_vectorsORDER BY distanceLIMIT 10;
2、全文检索

es全文检索 相当于 openGauss的LIKE和正则表达式

​​​​​​​​​​​​​​

# es 全文检索GET /products/_search{  "query": {    "match": {      "description": "search term"    }  }}
# openGauss 模糊查询SELECT * FROM products WHERE description LIKE '%search term%';
# openGauss 正则表达式匹配SELECT * FROM logs WHERE message ~ 'error|warning';

因此,根据数据层级及检索方式分析,迁移时将es的索引迁移到openGauss的一张表里。

环境准备

  • 已部署7.3 及以上(支持向量)版本的ElasticSearch实例

  • 已部署7.0.0-RC1 及以上版本(支持向量)的openGauss实例

  • 已安装3.8 及以上版本的Python环境

  • 已安装涉及的Python库

pip3 install psycopg2pip3 install requests pip3 install pyOpenSSL
#如果安装失败,可以考虑在一个新的虚拟环境中重新安装所需的库,执行以下命令:python3 -m venv venvsource venv/bin/activatepip install requests pyOpenSSL

前置条件

远程连接权限:

openGauss端:

​​​​​​​

#修改openGauss配置文件。将迁移脚本所在机器IP地址加入白名单,修改openGauss监听地址。# 执行以下命令gs_guc set -D {DATADIR} -c " listen_addresses = '\*'"
gs_guc set -D {DATADIR} -h "host all all x.x.x.x/32 sha256"
# 修改完毕后重启openGauss。
gs_ctl restart -D {DATADIR}

elasticsearch端:

​​​​​​​

vim /path/to/your_elasticsearch/config/elasticsearch.yml#修改network.hostnetwork.host: 0.0.0.0

openGauss端创建普通用户(赋权)、迁移的目标数据库:

​​​​​​​

 create user mig_test identified by 'Simple@123';
 grant all privileges to mig_test;
 create database es_to_og with owner mig_test;

迁移操作

1、根据本地部署的elasticsearch与openGauss对脚本进行配置修改,需要修改的内容如下:

​​​​​​​

# Elasticsearch 配置信息es_url = 'http://ip:port'  # Elasticsearch 服务器地址es_index = 'your_es_index'  # Elasticsearch 索引名
# openGauss 配置信息db_host = '127.0.0.1'   # openGauss服务器地址db_port = 5432          # openGauss 端口号db_name = 'your_opengauss_db' # 迁移到openGauss的数据库名称db_user = 'user_name'    # 连接openGauss的普通用户db_password = 'xxxxxx'   # 连接openGauss的用户密码

elasticsearchToOpenGauss.py迁移脚本如下:

​​​​​​​

import requestsimport psycopg2import jsonimport refrom typing import List, Dict, Any, Optional, Union
# Elasticsearch 配置信息es_url = 'http://192.168.0.114:9200'  # Elasticsearch 服务器地址es_index = 'my_dynamic_index'  # Elasticsearch 索引名
# openGauss 配置信息db_host = '192.168.0.219'   # openGauss服务器地址db_port = 15620          # openGauss 端口号db_name = 'es_to_og' # 迁移到openGauss的数据库名称db_user = 'mig_test'    # 连接openGauss的普通用户db_password = 'xxxxxx'   # 连接openGauss的用户密码
RESERVED_KEYWORDS = {    "select", "insert", "update", "delete", "drop", "table", "from", "where", "group",    "by", "having", "order", "limit", "join", "inner", "left", "right", "full", "union",    "all", "distinct", "as", "on", "and", "or", "not", "null", "true", "false", "case",    "when", "then", "else", "end", "exists", "like", "in", "between", "is", "like",    "references", "foreign", "primary", "key", "unique", "check", "default", "constraint",    "index", "unique", "varchar", "text", "int", "bigint", "smallint", "boolean", "timestamp"}
# 从 Elasticsearch 获取数据def fetch_data_from_es():    query = {        "query": {            "match_all": {}        },        "_source": True  # 获取所有字段    }    response = requests.get(f'{es_url}/{es_index}/_search', json=query)    if response.status_code == 200:        return response.json()['hits']['hits']    else:        raise Exception(f"Failed to fetch data from Elasticsearch: {response.status_code}, {response.text}")# 获取索引映射信息def fetch_mapping(es_url, es_index):    response = requests.get(f'{es_url}/{es_index}/_mapping')    if response.status_code == 200:        return response.json()    else:        raise Exception(f"Failed to fetch mapping: {response.status_code}, {response.text}")def get_field_type(es_url: str, es_index: str, field_name: str) -> str:    """ 获取 Elasticsearch 字段的类型 """    mappings = fetch_mapping(es_url, es_index)    print(f"Field name: {field_name}")    print(f"map: {mappings}")    # 获取 properties 字段    properties = mappings.get(es_index, {}).get('mappings', {}).get('properties', {})    # 遍历并查找字段的类型    field_type = 'text'  # 默认类型为 'text'    if field_name in properties:        field_type = properties[field_name].get('type', 'text')    elif 'fields' in properties.get(field_name, {}):        # 如果字段有子字段(比如 keyword),获取 'keyword' 类型        field_type = properties[field_name]['fields'].get('keyword', {}).get('type', 'text')    return field_type
def convert_dict_to_jsonb(value):    # 如果 value 是字典类型,递归调用该函数处理其中的每个元素    if isinstance(value, dict):        return json.dumps({k: convert_dict_to_jsonb(v) for k, v in value.items()})    # 如果 value 是列表类型,递归处理其中的每个元素    elif isinstance(value, list):        return json.dumps([convert_dict_to_jsonb(v) for v in value])    # 如果是其他类型(如字符串、数字),直接返回该值    else:        return value
# 映射 Elasticsearch 数据类型到 openGauss 类型def map_to_opengauss_type(es_type: str, dim: Optional[int] = None) -> str:    """Map Elasticsearch types to openGauss types"""    if isinstance(es_type, (dict, list)):  # 如果 es_type 是字典类型,则需要特殊处理        return 'JSONB'    type_map = {        "long": "BIGINT",  # 大整数        "integer": "INTEGER",  # 整数        "short": "SMALLINT",  # 小整数        "byte": "SMALLINT",  # 小字节        "float": "REAL",  # 浮点数        "double": "DOUBLE PRECISION",  # 双精度浮点数        "boolean": "BOOLEAN",  # 布尔值        "keyword": "VARCHAR",  # 关键字(字符串类型)        "text": "TEXT",  # 长文本        "date": "TIMESTAMP",  # 日期类型        "binary": "BYTEA",  # 二进制数据        "geo_point": "POINT",  # 地理坐标(经纬度)        "geo_shape": "GEOMETRY",  # 复杂地理形状        "nested": "JSONB",  # 嵌套对象        "object": "JSONB",  # 对象        "ip": "INET",  # IP 地址        "scaled_float": "REAL",  # 扩展浮动类型(带缩放的浮动)        "float_vector": f"VECTOR({dim})" if dim else "VECTOR",  # 浮动向量类型        "dense_vector": f"VECTOR({dim})" if dim else "VECTOR",  # 稠密向量类型        "binary_vector": f"BIT({dim})" if dim else "BIT",  # 二进制向量类型        "half_float": "REAL",  # 半精度浮动        "unsigned_long": "BIGINT",  # 无符号长整数        "date_nanos": "TIMESTAMP",  # 高精度日期时间        "alias": "TEXT",  # 别名(通常是字段的别名)    }
    # 如果 es_type 在映射表中,直接返回映射后的类型    if es_type in type_map:        print(f"es_type:{es_type} ----- og_type: {type_map[es_type]}")        return type_map[es_type]    else:        print(f"Warning: Unsupported Elasticsearch type '{es_type}', defaulting to 'TEXT'")        return 'TEXT'  # 默认使用 TEXT 类型# 函数:将非法字符替换为下划线def sanitize_name(field_name: str) -> str:    """处理字段名,确保不会与保留字冲突,且将非字母数字字符替换为下划线"""    # 将所有非字母数字字符替换为下划线    sanitized_name = re.sub(r'[^a-zA-Z0-9_]', '_', field_name)
    # 如果是保留字,则加双引号    if sanitized_name.lower() in RESERVED_KEYWORDS:        return f'"{sanitized_name}"'
    return sanitized_name# 创建 openGauss 表def create_table_in_opengauss(es_url, es_index, table_name):    columns_definition = ['id VARCHAR PRIMARY KEY']  # 增加 id 主键字段    seen_fields = set()  # 用于记录已经处理过的字段名
    # 获取 properties 字段    properties = fetch_mapping(es_url, es_index).get(es_index, {}).get('mappings', {}).get('properties', {})
    # 遍历每个字段    for field, field_info in properties.items():        # 如果该字段已经处理过,跳过        if field in seen_fields:            continue        # 获取字段的类型        es_type = field_info.get('type', 'text')        dim = field_info.get('dims', 0) if isinstance(field_info, dict) else 0        field_type = map_to_opengauss_type(es_type, dim)        sanitized_field_name = sanitize_name(field)        seen_fields.add(field)        columns_definition.append(f"{sanitized_field_name} {field_type}")    # 生成表创建 SQL    columns_str = ", ".join(columns_definition)    create_table_sql = f"DROP TABLE IF EXISTS {sanitize_name(table_name)}; CREATE TABLE {sanitize_name(table_name)} ({columns_str});"    try:        # 建立数据库连接并执行创建表 SQL        connection = psycopg2.connect(            host=db_host,            port=db_port,            dbname=db_name,            user=db_user,            password=db_password        )        cursor = connection.cursor()        cursor.execute(create_table_sql)        connection.commit()        print(f"Table {sanitize_name(table_name)} created successfully.")    except Exception as e:        print(f"Error while creating table {sanitize_name(table_name)}: {e}")    finally:        if connection:            cursor.close()            connection.close()
# 将数据插入到 openGauss 表中def insert_data_to_opengauss(table_name, es_source, es_id):    try:        # 建立数据库连接        connection = psycopg2.connect(            host=db_host,            port=db_port,            dbname=db_name,            user=db_user,            password=db_password        )        cursor = connection.cursor()
        # 动态生成插入 SQL 语句        sanitized_columns = ['id'] + [sanitize_name(col) for col in es_source.keys()]  # 清理列名        values = [es_id]
        # 处理每一列的数据类型,必要时进行转换        for column in es_source:            value = es_source[column]            if isinstance(value, (dict, list)):                # 如果是字典类型,转换为 JSONB                value = convert_dict_to_jsonb(value)            values.append(value)
        columns_str = ', '.join(sanitized_columns)        values_str = ', '.join(['%s'] * len(values))
        insert_sql = f"INSERT INTO {sanitize_name(table_name)} ({columns_str}) VALUES ({values_str})"        cursor.execute(insert_sql, values)
        # 提交事务        connection.commit()
    except Exception as e:        print(f"Error while inserting data into {table_name}: {e}")    finally:        if connection:            cursor.close()            connection.close()
# 主函数def main():    try:        es_data = fetch_data_from_es()        table_name = es_index  # 可以使用索引名作为表名
        create_table_in_opengauss(es_url, es_index, table_name)        for record in es_data:            es_source = record['_source']  # 获取 Elasticsearch 文档中的数据            es_id = record['_id']            insert_data_to_opengauss(table_name, es_source, es_id)        print(f"Successfully inserted data into table {table_name}.")    except Exception as e:        print(f"Migration failed: {e}")if __name__ == "__main__":    main()

2、执行脚本

python3 ./elasticsearchToOpenGauss.py

3、openGauss端查看数据​​​​​​​

#切换到迁移目标数据库openGauss=# \c es_to_og
#查看迁移的表es_to_og=# \d
#查看表结构es_to_og=# \d my_dynamic_index
#查看表数据es_to_og=# select c

图片

-END-

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.tpcf.cn/web/81970.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

品优购项目(HTML\CSS)

项目效果可访问 http://zhousunyu.3vdo.club 查看 主页 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><titl…

因泰立科技:镭眸T51激光雷达,打造智能门控新生态

在高端门控行业&#xff0c;安全与效率是永恒的追求。如今&#xff0c;随着科技的飞速发展&#xff0c;激光雷达与TOF相机技术的融合&#xff0c;为门控系统带来了前所未有的智能感知能力&#xff0c;开启了精准守护的新时代。因泰立科技的镭眸T51激光雷达&#xff0c;作为这一…

MyBatisPlus--快速入门

MyBatisPlus介绍 从名字中就可以感觉到MybatisPlus与MyBatis之间的渊源&#xff0c;而MyBatis是一个非常流行的持久层框架&#xff0c;主要来做数据库的增删改查&#xff0c;而MyBatisPlus这种命名方式让人不得不往MyBatis的升级版去联想&#xff0c;事实也确实如此&#xff0…

redis持久化策略

RDB 是通过生成数据快照来实现持久化的&#xff0c;相当于给内存中的数据拍一张"照片"保存到磁盘上。AOF 记录所有写操作命令&#xff0c;以Redis协议格式追加到文件末尾。 RDB 在满足特定条件时触发内存快照&#xff0c;生成新的RDB文件替换旧文件 AOF 先写入内…

Spring Boot中使用@JsonAnyGetter和@JsonAnySetter处理动态JSON属性

Spring Boot 中使用 @JsonAnyGetter 和 @JsonAnySetter 处理动态 JSON 属性 在实际的后端开发中,尤其是使用 Spring Boot 构建 API 时,我们经常会遇到需要处理动态 JSON 属性的场景。例如,前端传递过来的 JSON 数据结构不固定,或者业务需求变更频繁,导致实体类无法预先定…

拉取gitlab项目

一、下载nvm管理node 先下载配置好nvm,再用nvm下载node 下载链接&#xff1a;开始 下载nvm - nvm中文官网 情况&#xff1a;npm i 下载依赖缓慢&#xff0c;可能是node版本不对&#xff0c;可能node版本太高 可能得问题&#xff1a;使用nvm 下载低版本的node时&#xff0c;…

【解决办法】ubuntu重启不起来,输入用户名和密码进不去,又重新返回登录页。

项目场景&#xff1a; ubuntu重启不起来&#xff0c;输入用户名和密码进不去&#xff0c;又重新返回登录页。 问题描述 在华硕天选一代笔记本上面安装了ubuntu22.04.5桌面版&#xff0c;但是重启以后出现&#xff0c;输入了用户名和密码&#xff0c;等待一会还让输入用户名和…

# 云端大模型:智能时代的新引擎

云端大模型&#xff1a;智能时代的新引擎 在人工智能技术的迅猛发展中&#xff0c;云端大模型扮演着至关重要的角色。它们不仅推动了技术的边界&#xff0c;也为各行各业带来了前所未有的机遇。本文将结合一系列图片和代码示例&#xff0c;深入探讨云端大模型的功能、应用及其…

(1)pytest简介和环境准备

1. pytest简介 pytest是python的一种单元测试框架&#xff0c;与python自带的unittest测试框架类似&#xff0c;但是比unittest框架使用起来更简洁&#xff0c;效率更高。根据pytest的官方网站介绍&#xff0c;它具有如下特点&#xff1a; 非常容易上手&#xff0c;入门简单&a…

实验设计与分析(第6版,Montgomery)第5章析因设计引导5.7节思考题5.5 R语言解题

本文是实验设计与分析&#xff08;第6版&#xff0c;Montgomery著&#xff0c;傅珏生译) 第5章析因设计引导5.7节思考题5.5 R语言解题。主要涉及方差分析&#xff0c;正态假设检验&#xff0c;残差分析&#xff0c;交互作用图。 dataframe <-data.frame( wrapc(17,20,12,9,…

线程池的详细知识(含有工厂模式)

前言 下午学习了线程池的知识。重点探究了ThreadPoolExecutor里面的各种参数的含义。我详细了解了这部分的知识。其中有一个参数涉及工厂模式&#xff0c;我将这一部分知识分享给大家~ 线程池的详细介绍(含工厂模式) 结语 分享到此结束啦。byebye~

嵌入式开发学习(第二阶段 C语言笔记)

内存操作 我们对于内存操作需要依赖于string.h头文件中相关的函数库。 内存操作函数 内存填充 头文件&#xff1a;#include <string.h> 函数原型&#xff1a; void* memset(void *s,int c,size_t n)函数功能&#xff1a;将内存块s的前n个字节填充为c&#xff0c;一般…

C++学习-入门到精通【9】面向对象编程:继承

C学习-入门到精通【9】面向对象编程&#xff1a;继承 目录 C学习-入门到精通【9】面向对象编程&#xff1a;继承一、基类与派生类CommunityMember类的继承层次结构如何定义一个派生类呢 二、基类和派生类间的关系1.创建并使用类CommissionEmployee2.不使用继承创建类BasePlusCo…

黑马k8s(十七)

一&#xff1a;高级存储 1.高级存储-pv和pvc介绍 2.高级存储-pv 3.高级存储-pvc 最后一个改成5gi pvc3是没有来绑定成功的 pv3没有绑定 删除pod、和pvc&#xff0c;观察状态&#xff1a; 4.高级存储-pc和pvc的生命周期 二&#xff1a;配置存储 1.配置存储-ConfigMap 2.配…

cf每日刷题c++

目录 Simple Repetition&#xff08;1000&#xff09; Fashionable Array&#xff08;800&#xff09; Kevin and Arithmetic(800) Permutation Warm-Up(800) Game of Mathletes(900) LRC and VIP(800) Simple Repetition&#xff08;1000&#xff09; https://codeforc…

历年中国科学技术大学计算机保研上机真题

2025中国科学技术大学计算机保研上机真题 2024中国科学技术大学计算机保研上机真题 2023中国科学技术大学计算机保研上机真题 在线测评链接&#xff1a;https://pgcode.cn/school?classification1 拆分数字 题目描述 给定一个数字&#xff0c;拆分成若干个数字之和&#xff…

PHP学习笔记(十一)

类常量 可以把在类中始终保持不变的值定义为常量&#xff0c;类常量的默认可见性是public。 接口中也可以定义常量。 可以用一个变量来动态调用类&#xff0c;但该变量的值不能为关键字 需要注意的是类常量只为每个类分配一次&#xff0c;而不是为每个类的实例分配。 特殊的…

Nginx 性能优化全解析:从进程到安全的深度实践

一、进程优化&#xff1a;释放硬件性能潜力 Nginx 通过多工作进程处理请求&#xff0c;合理配置进程参数能充分利用 CPU 资源&#xff0c;避免资源浪费。 1.1 worker_processes 参数详解 worker_processes用于设置 Nginx 工作进程的数量&#xff0c;它直接影响 Nginx 对 CP…

中国移动咪咕助力第五届全国人工智能大赛“AI+数智创新”专项赛道开展

第五届全国人工智能大赛由鹏城实验室主办&#xff0c;新一代人工智能产业技术创新战略联盟承办&#xff0c;华为、中国移动、鹏城实验室科教基金会等单位协办&#xff0c;广东省人工智能与机器人学会支持。 大赛发布“AI图像编码”、“AI增强视频质量评价”、“AI数智创新”三大…

《 PyTorch 2.3革新:torch.compile自动生成CUDA优化内核全解》

CUDA作为NVIDIA推出的并行计算平台和编程模型&#xff0c;为GPU计算提供了强大的支持&#xff0c;但手动优化CUDA代码不仅需要深厚的专业知识&#xff0c;而且过程繁琐、耗时费力&#xff0c;torch.compile的出现&#xff0c;犹如一道曙光&#xff0c;为解决这一困境带来了全新…