Python实现将MongoDB中的数据导入到MySQL
作者:小小鸟爱吃辣条 发布时间:2024-01-21 04:41:01
标签:Python,MongoDB,MySQL
本文主要介绍了一个将 MongoDB 中的数据导入到 MySQL 中的 Python 工具类 MongoToMysql。该工具类实现了获取 MongoDB 数据类型、创建 MySQL 表结构以及将数据从 MongoDB 推送到 MySQL 等功能。
通过该工具类,用户可以轻松地将 MongoDB 中的数据导入到 MySQL 中,实现数据的转移和使用。
使用该工具类,用户需要传入相应的参数,包括 MongoDB 的连接信息,MySQL 的连接信息,以及表名、是否设置最大长度、批处理大小和表描述等信息。具体使用可以参考代码中的注释。
实现代码
import pymysql
from loguru import logger
class MongoToMysql:
def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user,
mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''):
self.mongo_host = mongo_host
self.mongo_port = mongo_port
self.mongo_db = mongo_db
self.mongo_collection = mongo_collection
self.mysql_host = mysql_host
self.mysql_port = mysql_port
self.mysql_user = mysql_user
self.mysql_password = mysql_password
self.mysql_db = mysql_db
self.table_name = table_name
self.set_max_length = set_max_length
self.batch_size = batch_size
self.table_description = table_description
self.data_types = self.get_mongo_data_types()
self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description)
self.push_data_to_mysql(self.batch_size)
def get_mongo_data_types(self):
logger.debug('正在获取mongo中字段的类型!')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
data_types = {}
for field in collection.find_one().keys():
data_types[field] = type(collection.find_one()[field]).__name__
return data_types
def check_mysql_table_exists(self):
logger.debug('检查是否存在该表,有则删之!')
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
sql = f"DROP TABLE IF EXISTS {self.mongo_collection}"
cursor.execute(sql)
conn.commit()
conn.close()
def get_max_length(self, field):
logger.debug(f'正在获取字段 {field} 最大长度......')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
max_length = 0
for item in collection.find({},{field:1,'_id':0}):
value = item.get(field)
if isinstance(value, str) and len(value) > max_length:
max_length = len(value)
return max_length
def create_mysql_table(self, data_types,set_max_length,table_description):
logger.debug('正在mysql中创建表结构!')
self.check_mysql_table_exists()
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
if self.table_name:
table_name = self.table_name
else:
table_name = self.mongo_collection
fields = []
for field, data_type in data_types.items():
if data_type == 'int':
fields.append(f"{field} INT")
elif data_type == 'float':
fields.append(f"{field} FLOAT")
elif data_type == 'bool':
fields.append(f"{field} BOOLEAN")
else:
if set_max_length:
fields.append(f"{field} TEXT)")
else:
max_length = self.get_max_length(field)
fields.append(f"{field} VARCHAR({max_length + 200})")
fields_str = ','.join(fields)
sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'"
cursor.execute(sql)
conn.commit()
conn.close()
def push_data_to_mysql(self, batch_size=10000):
logger.debug('--- 正在准备从mongo中每次推送10000条数据到mysql ----')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
if self.table_name:
table_name = self.table_name
else:
table_name = self.mongo_collection
# table_name = self.mongo_collection
data = []
count = 0
for item in collection.find():
count += 1
row = []
for field, data_type in self.data_types.items():
value = item.get(field)
if value is None:
row.append(None)
elif data_type == 'int':
row.append(str(item.get(field, 0)))
elif data_type == 'float':
row.append(str(item.get(field, 0.0)))
elif data_type == 'bool':
row.append(str(item.get(field, False)))
else:
row.append(str(item.get(field, '')))
data.append(row)
if count % batch_size == 0:
placeholders = ','.join(['%s'] * len(data[0]))
sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
cursor.executemany(sql, data)
conn.commit()
data = []
logger.debug(f'--- 已完成推送:{count} 条数据! ----')
if data:
placeholders = ','.join(['%s'] * len(data[0]))
sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
cursor.executemany(sql, data)
conn.commit()
logger.debug(f'--- 已完成推送:{count} 条数据! ----')
conn.close()
if __name__ == '__main__':
"""MySQL"""
mongo_host = '127.0.0.1'
mongo_port = 27017
mongo_db = 'db_name'
mongo_collection = 'collection_name'
"""MongoDB"""
mysql_host = '127.0.0.1'
mysql_port = 3306
mysql_user = 'root'
mysql_password = '123456'
mysql_db = 'mysql_db'
table_description = '' # 表描述
mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
mysql_user, mysql_password, mysql_db,table_description=table_description)
#
# table_name = None # 默认为None 则MySQL中的表名跟Mongo保持一致
# set_max_length = False # 默认为False 根据mongo中字段最大长度 加200 来设置字段的VARCHART长度 , 否则定义TEXT类型
# batch_size = 10000 # 控制每次插入数据量的大小
# table_description = '' # 表描述
# mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
# mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)
代码使用了 PyMongo、PyMySQL 和 Loguru 等 Python 库,并封装了一个 MongoToMysql 类。在类的初始化时,会自动获取 MongoDB 中字段的类型,并根据字段类型创建 MySQL 表结构。在将数据从 MongoDB 推送到 MySQL 时,还可以控制每次插入数据量的大小,以避免一次性插入大量数据造成系统崩溃或性能下降。
需要注意的是,在创建 MySQL 表结构时,如果用户选择了设置最大长度,则会创建 TEXT 类型的字段,否则会根据 MongoDB 中字段的最大长度加上200来设置 VARCHAR 类型的字段长度。
总之,本文介绍的 MongoToMysql 工具类非常方便实用,对于需要将 MongoDB 数据迁移到 MySQL 的用户来说,是一种很好的解决方案。
来源:https://juejin.cn/post/7229152434801721402


