概述
在有很多SQL文件,或者很多SQL语句的时候,如果手动每个文件执行会很麻烦;另外有的时候SQL语句的执行会影响用户的使用效果,需要在晚上没有用户使用网站的情况下执行SQL语句,也就是晚上定时执行SQL文件;
需要实现的功能:
1.弹框选择数据库环境
2.展示展示SQL文件列表
3.可以选择SQL文件,选中后,替换默认展示的SQL文件,并选中当前SQL文件
4.确定执行SQL文件,并提示是否执行
5.执行完SQL文件后,弹框提示执行成功,和失败条数
6.并且记录错误日志,错误日志包含时间,SQL文件地址,SQL语句,SQL错误详情
源码(包含窗体部分,以及改动后的可以做定时任务的脚本,pip需要安装的包含[有些包没有用到,可以删除]):
SQL文件执行工具
实现效果图:
当前存在一个问题,就是这个窗体没有用到多线程,导致在执行SQL文件的时候,窗体控件无法操作;
另外在此基础上进行一些改动,改动后的python脚本可以定时执行SQL文件;
关于定时执行任务设置可以参考这篇文章:https://blog.csdn.net/wwy11/article/details/51100432
代码分为两部分,也可以下载源码:
第一部分:初始化窗体,以及各种需要用到的空间
#自定义一个窗口类MyFrame
class MyFrame(wx.Frame):
path = os.path.split(os.path.realpath(__file__))[0] + "/SQL"
def __init__(self):
super().__init__(parent=None, title="选择服务器环境执行SQL语句", size=(800, 600))
self.SetBackgroundColour(wx.Colour(224, 224, 224))
self.Center() #设置窗口居中
#放一个面板,用于布局其他控件
panel = wx.Panel(parent=self)
hbox1 = wx.BoxSizer(wx.HORIZONTAL)
#创建静态文本
statictext = wx.StaticText(
panel,
label='选择执行环境:',
# size=(150, 111),
)
list1 = [
'开发环境', '外网测试环境', "外网正式环境"
]
self.ch1 = wx.ComboBox(
panel,
# -1,
value='开发环境',
pos=(50, 170),
size=(150, -1),
choices=list1,
style=wx.CB_READONLY)
#添加事件处理
self.Bind(wx.EVT_COMBOBOX, self.on_combobox, self.ch1)
hbox1.Add(
statictext,
# 1,
flag=wx.LEFT | wx.RIGHT | wx.FIXED_MINSIZE,
border=5)
hbox1.Add(self.ch1,
1,
flag=wx.LEFT | wx.RIGHT | wx.FIXED_MINSIZE,
border=5)
but1 = wx.Button(panel, 1, '确 定')
self.Bind(wx.EVT_BUTTON, self.OnSubmit, but1, id=1)
hbox1.Add(but1,
1,
flag=wx.LEFT | wx.RIGHT | wx.FIXED_MINSIZE,
border=5)
hbox13 = wx.BoxSizer(wx.HORIZONTAL)
text11 = wx.StaticText(
panel,
label='选择SQL文件:',
# size=(100, 50),
)
hbox13.Add(text11, 1, flag=wx.ALIGN_LEFT, border=3)
butFile = wx.Button(panel, 1, u"选择要执行的SQL文件")
self.Bind(wx.EVT_BUTTON, self.OnSubmit, butFile, id=2)
hbox13.Add(butFile, 9, flag=wx.ALIGN_LEFT, border=3)
hbox3 = wx.BoxSizer(wx.HORIZONTAL)
text1 = wx.StaticText(
panel,
label='SQL文件:',
# size=(150, 111),
)
hbox3.Add(
text1,
# 1,
flag=wx.LEFT | wx.RIGHT | wx.FIXED_MINSIZE,
border=5)
text2 = wx.StaticText(
panel,
label='(',
# size=(150, 111),
)
hbox3.Add(
text2,
# 1,
flag=wx.LEFT | wx.RIGHT | wx.FIXED_MINSIZE,
border=5)
#复选框(是否全选)
self.cb3 = wx.CheckBox(panel, label='全选', pos=(10, 70))
self.Bind(wx.EVT_CHECKBOX, self.on_chkboxall, self.cb3)
hbox3.Add(
self.cb3,
# 1,
flag=wx.LEFT | wx.RIGHT | wx.FIXED_MINSIZE,
border=5)
text3 = wx.StaticText(
panel,
label=')',
# size=(150, 111),
)
hbox3.Add(
text3,
# 1,
flag=wx.LEFT | wx.RIGHT | wx.FIXED_MINSIZE,
border=0)
# sql文件列表
files = os.listdir(self.path)
fileList = []
for f in files:
# 添加文件
if os.path.splitext(f)[1].lower() == ".sql":
fileList.append(self.path + "\" + f)
self.listBox = wx.CheckListBox(panel, -1, (20, 20), (180, 1520),
fileList, wx.LB_MULTIPLE)
self.Bind(wx.EVT_LISTBOX, self.One_Play, self.listBox)
hbox2 = wx.BoxSizer(wx.HORIZONTAL)
hbox2.Add(self.listBox,
1,
flag=wx.LEFT | wx.RIGHT | wx.FIXED_MINSIZE,
border=5)
#添加事件处理
# self.Bind(wx.EVT_CHOICE,self.on_choice,ch2)
vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(hbox1, proportion=0, flag=wx.ALL | wx.EXPAND, border=5)
vbox.Add(hbox13, proportion=0, flag=wx.ALL | wx.EXPAND, border=5)
vbox.Add(hbox3, proportion=0, flag=wx.ALL | wx.EXPAND, border=5)
vbox.Add(hbox2, proportion=0, flag=wx.ALL | wx.EXPAND, border=3)
panel.SetSizer(vbox)
# 选择SQL文件
def OnButton1(self, event):
# filesFilter = "Dicom (*.sql)|*.sql|" "All files (*.*)|*.*"
filesFilter = "sqlserver (*.sql)|*.sql"
fileDialog = wx.FileDialog(self,
message="多文件选择",
wildcard=filesFilter,
style=wx.FD_OPEN | wx.FD_MULTIPLE)
dialogResult = fileDialog.ShowModal()
if dialogResult != wx.ID_OK:
return
paths = fileDialog.GetPaths()
self.listBox.Clear()
for path in paths:
self.listBox.Append(path)
# 选择的文件默认全部选中
self.cb3.SetValue(True)
itemsLen = self.listBox.GetCount()
for i in range(itemsLen):
self.listBox.Check(i, check=True)
# 全选
def on_chkboxall(self, event):
checkBoxSelected = event.GetEventObject()
ischk = checkBoxSelected.IsChecked()
itemsLen = self.listBox.GetCount()
for i in range(itemsLen):
self.listBox.Check(i, check=ischk)
def One_Play(self, event):
listbox = event.GetEventObject()
seleIndex = event.Selection
ischk = self.listBox.IsChecked(seleIndex)
if ischk == True:
self.listBox.Check(seleIndex, check=False)
else:
self.listBox.Check(seleIndex, check=True)
def on_combobox(self, event):
print("选择{0}".format(event.GetString()))
def OnSubmit(self, event):
btn = event.GetEventObject()
txt = btn.LabelText
if txt == "选择要执行的SQL文件":
filesFilter = "sqlserver (*.sql)|*.sql"
fileDialog = wx.FileDialog(self,
message="多文件选择",
wildcard=filesFilter,
style=wx.FD_OPEN | wx.FD_MULTIPLE)
dialogResult = fileDialog.ShowModal()
if dialogResult != wx.ID_OK:
return
paths = fileDialog.GetPaths()
self.listBox.Clear()
for path in paths:
self.listBox.Append(path)
# 选择的文件默认全部选中
self.cb3.SetValue(True)
itemsLen = self.listBox.GetCount()
for i in range(itemsLen):
self.listBox.Check(i, check=True)
elif txt == "确 定":
chvalue = self.ch1.GetValue()
if chvalue == "开发环境":
self.msg = SQLServer(server="127.0.0.1",
port=0,
user="test",
password="Test321",
database="test")
elif chvalue == "外网测试环境":
self.msg = SQLServer(server="127.0.0.1",
port=0,
user="test",
password="Test321",
database="test")
elif chvalue == "外网正式环境":
self.msg = SQLServer(server="127.0.0.1",
port=0,
user="test",
password="Test321",
database="test")
# selectCount = len(self.checkListBox.GetChecked())
# sqlfiles = self.listBox.GetSelections()
chkItems = self.listBox.CheckedItems
# loop = asyncio.get_event_loop()
for item in chkItems:
# 获取SQL文件路径
seleStr = self.listBox.Items[item]
self.msg.GetSqlInfo(seleStr)
# loop.run_until_complete(self.msg.GetSqlInfo(seleStr))
# t = multiprocessing.Process(target=self.msg.GetSqlInfo,
# args=(seleStr,))
# t.daemon=True
# t.start()
# th = threading.Thread(target=self.msg.GetSqlInfo,
# args=(seleStr, ))
# th.start()
# wx.CallAfter(self.msg.GetSqlInfo, seleStr)
time.sleep(1) #每次执行一个文件后暂停1秒
# print(seleStr)
#打印错误日志
# print(self.msg.errMsg)
if len(self.msg.errMsg)>0:
box = wx.MessageDialog(
None, '有%s条SQL语句执行错误,具体情况请查看错误日志' % len(self.msg.errMsg),
'提示', wx.OK | wx.STAY_ON_TOP)
answer=box.ShowModal()
box.SetWindowStyle(wx.OK | wx.STAY_ON_TOP)
box.Destroy()
else:
box = wx.MessageDialog(None, '所有SQL语句执行成功', '提示',
wx.OK | wx.STAY_ON_TOP)
answer = box.ShowModal()
box.SetWindowStyle(wx.OK | wx.STAY_ON_TOP)
box.Destroy()
# time.sleep(2)
wx.Exit()
# def on_choice(self,event):
# print("选择{0}".format(event.GetString()))
第二部分:读取SQL文件,并且执行SQL文件
class SQLServer:
errMsg = []
# filename 指定日志存放文件,level 指定logging级别
logging.basicConfig(filename=os.path.split(os.path.realpath(__file__))[0] +"/sqlErrorInfo.log", level=logging.INFO)
def __init__(self, server,port, user, password, database):
# 类的构造函数,初始化DBC连接信息
self.server = server
self.port = port
self.user = user
self.password = password
self.database = database
def __GetConnect(self):
# 得到数据库连接信息,返回conn.cursor()
if not self.database:
raise (NameError, "没有设置数据库信息")
self.conn = pymssql.connect(server=self.server,
port=self.port,
user=self.user,
password=self.password,
database=self.database)
cur = self.conn.cursor()
if not cur:
raise (NameError, "连接数据库失败") # 将DBC信息赋值给cur
else:
return cur
def ExecQuery(self, sql):
'''
执行查询语句
返回一个包含tuple的list,list是元素的记录行,tuple记录每行的字段数值
'''
cur = self.__GetConnect()
cur.execute(sql) # 执行查询语句
result = cur.fetchall() # fetchall()获取查询结果
# 查询完毕关闭数据库连接
self.conn.close()
return result
def Exec(self, sql,filename):
try:
cur = self.__GetConnect()
cur.execute(
sql) # I have experienced problems when executing utf-8
self.conn.close()
except Exception as e:
errorMsg = '[当前时间:%s]SQL文件地址:%s nSQL语句:%s n错误信息:%s n' % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),filename, sql, e)
self.errMsg.append(errorMsg)
logging.info(errorMsg)
logging.info("------------------------------------------------------------------n")
# 获取SQL文件中的SQL语句
def GetSqlInfo(self, filename):
with open(filename, 'r', encoding='gbk', errors='ignore') as f:
script = f.read() # or whatever its encoding is
script = re.sub(r'/*.*?*/', '', script,
flags=re.DOTALL) # remove multiline comment
script = re.sub(r'--.*$', '', script,
flags=re.MULTILINE) # remove single line comment
sql = []
do_execute = False
for line in script.split(u'n'):
line = line.strip()
if not line:
continue
elif line.upper() == u'GO':
do_execute = True
else:
sql.append(line)
do_execute = line.endswith(u';')
if do_execute and filter(None,
sql): # ignore if only blank lines
sqlInfo = u'n'.join(sql) #.encode("cp1252")
# sql.append(sqlInfo)
self.Exec(
sqlInfo, filename
) # I have experienced problems when executing utf-8
do_execute = False
sql = []
转载于:https://www.cnblogs.com/Health/p/python.html
最后
以上就是丰富花卷为你收集整理的python 选择不同数据库环境执行SQL文件的全部内容,希望文章能够帮你解决python 选择不同数据库环境执行SQL文件所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复