1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
import pymysql import psycopg2 import psycopg2.extras import time import csv import random import sys from Crypto import Random import binascii from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_v1_5 import threading
thread = 6
hostName = 'http://test-web' mysql = {'host':'localhost','user':'root','pwd':'','db':'test'} pgsql = {'host':'127.0.0.1','database':'sys','user':'root','password':'root','port':'5432'}
def conMysql(type='dict'): ''' mysql连接 :param type: 获取数据类型 默认为字典类型 :return:游标 ''' if type=='dict': conn = pymysql.connect(mysql['host'],mysql['user'],mysql['pwd'],mysql['db'],charset='utf8',cursorclass=pymysql.cursors.DictCursor) else: conn = pymysql.connect(mysql['host'],mysql['user'],mysql['pwd'],mysql['db'],charset='utf8') return conn
def conPgsql(): ''' pgsql数据库连接 :return: 游标 ''' conn = psycopg2.connect(database=pgsql['database'],user=pgsql['user'],password=pgsql['password'],host=pgsql['host'],port=pgsql['port']) return conn
def exportCsv(title,data): ''' 生成csv文件 :param title: 表头 :param data: 数据 :return: {'status':任务执行状况, 'fileName':文件名} ''' for i in range(len(data)): dataList = list(data[i].values()) dataList.insert(0, i+1) data[i] = dataList
fileName = '/Public/Temp/' + time.strftime('%Y%m%d%H%M',time.localtime(time.time())) + '_'+ str(int(round((time.time()) * 1000))) + ".csv" title = title.split(',') with open(fileName,"w",newline='',encoding='gb18030') as csvfile: writer = csv.writer(csvfile) try: writer.writerow(title) writer.writerows(data) except: return {'status':False, 'msg':'文件写入失败!'} return {'status':True, 'fileName':hostName + fileName}
def getCsvDate(sql): ''' 获取csv数据 :param sql: 获取csv数据的sql语句 :return:{'status':sql执行状况, 'data':数据} ''' conn = conPgsql() cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
try: cur.execute(sql) rows = cur.fetchall() except: cur.close() conn.close() return {'status':False, 'msg':'sql执行失败!'}
conn.commit() cur.close() conn.close() return {'status':True, 'data':rows}
def dealWork(): ''' 任务处理 :return: None ''' conn = conMysql() cursor = conn.cursor() sql = "select count(*) count from box_download_task where status='1';" cursor.execute(sql) re = cursor.fetchone() rowsNum = re['count'] if rowsNum > thread: return
sql = "select count(*) count from box_download_task where status='0' and type<>'4' order by id asc;" cursor.execute(sql) re = cursor.fetchone() if re['count']==0: return else: sql = "select * from box_download_task where status='0' order by id asc;" cursor.execute(sql) re = cursor.fetchone() workId = re['id'] sql = 'update box_download_task set status="%s" where id=%d;' % ('1',workId) cursor.execute(sql) conn.commit()
sql = re['query'] title = re['title'] res = getCsvDate(sql) if res['status']: file = exportCsv(title, res['data']) if file['status']: sql = 'update box_download_task set status="%s", filepath="%s" where id=%d;' % ( '2', file['fileName'], workId) cursor.execute(sql) conn.commit() else: sql = 'update box_download_task set status="%s", msg="%s" where id=%d;' % ('3', file['msg'], workId) cursor.execute(sql) conn.commit() else: sql = 'update box_download_task set status="%s", msg="%s" where id=%d;' % ('3', res['msg'], workId) cursor.execute(sql) conn.commit() return False
conn.close() return False
if __name__ == "__main__":
while True: for i in range(thread): t = threading.Thread(target=dealWork, args=()) t.start() time.sleep(1)
t.join() print('执行了') time.sleep(5)
|