protocol buffer

protocol buffers 简介

protocol buffers 是由 google 开发的一款数据交换格式,可用于进行通信协议,数据存储。说人话就是类似于 XML、JSON 的一种用于序列化数据的东西,那么为什么不直接用 XML、JSON,而要重新开发 protocol buffers 呢?那还用说,当然是因为它又小又快。

简单概括:占容量小、速度快、平台无关、语言无关

protobuf 的简单使用

protobuf 的使用方法很简单,它比较类似于定义一个结构体,但是只有属性,没有方法。

另外,protobuf 目前有两个版本,一个是 proto2,另一个是 proto3,虽然 proto3 看上去比 proto2 新,但是在一些处理上其实还不如 proto2,比如说默认值和未定义的字段的处理就不如 proto2。但是 proto3 确实也修复了不少的问题和新增了 feature,所以一般情况下都会选用 proto3。

附Proto3 区别于 Proto2 的使用情况

  • 在第一行非空非注释行,必须写:syntax = “proto3”;
  • 字段规则移除 「required」,并把 「optional」改为 「singular」
  • 「repeated」字段默认使用 paced 编码
  • 移除 default 选项
  • 枚举类型的第一个字段必须要为 0
  • 移除对扩展的支持,新增 Any 类型, Any 类型是用来替代 proto2 中的扩展的
  • 增加了 JSON 映射特性

值得注意的是,如果要使用 proto3,那么在定义的时候第一行必须填写 syntax = "proto3"否则,将按照 proto2 的语法进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
// filename: login.proto
syntax = "proto3";
package pb;
message LoginReq {
string username = 1;
string password = 2;
}

message LoginResp {
string code = 1;
string msg = 2;
}

对于 JSON 数据,我们可以通过语言层面去直接解析,但是 protobuf 不行,在使用之前,我们需要把文件使用工具编译一下

编译器可以在 https://github.com/protocolbuffers/protobuf 官方的 release 找到并进行安装,安装之后,在当前文件目录下执行 protoc --go_out=. login.proto 后,会生成 login.pb.go 文件,该文件生成以后,我们就可以通过引用来直接使用了。

1
2
3
4
5
6
7
8
9
10
func main(){
req := &pb.LoginReq{
username: "test",
password: "123"
}

resp = &pb.LoginResp{
...
}
}

使用场景

作为 RPC 的数据交换相对来说用得比较多,grpc 默认也是与 protocol buffer 搭配一起使用

编码原理

ProtocolBuffers 使用 Varint 进行编码。

Varint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。

Varint 中的每个字节(最后一个字节除外)都设置了最高有效位(msb),这一位表示还会有更多字节出现。每个字节的低 7 位用于以 7 位组的形式存储数字的二进制补码表示,最低有效组首位。

优缺点

优点

首先我们来了解一下 XML 的封解包过程。XML 需要从文件中读取出字符串,再转换为 XML 文档对象结构模型。之后,再从 XML 文档对象结构模型中读取指定节点的字符串,最后再将这个字符串转换成指定类型的变量。这个过程非常复杂,其中将 XML 文件转换为文档对象结构模型的过程通常需要完成词法文法分析等大量消耗 CPU 的复杂计算。

反观 Protobuf,它只需要简单地将一个二进制序列,按照指定的格式读取到 C++ 对应的结构类型中就可以了。

所以结论当然是:容量小,速度快,跨平台

缺点

通用性来说的话,如果面向开放的 api 的话,还是 JSON 比较通用,为什么呢?一方面是因为大家都熟悉这套玩法了,另一方面就是省事,不用每次都编译 proto 文件

参考链接

https://developers.google.com/protocol-buffers/docs/proto3
https://halfrost.com/protobuf_encode/#proto3message
https://www.ibm.com/developerworks/cn/linux/l-cn-gpb/index.html
https://www.cnblogs.com/makor/p/protobuf-and-grpc.html

Sidekiq源码学习

在公司项目里面,我们是用 sidekiq 来做消息队列的,但是都仅仅是停留在用的阶段,前几天有空看了下源码才知道内部结构原来是这么回事

