概述
最近所做的项目利用flask的post接口接收base64图片数据,搭建我们人脸融合算法工程,环境依赖为:
python3.6、redis: 统一缓存、opencv 4.1、flask 、gunicorn、meinheld。
好了直接上代码吧!!!
# encoding: utf-8
import base64
import json
from meinheld import server
import flask
from flask import request, Flask
import time
from app.Fusion import *
from core.valid_judge import *
from core.image_detect import *
import redis
import logging
from logging.handlers import TimedRotatingFileHandler
app = Flask(__name__)
def setLog():
log_fmt = '%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s'
formatter = logging.Formatter(log_fmt)
fh = TimedRotatingFileHandler(
filename="log/run_facefusion_server" + str(time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())) + ".log",
when="H", interval=1,
backupCount=72)
fh.setFormatter(formatter)
logging.basicConfig(level=logging.INFO)
log = logging.getLogger()
log.addHandler(fh)
setLog()
# 创建链接到redis数据库的对象
pool = redis.ConnectionPool(host=config.redisHost, port=config.redisPort, password=config.redisPassword,
max_connections=config.maxConnections)
redisDb = redis.Redis(connection_pool=pool)
fusion = FaceFusion(redisDb)
@app.route("/ai/v1/DetectValid", methods=['POST'])
def detect_valid():
try:
start_time = time.time()
resParm = flask.request.data
# 转字符串
resParm = str(resParm, encoding="utf-8")
resParm = eval(resParm)
requestId = resParm.get('requestId')
# 服务鉴权
token = resParm.get('token')
if not token:
res = {'code': 3, 'msg': 'token fail'}
logging.error("code: 3 msg:
token fail ")
return json.dumps(res)
# 按照debase64进行处理
modelImg_base64 = resParm.get("modelImage_2")
destImg_base64 = resParm.get("image_2")
if not modelImg_base64 and not destImg_base64:
res = {'code': 4, 'msg': ' picture param invalid'}
logging.error("code: 4
msg:
picture param invalid")
return json.dumps(res)
model_landmarkInfo, model_eyestatus, model_headpose, model_res_faces = face_detect(modelImg_base64)
dest_landmarkInfo, dest_eyestatus, dest_headpose, dest_res_faces = face_detect(destImg_base64)
if angle_valid(model_headpose) == False or angle_valid(dest_headpose) == False:
res = {'code': 4, 'msg': ' picture angle invalid'}
logging.error("code: 4
msg:
picture angle invalid")
return json.dumps(res)
if occlusion_valid(model_eyestatus) == False or occlusion_valid(dest_eyestatus) == False:
res = {'code': 4, 'msg': ' eye occlusion invalid'}
logging.error("code: 4
msg:
eye occlusion invalid")
return json.dumps(res)
if has_dark_glass(model_eyestatus) == True or has_dark_glass(dest_eyestatus) == False:
res = {'code': 4, 'msg': ' eye has dark glass invalid'}
logging.error("code: 4
msg:
eye has dark glass invalid")
return json.dumps(res)
modelImg = base64.b64decode(modelImg_base64)
destImg = base64.b64decode(destImg_base64)
recv_time = time.time()
logging.info(f"recv image cost time:
{str(recv_time - start_time)}")
modelImg_data = np.fromstring(modelImg, np.uint8)
modelImg_data_1 = cv2.imdecode(modelImg_data, cv2.IMREAD_COLOR)
destImg_data = np.fromstring(destImg, np.uint8)
destImg_data_1 = cv2.imdecode(destImg_data, cv2.IMREAD_COLOR)
# 判定图片尺寸
if (modelImg_data_1.shape[0] > config.size or modelImg_data_1.shape[1] > config.size
or destImg_data_1.shape[0] > config.size or destImg_data_1.shape[1] > config.size):
res = {'code': 5, 'msg': ' picture size invalid'}
logging.error("code: 5 msg: picture size invalid")
return json.dumps(res)
# 扩展图片,防止旋转的时候出界
# modelImg_data_2 = cv2.copyMakeBorder(modelImg_data_1, config.cmb, config.cmb, config.cmb, config.cmb,
#
cv2.BORDER_CONSTANT,
#
value=[255, 255, 255])
# destImg_data_2 = cv2.copyMakeBorder(destImg_data_1, config.cmb, config.cmb, config.cmb, config.cmb,
#
cv2.BORDER_CONSTANT,
#
value=[255, 255, 255])
# modelImg_data_2_ret, modelImg_data_2_buf = cv2.imencode(".jpg", modelImg_data_2)
# destImg_data_2_ret, destImg_data_2_buf = cv2.imencode(".jpg", destImg_data_2)
# modelImgExpand64 = base64.b64encode(modelImg_data_2_buf)
# destImgExpand64 = base64.b64encode(destImg_data_2_buf)
if model_res_faces is not None:
redisOper.setLandmarkToRedis(redisDb, modelImg_data_1, model_res_faces)
# 需要确定redis值的数据格式
if dest_res_faces is not None:
redisOper.setLandmarkToRedis(redisDb, destImg_data_1, dest_res_faces)
# 需要确定redis值的数据格式
res = {'code': 0, 'msg': 'valid image'}
logging.info("code: 0 msg: picture is valid")
return json.dumps(res)
except Exception as x:
logging.exception(x)
res = {'code': 6, 'msg': 'request exception'}
return json.dumps(res)
# 定义路由
@app.route("/ai/v1/FaceFusion", methods=['POST'])
def get_frame():
try:
start_time = time.time()
resParm = flask.request.data
# 转字符串
resParm = str(resParm, encoding="utf-8")
resParm = eval(resParm)
requestId = resParm.get('requestId')
# 服务鉴权
token = resParm.get('token')
if not token:
res = {'code': 3, 'msg': 'token fail'}
logging.error("code: 3 msg:
token fail ")
return json.dumps(res)
# 按照debase64进行处理
modelImg_base64 = resParm.get("modelImage_2")
destImg_base64 = resParm.get("image_2")
if not modelImg_base64 and not destImg_base64:
res = {'code': 4, 'msg': ' picture param invalid'}
logging.error("code: 4
msg:
picture param invalid")
return json.dumps(res)
modelImg = base64.b64decode(modelImg_base64)
destImg = base64.b64decode(destImg_base64)
recv_time = time.time()
logging.info(f"recv image cost time:
{str(recv_time - start_time)}")
modelImg_data = np.fromstring(modelImg, np.uint8)
modelImg_data_1 = cv2.imdecode(modelImg_data, cv2.IMREAD_COLOR)
destImg_data = np.fromstring(destImg, np.uint8)
destImg_data_1 = cv2.imdecode(destImg_data, cv2.IMREAD_COLOR)
# 判定图片尺寸
if (modelImg_data_1.shape[0] > config.size or modelImg_data_1.shape[1] > config.size
or destImg_data_1.shape[0] > config.size or destImg_data_1.shape[1] > config.size):
res = {'code': 5, 'msg': ' picture size invalid'}
logging.error("code: 5 msg: picture size invalid")
return json.dumps(res)
logging.info(f" modelImg_data_1
shape:
{str(modelImg_data_1.shape)}
size:
{str(modelImg_data_1.size)}")
logging.info(f" destImg_data_1
shape:
{str(destImg_data_1.shape)}
size:
{str(destImg_data_1.size)}")
# 扩展图片,防止旋转的时候出界
modelImg_data_2 = cv2.copyMakeBorder(modelImg_data_1, config.cmb, config.cmb, config.cmb, config.cmb,
cv2.BORDER_CONSTANT,
value=[255, 255, 255])
destImg_data_2 = cv2.copyMakeBorder(destImg_data_1, config.cmb, config.cmb, config.cmb, config.cmb,
cv2.BORDER_CONSTANT,
value=[255, 255, 255])
modelImg_data_2_ret, modelImg_data_2_buf = cv2.imencode(".jpg", modelImg_data_2)
destImg_data_2_ret, destImg_data_2_buf = cv2.imencode(".jpg", destImg_data_2)
modelImgExpand64 = base64.b64encode(modelImg_data_2_buf)
destImgExpand64 = base64.b64encode(destImg_data_2_buf)
process_img_time = time.time()
logging.info(f"process img cost time:
{str(process_img_time - recv_time)}")
# 融合
mopherImg = fusion.morpherBy64(modelImgExpand64, destImgExpand64, modelImg_data_2, destImg_data_2,
config.width, config.height, fusion.redisDb)
if mopherImg is None or not mopherImg.data or len(modelImg) < 1:
res = {'code': 1, 'msg': ' not detect faces'}
logging.error("code: 1 msg: not detect faces")
return json.dumps(res)
cv2.imwrite("res.jpg", mopherImg)
ret, buf = cv2.imencode(".jpg", mopherImg)
picStr = base64.b64encode(buf)
timeUsed = time.time() - start_time
data = {'requestId': requestId, 'image': str(picStr, encoding="utf-8"), 'timeUsed': timeUsed}
res = {'code': 0, 'msg': 'success', 'data': data}
logging.info(f"code:0
msg:success
FaceFusion cost Time is: {str(timeUsed)} ")
return json.dumps(res)
except Exception as x:
logging.exception(x)
res = {'code': 6, 'msg': 'request exception'}
return json.dumps(res)
if __name__ == "__main__":
# setLog()
# 创建链接到redis数据库的对象
# pool = redis.ConnectionPool(host=config.redisHost, port=config.redisPort, password=config.redisPassword,
#
max_connections=config.maxConnections)
# redisDb = redis.Redis(connection_pool=pool)
# fusion = FaceFusion(redisDb)
logging.info('Starting the server...')
# app.run(host='0.0.0.0', port=fusion.servicePort, threaded=True)
server.listen(("0.0.0.0", fusion.servicePort))
server.run(app)
最后
以上就是狂野楼房为你收集整理的利用flask的post接口接收base64图片数据,搭建人脸融合工程实现的全部内容,希望文章能够帮你解决利用flask的post接口接收base64图片数据,搭建人脸融合工程实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复