猜你喜欢
- 比较好的地方就是js数组的操作,不重复的数组id显示,完美实现。<script language="JavaScript&q
- 今天在设置input的readonly属性遇到问题,上网查到下面的内容,作个标记。今天系统需要使用javascript 动态设置textbo
- 1、流程控制流程控制在编程语言中是最伟大的发明了,因为有了它,你可以通过很简单的流程描述来表达很复杂的逻辑。流程控制包含分三大类:条件判断,
- 在学习OpenCV或者其他关于Python技术的时候,我们通常需要准备不同的Python环境,我选择了Anaconda作为我的Python环
- 高可用架构对于互联网服务基本是标配,无论是应用服务还是数据库服务都需要做到高可用。虽然互联网服务号称7*24小时不间断服务,但多多少少有一些
- 前言:大家跟我一起念,Python * 好,跟着本宝宝用Python抢火车票首先我们需要splinter安装:pip install spli
- 描述: 日志按日期、大小回滚代码:# -*- coding: utf-8 -*-import osimport logging.handle
- 无法打开用户默认数据库,登录失败,其原因是登录帐户的默认数据库被删除。 解决办
- 静态链表和动态链表区别静态链表和动态链表的共同点是,数据之间"一对一"的逻辑关系都是依靠指针(静态链表中称"游
- 本文实例讲述了CentOS7系统搭建LAMP及更新PHP版本操作。分享给大家供大家参考,具体如下:搭建LAMP环境 用yum安装安装Apac
- 前些天有位网友建议我在博客中添加RSS订阅功能,觉得挺好,所以自己抽空看了一下如何在Django中添加RSS功能,发现使用Django中的s
- 列表解析——用来动态地创建列表[expr for iter_var in iterable if cond_expr]例子一:map(lam
- 这篇文章主要介绍了python3 pathlib库Path类方法总结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习
- 1.自定义线程池import threadingimport Queueimport timequeue = Queue.Queue()de
- 本意是为了和手写jdbc对照,不过不要和原来的手写连接重名。打开cmd,直接输入notepad就打开了记事本。jdk1.5之后不必配置cla
- 安装模块windows:pip install pymysqlubuntu:sudo pip3 install pymysqlpython操
- 一个项目开发完毕后总有一种想法,就是生成可执行文件,总不能一直用python xxx执行吧。以下操作同时适用于windows和Linux下的
- Python 条件语句Python条件语句是通过一条或多条语句的执行结果(True或者False)来决定执行的代码块。可以通过下图来简单了解
- 本文实例讲述了Yii2框架实现登陆添加验证码功能。分享给大家供大家参考,具体如下:models中LoginForm.phppublic $v
- 微信更新后出来了一块比较火的小游戏,要是一款不涉及到排行的游戏,可能 没人去关注这款游戏。最开自己一直苦练技术,想在微信排行上面装一装,练了