在Python Flask程序中发起request请求时卡死的问题

当前的Python Flask程序部署到包含三个pod的一个deployment中,业务中需要一个消息通知到三个pod,目前不想引入消息队列组件,所以调用现有的kubernetes python库,直接取出三个pod的IP,直接访问三遍。方法略显粗糙,虽然不优雅,但是轻量级的解决了问题。

1
2
3
resp = requests.post(url,
data=json.dumps(data_info, sort_keys=True, default=str),
headers={'Content-Type': 'application/json'})

但是在调试中发现,在一个API代码中一旦发起新的request请求,系统直接卡住,甚至所有的POD IP都无法支持访问了。
这个问题,一开始的研究思路是以为循环调用一个API导致的无限循环,但是加入debug log以后并没有打出log来,看起来并不是。
最后突然想起,之前把flask的app启动方式换成了gevent.pywsgi.WSGIServer, 会不会是线程无法启动的问题。
最后查到了如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
gevent is a coroutine -based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev or libuv event loop.

Features include:
- Fast event loop based on libev or libuv.
- Lightweight execution units based on greenlets.
- API that re-uses concepts from the Python standard library (for examples there are events and queues).
- Cooperative sockets with SSL support
- Cooperative DNS queries performed through a threadpool, dnspython, or c-ares.
- Monkey patching utility to get 3rd party modules to become cooperative
- TCP/UDP/HTTP servers
- Subprocess support (through gevent.subprocess)
- Thread pools

所以,gevent是协程级别的,那么自然就阻塞了IO,那如何解决呢?
https://www.gevent.org/api/gevent.monkey.html#module-gevent.monkey

官方提供了一个patch库。总结起来就是这样:

gevent 是一个基于协程的 Python 网络库,它使用 Greenlet 库提供了一种高效的协程实现。协程是一种轻量级的线程,允许并发执行,但没有真正的并行性。协程可 以在遇到 I/O 操作时自动地切换到其他任务,从而提高程序的并发能力。
然而,在标准的 Python 线程模型中,当一个线程遇到 I/O 操作时,它会被阻塞,直到 I/O 操作完成。这意味着在传统的多线程模型下,一个线程在等待 I/O 完成时 会占用一个线程资源,而其他线程则无法被调度执行。
monkey.patch_all() 是 gevent 提供的一个函数,用于实现对标准库的自动补丁,以便与协程一起使用。这个函数会对一些常见的阻塞式 I/O 操作进行替换,使其在 遇到阻塞时能够自动地切换到其他协程任务。
当你在执行 monkey.patch_all() 后,gevent 会修改 Python 的内置库,例如 socket、threading、time 等,以便在这些库中的 I/O 操作发生时进行协程切换。 这样一来,当使用 gevent 的 WSGIServer 启动一个服务器时,它会在遇到阻塞的 I/O 操作时自动切换到其他协程,而不会阻塞整个服务器。
总结起来,执行 monkey.patch_all() 会对 Python 的内置库进行补丁,使得在使用 gevent 的 WSGIServer 启动服务器时,能够自动地在 I/O 操作发生时切换到其 他协程,从而实现多线程的并发处理能力。这使得 gevent.pywsgi.WSGIServer 能够更高效地处理并发请求。

所以在整个flask最开始的地方加上这样一段,问题就解决了。

1
2
3
# noinspection PyUnresolvedReferences
from gevent import monkey
monkey.patch_all()

同时,借这个机会,再次重温了一些线程(Process),进程(Thread)和协程(Coroutine)。这篇文章讲的挺好。
https://juejin.cn/post/7027998293351202853

有两个重要的的点:

  1. 线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。
  2. 协程进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。

打印的python的dict或list对象用FEHelper查看

利用Python的kubernetes库,可以查询出来某个namespace里下的pod,但是返回的结果集过于复杂,想找到其中的pod IP, 需要费点时间。

1
2
3
all_pods = client.CoreV1Api().list_namespaced_pod(
CURRENT_RUNNING_NAMESPACE
).to_dict()["items"]

all_pods直接打的结果不是json,里面的None和datetime.datetime是无法被FEHelper识别的。
json直接dump的时候会报错误:TypeError: Object of type datetime is not JSON serializable。
这个时候只需要简单的加个参数就可以了。

1
json.dumps(all_pods, sort_keys=True, default=str)

打印出来的结果,拷贝到FEHelper然后去掉头尾的引号就可以识别了, 顺利找出pod IP的路径,拼出来想要的结果。

1
[s['status']['pod_ip'] for s in all_pods]

在K8S环境中构建基于python flask架构的websocket