下面我从使用的过程一直走到源码内部来看看整个过程是怎样的,请坐好扶稳

模拟场景:异步发送邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 定义发送邮件的 worker
class EmailWorker
include Sidekiq::Worker
def perform(email, content)
Email.send(email, content) # 发送邮件
end
end

# 调用端
class EmailController
def welcome
EmailWorker.perform_async("xxx@xx.com", "welcome to my home")
end
end

定义了 EmailWorker 以后,在 controller 层就可以直接调用 这个 worker,为什么不是 EmailWorker.perform 而是 perform_async? 因为 perform 是实例方法,而 perform_async 是类方法,所以,真正工作的是 perform_async,OK,那下面来看看 perform_async 方法做了什么处理,需要注意的是,perform_async 所接收的参数是跟 perform 是一致的

Sidekiq 是如何生产数据的?(生产者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
为了排版以下内容只列出关键方法,均有删减
def perform_async(*args)
# 1. 把类对象和参数组成 hash 后传递给 client_push
client_push("class" => self, "args" => args)
end

def client_push(item)
# 2. 实例化一个 client 把 item 参数传递给 push 方法
Sidekiq::Client.new(pool).push(item)
end
def push(item)
normed = normalize_item(item) # 格式化 hash
payload = process_single(item["class"], normed) # 中间件方法,后面插曲有提到

if payload
raw_push([payload]) # 关键方法
payload["jid"]
end
end

def raw_push(payloads)
@redis_pool.with do |conn|
conn.multi do
atomic_push(conn, payloads)
end
end
true
end

# 最终处理的方法
def atomic_push(conn, payloads)
# 是否定时任务,如果是定时任务,则按照定时任务的逻辑来处理字符串后放入队列
if payloads.first["at"]
conn.zadd("schedule", payloads.map { |hash|
at = hash.delete("at").to_s
[at, Sidekiq.dump_json(hash)]
})
else
# 非定时任务,记录入队时间,把它放入 redis 的列表
queue = payloads.first["queue"]
now = Time.now.to_f
to_push = payloads.map { |entry|
entry["enqueued_at"] = now
Sidekiq.dump_json(entry)
}
conn.sadd("queues", queue)
conn.lpush("queue:#{queue}", to_push)
end
end

完。

以上,就是 EmailWorker.perform_async("xxx@xx.com", "welcome to my home") 所做的处理,其实归根结底,生产者所做的事情都比较简单,就是把类名和参数格式化一下,然后存储到 redis 的队列里面去

插曲(中间件)

在客户端代码中,process_single 所做的事情就是调用已注册的中间件,具体可参考以下这个文件

基于此中间件,你可以定义在消息塞入队列前后的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.insert_after ActiveRecord, MyClientHook
# end
# end

# class MyClientHook
# def call(worker_class, msg, queue, redis_pool)
# puts "Before push"
# result = yield
# puts "After push"
# result
# end

如何进行消费?(消费者)

消费者比较复杂,但是仔细看代码还是能看明白的,sidekiq的代码真的写得非常棒,基本上不需要怎么看注释,很自然而然的就让下读,读的很舒服。

当在命令行敲击 bundle exec sidekiq 的时候,实际上运行的就是以下 3 行代码

1
2
3
cli = Sidekiq::CLI.instance
cli.parse
cli.run

当然了,这三行代码只是一个入口,我们可以慢慢往下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
以下内容有删减
# 首先,打开了管道
self_read, self_write = IO.pipe
# 注册信号量
sigs.each do |sig|
trap sig do
self_write.write("#{sig}\n")
end
end

launcher.run # 主要方法

# 死循环一直读取管道,直到发送了信号量才进行 handle_signal,可参考后面的信号量说明
while (readable_io = IO.select([self_read]))
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end

注意到第一步,只是简单的进行一个死循环,等待用户给信号量,比如 kill -USR2 等信息,主要方法在于 launcher.run,以下,是 launcher.run 方法,也仅仅是做了 3 件事,第一:注册心跳包,定时检测状态;第二:不知道,往下看;第三:也不知道,往下看;

1
2
3
4
5
def run
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
@poller.start
@manager.start
end

沿着 @poller.start 的调用栈一直往下走,最后可以发现实际上执行的是以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Enq
def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while (job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first)

# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
end
end

其实也比较好理解,就是排序列表后,取出定时任务,然后放入到队列里面去执行,比如说你设置了今晚 10 点的发送邮件的任务,你在早上8点就已经调用了并且启动了 sidekiq,那么 sidekiq 会检测定时任务的列表,直到晚上 10 点,sidekiq 会把这个任务的相关信息取出来,然后发送到处理任务的队列里面去,也就是 Sidekiq::Client.push(Sidekiq.load_json(job))

OK,看完 poller.start 方法的处理逻辑以后,可以继续往下看 @manager.start 方法,是的,这个就是真正进行消费的代码逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def initialize(options = {})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 10
@done = false
@workers = Set.new
# 根据 count 的设置,来决定实例化多少个 `Processor` 对象,默认是 10 个
@count.times do
@workers << Processor.new(self)
end
@plock = Mutex.new
end

# Processor.new(self) 执行 options[:concurrency] 次 start
def start
@workers.each do |x|
x.start
end
end

# 继续进入 Processor 类的 start 方法看

def start
@thread ||= safe_thread("processor", &method(:run))
end

def run
process_one until @done
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end

看到 start 方法大概就已经明白了,其实就是起 N 个线程来跑 run 方法,而继续往下看 run 方法实际上就是不断的从队列里面阻塞的取数据,下面就是取数据的关键方法

1
2
3
4
def retrieve_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work
end

消费者的大概流程

image

存在的问题

  1. sidekiq是能保证顺序的,但是因为从队列里面取数据的时候是阻塞的取的,所以造成了尽管有 20 个线程,某个线程从 redis 取数据的时候,其它 19 个线程是处于等待的状态的,并不能实现完美的并发消费。
  2. 基于 redis 的队列使得结构较为单一,意思就是队列只有一个,但是线程太多了,无法同时进行处理。不过,sidekiq 里面可以设置不同的队列名称,使得可以并发的执行不同的队列名
  3. 如何保证消息的可靠性?因为线程从队列里面拿出来以后,这条消息就相当于被消费了,那么如果线程拿出来后就死掉的话,这条消息是不是就丢了呢?

信号量(插曲)

在看到处理服务端的的时候,有一段代码是关于处理信号量的

1
2
3
4
5
6
7
8
9
sigs = %w[INT TERM TTIN TSTP]
...
sigs.each do |sig|
trap sig do
self_write.write("#{sig}\n")
end
rescue ArgumentError
puts "Signal #{sig} not supported"
end

这段代码是为了发出信号的时候,通知管道的另一端处理,那么信号量是怎么一回事? 看看下面代码

1
2
3
4
5
6
7
8
9
puts "I have PID #{Process.pid}"

Signal.trap("USR1") {puts "prodded me"}

loop do
sleep 5
puts "doing stuff"
end

首先,可以先 ruby example.rb ,然后呢,怎么触发信号量? 通过 ps -ef | grep 找到对应的进程,再次执行 kill -USR1 xxx,就会触发 puts prodded me

解释信号的一篇文章

小 Demo 测试队列的示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
require 'sidekiq'

Sidekiq.configure_client do |config|
config.redis = { db: 1 }
end

Sidekiq.configure_server do |config|
config.redis = { db: 1 }
end

Sidekiq.configure_client do |config|
class MyClientHook
def call(worker_class, msg, queue, redis_pool)
puts "Before push"
result = yield
puts "After push"
puts result
result
end
end
config.client_middleware do |chain|
chain.add MyClientHook
end
end

class OurWorker
include Sidekiq::Worker

def perform(msg)
puts "hello #{msg}"
end
end
# 启动服务端
# bundle exec sidekiq -r ./woker.rb
# 启动客户端
# bundle exec irb -r ./woker.rb
# OurWorker.perform_async("abc") # 生产一条数据
# 输出: {"class"=>"OurWorker", "args"=>["abc"], "retry"=>true, "queue"=>"default", "jid"=>"966a57511c1d2b8d314b1318", "created_at"=>1563630849.305185}

Rails 的 ActiveSupport::Concern

相信使用过 rails 的朋友们都经常会看到或者会使用到 ActiveSupport::Concern 这个模块,但是有没有想过为什么要使用这个模块呢?

因为以下内容会涉及上篇文章的内容,如果还不了解 ruby 的 include 和 extend 关键字的话,可先看看 关于 Ruby 的 include 和 extend

在上一篇文章的末尾我们说到可以通过 include 的钩子方法 included 来进行引入类方法,在看了 ActiveSupport::Concern 之后,我相信你会有新的领悟。

源码地址: https://github.com/rails/rails/blob/master/activesupport/lib/active_support/concern.rb

总的来说,Concern 模块所做的方法就是封装了 include 的钩子,之后以一种更友好的方式来引入模块,此话怎讲?看看示例吧

通常来说,按照上一篇文章,这段代码还这么写..

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 示例代码01
module Skill
def self.included(base)
base.extend(ClassMethod)
base.class_eval do
def fly
puts 'I can fly'
end
end
end

module ClassMethod
def run
puts 'Everybody can run'
end
end
end

class User
include Skill
end

user = User.new
user.fly # 输出 I can fly

使用了 ActiveSupport::Concern 改进后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 示例代码02
require 'active_support/concern'
module Skill
extend ActiveSupport::Concern

included do
def fly
puts 'I can fly'
end
end

class_methods do
def run
puts 'Everybody can run'
end
end
end

class User
include Skill
end

user = User.new
user.fly # 输出 I can fly

仔细对比一下会发现,示例代码01 和 02 的区别在于原来的 ClassMethod 使用 class_methods 代码块来代替,而原来的 include 回调方法也使用了 included 代码块来代替

对应的查看一下源码你就会发现,其实 concern 模块只是帮我们再次封装了一层而已,我们分两点来查看

class_methods 方法做了什么处理?

1
2
3
4
5
6
7
8
9
10
11
# 源代码01
# Define class methods from given block.
# You can define private class methods as well.
# ....
def class_methods(&class_methods_module_definition)
mod = const_defined?(:ClassMethods, false) ?
const_get(:ClassMethods) :
const_set(:ClassMethods, Module.new)

mod.module_eval(&class_methods_module_definition)
end

通过 class_methods 的源码和注释可以知道,class_methods 所做的处理无非就是把代码块的内容整合到名为 ClassMethods 的代码块中

included 方法做了什么处理?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 源码02
# Evaluate given block in context of base class,
# so that you can write class macros here.
# When you define more than one +included+ block, it raises an exception.
def included(base = nil, &block)
if base.nil?
if instance_variable_defined?(:@_included_block)
if @_included_block.source_location != block.source_location
raise MultipleIncludedBlocks
end
else
@_included_block = block
end
else
super
end
end

咋看好像这个方法也没干啥啊,只是简单的变量赋值,好像也没有说像 base.extend 这样的代码?其实源文件也就几个方法,再仔细看看,发现一个“可疑的家伙” - append_features 方法,方法倒是有这么句 base.extend const_get(:ClassMethods) if const_defined?(:ClassMethods) 代码,这个操作有点像之前在 def self.included(base) 里面所做的操作,而且也没有显式的进行调用,所以我推断这个方法必有蹊跷

append_features 钩子

果然,一顿网络冲浪后,发现这个方法并不简单,原来 append_features 才是 “真正干活的回调方法”,还是看看代码比较好理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
module A
def self.included(target)
v = target.instance_methods.include?(:method_name)
puts "in included: #{v}"
end

def self.append_features(target)
v = target.instance_methods.include?(:method_name)
puts "in append features before: #{v}"
super
v = target.instance_methods.include?(:method_name)
puts "in append features after: #{v}"
end

def method_name
end
end

class X
include A
end

# 以上代码的输出为
# in append features before: false
# in append features after: true
# in included: true

没错,append_features 的方法是先于 included 执行的回调方法,可以在引入模块的前后进行基本的变量设置等操作

OK,了解了 append_features 方法之后,再回到源码上看下面这段代码,你会发现,“卧槽,还是看不懂啊,他在干什么”

1
2
3
4
5
6
7
8
9
10
11
12
def append_features(base) #:nodoc:
if base.instance_variable_defined?(:@_dependencies)
base.instance_variable_get(:@_dependencies) << self
false
else
return false if base < self
@_dependencies.each { |dep| base.include(dep) }
super
base.extend const_get(:ClassMethods) if const_defined?(:ClassMethods)
base.class_eval(&@_included_block) if instance_variable_defined?(:@_included_block)
end
end

其实这段代码在处理一个依赖的问题,考虑下面的情景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
module Foo
def self.included(base)
base.class_eval do
def self.method_injected_by_foo
...
end
end
end
end

module Bar
def self.included(base)
base.method_injected_by_foo
end
end

class Host
# 我实际上只想引用 Bar 模块
# 引入 Foo 模块是因为 Bar 模块的某方法依赖于 Foo,所以我不得不在 Bar 之前引用一下 Foo
include Foo
include Bar
end

以上情景就导致了依赖的产生,我只是想使用 Bar 模块却把 Foo 模块也引进来了,这不符合 Ruby 的优雅哲学,所以是否可以这样做呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
module Foo
def self.included(base)
base.class_eval do
def self.method_injected_by_foo
...
end
end
end
end

module Bar
include Foo
def self.included(base)
base.method_injected_by_foo
end
end

class Host
include Bar
end

很遗憾,这种方式是会报错的,为什么?

因为我在终端试过是会报错的,报错信息

1
undefined method `method_injected_by_foo' for Host:Class (NoMethodError)

原因在于:在 Bar 模块引用 Foo 的时候,Foo 模块的 included 的回调参数 base 的值不再是 Host,而是 Bar,也就是说 method_injected_by_foo 这个方法是属于 Bar 模块的,而不是 Host 类

换用 concern 模块来实现的话,是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
require 'active_support/concern'

module Foo
extend ActiveSupport::Concern
included do
def self.method_injected_by_foo
...
end
end
end

module Bar
extend ActiveSupport::Concern
include Foo

included do
self.method_injected_by_foo
end
end

class Host
include Bar # It works, now Bar takes care of its dependencies
end

为什么使用 concern 后变得可行?这就得益于 append_features 方法,在方法内部处理了模块引用之间的依赖的关系,现在回过头去看 append_features 方法,大概也就明白了为什么需要这么处理了吧。(什么??还不明白?多看几遍

完。

Redis的两种持久化

什么是持久化?

redis 为了提高获取数据的速度,把数据都存储在内存里面。但是快往往是需要付出代价的,每当程序崩溃或者系统意外重启的时候,存储在内存里面的数据就不复存在了,所以需要一种机制来保证数据可以在意外崩溃的时候恢复。

持久化的两种方式

  • RDB快照
  • AOF(append-only-file)

RDB快照

save 命令

执行 save 命令将会阻塞当前客户端的处理,此时,客户端所进行的工作仅仅只是把内存中的数据存储到 dump.rdb 文件里面去

save命令可以通过 save 60 10000 这种形式来设置 60秒内有10000次写入 就会触发一次 bgsave 命令

bgsave 命令

bgsave 命令与 save 命令的区别在于,bgsave 会 fork 一个新的进程来专门负责数据的备份,而主进程客户端将继续处理读写请求。

存在问题

当数据只有几个 GB 的时候,使用快照来存储自然是没有问题,但是当数据量剧增的时候,比方说当内存数据增加到100个G,bgsave 方式 fork 子进程所耗费的时间也越来越多,从而可能会导致系统性能底下等问题,甚至可能导致 Redis 暂时停顿数秒

如何解决

这种情况下可以考虑关闭自动保存快照的功能,转用手动执行。但是手动执行带来的风险就是可能会丢失数据,如果是对系统要求极为严格的程序,考虑使用 AOF 功能

AOF(append-only-file)

区别于 rdb 的快照存储方式,AOF 存储方式并不是存储内存里面的数据,而是存储写操作的命令。比如说:set message 1,快照的方式会存储 message 1 这条数据,而 aof 的方式会把 set message 1 这条命令记录到 appendfile.aof 这个文件中,为了让体积尽可能小,redis 不会直接明文存储这句命令,而是通过 redis 的通信协议(RESP)格式来进行存储

AOF 工作流程

命令追加(append)

在执行写命令的时候,redis 不会马上把命令写入到文件,而是先把命令写入到 AOF缓冲区,然后再由缓冲区写入到文件。因为如果直接写入到文件的话,大量的 硬盘IO 可能会造成系统的不可用

image

文件写入和文件同步

redis提供了多种机制来把缓冲区的数据写入到磁盘文件,这部分操作将由操作系统的 write 和 fsync 来完成

write 命令: 当用户调用 write 函数将数据写入到文件的时候,数据会先存储在内存缓冲区里,等到填满内存缓冲区或超过了指定时间,才真正的把数据写入到磁盘的文件

fsync 命令:这句命令会立即将缓存区的数据写入到磁盘的文件

所以,基于以上说明, redis 提供了几种选项来让用户选择 AOF缓冲区的同步策略

image

redis 同步策略

redis 的同步策略由配置文件中的 appendfsync 参数来决定,有以下三种参数

  • always: 每当数据写入到 AOF 缓冲区的时候,立马调用系统 fsync 函数,把缓冲区的数据同步到磁盘文件。这种方法会造成大量的磁盘 IO,影响 redis 甚至系统的正常使用。
  • no: 命令写入到缓冲区后,redis层面就不管了了,然后交由操作系统来把数据写入到内存缓冲区,再每个周期(30s)同步一次内存的数据到磁盘文件,这种方式相当于每过 30s 把文件存储到磁盘文件(实际上时间会有偏差,但基本是这个意思)。这种情况下还是可能会造成 30s 内的数据丢失
  • everysec: 命令写入 aof 缓冲区后,由线程来执行 write 函数,让他存储到内存缓冲区,然后,由另一个线程每隔一秒执行一次 fsync

everysec 是前两种策略的折中方案,这种方式能很好的平衡数据的完备和系统的安全性,将数据丢失降低到 1s

AOF重写机制

随着 AOF 的执行,文件的体积势必会越来越大,如果不加以控制,最终肯定会占满整个磁盘的空间。所以,也就有了 AOF 重写机制

需要说明的是,重写并不会对已有的 aof 文件进行操作!!!该过程仅仅只是把当前进程内的数据转换为写命令

为了说明这一点,举个简单的例子

1
2
3
4
5
6
# 假设服务器对键list执行了以下命令
127.0.0.1:6379> RPUSH list "A" "B"
(integer) 2
127.0.0.1:6379> RPUSH list "C"
(integer) 3
127.0.0.1:6379> RPUSH list "D" "E"

当前列表键list在数据库中的值就为[“A”, “B”, “C”, “D”, “E”]。要使用尽量少的命令来记录list键的状态,最简单的方式不是去读取和分析现有AOF文件的内容,而是直接读取list键在数据库中的当前值,然后用一条RPUSH list “A”, “B”, “C”, “D”, “E”代替前面的命令。

过程如下:

  1. 系统检测到目前符合重写的条件,则 fork 子进程来进行重写操作(因为如果由主线程来完成操作会造成操作阻塞)
  2. 由子进程来操作重写的过程中,中途服务器执行了写/删命令,这种情况下,如果不处理的话,会造成新的 aof 文件与现有数据内容不一致。
  3. 那么,就引入了新的缓存区,AOF重写缓存区,把这段时间内产生的操作都写入到这个缓存区,等到整个重写过程完成以后,再把这个缓冲区的内容写入到新的 aof 文件即可。

image

参考:

https://blog.csdn.net/hezhiqiang1314/article/details/69396887

https://www.cnblogs.com/kismetv/p/9137897.html