Openresty动态负载均衡

说明:

使用OpenResty的 "balancer_by_lua" 指令配合lua-resty-checkups模块来实现动态负载均衡
并通过lua实现服务器列表管理逻辑,也可以使用consul来管理

安装模块:

checkups模块地址(使用详情):

https://github.com/upyun/lua-resty-checkups

安装Lua包管理器:

apt install luarocks

使用luarocks安装模块:

luarocks install lua-resty-checkups

# 查看安装路径:luarocks path
# 如果路径不对,配置lua_package_path 

使用

配置checkups的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- config.lua

_M = {}

_M.global = {
checkup_timer_interval = 15,
default_heartbeat_enable = true,
checkup_shd_sync_enable = true,
shd_config_timer_interval = 1,
}

_M.up_http = {
enable = true,
typ = "http",
timeout = 2,
read_timeout = 15,
send_timeout = 15,
cluster = {
{
servers = {
{ host = "127.0.0.1", port = 8081, weight=10, max_fails=3, fail_timeout=10 },
{ host = "127.0.0.1", port = 8082, weight=11, max_fails=3, fail_timeout=10 },
}
},
},
}

return _M

nginx.conf中http完整配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176

http {

# 定义checkups需要的共享内存
lua_shared_dict state 10m;
lua_shared_dict mutex 1m;
lua_shared_dict locks 1m;
lua_shared_dict config 10m;


keepalive_timeout 60;
lua_package_path '$prefix/lua/?.lua;;';
lua_package_cpath "$prefix/src/lualib/?.lua;;";

#初始化checkups
init_by_lua_block {
local config = require "config"
local checkups = require "resty.checkups.api"
checkups.init(config)
}
#准备和创建心跳定时器和上游同步定时器
init_worker_by_lua_block {
local config = require "config"
local checkups = require "resty.checkups.api"
checkups.prepare_checker(config)
checkups.create_checker()
}

# 动态上游集群
upstream binghe_server{
server 0.0.0.1; # 占位用,无实际意义

# 执行负载均衡的 Lua 代码
balancer_by_lua_block{
local checkups = require "resty.checkups.api"
local balancer = require "ngx.balancer";
balancer.set_timeouts(1, 0.5, 0.5) -- 后端的连接和读写超时时间
balancer.set_more_tries(2) -- 连接失败后最多在重试 2 次

local status, code = balancer.get_last_failure() --获取上次失败信息
if status == "failed" then
local last_peer = ngx.ctx.last_peer
ngx.log(ngx.ERR, "get_last_failure: ", last_peer);
-- 标记上次失败server
checkups.feedback_status("up_http", last_peer.host, last_peer.port, true)
end
local peer,err = checkups.select_peer("up_http") --根据算法从服务器列表中获取一个服务地址
ngx.ctx.last_peer = peer --记录这次连接server
balancer.set_current_peer(peer.host, peer.port)
}
keepalive 10; # 需在 balancer 指令之后
}

server {
listen 8090;
server_name localhost;
charset utf-8;

location / {
proxy_pass http://binghe_server;
}

# 动态更新upstream集群,同时更新checkups配置文件
# checkups.update_upstream接口是覆盖集群
# 这里改为添加server到集群
# 可根据情况自行更改
location = /update_upstream {
access_log off;
allow 127.0.0.1;
deny all;
default_type text/plain;
content_by_lua_block {
local host = ngx.req.get_uri_args()["host"]
local port = ngx.req.get_uri_args()["port"]
local weight = ngx.req.get_uri_args()["weight"]

if host == nil or port == nil then
ngx.say("usage: /update_upstream?host=x.x.x.x&port=x")
return
end
if type(port) == "string" then
port= tonumber(port)
end
if weight == nil then
weight = 10
end
local checkups = require "resty.checkups.api"
local save_config = require "save_config"

local ok = save_config.save_server(host,port,weight)
if not ok then ngx.say("save server info error") end

local upstream_data = checkups.get_upstream("up_http")
local new_server = { host = host, port = port, weight=weight, max_fails=3, fail_timeout=10 }
table.insert(upstream_data.cluster[1].servers,new_server)
local ok, err = checkups.update_upstream("up_http", upstream_data)
if err then ngx.say(err) end

ngx.say("update upstream ok")
}
}

# 动态删除upstream集群,同时更新checkups配置文件
# checkups.delete_upstream接口是删除整个集群
# 这里改为删除集群中指定server或者整个集群
# 可根据情况自行更改
location = /delete_upstream {
access_log off;
allow 127.0.0.1;
deny all;
default_type text/plain;
content_by_lua_block {
local host = ngx.req.get_uri_args()["host"]
local port = ngx.req.get_uri_args()["port"]
local skey = ngx.req.get_uri_args()["skey"]

if (host == nil or port == nil) and skey == nil then
ngx.say("usage: /delete_upstream?host=x.x.x.x&port=x \nor\nusage: /delete_upstream?skey=x")
return
end

local checkups = require "resty.checkups.api"
if skey ~= nil then
local ok, err = checkups.delete_upstream(skey)
if err then
ngx.say(err)
else
ngx.say("ok")
end
return
end

if type(port) == "string" then
port= tonumber(port)
end

local upstream_data = checkups.get_upstream("up_http")

for i,v in ipairs(upstream_data.cluster[1].servers) do
if v.host == host and v.port == port then
table.remove(upstream_data.cluster[1].servers, i)
local ok, err = checkups.update_upstream("up_http", upstream_data)
if err then
ngx.say(err)
else
local save_config = require "save_config"
local ok = save_config.delete_server(port)
if not ok then ngx.say("delete server info error") end

ngx.say("delete upstream ok")
end

return
end
end

ngx.say("failed")
}
}

# 查看所有集群运行状态
# 可根据情况自行更改
location = /status {
access_log off;
allow 127.0.0.1;
deny all;
default_type application/json;
content_by_lua_block {
local cjson = require "cjson"
local checkups = require "resty.checkups.api"
ngx.say(cjson.encode(checkups.get_status()))
}
}
}
}