业务上需要搭建一个能推送消息的架构,现有的服务端是基于python-flask构建的。

代码部分

服务端

服务端python需要集成socketio,参考这里https://github.com/miguelgrinberg/python-socketio/blob/main/examples/server/wsgi/app.py
但是需要记得async_mode改成gevent。

前端

前端测试的代码, 需要替换一下Server IP。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
<!DOCTYPE HTML>
<html>
<head>
<title>Flask-SocketIO Test</title>
<script type="text/javascript" src="//code.jquery.com/jquery-2.1.4.min.js"></script>
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.3/socket.io.min.js"></script>
<script type="text/javascript" charset="utf-8">
$(document).ready(function(){
var socket = io.connect("http://<Server IP>:5000/");

socket.on('connect', function() {
socket.emit('my_event', {data: 'I\'m connected!'});
});
socket.on('disconnect', function() {
$('#log').append('<br>Disconnected');
});
socket.on('nuke_response', function(msg) {
$('#log').append('<br>Received: ' + msg.data);
});

// event handler for server sent data
// the data is displayed in the "Received" section of the page
// handlers for the different forms in the page
// these send data to the server in a variety of ways
$('form#emit').submit(function(event) {
socket.emit('my_event', {data: $('#emit_data').val()});
return false;
});
$('form#broadcast').submit(function(event) {
socket.emit('my_broadcast_event', {data: $('#broadcast_data').val()});
return false;
});
$('form#join').submit(function(event) {
socket.emit('join', {room: $('#join_room').val()});
return false;
});
$('form#leave').submit(function(event) {
socket.emit('leave', {room: $('#leave_room').val()});
return false;
});
$('form#send_room').submit(function(event) {
socket.emit('my_room_event', {room: $('#room_name').val(), data: $('#room_data').val()});
return false;
});
$('form#close').submit(function(event) {
socket.emit('close_room', {room: $('#close_room').val()});
return false;
});
$('form#disconnect').submit(function(event) {
socket.emit('disconnect_request');
return false;
});
});
</script>
</head>
<body>
<h1>Flask-SocketIO Test</h1>
<h2>Send:</h2>
<form id="emit" method="POST" action='#'>
<input type="text" name="emit_data" id="emit_data" placeholder="Message">
<input type="submit" value="Echo">
</form>
<form id="broadcast" method="POST" action='#'>
<input type="text" name="broadcast_data" id="broadcast_data" placeholder="Message">
<input type="submit" value="Broadcast">
</form>
<form id="join" method="POST" action='#'>
<input type="text" name="join_room" id="join_room" placeholder="Room Name">
<input type="submit" value="Join Room">
</form>
<form id="leave" method="POST" action='#'>
<input type="text" name="leave_room" id="leave_room" placeholder="Room Name">
<input type="submit" value="Leave Room">
</form>
<form id="send_room" method="POST" action='#'>
<input type="text" name="room_name" id="room_name" placeholder="Room Name">
<input type="text" name="room_data" id="room_data" placeholder="Message">
<input type="submit" value="Send to Room">
</form>
<form id="close" method="POST" action="#">
<input type="text" name="close_room" id="close_room" placeholder="Room Name">
<input type="submit" value="Close Room">
</form>
<form id="disconnect" method="POST" action="#">
<input type="submit" value="Disconnect">
</form>
<h2>Receive:</h2>
<div><p id="log"></p></div>
</body>
</html>

这样就基本完成了正常环境下的websocket下的代码部分的功能。

架构部分

架构图

arch

如图所示,几条重点:

  1. 系统发布在一个K8S环境中,一个application的deployment里包含三个pod和一个service。
  2. K8S Sevice用NodePort的方式进行服务暴漏。
  3. K8S系统外面是用一个HAProxy进行代理,域名解析到HAProxy所在的虚拟机的IP上。
  4. HAProxy和K8S service形成了两层的LoadBalance。

目的是让客户端Client A, B, C访问到一个POD后,以后就一直绑定到这个pod上。
需要改动的配置:

  1. HAProxy的balance策略改成source
    这样就让HAProxy进行转发的时候根据客户端ip进行选择目的IP。
  2. K8S Service的配置里增加sessionAffinity: ClientIP
    K8S Service进行转发的时候根据客户端ip进行选择目的IP
  3. K8S Service的配置里增加externalTrafficPolicy: Local
    如果不加这一条,那么所有的ClientIP都会被认为是来自HAProxy的那个IP,加上这个配置后,会根据请求的header里的X-Forwarded-For里的客户端IP进行判断。

消息队列

