MongoDB, MySQL和Redis的区别和使用场景
- MySQL是关系型数据库,支持事件
- MongoDB和Redis是非关系型数据库,不支持事件
- 如何快速选择
- 希望速度快的时候,使用MongoDB或者Redis
- 数据量过大的时候,可以将频繁使用的数据存入Redis,其他的存入MongoDB
- MongoDB不用提前建表建数据库,使用方便,字段数量不确定的时候使用MongoDB
- 后续需要用到数据之间的关系,可以考虑MySQL
数据库常用操作
查询所有的数据库
show dbs
切换到某个数据库
use databaseName
Notice:
- 当该数据库存在时,会切换到该数据库
- 当数据库不存在时,会自动创建该数据库,并切换到该数据库
查看当前所处的数据库
db
集合常用操作
创建一个新的集合
db.createCollection(name,{capped,autoIndexId,size,max,...})
查询数据
db.collection.find({cretiria}):
返回所有满足过滤条件的文档
db.collection.findOne({cretiria}):
返回满足过滤条件的第一个文档
pretty():
将输出文档进行格式化
索引文档嵌套字段
对于文档:
{
"address": {
"city": "Los Angeles",
"state": "California",
"pincode": "123"
},
"tags": [
"music",
"cricket",
"blogs"
],
"name": "Tom Benzamin"
}
假设我们需要通过city,state,pincode字段来检索文档
db.users.find({'address.city':'Los Angeles'})
逻辑运算符
and :{key1:value1,key2:value2}
or :{$or:[{key1:value1},{key2:value2}]}
and ,or: {key1:value1,$or:[{key2:value2},{key3:value3}]}
范围运算符
$in: 等于数组内某个值,{age:{$in:[18,20]}}
$nin:不等于数组内任何一个值,{age:{$nin:[18,20]}}
投影
db.collection.find({},{field1:value,field2:value}):
- 0:表示不显示
- 1:表示显示
Notice: -
_id字段默认显示; - 括号内除
_id字段,包括项(1)和排除项(0)不能同时使用
MongoDB 的更新操作
以下语句会将 name 为
xiaowang的整个文档替换为{'name':'xiaozhao'}
db.collection.updateOne({'name':'xiaowang'},{'name':'xiaozhao'})
db.collection.replaceOne({'name':'xiaowang'},{'name':'xiaozhao'})以下语句会将 name 为
xiaowang的这个文档中的 name 字段更新为 xiaozhao
db.collection.updateOne({'name':'xiaowang'},{$set:{'name':'xiaozhao'}})如果希望所有满足条件的数据都得到更新,可以使用
db.collection.updateMany({'name':'xiaowang'},{$set:{'name':'xiaozhao'}})如果将参数
upsert设置为 true,那么当没有找到满足条件的文档时,插入的文档将作为新文档插入
db.collection.updateOne({'name':'xiaowang'},{'name':'xiaozhao'},{'upsert':true})
MongoDB 删除操作
删除满足条件的第一条数据
db.collection.deleteOne({criterion})删除满足条件的所有数据
db.collection.deleteMany({critetion})
统计个数
方法count()用于统计结果集合中文档的数量
db.collectionName.find({criteria}).count()
Notice:在新版本4.0中建议使用countDocuments(criteria,limit,skip,...)代替,上面的语句等价于:
db.collectionName.countDocuments({criteria})
相当于:
db.collection.aggregate([
{$match:<query>},
{$group:{_id:null,n:{$sum:1}}},
])
消除重复
distinct()返回文档特定字段的不同值
db.collectionName.distinct('去重字段',{条件})
db.stu.distinct(hometown,{age:{$gt:18}})
skip(), limit(), sort() 三个函数在一天语句中的时候,执行的顺序是 sort(), skip(), 最后是limit().
sort({field1:num1, field2:num2}): 1表示升序排列,-1表示降序排列
建立索引
MongoDB使用createIndex()函数来创建索引,3.0版本之前使用db.collection.ensureIndex() 函数创建索引。
默认情况下,索引字段是可以有重复的
-
如果想要创建唯一值索引,可以使用:
db.collection.createIndex({field:1},{unique:true})
-
创建联合索引(对多个字段同时创建索引)
db.collection.createIndex({field1:1,field2:-1})
-
查看集合内文档的所有索引
db.collection.getIndexes()
-
删除索引:
-
db.collection.dropIndexes()## 删除所有索引 -
db.collection.dropIndex(index_name)## 删除指定索引
-
数据库的备份和恢复
数据库备份语句:
mongodump -h dbhost -d dbname -o dbdirectory
-
dbhost:服务器地址, -
dbname:要备份的数据库 -
dbdirectory:存放备份数据库的路径
Notice:如果要备份本地数据库,可直接使用:
mongodump -o dbdirectory ## 会把所有的数据库文档进行备份
mongodump -d databaseName -o dbdirectory ## 会把指定的数据库进行备份
数据库恢复语句:
mongorestore -h dbhost -d dbname --dir directory
-
dbhost:服务器地址, -
dbname:用于存放数据的数据库名称 -
directory:存放备份数据库集合的路径
Notice:如果要恢复到本地数据库,可直接使用:
mongorestore -d dbname --dir directory
Notice:这里的directory指的是集合所在的路径,是备份数据库时所指定的路径dbdirectory的下一级文件夹
数据聚合操作
聚合(aggregate)是基于数据处理的聚合管道,每个文档通过一个由多个阶段(stage)组成的管道,可以对每个阶段的管道进行分组、过滤等功能,然后经过一系列的处理,输出相应的结果。
常用的管道:
-
$match:过滤数据,只输出满足条件的文档 -
$group:将集合中的文档分组,用于统计结果 -
$project:修改输入文档的结构,如重命名、增加、删除字段、创建计算结果 -
$sort:将输入文档排序后输出 -
$skip:跳过指定数量的文档,并返回余下的文档 -
$limit:限制通过该阶段的文档数 -
$unwind:将数组类型的字段拆分
$group会根据_id后面的字段进行分组,取字段的值的时候需要用$运算符
db.collection.aggregate([
{`$match`:{`field`:value}},
{`$group`:{'_id':field,count:{`$sum`:1},'avg_age':{'$avg':'$age'}}},
{`$project`:{'cate':'_id','counter':'$count','_id':0,'avg_age':1,}},
])
$group还可以根据多个字段进行分组
db.collection.aggregate([
{$group:{'_id':{'country':'$country','city':'$city'}}},
## 分组之后的结果是{_id:{country:value,city:value}}
## 取字典嵌套的字段的值,可以用`_id.country`,`_id.city`
{$group:{'_id':{'country':'$_id.country','city':'$_id.city'},'count':{$sum:1}}},
{$project:{'country':'$_id.country','city':'$_id.city','_id':0}}
])
$group的另一种用法:用于统计所有文档
db.collection.aggregate([
{`$match`:{`field`:value}},
{`$group`:{'_id':null,count:{`$sum`:1},'avg_age':{'$avg':'$age'}}},
])
$sort可对多个字段进行排序
db.collection.aggregate([
{$sort:{field1:1,field2:-1}}
])
$unwind将文档中某数组类型的字段拆分成多条,每条包含数组中的一个值
db.t.insertOne({'_id':1,'item':'t-shirt',`size`:['S','M','L']})
db.t.aggregate([
{$unwind:'$size'},
])
结果为:
{'_id':1,'item':'t-shirt','size':'S'}
{'_id':1,'item':'t-shirt','size':'M'}
{'_id':1,'item':'t-shirt','size':'L'}
Notice:在默认情况下,使用$unwind对数组字段展开时,会自动忽略null和空数组,
如果想要保留null和空数组,可以设置参数preserveNullAndEmptyArrays为true。
db.collection.aggregate({$unwind:{path:'$field',preserveNullAndEmptyArrays:true}})
explain():可用于查看命令执行的详细情况,输出的信息详细级别依次为:queryPlanner(默认),executionStats和allPlanExecution
db.collection.explain('executionStats').find()
MongoDB 原子操作常用命令
$set: 用来指定一个键并更新键值,若该键不存在,就创建该键
{$set:{field:value}}
$inc:对文档的某个数字类的键进行增减操作
{$inc:{field:value}}
$push:把value追加到field里面,field一定要是数组类型才行,如果field不存在,会新增加一个数组类型进去。
{$push:{field:value}}
$pushAll:同$push,只是一次可以追加多个值到一个数组字段内。
{$pushAll:{field:value_array}}
$pull:从数组field内删除一个等于value的值
{$pull:{field:_value}}
$pop:删除数组的第一个或最后一个元素
{$pop:{field:1}}
$rename:修改字段名称
{$rename:{old_field_name:new_field_name}}
正则表达式
MongoDB使用$regex操作符来设置匹配字符串的正则表达式。
使用正则表达式之前不需要做任何配置。
考虑以下文档:
{
"post_text": "enjoy the mongodb articles on runoob",
"tags": [
"mongodb",
"runoob"
]
}
使用正则表达式查找 post_text 包含 Runoob 字符串的文章
db.posts.find({post_text:{$regex:'Runoob'}})
db.posts.find({post_text:/Runoob/})
## 查找以enjoy开头
db.posts.find({post_text:/^enjoy/})
## 查找以runoob结尾
db.posts.find({post_text:/Runoob$/})
如果需要检索不区分大小写,可以设置$options为$i
以下命令将查找不区分大小写的字符串runoob
db.posts.find({post_text:{$regex:"runoob",$options:"$i"}})
优化正则表达式查询
- 如果文档中设置了索引,那么使用索引相比于正则表达式匹配查找所有的数据查询速度更快。
- 如果正则表达式是前缀表达式,所有匹配的数据将以指定的前缀字符串开始;
MongoDB 固定集合 (Capped Collection)
MongoDB 固定集合是性能出色,且有着固定大小的集合,就像一个环形队列,当集合空间用完之后,再插入的元素就会覆盖最初始的元素。
Notice: 固定集合无法使用remove()函数删除部分文档,但是可以删除全部文档
创建固定集合
db.createCollection('cappedCollection',{capped:true,size:maxSize,max:maxNum})
其中:size是整个集合空间的大小,单位是KB;
max是集合文档个数的上限,单位是'个';
如果空间大小达到上限,插入新文档的时候,会覆盖第一个文档;
如果文档个数到达上限,插入新文档的时候,同样会覆盖第一个文档。
判断一个集合是否为固定集合:
db.collection.isCapped()
如果需要将一个已经存在的集合转换为固定集合,可以使用:
db.runCommand({'convertToCapped':collection,size:value})
固定集合的属性
- 对固定集合进行插入数据,速度极快
- 按照插入顺序的查询,输出速度极快
- 能够在插入最新数据时,淘汰最早的数据
常用用途
- 存储日志信息
- 缓存一些少量的文档
启动 MongoDB 容器:
docker run -d --network host --name mongodb -e MONGO_INITDB_ROOT_USERNAME=admin -e MONGO_INITDB_ROOT_PASSWORD=admin mongo
-
从
MongoDB中删除集合, 有2种方式:-
db.coll.remove({}): 删除文件之后, 文件原本占用的物理空间不会被回收; 是删除符合条件的数据, 不会删除其他结构(如索引等); -
db.coll.drop(): 删除文件之后, 文件原本占用的物理空间会进行回收;
针对物理空间没有进行回收的情况, 可以使用
compact命令释放这些空间, 具体命令为db.runCommand({"compact":"collection_name"}). -
-
主币交易要求交易
hash(数据库中的_id)和代币交易的交易hash log_idxblock_number(transaction_hash,block_number,log_index)的组合是唯一值. 在使用pymongo批量插入时, 需要:try: eth_trans_coll.insert_many( trans, ordered=False, bypass_document_validation=True) except pymongo.errors.BulkWriteError as e: logging.debug(e)备注:
orderd=False时, 会并行插入数据库, 可以大幅提高插入效率; 同时, 即便部分数据插入时出错, 也不会影响其他数据的插入.bypass_document_validation=True可以绕过文件验证环节, 提高插入效率.
索引
删除索引
- 批量删除索引:
db.coll_name.dropIndexes([idx_name_1,idx_name_2])
explain 支持 queryPlanner(仅给出执行计划)、executionStats(给出执行计划并执行)和 allPlansExecution(前两种的结合)三种分析模式,默认使用的是 queryPlanner:
Mongo 会通过优化分析选择其中一种更好的方案放置到 winningPlan,最终的执行计划是 winningPlan 所描述的方式。其它稍次的方案则会被放置到 rejectedPlans 中,仅供参考。
所以 queryPlanner 的关注点是 winningPlan,如果希望排除其它杂项的干扰,可以直接只返回 winningPlan 即可:
db.coll_name.find().explain().queryPlanner.winningPlan
winningPlan 中,总执行流程分为若干个 stage(阶段),一个 stage 的分析基础可以是其它 stage 的输出结果。从这个案例来说,首先是通过 IXSCAN(索引扫描)的方式获取到初步结果(索引得到的结果是所有符合查询条件的文档在磁盘中的位置信息),再通过 FETCH 的方式提取到磁盘上各个位置所对应的完整文档。这是一种很常见的索引查询计划(explain 返回的是一个树形结构,实际先执行的是子 stage,再往上逐级执行父 stage)
除了 queryPlanner 之外,还有一种非常有用的 executionStats 模式:
db.coll_name.find().explain('executionStats').executionStats
总体上大同小异,一些关键字段可以了解一下:
-
nReturned:执行返回的文档数 -
executionTimeMillis: 执行时间(ms) -
totalKeysExamined:索引扫描条数 -
totalDocsExamined:文档扫描条数 -
executionStages:执行步骤
MongoBD的索引采用B树结构, 相较于时间复杂度为O(1)的哈希结构, B树虽然效率为O(logN), 但它允许范围查询, 这是哈希树无法做到的. 但是以下语句无法使用索引:
- 正则表达式,及'非'操作符, 如
$nin,$not等; - 算术运算符, 如
$mod等; -
$where子句;
覆盖索引
常见的执行步骤为:
- 在内存中扫描索引(
IXSCAN), 得到需要的结果(完整文档在磁盘上的位置); - 从磁盘上根据索引结果, 拉取特定位置的文档(
FETCH); - 执行字段映射, 得到需要的字段;
当查询的字段和要返回的字段都在一个组合索引中, 数据库就不需要取磁盘抓取完整的文档了. _id这个字段默认会返回, 但是如果不需要的化, 可以不让其返回.
文件排序
索引的2个主要用途就是分组和排序, 使用索引筛选, 也是建立在排序的基础上. 所以可以在排序字段和筛选字段上建立组合索引,
MongoDB返回的文档顺序和查询时的扫描顺序一致, 扫描顺序又分为集合扫描(COLLSCAN)和索引扫描(IXSCAN);
- 集合中没有索引, 或者索引失效时,
MongoDB会扫描全部文档, 扫描的顺序和该集合中的文档在磁盘上的存储顺序相同; - 当扫描索引时, 会在内存中对索引值进行匹配, 得到所有等匹配到的文档后, 再根据情况决定是否到磁盘拉取相应的文档, 或进行下一阶段. 因为索引本身就是排好序的, 所以扫描顺序就取决于索引本身的顺序;
常见的几个阶段:
-
IXSCAN:通过索引, 扫描出相关文档的位置; -
FETCH: 根据前面扫描到的位置到磁盘上抓取相应的完整文档; -
SORT_KEY_GENERATOR: 获取每一个文档排序所用的健值; -
SORT: 进行内存排序, 返回最终结果;(在内存中排序, 效率非常低, 当结果集大于32MB时, 还会返回错误).
批量插入/更新
BTC 的主币交易和代币交易都是使用 transaction_hash 作为 _id, 插入重复数据时, 可以直接使用 updateone 和 bulk_write;
其他币种的代币交易在transaction_hash, block_number, log_index上建有唯一值索引, 需要根据这三个字段筛选数据, 有两种方式
# 逐个指定需要更新字段
tasks = [UpdateOne({'transaction_hash': token_transfer['transaction_hash'],
'block_number': token_transfer['block_number'],
'log_index': token_transfer['log_index']},
{'$set':{'from_address':token_transfer['from_address'],
'to_address':token_transfer['to_address']}}, upsert=True)
for token_transfer in token_trans]
trans_coll.bulk_write(
tasks, ordered=False, bypass_document_validation=True)
# 把重复数据删除 再批量插入
tasks = [DeleteOne({'transaction_hash': token_transfer['transaction_hash'],
'block_number': token_transfer['block_number'],
'log_index': token_transfer['log_index'], })
for token_transfer in token_trans]
trans_coll.bulk_write(
tasks, ordered=False, bypass_document_validation=True)
token_transfer_coll.insert_many(
token_trans, ordered=False, bypass_document_validation=True)
# 错误的方式 整体更新会更变_id, 而MongoDB中 _id 不能更改,所以会报错
tasks = [UpdateOne({'transaction_hash': token_transfer['transaction_hash'],
'block_number': token_transfer['block_number'],
'log_index': token_transfer['log_index']},
{'$set':token_transfer, upsert=True)
for token_transfer in token_trans]
trans_coll.bulk_write(
tasks, ordered=False, bypass_document_validation=True)
TIPS
-
$OR: 很可能会导致索引失效, 最好能用$IN代替, 如:# 存在索引 "province_1_city_1" db.collection.find({ $or: [{ province: "广东省" }, { province: "福建省" }], city: "深圳市", }) # 建议使用 db.collection.find({ province: { $in: ["广东省", "福建省"] }, city: "深圳市", }) -
在数组中遍历:
数据样例:{ "_id" : "1d8149eb8d8475b98113b5011cf70e0b7a4dccff71286d28b8b4b641f94f1e46", "block_number" : 700000, "block_timestamp" : ISODate("2021-09-11T04:14:32Z"), "value" : NumberDecimal("640388640"), "is_coinbase" : true, "inputs" : [ [ "0000000000000000000000000000000000000000000000000000000000000000", NumberLong("4294967295") ] ], "outputs" : [ [ "bc1q7wedv4zdu5smt3shljpu5mgns48jn299mukymc", 640388640 ], [ ], [ ] ] } # 需要查询outputs中包含指定地址的交易 db.trans_test.find({'outputs':{'$elemMatch':{'0':{'$eq':'bc1q7wedv4zdu5smt3shljpu5mgns48jn299mukymc'}}}}) db.trans_test.find({'outputs':{'$elemMatch':{'0':'bc1q7wedv4zdu5smt3shljpu5mgns48jn299mukymc'}}})
优化案例
- 在
time和value上创建组合索引,db.usdt_token_trans.createIndex({'time':1,'value':1}): - 分别在
time和value上创建索引:
> db.usdt_token_trans.find({$and:[{time:{$gte:ISODate("2021-05-01T18:11:00Z")}},
{time:{$lte:ISODate("2021-05-01T19:11:00Z")}},
{value:{$gte:NumberLong("2998999900")}},
{value:{$lte:NumberLong("2999000100")}}]}).explain("executionStats")
# 创建组合索引时:
"executionStats" : {
"executionSuccess" : true,
"nReturned" : 520,
"executionTimeMillis" : 39,
"totalKeysExamined" : 5411,
"totalDocsExamined" : 520,
"indexBounds" : {
"time" : [
"[new Date(1619892660000), new Date(1619896260000)]"
],
"value" : [
"[49999999900, inf.0]"
]
# 在 time 和 value 上单独创建索引时
"executionStats" : {
"executionSuccess" : true,
"nReturned" : 520,
"executionTimeMillis" : 85,
"totalKeysExamined" : 33226,
"totalDocsExamined" : 33226,
"indexBounds" : {
"time" : [
"[new Date(1619892660000), new Date(1619896260000)]"
]
},
-
由上面的例子可以看出, 创建组合索引时, 查询时需要遵守
最佳左前缀法则(先使用前面的字段, 然后再使用后面的字段, 否则就是全文扫描);- 组合索引前面的字段是范围查询时, 后面字段的索引依然可以使用;
-
在多个子段上分别建立索引时, 如果查询条件只有该字段时, 会使用索引扫描, 速度比组合索引稍快;
但如果查询条件是多个字段时, 也只会选择一个索引, 速度比组合索引慢很多.
# 创建组合索引"value_1_time_1"
"executionStages" : {
"stage" : "FETCH",
"nReturned" : 13,
"executionTimeMillisEstimate" : 5,
"indexName" : "value_1_time_1",
"indexBounds" : {
"value" : [
"[2998999900, 2999000100]"
],
"time" : [
"[new Date(1619892660000), new Date(1619896260000)]"
]
},
# 创建组合索引 "time_1_value_1"
"executionStats" : {
"executionSuccess" : true,
"nReturned" : 13,
"executionTimeMillis" : 58,
"indexName" : "time_1_value_1",
"indexBounds" : {
"time" : [
"[new Date(1619892660000), new Date(1619896260000)]"
],
"value" : [
"[2998999900, 2999000100]"
]
},
- 两者都是使用组合索引, 只是索引中字段的顺序不一样, 查询速度竟然差别这么大. 不明白为什么? 可能的原因是:
-
time和value的索引空间不同,value的差异性比较小, 索引占用的空间也比较小, 所以更快;
-
查询非空集合
-
MongoDB查询非空集合,并比较几种用法的效率
db.testcollection.find({"field1.0":{$exists: true}})
db.testcollection.find({"field1":{'$elemMatch':{'$ne':null}}})
db.testcollection.find({$where:"this.field1.length>0"})
db.testcollection.find({"field1":{$gt: []}})
# 在管道中可以这样使用,效率依次下降
{'$match':{'outputs':{'$ne':[]}}},
{'$match':{'outputs':{'$elemMatch':{'ne':null}}}},
{'$match':{'outputs.0':{'$exists':true}}},
{'$match':{'outputs':{'$gt':['$size',0]}}},
allowDiskUse
当数据集过大时, 使用 MongoDB进行查询会突破内存上线,报错信息为‘'QueryExceededMemoryLimitNoDiskUseAllowed'’, 这时可以在 aggregate()加上一个参数 {allowDiskUse:true}以便可以使用磁盘进行缓存, 即:db.coll.aggregate([{}...],{'allowDiskUse':true}). 调用pymongo模块时,需要写成addr_tag_testdata.aggregate([{}...], allowDiskUse=True)的形式.
Decimal格式
-
在
Python中使用decimal格式可以使用decimal模块:import decimal # 可以查看 decimal 的一些设置参数 decimal.getcontext() # 以下为输出: Context(prec=28, rounding=ROUND_HALF_EVEN, Emin=-999999, Emax=999999, capitals=1, clamp=0, flags=[], traps=[Overflow, DivisionByZero, InvalidOperation]) # 修改确定精度最后一位的方式 decimal.getcontext().rounding = ROUND_UP # 'ROUND_HALF_EVEN':5以下舍弃,6以上加1;'ROUND_UP':向上取;'ROUND_DOWN':向下取 # 指定精度为6位(指总位数 包括整数位), 默认为28位; decimal.getcontext().prec=6 decimal.Decimal(arg) # 把科学计算法表示的数字转换成普通方法表示的数字, 可以用以下方法 decimal.Decimal(eval(sci_number))-
Decimal()的参数可以是整型, 也可以是字符串, 但不能是浮点型. - 该类型可以进行四则运算;
- 无法直接存入
MongoDB中; -
建议: 在平常使用时, 通过
Decimal()或to_decimal()转化为Decimal类型; 需要存到MongoDB中时, 再转化成bson.decimal128.Decimal128()类型, 从MongoDB取出之后, 再通过to_decimal()转化为Decimal类型.
-
-
bson.decimal128.Decima128()可以直接存到MongoDB中, 具体方法为:from bson.decimal128 import Decimal128 from decimal import Decimal Decimal128('1') Decimal128(Decimal(1)) # 在MongoDB中显示为: NumberDecimal("1")-
Decimal128()只能接受decimal.Decimal()类型和字符串类型作为参数, 不接受整型和浮点型; -
Decimal128()类型无法进行四则运算, 需要通过Decimal128().to_decimal()转换成Python自带的Decimal类型才可以. - 在
MongoDB的shell中, 可以使用NumberDecimal('111')直接创建Decimal类型;toDecimal()可以将其他类型转化为Decimal格式; 最大可以容纳34位的十进制数值. -
注意: 任何类型转化为浮点型后, 都可能会发生变化, 如字符串或整型
1转化为浮点型后可能变成1.00000000000000456; 如果需要非常精确地表示, 需要使用decimal.Decimal('1'), 这样不论结果扩大多少倍, 后面依然是0.
-
Session
如果客户端和MongoDB服务器在30分钟内没有互动, 服务器会自动释放客户端连接, 解决方案有2个:
-
创建 session 时, 指定 session_id, 并且每5分钟刷新一次该 session_id (官方推荐方案);
# 可以使用该方法获取 session_id with client.start_session() as session: session_id = session.session_id['id'] # 但是没有在 pymongo api 中找到刷新 session 的方法在 mongo shell 中可通过以下方式实现: 参考文档
# 创建新的 session, 并获取对应的 session_id var session = db.getMongo().startSession() var sessionId = session.getSessionId().id var cursor = session.getDatabase("examples").getCollection("data").find().noCursorTimeout() # 每隔5分钟刷新一次该 session 的连接 var refreshTimestamp = new Date() // take note of time at operation start while (cursor.hasNext()) { // Check if more than 5 minutes have passed since the last refresh if ( (new Date()-refreshTimestamp)/1000 > 300 ) { print("refreshing session") db.adminCommand({"refreshSessions" : [sessionId]}) refreshTimestamp = new Date() } // process cursor normally } 对集合操作时, 指定 session, 参考文档
with client.start_session() as session:
db.coll.find({}, session=session)
-
查询数据时, 通过
batch_size指定每个 batch 的大小, 保证该 batch 在30分钟内肯定能执行完;# 依然会返回全部数据, 但是每个 batch 只返回30条 迭代的每个 document 也一样 for document in db.collection.find().batch_size(30) print(document) import time import pymongo # 当 batch_size = 2 时, 耗时:4.8 S # 当 batch_size = 100 时, 耗时:0.42 S # 当 batch_size = 10000 时, 耗时:0.25 S # 不指定 batch_size 时, 耗时: 0.255 S # counter 最终都是 13929 with pymongo.MongoClient('192.168.1.7', 27017)as client: db = client['btc'] block_info_coll = db['block_info'] result = block_info_coll.find({}).batch_size(10) t0 = time.time() counter = 0 for block_info in result: counter += 1 print(counter) t1 = time.time() print(f'共耗时:{t1-t0} S')
Date
Date()会返回当前时间 (字符串类型)
new Date() 会返回当前时间 (ISODate 类型)
也可以通过传递参数生成指定日期的时间. 可接受的参数类型有以下几种:
-
new Date('2022-01-01'): 返回 ISODate("2022-01-01T00:00:00Z")(UTC 时区) -
new Date('2022-01-01T08:00:00'): 返回 ISODate("2022-01-01T00:00:00Z") (会自动转换成UTC时区), 如果MongoDB的服务器时区就是UTC时区, 会返回 ISODate("2022-01-01T08:00:00Z") -
new Date('2022-01-01T08:00:00Z'): 返回 ISODate("2022-01-01T08:00:00Z") -
new Date(<integer>): 以毫秒为单位的 UNIX 时间戳;
数据导出/导入
将 MongoDB 中指定的集合导出为文件的命令:
mongoexport -d btc_b -c transaction_60w --type csv -o transaction_60w.csv --limit 10 --sort '{block_number:1}' -f "_id,block_number,block_timestamp,is_coinbase,value,inputs,outputs,new_inputs"
遇到的问题: 怎么在导入时指定字段的类型, 如 block_number.int32(), 对应的命令为:
mongoimport -d btc -c transaction_60w -f 'block_number.int32()' --type cs
v --columnsHaveTypes --drop --mode upsert transaction_60w.csv
使用该命令的难点在于: 导入时, 没有找到指定各个字段的类型.
可以导出为 json格式, 会自带类型, 但是会将二进制导出为字符串, 在导入时没有找到将其类型指定为二进制的方法.
# 导出命令
mongoexport -d btc_b -c transaction_60w --type json -o transaction_60w.json --limit 10 -f "_id,block_number,block_timestamp,is_coinbase,value,inputs,outputs,new_inputs"
# 导入命令
mongoimport -d btc -c transaction_60w --type json --jsonArray --drop --mode upsert transaction_60w.json
使用 mongodump导出指定的集合, 具体命令为:
# 导出指定集合
mongodump -d btc_b -c transaction_70w -o transaction_70w.bson
# 导入指定集合
mongorestore -d btc -c transaction_70w --drop transaction_70w.bson