checkups的配置文件保存示例(根据情况自行更改)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
-- save_config.lua

local _M = {
file_name = "./nginx-projects/lua/config.lua"
}


function _M.save_server(host,port,weight)

local file = io.open(_M.file_name, "r+")
if file == nil then return false end

local buffer = file:read("*a")

local find_begin,find_end = string.find(buffer,'servers = {')
if find_begin == nil then
file:close()
return false
end


file:seek("set",find_end)

local end_buff = file:read("*a")

file:seek("set",find_end)

local new_server = "\n\t\t\t\t".."{ host = '"..host.."', port = "..tostring(port)..
", weight="..tostring(weight)..", max_fails=3, fail_timeout=10 }"..","..end_buff

file:write(new_server)

file:close()

ngx.say("save server info ok")
return true
end

function _M.delete_server(port)

local file = io.open(_M.file_name, "r")
if file == nil then return false end

local buffer = file:read("*a")

local find_begin,find_end = string.find(buffer,'\t\t\t\t{.-'..tostring(port)..'.-},')
if find_begin == nil then
file:close()
return false
end
ngx.say("find:",find_begin,"--",find_end)

file:seek("set")

local begin_buffer = file:read(find_begin)

file:seek("set",find_end)

local end_buff = file:read("*a")

file:close()

file = io.open(_M.file_name, "w")
if file == nil then return false end

file:write(begin_buffer..end_buff)

file:close()

ngx.say("delete server info ok")
return true

end

return _M

服务器注册示例(调用此接口自动注册)

http://127.0.0.1:8090/update_upstream?host=127.0.0.1&port=8083

服务器关闭示例(调用此接口自动移除)

http://127.0.0.1:8090/delete_upstream?host=127.0.0.1&port=8083

注意

此模块与lua_code_cache配置冲突,关闭lua代码缓存会后导致prepare_checker()失败