正常情况下,客户端A和B链接到第一个pod,客户端C链接到第三个pod,如果有一个时间发生在第二个POD,或者Job Pod上,是无法直接发送消息给所有的pod上的。
所以需要一个消息总线,可以选择Redis/MQ/kube-event来实现,三个API pod侦听消息队列的某个时间,所有需要发送给客户的消息直接发送给消息队列,然后消息队列转发给三个API pod,
API pod收到消息队列的提醒后,然后推送给自己链接的客户端,这样就完成了一个整体回路。

pip安装是的时候遇到Microsoft Visual C++ 14.0 or greater is required

pip安装部分包的时候,会遇到下面的错误提示,这是因为缺少的C++的编译环境。

1
2
error: Microsoft Visual C++ 14.0 or greater is required. Get it with "Microsoft C++ Build Tools": https://visualstudio.microsoft.com/visual-cpp-build-tools/
[end of output]

网上找到的许多信息看起来已经过期了,目前能解决的办法是,打开这个错误提示里的网站。

https://visualstudio.microsoft.com/visual-cpp-build-tools/

下载一个“Download Build Tools”, 然后安装,从里面选择“Windows”里的“Desktop development with C++”, 然后确认安装,等待就可以了。

查看kubectl config文件中的权限信息

有时候调试问题的时候,只有一份kubeconfig文件,可以根据这个文件获取当前用户的权限,方便下一步的调试。

把config文件中的client-certificate-data内容,或者.crt文件拿出来用openssl解析一下。

1
2
3
4
5
6
# 直接处理内容
$ echo "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURJVENDQWdtZ0F3SUJBZ0lJWkVMQmxCUDl4dmN3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB5TXpBeU1EZ3dOak0xTWpKYUZ3MHlOREF6TURVd016QXdNREZhTURReApGekFWQmdOVkJBb1REbk41YzNSbGJUcHRZWE4wWlhKek1Sa3dGd1lEVlFRREV4QnJkV0psY201bGRHVnpMV0ZrCmJXbHVNSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQXRubG93TUYxeXVHS1c2R2QKVVp3M09OQ3pqVGcyUDRpbU00SHZlU29ZNFdHRmRMQitnQ2tPSmtCTUpYYU92c2UrYStwOVhMYVJjVUwwN2RWdQo5NUVQYStIUWx3VE5UYVRZRXRtNGFHZHhOMUY5L0pxejZGejVXeUZKSTBnOEVCNDVEVGw1WVhuV0E0YzlXcHBDCmF3Y0N2c0lvK0RoRWowSjFMeFlMTHhDSTVha05pWWdKYlJ5NDhiallSengrdDBoUjJEUjZCRmtibUg4a1Z6NmwKNFduSDRHZGhJRWNhV1g5UUxaQnY1QmNDRDRoN3J0V0J1WFd5NVRtUGViRE1nTC9aQmVERVBUTGFRRUVZSDNkbwpGM1dFcGV5QXFsMHc4UHVzWnllRnpIcUY3c2lzYnZWaFAxN2xhSzNtRmloSDNkVG0rN3RieWNtRnQzTk1RVzZjCmtwbkVrd0lEQVFBQm8xWXdWREFPQmdOVkhROEJBZjhFQkFNQ0JhQXdFd1lEVlIwbEJBd3dDZ1lJS3dZQkJRVUgKQXdJd0RBWURWUjBUQVFIL0JBSXdBREFmQmdOVkhTTUVHREFXZ0JRaWN3TzROU2hyeU1ib0VVVUNCTjNVU1RMZgo4akFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBVDF5MEtFa1Y5Sm05aU5TRklpeE5raXRjTFVwamJCS2FOSVNNCjI3T2hLWFZWRXdORzRuaERHanQrZFZNa2RhTTFvVlpqRVg2M09CeFdxYnFZNUhSWnZGZi9PUkpGaXo2cUxlbmIKSlJxMk5zSWswdHdpdTZXeFB5WERHOFcxenEvZHdxcmFQOVFlZGd1SFRQMGgrSHd6TFVzalJFSE9SanNOYzUvawpUbWM4andnN2VSckNUamU2Ym41RlNuam1CU0ZOaGM0TGlFZXllVXVpREViOVI2ZG96U1RrdFZHWEFldnlHT1BzCk1PcWJTRTRsT3FsbnZWcTVFd3J0N3ZhZEdmYzRNRDdCWnFLdzBNdkhPckFnb251T1BnM1UrWDlUODNhQ2NZVkgKcmxKaWdybkowc3ZkYjI2WEhKWDZiU3ZnWFpZb3c2S1pGY1liOTdJQURUVDBma0pNZUE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==" | base64 -d | openssl x509 -noout -text

