当前位置:   article > 正文

MySQL表结构转换为ES索引Mapping_将mysql的表转换成es的索引

将mysql的表转换成es的索引

背景

日常开发过程中肯定会存在MySQL表数据迁移至ES的情况,以canal为例,数据迁移时需要提前在ES中创建索引Mapping,但是如果碰到字段特别的表时,创建Mapping将是一件耗费心神的事情。为了解决这些重复工作,我使用Python编写了一个脚本,自动将MySQL中的表结构同步到ES中,本脚本只同步表结构,并不同步表数据,如需同步数据可以采用canal或者logstash等方式进行同步

脚本内容

如果不需要直接同步到es中,需要注释脚本最后一行,该脚本会将转换后的mapping信息打印到控制台中

在这里插入图片描述

import mysql.connector
import requests
import json

# MySQL连接配置
mysql_config = {
    'host': '127.0.0.1',
    'port': '3306',
    'user': 'root',
    'password': '123456',
    'database': 'test'
}

# Elasticsearch配置
es_host = '127.0.0.1'
es_port = '9200'
es_index = 'order1'


# 新版本es不需求type字段
# es_type = '_doc'

def fetch_mysql_table_fields(mysql_config):
    connection = mysql.connector.connect(**mysql_config)
    cursor = connection.cursor()

    # 获取MySQL表字段信息,指定需要转换得表名
    cursor.execute(f"DESCRIBE {"`order`"}")
    fields = cursor.fetchall()

    cursor.close()
    connection.close()

    return fields


def generate_es_mapping(fields):
    mapping = {
        "mappings": {
            "properties": {}
        }
    }

    for field in fields:
        field_name = field[0]
        field_type = field[1]

        # 根据MySQL字段类型设置Elasticsearch映射类型
        es_field_type = "text"  # 默认为文本类型
        if "int" in field_type:
            es_field_type = "integer"
        elif "bigint" in field_type:
            es_field_type = "long"
        elif "tinyint" in field_type:
            es_field_type = "short"
        elif "float" in field_type:
            es_field_type = "float"
        elif "double" in field_type:
            es_field_type = "double"
        elif "decimal" in field_type:
            es_field_type = "double"
        elif "date" in field_type or "datetime" in field_type or "timestamp" in field_type or "time" in field_type:
            es_field_type = "date"
        elif "json" in field_type:
            es_field_type = "object"

        # 这里可以根据需要添加更多类型的映射

        mapping["mappings"]["properties"][field_name] = {
            "type": es_field_type
        }

    return mapping


def print_es_mapping(mapping):
    print(json.dumps(mapping, indent=2))


def create_es_index_mapping(es_host, es_port, es_index, mapping):
    url = f"http://{es_host}:{es_port}/{es_index}"
    headers = {'Content-Type': 'application/json'}
    payload = json.dumps(mapping)

    response = requests.put(url, headers=headers, data=payload)

    if response.status_code == 200:
        print(f"Elasticsearch index mapping created for index '{es_index}'")
    else:
        print(f"Failed to create Elasticsearch index mapping. Status code: {response.status_code}")
        print(response.text)


if __name__ == "__main__":
    # 获取MySQL表字段信息
    table_fields = fetch_mysql_table_fields(mysql_config)

    # 生成Elasticsearch Mapping
    es_mapping = generate_es_mapping(table_fields)

    # 打印Elasticsearch Mapping到控制台
    print_es_mapping(es_mapping)

    # 创建Elasticsearch Index Mapping
    create_es_index_mapping(es_host, es_port, es_index, es_mapping)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/235751
推荐阅读
相关标签
  

闽ICP备14008679号