代码拉取完成,页面将自动刷新
#!/usr/bin/env python
# -*- encoding:utf-8 -*-
import re
import cookielib
import urllib
import urllib2
import optparse
import zlib
import json
import time
import random
import HTMLParser
import json
import traceback
import socket
import errno
import datetime,calendar
import sys, os
import getopt
import MySQLdb
from MySQLdb import DatabaseError
from multiprocessing import Process, Lock
import multiprocessing
DEBUG = None
TypeMapConstant = {
'cpid' : 'char',
'cpdjbm' : 'char',
'mjbz' : 'char',
'tzlxms' : 'char',
'cpqsrq' : 'char',
'yjkhzdnsyl' : 'double',
'yjkhzgnsyl' : 'double',
'qxms' : 'char',
'mjqsrq' : 'char',
'tzzlxms' : 'char',
'cpztms' : 'char',
'cpyjzzrq' : 'char',
'cpms' : 'char',
'fxjgms' : 'char',
'fxjgdm' : 'char',
'cpfxdj' : 'char',
'fxdjms' : 'char',
'cpqx' : 'int',
'mjjsrq' : 'char',
'qdxsje' : 'double',
'cplx' : 'char',
'cplxms' : 'char',
'cpsylx' : 'char',
'cpsylxms' : 'char',
'cpjz' : 'double',
'dqsjsyl' : 'double',
'csjz' : 'double',
'xsqy' : 'char'
}
''' 枚举的使用 '''
def enum(**enums):
return type('Enum', (), enums)
RSPSTATUS = enum(SUC =0, EMPTY = 1, TIMEOUT = 2, TOJSONERR = 3, URLLIBERR = 4, OTHERERR = 5)
def parseCmd():
global DEBUG
opts, args = getopt.getopt(sys.argv[1:], "-d")
for op, value in opts:
if op == "-d":
DEBUG = True
def F(val):
if val == None:
return 0.0
return val
def S(val):
if val == None:
return ''
return val
def I(val):
if val == None:
return 0
return val
def D(val):
if val == None:
return ''
plusOne = 0
plusTwo = 0
if len(val) == 10:
plusOne = 1
plusTwo = 2
y = val[0 : 4]
m = val[4 + plusOne : 6 + plusOne]
d = val[6 + plusTwo: 8 + plusTwo]
return (('%s-%s-%s') % (y, m, d))
def TypeUnionTransfer(k ,v):
res = None
if TypeMapConstant[k] == 'char':
res = S(v)
elif TypeMapConstant[k] == 'double' :
if v == None or v == '':
v = '0.0'
res = float(v)
elif TypeMapConstant[k] == 'int':
if v == None or v == '':
v = '0'
res = int(v)
return res
def getTime():
nowtime = datetime.datetime.now()
return datetime.datetime.strftime(nowtime, "%m%d.%H%M%S")
def getNowDateAndTime():
nowtime = datetime.datetime.now()
return (datetime.datetime.strftime(nowtime, "%Y%m%d"), datetime.datetime.strftime(nowtime, "%H:%M:%S"))
def getErrnoFromE(e):
if hasattr(e, 'errno'):
return e.errno
elif e.args:
return e.args[0]
else:
return None
def logger(content):
(date, time) = getNowDateAndTime()
filename = './log/lcspider%s.log' % date
fmtContent = getTime() + 'P[' + str(os.getpid()) + ' F[' + sys._getframe().f_back.f_code.co_filename + '] L[' + str(sys._getframe().f_back.f_lineno) + ']::' + content + '\n'
with open(filename, "a+") as file:
file.write(fmtContent)
if DEBUG == True:
sys.stdout.write(fmtContent)
class LcInfo(object):
def __init__(self,
cpid, cpdjbm, mjbz, tzlxms, cpqsrq, yjkhzdnsyl, yjkhzgnsyl, qxms,
mjqsrq, tzzlxms, cpztms, cpyjzzrq, cpms, fxjgms, fxjgdm, cpfxdj,
fxdjms, cpqx, mjjsrq, qdxsje, cplx, cplxms, cpsylx, cpsylxms, cpjz,
dqsjsyl, csjz, xsqy):
self.cpid = cpid
self.cpdjbm = cpdjbm
self.mjbz = mjbz
self.tzlxms = tzlxms
self.cpqsrq = cpqsrq
self.yjkhzdnsyl = yjkhzdnsyl
self.yjkhzgnsyl = yjkhzgnsyl
self.qxms = qxms
self.mjqsrq = mjqsrq
self.tzzlxms = tzzlxms
self.cpztms = cpztms
self.cpyjzzrq = cpyjzzrq
self.cpms = cpms
self.fxjgms = fxjgms
self.fxjgdm = fxjgdm
self.cpfxdj = cpfxdj
self.fxdjms = fxdjms
self.cpqx = cpqx
self.mjjsrq = mjjsrq
self.qdxsje = qdxsje
self.cplx = cplx
self.cplxms = cplxms
self.cpsylx = cpsylx
self.cpsylxms = cpsylxms
self.cpjz = cpjz
self.dqsjsyl = dqsjsyl
self.csjz = csjz
self.xsqy = xsqy
self.regdate, self.regtime = getNowDateAndTime()
self.valueTuple = (self.cpid, self.cpdjbm, self.mjbz, self.tzlxms, self.cpqsrq, self.yjkhzdnsyl, self.yjkhzgnsyl, self.qxms, self.mjqsrq, self.tzzlxms, self.cpztms, self.cpyjzzrq, self.cpms, self.fxjgms, self.fxjgdm, self.cpfxdj, self.fxdjms, self.cpqx, self.mjjsrq, self.qdxsje, self.cplx, self.cplxms, self.cpsylx, self.cpsylxms, self.cpjz, self.dqsjsyl, self.csjz, self.xsqy, self.regdate, self.regtime)
# RETURN(None:success ; Not None: error)
def insert(self, cur):
sqlCmdModel = "INSERT INTO lcinfo values(0, '%s', '%s', '%s', '%s', '%s', %0.2lf, %0.2lf, '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, '%s', %0.2lf, '%s', '%s', '%s', '%s', %0.2lf, %0.2lf, %0.2lf, '%s', '%s', '%s')"
sqlCmd = (sqlCmdModel % self.valueTuple)
error = None
if cur != None:
try:
cur.execute(sqlCmd)
except DatabaseError, e:
error = e
return e
except Exception, e:
error = e
return e
finally:
if error != None and getErrnoFromE(error) != 1062:
logger('insert error' + str(error))
logger('DBACTION:' + sqlCmd)
# not update sales area
def update(self, cur):
a = self.valueTuple[2: len(self.valueTuple) - 3]
b = self.valueTuple[len(self.valueTuple) - 2 : ]
# cpdjbm
c = self.valueTuple[1:2]
updateValueTuple = a + b + c
sqlCmdModel = "UPDATE lcinfo set mjbz = '%s', tzlxms = '%s', cpqsrq = '%s', yjkhzdnsyl = %0.2lf, yjkhzgnsyl= %0.2lf, qxms = '%s', mjqsrq = '%s', tzzlxms = '%s', cpztms = '%s', cpyjzzrq = '%s', cpms = '%s', fxjgms = '%s', fxjgdm = '%s', cpfxdj = '%s', fxdjms = '%s', cpqx = %d, mjjsrq = '%s', qdxsje = %0.2lf, cplx = '%s', cplxms = '%s', cpsylx = '%s', cpsylxms = '%s', cpjz = %0.2lf, dqsjsyl = %0.2lf, csjz = %0.2lf, regdate = '%s', regtime = '%s' where cpdjbm = '%s'"
sqlCmd = sqlCmdModel % updateValueTuple
#logger('DBACTION:' + sqlCmd)
error = None
if cur != None:
try:
cur.execute(sqlCmd)
except DatabaseError, e:
error = e
return e
except Exception, e:
error = e
return e
finally:
if error != None:
logger('update error' + str(error))
def updateSalesArea(self, cur):
updateValueTuple = (self.xsqy, self.cpdjbm)
sqlCmdModel = "UPDATE lcinfo set xsqy = '%s' where cpdjbm = '%s'"
sqlCmd = sqlCmdModel % updateValueTuple
#logger('DBACTION-UPDATE AREA:' + sqlCmd)
error = None
if cur != None:
try:
cur.execute(sqlCmd)
except DatabaseError, e:
error = e
return e
except Exception, e:
error = e
return e
finally:
if error != None:
logger('update error' + str(error))
class DbUtils(object):
@staticmethod
def mysqlConnect(mhost, muser, mpasswd, mdb, mport):
try:
conn = MySQLdb.connect(host = mhost, user= muser, passwd=mpasswd, db = mdb, port = mport, charset='utf8')
cur = conn.cursor()
return (conn, cur)
except MySQLdb.Error,e:
logger("Mysql Error %d: %s" % (e.args[0], e.args[1]))
return (None, None)
class HttpRequstParamHelper(object):
commonHeaders = {
'Connection' : 'keep-alive',
'Accept' : 'application/json, text/javascript, */*; q=0.01',
'X-Requested-With' : 'XMLHttpRequest',
'Accept-Encoding' : 'gzip, deflate',
'Accept-Language' : 'zh-cn',
'Content-Type' : 'application/x-www-form-urlencoded; charset=UTF-8',
'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36 SE 2.X MetaSr 1.0',
'Pragma' : 'no-cache',
'Cache-Control' : 'no-cache',
'Origin' : 'http://www.chinawealth.com.cn',
};
@staticmethod
def makeSalesAreaParam(paramDict):
baseurl = 'http://www.chinawealth.com.cn/cpxsqyQuery.go'
cpid = paramDict['cpid']
fxjgdm = paramDict['fxjgdm']
headers = dict(HttpRequstParamHelper.commonHeaders)
headers['Referer'] = 'http://www.chinawealth.com.cn/zzlc/jsp/lccpDetail.jsp?cpid=%s&fxjgdm=%s' % (cpid, fxjgdm)
postDict = { 'tzzlxdm':"03", 'cpid':cpid, 'cpjglb':"", 'cpsylx':"", 'cpyzms':"", 'cpqx':"", 'cpzt':"", 'cpdjbm':"", 'cpmc':"", 'cpfxjg':"", 'mjqsrq':"", 'mjjsrq':'', 'pagenum':"" }
return (baseurl, headers, postDict)
@staticmethod
def makePageInfoParam(paramDict):
pagenum = paramDict['pagenum']
baseurl = 'http://www.chinawealth.com.cn/lccpAllProJzyServlet.go'
headers = dict(HttpRequstParamHelper.commonHeaders)
headers['Referer'] = 'http://www.chinawealth.com.cn/zzlc/jsp/lccp.jsp'
postDict = { 'cpjglb':"", 'cpsylx':"", 'cpyzms':"", 'cpfxdj':"", 'cpqx':"", 'cpzt':"02", 'cpdjbm':"", 'cpmc':"", 'cpfxjg':"", 'mjqsrq':"", 'mjjsrq':"", 'pagenum':'1', 'areacode':"", 'code':"", 'tzzlxdm':"03" };
if pagenum != None:
postDict['pagenum'] = str(pagenum)
return (baseurl, headers, postDict)
class HttpHelper(object):
def __init__(self, url, headersDict, postDict, timeout = 2, tryTimes = 1, sleepTimeBase = 10):
self.url = url
self.headersDict = headersDict
self.postDict = postDict
self.timeout = timeout
self.tryTimes = tryTimes
self.sleepTimeBase = sleepTimeBase
# RETURN< tuple >
def doPostRequest(self):
postData = urllib.urlencode(self.postDict);
response = None
try:
req = urllib2.Request(self.url, headers = self.headersDict, data = postData)
response = urllib2.urlopen(req, timeout = self.timeout)
page = response.read()
if page == None or page == '':
return (RSPSTATUS.EMPTY, None)
uhtml = page.decode('utf-8')
jsonObj = json.loads(uhtml)
return (RSPSTATUS.SUC, jsonObj)
except socket.timeout, e:
return (RSPSTATUS.TIMEOUT, None)
except urllib2.URLError, e:
if isinstance(e.reason, socket.timeout):
return (RSPSTATUS.TIMEOUT, None)
else:
return (RSPSTATUS.URLLIBERR, str(e))
except Exception, e:
return (RSPSTATUS.OTHERERR, traceback.format_exc())
finally:
if response != None:
response.close()
def doPostWithTimes(self):
res = ()
while self.tryTimes > 0:
res = self.doPostRequest()
if res != None and res[0] in (RSPSTATUS.TIMEOUT, RSPSTATUS.URLLIBERR):
sleepTime = random.randint(self.sleepTimeBase, self.sleepTimeBase + 5)
self.tryTimes = self.tryTimes - 1
logger(('timeout let me sleep and try again!left chance[%d] - sleeptime[%d]' % (self.tryTimes, sleepTime)))
time.sleep(sleepTime)
elif res != None and res[0] == RSPSTATUS.SUC:
if res[1].has_key('code') and res[1]['code'] == 'error':
sleepTime = random.randint(self.sleepTimeBase, self.sleepTimeBase + 5)
self.tryTimes = self.tryTimes - 1
logger(('code error let me sleep and try again!left chance[%d] - sleeptime[%d]' % (self.tryTimes, sleepTime)))
time.sleep(sleepTime)
else:
break
#error
else:
break
return res
def doPostOneTime(self):
oldTryTime = self.tryTimes
self.tryTimes = 1
res = doPostWithTimes(self)
self.tryTimes = oldTryTime
return res
class Spider(object):
pageInfosList = multiprocessing.Manager().list()
pageInfosDict = multiprocessing.Manager().dict()
listLock = Lock()
@staticmethod
def appendLcInfoToList(lc):
Spider.listLock.acquire()
if Spider.pageInfosDict.has_key(lc.cpdjbm) == False:
Spider.pageInfosList.append(lc)
Spider.pageInfosDict[lc.cpdjbm] = 'A'
Spider.listLock.release()
@staticmethod
def popLcInfoFromList():
lc = None
Spider.listLock.acquire()
if len(Spider.pageInfosList) > 0:
lc = Spider.pageInfosList.pop(0)
Spider.pageInfosDict.pop(lc.cpdjbm)
Spider.listLock.release()
return lc
@staticmethod
def isEmpty():
l = 0
Spider.listLock.acquire()
l = len(Spider.pageInfosList)
Spider.listLock.release()
return (True if l == 0 else False)
def doUpdateSalesArea(self, conn, cur):
while True:
lcInfo = Spider.popLcInfoFromList()
if lcInfo == None:
logger('update all record\'s sales record')
break
paramDictForSalesArea = {
'cpid' : lcInfo.cpid,
'fxjgdm' : lcInfo.fxjgdm,
};
(salesAreaUrl, salesAreaHeaders, salesAreaPostDict) = HttpRequstParamHelper.makeSalesAreaParam(paramDictForSalesArea)
salesAreaHttpHelper = HttpHelper(salesAreaUrl, salesAreaHeaders, salesAreaPostDict, tryTimes = 2, sleepTimeBase = 20)
logger('Update sales area[cpdjbm:%s cpid:%s fxjgdm:%s]' % (lcInfo.cpdjbm, lcInfo.cpid, lcInfo.fxjgdm))
resGetArea = salesAreaHttpHelper.doPostWithTimes()
if resGetArea != None and resGetArea[0] == RSPSTATUS.SUC:
areaJsonObj = resGetArea[1]
needUpdate = True
areaInfoList = []
try:
areaInfoList = areaJsonObj['List']
except Exception, e:
logger('get list in areainfo jsonobj error, obj = %s'% str(areaJsonObj))
needUpdate = False
if needUpdate == False:
continue
isFirst = True
for info in areaInfoList:
if isFirst:
lcInfo.xsqy = info['cpxsqy']
isFirst = False
else:
lcInfo.xsqy = lcInfo.xsqy + ',' + info['cpxsqy']
if DEBUG: logger('cpid:' + lcInfo.cpid + ', fxjgdm:' + lcInfo.fxjgdm + ' GetArea: ' + lcInfo.xsqy)
if cur != None:
lcInfo.updateSalesArea(cur)
conn.commit()
else:
logger('get sales area error, cpid = [' + cpid + '] fxjgdm = [' + fxjgdm + ']' )
# RETURN( tuple ) or None
def doOnePageSpider(self, pagenum, conn, cur):
pageInfo = []
paramDict = {
'pagenum' : pagenum,
};
# page info
(pageUrl, pageHeaders, pagePostDict) = HttpRequstParamHelper.makePageInfoParam(paramDict)
httpHelper = HttpHelper(pageUrl, pageHeaders, pagePostDict, tryTimes = 3)
res = httpHelper.doPostWithTimes()
if res != None and res[0] == RSPSTATUS.SUC:
jsonObj = res[1]
lcInfosList = []
try:
lcInfosList = jsonObj['List']
except Exception, e:
logger('get list in pageinfo jsonobj err' + str(e))
return (RSPSTATUS.TOJSONERR, str(e));
# {u'Count': 1312, u'List': []}
if len(lcInfosList) == 0:
return (RSPSTATUS.EMPTY, None)
for info in lcInfosList:
for k, v in info.items():
if TypeMapConstant.has_key(k):
info[k] = TypeUnionTransfer(k, v)
lc = LcInfo(info['cpid'], info['cpdjbm'], info['mjbz'], info['tzlxms'], D(info['cpqsrq']), info['yjkhzdnsyl'], info['yjkhzgnsyl'],
info['qxms'], D(info['mjqsrq']), info['tzzlxms'], info['cpztms'], D(info['cpyjzzrq']), info['cpms'], info['fxjgms'],
info['fxjgdm'], info['cpfxdj'], info['fxdjms'], info['cpqx'], D(info['mjjsrq']), info['qdxsje'], info['cplx'], info['cplxms'],
info['cpsylx'], info['cpsylxms'], info['cpjz'], info['dqsjsyl'], info['csjz'], info['xsqy'])
error = lc.insert(cur)
# Duplicate, need update
if error != None and getErrnoFromE(error) == 1062:
#TODO
res = lc.update(cur)
if DEBUG: logger('Update: ' + lc.cpdjbm)
if res == None:
if cur != None: conn.commit()
else:
logger('Update Error: ' + str(res))
#ignore update error, continue get new record
Spider.appendLcInfoToList(lc)
# error
elif error != None:
logger('Insert: ' + lc.cpdjbm + ' error')
return error
# success
else:
if cur != None:
Spider.appendLcInfoToList(lc)
conn.commit()
#getPageInfo error
else:
logger("ErrorCode: " + str(res[0]) + " ErrorDesc: " + str(res[1]))
return res
# control pagenum
def doSpider(self):
(conn, cur) = DbUtils.mysqlConnect(mhost = 'localhost', muser = 'root', mpasswd = 'root', mdb = 'lc', mport = 3306)
for pagenum in range(1, 10000):
logger('pagenum start: ' + str(pagenum))
res = self.doOnePageSpider(pagenum, conn, cur)
logger('len of pagelist:' + str(len(Spider.pageInfosList)))
# success
if res == None:
logger('pagenum end: ' + str(pagenum))
# update area
continue
elif res[0] == -1 or res[0] == RSPSTATUS.EMPTY:
logger('pagenum stop: ' + str(pagenum))
elif res[0] in (RSPSTATUS.TIMEOUT, RSPSTATUS.URLLIBERR):
logger('pagenum timeout or urlliberr: ' + str(res))
else:
logger('pagenum: ' + str(pagenum) + ' error: ' + str(res))
break
cur.close()
conn.close()
logger('total len of pagelist:' + str(len(Spider.pageInfosList)))
def spiderRecord(self):
while True:
'''
(d, t) = getNowDateAndTime()
if (t > '00:00:00' and t < '12:00:00') or (t > '23:00:00' and t < '24:00:00'):
logger('Not in worker time, Sleep!!')
time.sleep(5 * 60)
continue
'''
logger(' ---- <<<<<<<<<<<<<<<< spider start >>>>>>>>>>>>>> ---- ')
self.doSpider()
# 15 min
logger(' --- sleep 15 minutes --- ')
time.sleep(15 * 60)
logger(' ---- >>>>>>>>>>>>>>> spider end <<<<<<<<<<<<<<<<< ---- ')
def updateSalesArea(self):
while True:
'''
(d, t) = getNowDateAndTime()
if (t > '00:00:00' and t < '12:00:00') or (t > '23:00:00' and t < '24:00:00'):
logger('Not in worker time, Sleep!!')
time.sleep(5 * 60)
continue
'''
logger('get len of pagelist:' + str(len(Spider.pageInfosList)))
if Spider.isEmpty():
logger('No update area record, let me sleep!!')
time.sleep(60)
continue
(conn, cur) = DbUtils.mysqlConnect(mhost = 'localhost', muser = 'root', mpasswd = 'root', mdb = 'lc', mport = 3306)
logger(' ---- <<<<<<<<<<<<<<<< update sales area start >>>>>>>>>>>>>> ---- ')
self.doUpdateSalesArea(conn, cur)
logger(' ---- <<<<<<<<<<<<<<<< update sales area end >>>>>>>>>>>>>> ---- ')
cur.close()
conn.close()
def spiderMain(self):
try:
pid = os.fork()
if pid > 0:
logger('UpdateSalesArea pid=%s start' % str(os.getpid()))
self.updateSalesArea()
sys.exit(0)
elif pid == 0:
logger('SpiderRecord pid=%s start' % str(os.getpid()))
self.spiderRecord()
sys.exit(0)
else:
sys.exit(1)
except OSError, e:
sys.exit(1)
def testDB1():
lc = LcInfo('1079256', 'C1080316000941', '人民币(CNY)', '债券类', '2016-08-02', 4.3, 4.3, '1-3个月(含)', '2016/07/28',
'一般个人客户', '在售', '2016/09/19', '2016汇富计划698期', '天津银行股份有限公司', 'C10803', '02', '(中低)',
48, '2016/08/01', 50000, '02', '值型', '03', '收益', 0, 0, 0, '')
(conn, cur) = DbUtils.mysqlConnect(mhost = 'localhost', muser = 'root', mpasswd = 'root', mdb = 'lc', mport = 3306)
if conn == None or cur == None:
logger('db error')
lc.insert(cur)
res = lc.update(cur)
conn.commit()
def daemonize():
try:
pid = os.fork()
if pid > 0:
# in parent
sys.exit(0)
except OSError, e:
logger('fork error')
sys.exit(1)
os.umask(0)
os.setsid()
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
logger('fork error')
sys.exit(1)
if __name__ == '__main__':
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
parseCmd()
#daemonize()
spider = Spider()
spider.spiderMain()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。