# 处理.crt文件

$ openssl x509 -in cluster.crt -noout -text

openssl输出的内容里包含如下一段:

1
Subject: O=system:masters, CN=kubernetes-admin

这里面的O=对应的就是K8S里的Group,CN=对应的就是user, 剩下的就是cluster里去找对应rolebinding, clusterrolebinding, role和clusterrole就可以了。

用sed替换掉特定字符串后的指定内容

工作中遇到了这么一个需求,有一段没有格式的json字符串,需要把其中的key为“name”的值改成“new-webhook-config”, 用sed可以完成这个工作。

1
{"apiVersion":"admissionregistration.k8s.io/v1beta1","kind":"MutatingWebhookConfiguration","metadata":{"annotations":{},"name":"webhook-config"}}
1
sed -ri 's/(\"name\":\")[^"]*/\1new-webhook-config/g' 1.txt

重点的是其中的\1,这里表示的第一个子串,如果没有这个\1,那么就会全部替换掉。
可以参考这里:
https://www.cnblogs.com/maxincai/p/5146338.html

调用Jira API统计一段时间每个账户内ticket被resolved的数量

Jira是不能分组统计的,只能用dashboard里用二维表格统计,但是如果查询语句是“status changed to resolved by”,统计表格里是不能显示出来“resolved by”这个用户名的。
之前同事是手动做了特别多的查询,然后把这些查询放到confluence里进行统计的,所以就想用jira api分账户统计一下,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import json
from functools import reduce

import requests
from prettytable import PrettyTable

token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 从JIRA的Porta里profile页面创建一个Personal Access Tokens
headers = {
"Authorization": "Bearer %s" % token,
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
}
duiring_time = "2022-01-01, 2023-01-01"
jira_user_accounts = [
'zhangsan',
'lisi',
'wangwu'
]


if __name__ == '__main__':
requests.packages.urllib3.disable_warnings()
total_result = []
for jira_user in jira_user_accounts:
params = {
"startAt": 0,
"maxResults": 2000,
"jql": "project = CSFS AND status changed to resolved by (%s) during (%s)" % (jira_user, duiring_time),
"fields": ['customfield_40669']
}
res = requests.post("https://jiradc2.ext.net.nokia.com/rest/api/2/search", headers=headers,
data=json.dumps(params, ensure_ascii=False).encode("utf-8"), verify=False)
issues = res.json()['issues']
if len(issues) > 0:
display_name = "Unknown"
# 想办法找到所有的display name
try:
for issue in issues:
resolve_users = issue['fields']["customfield_40669"]
for resolve_user in resolve_users:
# 这个ticket可能会被多个人resolve,取最后一个resolver
if resolve_user['name'] == jira_user:
display_name = resolve_user["displayName"]
raise StopIteration
except StopIteration:
total_result.append({"name": jira_user, 'count': len(issues), "display_name": display_name})
else:
total_result.append({"name": jira_user, 'count': 0})

total_result.sort(key=lambda u: u["count"], reverse=True)

total_count = reduce(lambda x, y: x + y["count"], total_result, 0)
print("total: " + str(total_count))
myTable = PrettyTable(["user", "display name", "count"])
for result in total_result:
if 'display_name' in result:
myTable.add_row([result['name'], result['display_name'], result['count']])
else:
myTable.add_row([result['name'], '', result['count']])
print(myTable)

查询后的结果:

1
2
3
4
5
6
7
+----------+-----------------------+-------+
| user | dispaly | count |
+----------+-----------------------+-------+
| zhangsa | San Zhang (Company) | 302 |
| lisi | Si Li (Company) | 113 |
| wangwu | Wu Wang (Company) | 93 |
+----------+-----------------------+-------+

效果不错,值得分享一下。

学习Python的列表生成器以及Map-Reduce

最近写了很多的接口,需要反复处理json数据,抽离数据,重组格式。
花时间研究了一些python的列表生成,以及更高级的Map/Reduce, 并给大家分享了一下,记录一下share的过程写下的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from functools import reduce

datas = [
{
"date": "22日星期四",
"sunrise": "06:17",
"high": "高温 17.0℃",
"low": "低温 1.0℃",
"sunset": "18:27",
"aqi": 98,
"fx": "西南风",
"fl": "<3级",
"type": "晴",
"notice": "愿你拥有比阳光明媚的心情"
},
{
"date": "23日星期五",
"sunrise": "06:16",
"high": "高温 18.0℃",
"low": "低温 5.0℃",
"sunset": "18:28",
"aqi": 118,
"fx": "无持续风向",
"fl": "<3级",
"type": "多云",
"notice": "阴晴之间,谨防紫外线侵扰"
},
{
"date": "24日星期六",
"sunrise": "06:14",
"high": "高温 21.0℃",
"low": "低温 7.0℃",
"sunset": "18:29",
"aqi": 52,
"fx": "西南风",
"fl": "<3级",
"type": "晴",
"notice": "愿你拥有比阳光明媚的心情"
},
{
"date": "25日星期日",
"sunrise": "06:13",
"high": "高温 22.0℃",
"low": "低温 7.0℃",
"sunset": "18:30",
"aqi": 71,
"fx": "西南风",
"fl": "<3级",
"type": "晴",
"notice": "愿你拥有比阳光明媚的心情"
},
{
"date": "26日星期一",
"sunrise": "06:11",
"high": "高温 21.0℃",
"low": "低温 8.0℃",
"sunset": "18:31",
"aqi": 97,
"fx": "西南风",
"fl": "<3级",
"type": "多云",
"notice": "阴晴之间,谨防紫外线侵扰"
}
]


def add(x):
return x + 'Z'


def sum(x, y):
return x + y


if __name__ == '__main__':
zz = []
for data in datas:
zz.append(data['date'] + data['high'] + data['low'])
print(zz)

p = 9
a = 10
zz = p if p > a else a
print(zz)

zz = [data['date'] + data['high'] + data['low'] for data in datas if data["aqi"] > 90]
print(zz)

zz = ['不好' if data["aqi"] > 90 else '不好' for data in datas]
print(zz)

zz = [{data['date']: data['high']} for data in datas]
print(zz)

# zz = {data['date']: data['high'],data['sunrise']: data['sunset'] for data in datas}
# print(zz)

# MAP
list_a = ['A', 'B', 'C']

list_b = [1, 2, 3]
# map -- ['A+','B+','C+']
# reduce -- D= 'A+'+'B+'+'C+'

zz = list(map(add, list_a))
print(zz)

zz = reduce(sum, list_b, 6)
print(zz)

zz = list(map(lambda x: x + 'Z', list_a))
print(zz)

zz = reduce(lambda x, y: x + y, list_b, 6)
print(zz)

zz = list(
map(lambda x: {x['date']: x['fx'], x['aqi']: x['sunset'], 'status': '好天气' if x['aqi'] < 90 else '坏天气'},
datas))
print(zz)

zz = []
for x in datas:
d = {x['date']: x['fx'], x['aqi']: x['sunset']}
if x['aqi'] < 90:
d['status'] = '好天气'
else:
d['status'] = '坏天气'
zz.append(d)
print(zz)

zz = reduce(lambda d, k: {**d, **{k['date']: k['fx'], k['sunrise']: k['sunset']}}, datas, {})
print(zz)

a = {'22日星期四': '西南风', 98: '18:27', 'status1': '坏天气'}
b = {'23日星期五': '无持续风向', 118: '18:28', 'status2': '天气'}

print(a.update(b))
print({**a, **b})

datas = [(3, 5), (6, 8), (1, 9), (4, 8)]

zz = reduce(lambda d, k: (d[0] + k[0], d[1] + k[1]), datas, (0, 0))
print(zz)

通过docker registry的文件夹获取到所有的镜像

工作需要获取docker registry里所有的镜像和tag,但是系统在磁盘被占满的情况下出现无法工作的情况,无法通过dockers registry的API去获取。
所以尝试通过,registry的文件夹目录结构中取出想要的结果,最后证明可行。

1
2
3
4
5
6
export docker_registry_dir=/opt/bcmt/storage/docker-registry/ # docker registry 的安装目录。

cd ${docker_registry_dir}docker/registry/v2/repositories/; ls -R | grep tags/ | grep -v "current"| grep -v "index" | sort | awk '{a=index($0,"./");b=index($0,"/_manifests/tags/");c=index($0,":"); print substr($0,a+2,b-a-2)":"substr($0,b+17,c-b-17)}'

# 多行换成一行,用逗号分隔
cd ${docker_registry_dir}docker/registry/v2/repositories/; ls -R | grep tags/ | grep -v "current"| grep -v "index" | sort | awk '{a=index($0,"./");b=index($0,"/_manifests/tags/");c=index($0,":"); print substr($0,a+2,b-a-2)":"substr($0,b+17,c-b-17)}' | tr '\n' ','

之前都是用notepad++手动替换处理,这次发现结合这几个命令awk, print, substr, tr就可以搞定,不错!