赞
踩
日常生活中常会遇到一些小任务,如果人工处理会很麻烦。
用python做些小脚本处理,能够提高不少效率。或者可以把python当工具使用,辅助提高一下办公效率。(比如我常拿python当计算器,计算和字符转换用)
以下总结下个人用到的一些python小脚本留作备忘。
用途:通信报文中的hex数据不好看,可以打印为16进制的字符串显示出来。
- #coding=utf-8
- #name: myutil.py
- def print_hex1(s,prev='0x'):
- for c in s:
- print '%s%02x' %(prev,ord(c)),
- print
- def print_hex(s):
- for c in s:
- print '%02x' %(ord(c)),
- print
- print 'myutil'
-
- def print_hex3(s,prev='0x'):
- i = 0
- for c in s:
- print '%s%s,' %(prev,s[i:i+2]),
- i += 2
- print
之前搞单片机时生成的hex应用程序文件不能直接刷到单片机里,还需要把iap程序合并成一个文件才能烧写到单片机。每次打包麻烦,做个脚本处理:
- #path='C:\\Users\\test\\IAP_CZ_v204w.hex'
- #file=open(path,'r')
- #for ll in file.readlines()
- # print ll
- #coding=gb18030
- import time
- import os
- def prr():
- print 'file combination begin..'
-
- path0=os.getcwd()
- print path0
- path=path0
- #path1=path0
- path2=path0
- path+='\\IAP_CZ_v204w.hex'
- #path1+='\\NC_armStaSystem.hex'
- path2+='\\'
- print path
- s=raw_input('enter file path:')
- path1=s
- #path1+='\\NC_armStaSystem.hex'
- print path1
- s=raw_input('enter file name:')
- path2+=s
- path2+=time.strftime('_%y%m%d%H%M%S')
- path2+='.hex'
- print path2
- prr()
- try:
- f1=open(path,'r')
- count=0
- for l in f1.readlines():
- # print l
- count+=1
- #print count
- f1.close()
- f1=open(path,'r')
- f2=open(path1,'r')
- f3=open(path2,'w')
- while(count>1):
- l=f1.readline()
- # print l
- f3.write(l)
- count-=1
- # print count
- f3.flush()
- for l in f2.readlines():
- f3.write(l)
- f3.flush()
- f3.close()
- print 'combination success!'
- except Exception,ex:
- print 'excettion occured!'
- print ex
- s=raw_input('press any key to continue...')
- finally:
- f1.close()
- f2.close()
- s=raw_input('press any key to continue...')
网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- # filename: paxel.py
-
- '''It is a multi-thread downloading tool
-
- It was developed follow axel.
- Author: volans
- E-mail: volansw [at] gmail.com
- '''
-
- import sys
- import os
- import time
- import urllib
- from threading import Thread
-
- local_proxies = {'http': 'http://131.139.58.200:8080'}
-
- class AxelPython(Thread, urllib.FancyURLopener):
- '''Multi-thread downloading class.
-
- run() is a vitural method of Thread.
- '''
- def __init__(self, threadname, url, filename, ranges=0, proxies={}):
- Thread.__init__(self, name=threadname)
- urllib.FancyURLopener.__init__(self, proxies)
- self.name = threadname
- self.url = url
- self.filename = filename
- self.ranges = ranges
- self.downloaded = 0
-
- def run(self):
- '''vertual function in Thread'''
- try:
- self.downloaded = os.path.getsize( self.filename )
- except OSError:
- #print 'never downloaded'
- self.downloaded = 0
-
- # rebuild start poind
- self.startpoint = self.ranges[0] + self.downloaded
-
- # This part is completed
- if self.startpoint >= self.ranges[1]:
- print 'Part %s has been downloaded over.' % self.filename
- return
-
- self.oneTimeSize = 16384 #16kByte/time
- print 'task %s will download from %d to %d' % (self.name, self.startpoint, self.ranges[1])
-
- self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))
-
- self.urlhandle = self.open( self.url )
-
- data = self.urlhandle.read( self.oneTimeSize )
- while data:
- filehandle = open( self.filename, 'ab+' )
- filehandle.write( data )
- filehandle.close()
-
- self.downloaded += len( data )
- #print "%s" % (self.name)
- #progress = u'\r...'
-
- data = self.urlhandle.read( self.oneTimeSize )
-
- def GetUrlFileSize(url, proxies={}):
- urlHandler = urllib.urlopen( url, proxies=proxies )
- headers = urlHandler.info().headers
- length = 0
- for header in headers:
- if header.find('Length') != -1:
- length = header.split(':')[-1].strip()
- length = int(length)
- return length
-
- def SpliteBlocks(totalsize, blocknumber):
- blocksize = totalsize/blocknumber
- ranges = []
- for i in range(0, blocknumber-1):
- ranges.append((i*blocksize, i*blocksize +blocksize - 1))
- ranges.append(( blocksize*(blocknumber-1), totalsize -1 ))
-
- return ranges
- def islive(tasks):
- for task in tasks:
- if task.isAlive():
- return True
- return False
-
- def paxel(url, output, blocks=6, proxies=local_proxies):
- ''' paxel
- '''
- size = GetUrlFileSize( url, proxies )
- ranges = SpliteBlocks( size, blocks )
-
- threadname = [ "thread_%d" % i for i in range(0, blocks) ]
- filename = [ "tmpfile_%d" % i for i in range(0, blocks) ]
-
- tasks = []
- for i in range(0,blocks):
- task = AxelPython( threadname[i], url, filename[i], ranges[i] )
- task.setDaemon( True )
- task.start()
- tasks.append( task )
-
- time.sleep( 2 )
- while islive(tasks):
- downloaded = sum( [task.downloaded for task in tasks] )
- process = downloaded/float(size)*100
- show = u'\rFilesize:%d Downloaded:%d Completed:%.2f%%' % (size, downloaded, process)
- sys.stdout.write(show)
- sys.stdout.flush()
- time.sleep( 0.5 )
-
- filehandle = open( output, 'wb+' )
- for i in filename:
- f = open( i, 'rb' )
- filehandle.write( f.read() )
- f.close()
- try:
- os.remove(i)
- pass
- except:
- pass
-
- filehandle.close()
-
- if __name__ == '__main__':
- url = "http://xz1.mm667.com/xz84/images/001.jpg"
- output = '001.jpg'
- paxel( url, output, blocks=4, proxies={} )
多线程下载图片并存储到指定目录中,若目录不存在则自动创建。
- # -*- coding: UTF-8 -*-
- '''
- import re
- import urllib
- urls='http://xz5.mm667.com/xz82/images/01.jpg'
- def getHtml(url):
- page = urllib.urlopen(url)
- html = page.read()
- return html
- def getImg(html):
- reg = r'src="(.+?\.jpg)" pic_ext'
- imgre = re.compile(reg)
- imglist = imgre.findall(html)
- x = 0
- for imgurl in imglist:
- urllib.urlretrieve(imgurl,'%s.jpg' % x)
- x = x + 1
- html = getHtml("http://tieba.baidu.com/p/2460150866")
- getImg(html)
- '''
- import re
- import urllib
- import threading
- import time
- import socket
- socket.setdefaulttimeout(30)
- urls=[]
- j=0
- for i in xrange(1,81):
- if (i-1)%4 == 0:
- j += 1
- if ((j-1)%5) == 0 :
- j=1
- site='http://xz%d.mm667.com/xz%02d/images/' %(j,i)
- urls.append(site)
- print urls[i-1]
- #print urls
- '''
- urls.append('http://xz1.mm667.com/xz01/images/')
- urls.append('http://xz1.mm667.com/xz02/images/')
- urls.append('http://xz1.mm667.com/xz03/images/')
- urls.append('http://xz1.mm667.com/xz04/images/')
- urls.append('http://xz1.mm667.com/xz84/images/')
- urls.append('http://xz2.mm667.com/xz85/images/')
- urls.append('http://xz3.mm667.com/xz86/images/')
- urls.append('http://xz1.mm667.com/s/')
- urls.append('http://xz1.mm667.com/p/')
- '''
- def mkdir(path):
- # 引入模块
- import os
- # 去除首位空格
- path=path.strip()
- # 去除尾部 \ 符号
- path=path.rstrip("\\")
- # 判断路径是否存在
- # 存在 True
- # 不存在 False
- isExists=os.path.exists(path)
- # 判断结果
- if not isExists:
- # 如果不存在则创建目录
- print path+u' 创建成功'
- # 创建目录操作函数
- os.makedirs(path)
- return True
- else:
- # 如果目录存在则不创建,并提示目录已存在
- print path+u' 目录已存在'
- return False
-
- def cbk(a,b,c):
- '''''回调函数
- @a: 已经下载的数据块
- @b: 数据块的大小
- @c: 远程文件的大小
- '''
- per = 100.0 * a * b / c
- if per > 100:
- per = 100
- print '%.2f%%' % per
-
-
- #url = 'http://www.sina.com.cn'
- local = 'd:\\mysite\\pic1\\'
- d=0
- mutex = threading.Lock()
- # mutex1 = threading.Lock()
- class MyThread(threading.Thread):
- def __init__(self, url, name):
- threading.Thread.__init__(self)
- self.url=url
- self.name=name
- def run(self):
- mutex.acquire()
- print
- print 'down from %s' % self.url
- time.sleep(1)
- mutex.release()
- try:
- urllib.urlretrieve(self.url, self.name)
- except Exception,e:
- print e
- time.sleep(1)
- urllib.urlretrieve(self.url, self.name)
- threads=[]
-
- for u in urls[84:]:
- d += 1
- local = 'd:\\mysite\\pic1\\%d\\' %d
- mkdir(local)
- print 'download begin...'
- for i in xrange(40):
- lcal = local
- url=u
- url += '%03d.jpg' %i
- lcal += '%03d.jpg' %i
- th = MyThread(url,lcal)
- threads.append(th)
- th.start()
- # for t in threads:
- # t.join()
- print 'over! download finished'
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- """
- Python爬虫,抓取一卡通相关企业信息
- Anthor: yangyongzhen
- Version: 0.0.2
- Date: 2014-12-14
- Language: Python2.7.5
- Editor: Sublime Text2
- """
-
- import urllib2, re, string
- import threading, Queue, time
- import sys
- import os
- from bs4 import BeautifulSoup
- #from pprint import pprint
-
- reload(sys)
- sys.setdefaultencoding('utf8')
- _DATA = []
- FILE_LOCK = threading.Lock()
- SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列
- _WORKER_THREAD_NUM = 3 #设置线程的个数
-
- _Num = 0 #总条数
- class MyThread(threading.Thread) :
-
- def __init__(self, func,num) :
- super(MyThread, self).__init__() #调用父类的构造函数
- self.func = func #传入线程函数逻辑
- self.thread_num = num
- def run(self) :
- self.func()
- #print u'线程ID:',self.thread_num
-
- def worker() :
- global SHARE_Q
- while not SHARE_Q.empty():
- url = SHARE_Q.get() #获得任务
- my_page = get_page(url)
- find_data(my_page) #获得当前页面的数据
- #write_into_file(temp_data)
- time.sleep(1)
- SHARE_Q.task_done()
-
- def get_page(url) :
- """
- 根据所给的url爬取网页HTML
- Args:
- url: 表示当前要爬取页面的url
- Returns:
- 返回抓取到整个页面的HTML(unicode编码)
- Raises:
- URLError:url引发的异常
- """
- try :
- html = urllib2.urlopen(url).read()
- my_page = html.decode("gbk",'ignore')
- #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore')
- #my_page = urllib2.urlopen(url).read().decode("utf8")
- except urllib2.URLError, e :
- if hasattr(e, "code"):
- print "The server couldn't fulfill the request."
- print "Error code: %s" % e.code
- elif hasattr(e, "reason"):
- print "We failed to reach a server. Please check your url and read the Reason"
- print "Reason: %s" % e.reason
- return my_page
-
- def find_data(my_page) :
- """
- 通过返回的整个网页HTML, 正则匹配名称
-
- Args:
- my_page: 传入页面的HTML文本用于正则匹配
- """
- global _Num
- temp_data = []
- items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;")
- for index, item in enumerate(items) :
- #print item
- #print item.h1
- #print h.group()
- #temp_data.append(item)
- #print item.find(re.compile("^a"))
- href = item.find(re.compile("^a"))
- #soup = BeautifulSoup(item)
- #公司名称
- if item.a:
- data = item.a.string.encode("gbk","ignore")
- print data
- temp_data.append(data)
-
- goods = item.find_all("div", style="font-size:12px;")
-
- #经营产品与联系方式
- for i in goods:
- data = i.get_text().encode("gbk","ignore")
- temp_data.append(data)
- print data
- #b = item.find_all("b")
- #print b
- #链接地址
- pat = re.compile(r'href="([^"]*)"')
- h = pat.search(str(item))
- if h:
- #print h.group(0)
- href = h.group(1)
- print href
- temp_data.append(h.group(1))
-
- _Num += 1
- #b = item.find_all(text=re.compile("Dormouse"))
- #pprint(goods)
- #print href
- #pat = re.compile(r'title="([^"]*)"')
- #h = pat.search(str(href))
- #if h:
- #print h.group(1)
- #temp_data.append(h.group(1))
- _DATA.append(temp_data)
-
- #headers = {'User-Agent':"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦)
- #all_url = 'http://www.mzitu.com/all' ##开始的URL地址
- #start_html = requests.get(all_url, headers=headers) ##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容 headers为上面设置的请求头、请务必参考requests官方文档解释
- #print(start_html.text) ##打印出start_html (请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent, 对于打印网页内容请使用text)
-
- def main() :
- global SHARE_Q
- threads = []
- start = time.clock()
- douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}"
- #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
- for index in xrange(20) :
- SHARE_Q.put(douban_url.format(page = index * 1))
- for i in xrange(_WORKER_THREAD_NUM) :
- thread = MyThread(worker,i)
- thread.start() #线程开始处理任务
-
- threads.append(thread)
- for thread in threads :
- thread.join()
- SHARE_Q.join()
- i = 0
- with open("down.txt", "w+") as my_file :
- for page in _DATA :
- i += 1
- for name in page:
- my_file.write(name + "\n")
-
- print "Spider Successful!!!"
- end = time.clock()
- print u'抓取完成!'
- print u'总页数:',i
- print u'总条数:',_Num
- print u'一共用时:',end-start,u'秒'
-
- if __name__ == '__main__':
- main()
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- """
- Python爬虫
- Anthor: yangyongzhen
- Version: 0.0.2
- Date: 2014-12-14
- Language: Python2.7.8
- Editor: Sublime Text2
- """
-
- import urllib2, re, string
- import threading, Queue, time
- import sys
- import os
- from bs4 import BeautifulSoup
-
- reload(sys)
- sys.setdefaultencoding('utf8')
- _DATA = []
- FILE_LOCK = threading.Lock()
- SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列
- _WORKER_THREAD_NUM = 3 #设置线程的个数
-
- rootpath = os.getcwd()+u'/抓取的内容/'
-
- def makedir(path):
- if not os.path.isdir(path):
- os.makedirs(path)
-
- #创建抓取的根目录
- #makedir(rootpath)
- #显示下载进度
- def Schedule(a,b,c):
- '''''
- a:已经下载的数据块
- b:数据块的大小
- c:远程文件的大小
- '''
- per = 100.0 * a * b / c
- if per > 100 :
- per = 100
- print '%.2f%%' % per
- class MyThread(threading.Thread) :
-
- def __init__(self, func) :
- super(MyThread, self).__init__() #调用父类的构造函数
- self.func = func #传入线程函数逻辑
-
- def run(self) :
- self.func()
-
- def worker() :
- print 'work thread start...\n'
- global SHARE_Q
- while not SHARE_Q.empty():
- url = SHARE_Q.get() #获得任务
- my_page = get_page(url)
- find_title(my_page) #获得当前页面的电影名
- #write_into_file(temp_data)
- time.sleep(1)
- SHARE_Q.task_done()
-
- def get_page(url) :
- """
- 根据所给的url爬取网页HTML
- Args:
- url: 表示当前要爬取页面的url
- Returns:
- 返回抓取到整个页面的HTML(unicode编码)
- Raises:
- URLError:url引发的异常
- """
- try :
- html = urllib2.urlopen(url).read()
- my_page = html.decode("utf8")
- #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore')
- #my_page = urllib2.urlopen(url).read().decode("utf8")
- except urllib2.URLError, e :
- if hasattr(e, "code"):
- print "The server couldn't fulfill the request."
- print "Error code: %s" % e.code
- elif hasattr(e, "reason"):
- print "We failed to reach a server. Please check your url and read the Reason"
- print "Reason: %s" % e.reason
- return my_page
-
- def find_title(my_page) :
- """
- 通过返回的整个网页HTML, 正则匹配前100的电影名称
-
- Args:
- my_page: 传入页面的HTML文本用于正则匹配
- """
- temp_data = []
- movie_items = BeautifulSoup(my_page).findAll('h1')
- for index, item in enumerate(movie_items) :
- #print item
- #print item.h1
- pat = re.compile(r'href="([^"]*)"')
- h = pat.search(str(item))
- if h:
- #print h.group(0)
- href = h.group(1)
- print href
- temp_data.append(h.group(1))
- #print h.group()
- #temp_data.append(item)
- #print item.find(re.compile("^a"))
- href = item.find(re.compile("^a"))
- #soup = BeautifulSoup(item)
- if item.a:
- #print item.a.string
- temp_data.append(item.a.string)
- #print href
- #pat = re.compile(r'title="([^"]*)"')
- #h = pat.search(str(href))
- #if h:
- #print h.group(1)
- #temp_data.append(h.group(1))
- _DATA.append(temp_data)
-
-
- def main() :
- global SHARE_Q
- threads = []
- start = time.clock()
- douban_url = "http://movie.misszm.com/page/{page}"
- #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
- for index in xrange(5) :
- SHARE_Q.put(douban_url.format(page = index * 1))
- for i in xrange(_WORKER_THREAD_NUM) :
- thread = MyThread(worker)
- thread.start() #线程开始处理任务
- threads.append(thread)
- for thread in threads :
- thread.join()
- SHARE_Q.join()
- with open("movie.txt", "w+") as my_file :
- for page in _DATA :
- for movie_name in page:
- my_file.write(movie_name + "\n")
- print "Spider Successful!!!"
- end = time.clock()
- print u'抓取完成!'
- print u'一共用时:',end-start,u'秒'
-
- if __name__ == '__main__':
- main()
- #coding=utf-8
- #author:yangyongzhen
- #QQ:534117529
- #'CardTest TcpServer - Simple Test Card Tool 1.00'
-
- import sys,threading,time;
- import serial;
- import binascii,encodings;
- import re;
- import os;
- from socket import *
- from struct import *;
- #from myutil import *;
- #name: myutil.py
-
- mylock = threading.RLock()
-
- Server_IP = ''
- Srever_Port = ''
-
- def print_hex1(s,prev='0x'):
- for c in s:
- print '%s%02x' %(prev,ord(c)),
- print
- def print_hex(s):
- for c in s:
- print '%02x' %(ord(c)),
- print
-
- def hexto_str(s):
- r =''
- for c in s:
- r += '%02x' %(ord(c))
- return r
- def strto_hex(s):
- r = s.decode('hex')
- return r
- #''代表服务器为localhost
-
- #在一个非保留端口号上进行监听
-
-
- class ComThread:
- def __init__(self, Port=0):
- self.l_serial = None;
- self.alive = False;
- self.waitEnd = None;
- self.port = Port;
-
- #TCP部分
- #self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connection = None
- #数据
- self.snddata = ''
- self.rcvdata = ''
-
- def waiting(self):
- if not self.waitEnd is None:
- self.waitEnd.wait();
-
-
- def SetStopEvent(self):
- if not self.waitEnd is None:
- self.waitEnd.set();
- self.alive = False;
- self.stop();
-
-
- def start(self):
- self.l_serial = serial.Serial();
- self.l_serial.port = self.port;
- self.l_serial.baudrate = 115200;
- self.l_serial.timeout = 2; #秒
- self.l_serial.open();
-
- if self.l_serial.isOpen():
- self.waitEnd = threading.Event();
- self.alive = True;
- print 'open serial port %d ok!\n' %(self.port+1)
- print 'baudrate:115200 \n'
- self.thread_read = None;
- self.thread_read = threading.Thread(target=self.FirstReader);
- self.thread_read.setDaemon(1);
- self.thread_read.start();
-
- self.thread_write = None;
- self.thread_write = threading.Thread(target=self.FirstWriter);
- self.thread_write.setDaemon(1);
- self.thread_write.start();
-
- #TCP部分
- self.thread_TcpClient = None;
- self.thread_TcpClient = threading.Thread(target=self.TcpClient);
- self.thread_TcpClient.setDaemon(1);
- self.thread_TcpClient.start();
-
- self.thread_TcpSend = None;
- self.thread_TcpSend = threading.Thread(target=self.TcpSend);
- self.thread_TcpSend.setDaemon(1);
- self.thread_TcpSend.start();
-
- return True;
- else:
- return False;
-
-
- def FirstReader(self):
- while self.alive:
- # 接收间隔
- time.sleep(0.1);
- try:
- data = '';
- n = self.l_serial.inWaiting();
- if n:
- data = data+self.l_serial.read(n);
- #for l in xrange(len(data)):
- #print '%02X' % ord(data[l]),
- # 发送数据
- print u'->请求:'
- print data;
- mylock.acquire()
- self.snddata = data
- mylock.release()
- #print_hex(data);
-
-
- # 判断结束
-
- except Exception, ex:
- print str(ex);
-
- self.waitEnd.set();
- self.alive = False;
-
- def FirstWriter(self):
- while self.alive:
- # 接收间隔
- time.sleep(0.1);
- try:
- #snddata = raw_input('\nenter data send:\n')
- if self.rcvdata!='':
- self.l_serial.write(self.rcvdata);
- print u'-<应答:'
- print self.rcvdata;
- mylock.acquire()
- self.rcvdata = '';
- mylock.release()
- #print_hex(snddata);
-
- except Exception, ex:
- print str(ex);
- self.waitEnd.set();
- self.alive = False;
-
- def TcpClient(self):
- while True:
- # 接收间隔
- time.sleep(0.1);
- self.connection = socket(AF_INET, SOCK_STREAM);
- self.connection.connect((Server_IP, int(Server_Port)));
- print 'Connect to Server OK!';
- self.snddata = ''
- self.rcvdata = ''
- while True:
- #读取客户端套接字的下一行
- data = self.connection.recv(1024)
- #如果没有数量的话,那么跳出循环
- if not data: break
- #发送一个回复至客户端
- mylock.acquire()
- self.snddata = ''
- self.rcvdata = data
- mylock.release()
- #connection.send('Echo=>' + data)
- self.connection.close()
-
- self.waitEnd.set();
- self.alive = False;
-
- def TcpSend(self):
- while True:
- # 接收间隔
- time.sleep(0.1);
- while True:
- time.sleep(0.1);
- try:
- if not self.connection is None:
- if self.snddata != '':
- self.connection.send(self.snddata)
- mylock.acquire()
- self.rcvdata = ''
- self.snddata = ''
- mylock.release()
- except Exception, ex:
- pass
- def stop(self):
- self.alive = False;
- self.thread_read.join();
- if self.l_serial.isOpen():
- self.l_serial.close();
-
-
-
- #测试用部分
- if __name__ == '__main__':
- print 'Serial to Tcp Tool 1.00\n'
- print 'Author:yangyongzhen\n'
- print 'QQ:534117529\n'
- print 'Copyright (c) **cap 2015-2016.\n'
-
- Server_IP = raw_input('please enter ServerIP:')
- print 'Server_IP: %s' %(Server_IP)
- Server_Port = raw_input('please enter ServerPort:')
- print 'Server_Port: %s' %(Server_Port)
- com =raw_input('please enter com port(1-9):')
- rt = ComThread(int(com)-1);
- try:
- if rt.start():
- rt.waiting();
- rt.stop();
- else:
- pass;
- except Exception,se:
- print str(se);
-
- if rt.alive:
- rt.stop();
- os.system("pause")
-
- print '';
- print 'End OK .';
- del rt;
很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。
这个就是服务端工具的实现:
- #coding=utf-8
- #author:yangyongzhen
- #QQ:534117529
- #'CardTest TcpServer - Simple Test Card Tool 1.00'
-
- import sys,threading,time;
- import serial;
- import binascii,encodings;
- import re;
- import os;
- from socket import *
- from struct import *;
- #from myutil import *;
- #name: myutil.py
-
- mylock = threading.RLock()
-
- def print_hex1(s,prev='0x'):
- for c in s:
- print '%s%02x' %(prev,ord(c)),
- print
- def print_hex(s):
- for c in s:
- print '%02x' %(ord(c)),
- print
-
- def hexto_str(s):
- r =''
- for c in s:
- r += '%02x' %(ord(c))
- return r
- def strto_hex(s):
- r = s.decode('hex')
- return r
- #''代表服务器为localhost
-
- #在一个非保留端口号上进行监听
-
-
- class ComThread:
- def __init__(self, Port=0):
- self.l_serial = None;
- self.alive = False;
- self.waitEnd = None;
- self.port = Port;
-
- #TCP部分
- self.myHost = ''
- self.myPort = 5050
- self.sockobj = socket(AF_INET, SOCK_STREAM)
- self.connection = None
- #数据
- self.snddata = ''
- self.rcvdata = ''
-
- def waiting(self):
- if not self.waitEnd is None:
- self.waitEnd.wait();
-
-
- def SetStopEvent(self):
- if not self.waitEnd is None:
- self.waitEnd.set();
- self.alive = False;
- self.stop();
-
-
- def start(self):
- self.l_serial = serial.Serial();
- self.l_serial.port = self.port;
- self.l_serial.baudrate = 115200;
- self.l_serial.timeout = 2; #秒
- self.l_serial.open();
-
- if self.l_serial.isOpen():
- self.waitEnd = threading.Event();
- self.alive = True;
- print 'open serial port %d ok!\n' %(self.port+1)
- print 'baudrate:115200 \n'
- self.thread_read = None;
- self.thread_read = threading.Thread(target=self.FirstReader);
- self.thread_read.setDaemon(1);
- self.thread_read.start();
-
- self.thread_write = None;
- self.thread_write = threading.Thread(target=self.FirstWriter);
- self.thread_write.setDaemon(1);
- self.thread_write.start();
-
- #TCP部分
- self.thread_TcpServer = None;
- self.thread_TcpServer = threading.Thread(target=self.TcpServer);
- self.thread_TcpServer.setDaemon(1);
- self.thread_TcpServer.start();
-
- self.thread_TcpSend = None;
- self.thread_TcpSend = threading.Thread(target=self.TcpSend);
- self.thread_TcpSend.setDaemon(1);
- self.thread_TcpSend.start();
-
- return True;
- else:
- return False;
-
-
- def FirstReader(self):
- while self.alive:
- # 接收间隔
- time.sleep(0.1);
- try:
- data = '';
- n = self.l_serial.inWaiting();
- if n:
- data = data+self.l_serial.read(n);
- #for l in xrange(len(data)):
- #print '%02X' % ord(data[l]),
- # 发送数据
- print 'serial recv:'
- print data;
- mylock.acquire()
- self.snddata = data
- mylock.release()
- #print_hex(data);
-
-
- # 判断结束
-
- except Exception, ex:
- print str(ex);
-
- self.waitEnd.set();
- self.alive = False;
-
- def FirstWriter(self):
- while self.alive:
- # 接收间隔
- time.sleep(0.1);
- try:
- #snddata = raw_input('\nenter data send:\n')
- if self.rcvdata!='':
- self.l_serial.write(self.rcvdata);
- print 'serial send:'
- print self.rcvdata;
- mylock.acquire()
- self.rcvdata = '';
- mylock.release()
- #print_hex(snddata);
-
- except Exception, ex:
- print str(ex);
- self.waitEnd.set();
- self.alive = False;
-
- def TcpServer(self):
- self.sockobj.bind((self.myHost, self.myPort))
- self.sockobj.listen(10)
- print 'TcpServer listen at 5050 oK!\n'
- print 'Waiting for connect...\n'
- while True:
- # 接收间隔
- time.sleep(0.1);
- self.connection, address = self.sockobj.accept()
- print 'Server connected by', address
- self.snddata = ''
- self.rcvdata = ''
- try:
- while True:
- #读取客户端套接字的下一行
- data = self.connection.recv(1024)
- #如果没有数量的话,那么跳出循环
- if not data: break
- #发送一个回复至客户端
- mylock.acquire()
- self.snddata = ''
- self.rcvdata = data
- mylock.release()
- #connection.send('Echo=>' + data)
- self.connection.close()
- except Exception, ex:
- self.connection.close()
-
- self.waitEnd.set();
- self.alive = False;
-
- def TcpSend(self):
- while True:
- # 接收间隔
- time.sleep(0.1);
- while True:
- time.sleep(0.1);
- try:
- if not self.connection is None:
- if self.snddata != '':
- self.connection.send(self.snddata)
- mylock.acquire()
- self.rcvdata = ''
- self.snddata = ''
- mylock.release()
- except Exception, ex:
- pass
- def stop(self):
- self.alive = False;
- self.thread_read.join();
- if self.l_serial.isOpen():
- self.l_serial.close();
-
-
-
- #测试用部分
- if __name__ == '__main__':
- print 'CardTest TcpServer - Simple Test Card Tool 1.00\n'
- print 'Author:yangyongzhen\n'
- print 'QQ:534117529\n'
- print 'Copyright (c) **** 2015-2016.\n'
-
- com =raw_input('please enter com port(1-9):')
- rt = ComThread(int(com)-1);
- try:
- if rt.start():
- rt.waiting();
- rt.stop();
- else:
- pass;
- except Exception,se:
- print str(se);
-
- if rt.alive:
- rt.stop();
- os.system("pause")
-
- print '';
- print 'End OK .';
- del rt;
- # -*- coding: utf-8 -*-
-
- '''
- filename:rtcp.py
- @desc:
- 利用python的socket端口转发,用于远程维护
- 如果连接不到远程,会sleep 36s,最多尝试200(即两小时)
- @usage:
- ./rtcp.py stream1 stream2
- stream为:l:port或c:host:port
- l:port表示监听指定的本地端口
- c:host:port表示监听远程指定的端口
- @author: watercloud, zd, knownsec team
- @web: www.knownsec.com, blog.knownsec.com
- @date: 2009-7
- '''
-
- import socket
- import sys
- import threading
- import time
-
- streams = [None, None] # 存放需要进行数据转发的两个数据流(都是SocketObj对象)
- debug = 1 # 调试状态 0 or 1
-
- def print_hex(s):
- for c in s:
- print '%02x' %(ord(c)),
- print
- def _usage():
- print 'Usage: ./rtcp.py stream1 stream2\nstream : L:port or C:host:port'
-
- def _get_another_stream(num):
- '''
- 从streams获取另外一个流对象,如果当前为空,则等待
- '''
- if num == 0:
- num = 1
- elif num == 1:
- num = 0
- else:
- raise "ERROR"
-
- while True:
- if streams[num] == 'quit':
- print("can't connect to the target, quit now!")
- sys.exit(1)
-
- if streams[num] != None:
- return streams[num]
- else:
- time.sleep(1)
-
- def _xstream(num, s1, s2):
- '''
- 交换两个流的数据
- num为当前流编号,主要用于调试目的,区分两个回路状态用。
- '''
- try:
- while True:
- #注意,recv函数会阻塞,直到对端完全关闭(close后还需要一定时间才能关闭,最快关闭方法是shutdow)
- buff = s1.recv(1024)
- if debug > 0:
- print num,"recv"
- if len(buff) == 0: #对端关闭连接,读不到数据
- print num,"one closed"
- break
- s2.sendall(buff)
- if debug > 0:
- print num,"sendall"
- print_hex(buff)
- except :
- print num,"one connect closed."
-
- try:
- s1.shutdown(socket.SHUT_RDWR)
- s1.close()
- except:
- pass
-
- try:
- s2.shutdown(socket.SHUT_RDWR)
- s2.close()
- except:
- pass
-
- streams[0] = None
- streams[1] = None
- print num, "CLOSED"
-
- def _server(port, num):
- '''
- 处理服务情况,num为流编号(第0号还是第1号)
- '''
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.bind(('0.0.0.0', port))
- srv.listen(1)
- #print 'local listening at port %d' (%(port))
- while True:
- conn, addr = srv.accept()
- print "connected from:", addr
- streams[num] = conn # 放入本端流对象
- s2 = _get_another_stream(num) # 获取另一端流对象
- _xstream(num, conn, s2)
-
- def _connect(host, port, num):
- ''' 处理连接,num为流编号(第0号还是第1号)
- @note: 如果连接不到远程,会sleep 36s,最多尝试200(即两小时)
- '''
- not_connet_time = 0
- wait_time = 36
- try_cnt = 199
- while True:
- if not_connet_time > try_cnt:
- streams[num] = 'quit'
- print('not connected')
- return None
-
- conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- conn.connect((host, port))
- except Exception, e:
- print ('can not connect %s:%s!' % (host, port))
- not_connet_time += 1
- time.sleep(wait_time)
- continue
-
- print "connected to %s:%i" % (host, port)
- streams[num] = conn #放入本端流对象
- s2 = _get_another_stream(num) #获取另一端流对象
- _xstream(num, conn, s2)
-
-
- if __name__ == '__main__':
- print 'Tcp to Tcp Tool 1.00\n'
- print 'Author:yangyongzhen\n'
- print 'QQ:534117529\n'
- print 'Copyright (c) Newcapec 2015-2016.\n'
- Server_IP = raw_input('please enter Server IP:')
- print 'Server_IP: %s' %(Server_IP)
- Server_Port = raw_input('please enter Server Port:')
- print 'Server_Port: %s' %(Server_Port)
- com =raw_input('please enter Local Port:')
- tlist = [] # 线程列表,最终存放两个线程对象
- #targv = [sys.argv[1], sys.argv[2] ]
- t = threading.Thread(target=_server, args=(int(com), 0))
- tlist.append(t)
-
- t = threading.Thread(target=_connect, args=(Server_IP, int(Server_Port), 1))
- tlist.append(t)
-
- for t in tlist:
- t.start()
- for t in tlist:
- t.join()
- sys.exit(0)
- # -*- coding:utf8 -*-
- from ctypes import *
- from binascii import unhexlify as unhex
- import os
- dll = cdll.LoadLibrary('mydll.dll');
-
- print 'begin load mydll..'
- #key
- #str1='\x9B\xED\x98\x89\x15\x80\xC3\xB2'
- str1=unhex('0000556677222238')
- #data
- str2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427')
- #output
- str3='\x12\x34\x56\x78\x12\x34\x56\x78'
- pstr1=c_char_p()
- pstr2=c_char_p()
- pstr3=c_char_p()
- pstr1.value=str1
- pstr2.value=str2
- pstr3.value=str3
- dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3)
- print pstr1
- print pstr2
- print pstr3
- stro= pstr3.value
- print stro
- strtemp=''
- for c in stro:
- print "%02x" % (ord(c))
- strtemp+="{0:02x}".format(ord(c))
- print strtemp
- os.execlp("E:\\RSA.exe",'')
- s=raw_input('press any key to continue...')
- # -*- coding: utf-8 -*-
- import socket
- from myutil import *
- from binascii import unhexlify as unhex
- from ctypes import *
- dll = cdll.LoadLibrary('mydll.dll')
- print 'begin load mydll..'
- HOST, PORT = "192.168.51.28", 5800
- sd ="1234567812345678"
- # Create a socket (SOCK_STREAM means a TCP socket)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- # Connect to server and send data
- sock.connect((HOST, int(PORT))
- print "Sent1 OK:"
- print sd
- # Receive data from the server and shut down
- received = sock.recv(1024)
- print "Received:"
- print_hex(received)
- print 'received len is 0x%02x' %(len(received))
- print 'received data analysis...'
- re1=received[0:4]
- print_hex(re1)
- re1=received[4:6]
- print_hex(re1)
- re1=received[6:10]
- print_hex(re1)
- re1=received[10:16]
- print_hex(re1)
-
- #pack2 send
- sock.send(sd2.decode('hex'))
- print "Sent2 OK:"
- print sd2
- # Receive data from the server and shut down
- received1 = sock.recv(1024)
- print "Received1:"
- print_hex(received1)
- print 'received1 len is 0x%02x' %(len(received1))
-
- finally:
- sock.close()
-
- s=raw_input('press any key to continue...')
- # -*- coding: gb2312 -*-
- import socket
- from myutil import *
- from binascii import unhexlify as unhex
- from ctypes import *
- dll = cdll.LoadLibrary('mydll.dll')
- print 'begin load mydll..'
- #key
- key='\xF1\xE2\xD3\xC4\xF1\xE2\xD3\xC4'
- #output MAC
- mac='\x00'*8
- data='\x00'*8
- pkey=c_char_p()
- pdata=c_char_p()
- pmac=c_char_p()
- pkey.value=key
- pdata.value=data
- pmac.value=mac
- #pack1
- class pack:
- pass
- pk=pack()
- pk.len='00000032'
- pk.ID='0001'
- pk.slnum='00000004'
- pk.poscode='123456781234'
- pk.rand='1122334455667788'
- pk.psam='313233343536'
- pk.kind='0000'
- pk.ver='000001'
- pk.time='20140805135601'
- pk.mac='06cc571e6d96e12d'
-
- data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time)
- #print_hex(data)
- pdata.value=data
- #cacl MAC
- dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac)
- stro= pmac.value
- strtemp=''
- for c in stro:
- strtemp+="{0:02x}".format(ord(c))
- #print strtemp
- pk.mac=strtemp
- #data to send
- sd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.mac
- print 'send1 len is 0x%02x' %(len(sd)/2)
- print sd
- #pack2
- class pack2:
- pass
- pk2=pack2()
- pk2.len='0000006E'
- pk2.ID='0012'
- pk2.slnum='00000005'
- pk2.fatCode='00'
- pk2.cardASN='0000000000000000'
- pk2.cardType='00'
- pk2.userNO= '0000000000000000'
-
- pk2.fileName1='00000000000000000000000000000015'
- pk2.dataLen1='00'
- pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF'
- pk2.fileName2='00000000000000000000000000000016'
- pk2.dataLen2='00'
- pk2.dataArea2='000003E800FFFF16'
- pk2.mac='06cc571e6d96e12d'
-
- data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2)
-
- pdata.value=data2
- #cacl MAC
- dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac)
- stro= pmac.value
- strtemp=''
- for c in stro:
- strtemp+="{0:02x}".format(ord(c))
- #print strtemp
- pk2.mac=strtemp
- #data to send
- sd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.mac
-
- print 'send2 len is 0x%02x' %(len(sd2)/2)
- print sd2
-
-
- #PORT="192.168.60.37"
- #PORT="localhost"
- HOST, PORT = "192.168.51.28", 5800
- # Create a socket (SOCK_STREAM means a TCP socket)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- # Connect to server and send data
- sock.connect((HOST, int(PORT))
- #data= "123456789"
- #s = struct.pack('bbb',1,2,3)
- sock.send(sd.decode('hex'))
- print "Sent1 OK:"
- print sd
- # Receive data from the server and shut down
- received = sock.recv(1024)
- print "Received:"
- print_hex(received)
- print 'received len is 0x%02x' %(len(received))
- print 'received data analysis...'
- re1=received[0:4]
- print_hex(re1)
- re1=received[4:6]
- print_hex(re1)
- re1=received[6:10]
- print_hex(re1)
- re1=received[10:16]
- print_hex(re1)
-
- #pack2 send
- sock.send(sd2.decode('hex'))
- print "Sent2 OK:"
- print sd2
- # Receive data from the server and shut down
- received1 = sock.recv(1024)
- print "Received1:"
- print_hex(received1)
- print 'received1 len is 0x%02x' %(len(received1))
-
- finally:
- sock.close()
-
- s=raw_input('press any key to continue...')
- # -*- coding: utf-8 -*-
- from myutil import *
- from binascii import unhexlify as unhex
- import os
- path=os.getcwd()
-
- path+='\\rec04.bin'
- #print path
- print "begin ans......"
-
- f1=open(path,'rb')
- for i in range(1,35):
- s=f1.read(280)
- print "data:",i
- print_hex(s)
-
- print 'read data is:'
- print_hex(s)
-
- recstatadd = 187
- print "终端编号:"
- print_hex(s[recstatadd:recstatadd+10])
- print "卡号长度:"
- print_hex(s[10])
- print "卡号: "
- print_hex(s[11:11+10])
- print "持卡序号1+所属地城市代码2+交易地城市代码2"
- print_hex(s[recstatadd+22:recstatadd+22+5])
- print "应用交易计数器"
- print_hex(s[92:92+2])
- print "交易前余额4,交易金额3"
- print_hex(s[recstatadd+29:recstatadd+29+7])
- print "交易日期:"
- print_hex(s[99:99+3])
- print "交易时间:"
- print_hex(s[44:44+3])
- print "终端编号"
- print_hex(s[21:21+8])
- print "商户编号"
- print_hex(s[21+8:21+8+15])
- print "批次号"
- print_hex(s[5:5+3])
- print "应用密文"
- print_hex(s[47:47+8])
- print "授权金额"
- print_hex(s[103:103+6])
- print "其他金额"
- print_hex(s[115:115+6])
- print "终端验证结果"
- print_hex(s[94:5+94])
- print "应用交易计数器"
- print_hex(s[92:92+4])
- print "卡片验证结果"
- print_hex(s[56:56+32])
- print "卡片序列号:"
- print_hex(s[131])
- f1.close()
-
-
- # -*- coding:utf8 -*-
- # 2013.12.36 19:41
- # 抓取dbmei.com的图片。
-
- from bs4 import BeautifulSoup
- import os, sys, urllib2,time,random
-
- # 创建文件夹
- path = os.getcwd() # 获取此脚本所在目录
- new_path = os.path.join(path,u'暴走漫画')
- if not os.path.isdir(new_path):
- os.mkdir(new_path)
-
-
- def page_loop(page=1):
- url = 'http://baozoumanhua.com/all/hot/page/%s?sv=1389537379' % page
- content = urllib2.urlopen(url)
- soup = BeautifulSoup(content)
-
- my_girl = soup.find_all('div',class_='img-wrap')
- for girl in my_girl:
- jokes = girl.find('img')
- link = jokes.get('src')
- flink = link
- print flink
- content2 = urllib2.urlopen(flink).read()
-
- #with open(u'暴走漫画'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb') as code: #在OSC上现学的
- with open(u'暴走漫画'+'/'+flink[-11:],'wb') as code:
- code.write(content2)
-
- page = int(page) + 1
- print u'开始抓取下一页'
- print 'the %s page' % page
- page_loop(page)
-
- page_loop()
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # by yangyongzhen
- # 2016-12-06
-
- from bs4 import BeautifulSoup
- import urllib,urllib2,os,time
- import re
-
- rootpath = os.getcwd()+u'/抓取的模板/'
-
- def makedir(path):
- if not os.path.isdir(path):
- os.makedirs(path)
-
- #创建抓取的根目录
- makedir(rootpath)
- #显示下载进度
- def Schedule(a,b,c):
- '''''
- a:已经下载的数据块
- b:数据块的大小
- c:远程文件的大小
- '''
- per = 100.0 * a * b / c
- if per > 100 :
- per = 100
- print '%.2f%%' % per
-
- def grabHref(url,listhref,localfile):
- html = urllib2.urlopen(url).read()
- html = unicode(html,'gb2312','ignore').encode('utf-8','ignore')
-
- content = BeautifulSoup(html).findAll('link')
- myfile = open(localfile,'w')
- pat = re.compile(r'href="([^"]*)"')
- pat2 = re.compile(r'http')
- for item in content:
- h = pat.search(str(item))
- href = h.group(1)
- if pat2.search(href):
- ans = href
- else:
- ans = url+href
- listhref.append(ans)
- myfile.write(ans)
- myfile.write('\r\n')
- print ans
-
- content = BeautifulSoup(html).findAll('script')
- pat = re.compile(r'src="([^"]*)"')
- pat2 = re.compile(r'http')
- for item in content:
- h = pat.search(str(item))
- if h:
- href = h.group(1)
- if pat2.search(href):
- ans = href
- else:
- ans = url+href
- listhref.append(ans)
- myfile.write(ans)
- myfile.write('\r\n')
- print ans
-
- content = BeautifulSoup(html).findAll('a')
- pat = re.compile(r'href="([^"]*)"')
- pat2 = re.compile(r'http')
- for item in content:
- h = pat.search(str(item))
- if h:
- href = h.group(1)
- if pat2.search(href):
- ans = href
- else:
- ans = url+href
- listhref.append(ans)
- myfile.write(ans)
- myfile.write('\r\n')
- print ans
-
- myfile.close()
-
- def main():
- url = "http://192.168.72.140/qdkj/" #采集网页的地址
- listhref =[] #链接地址
- localfile = 'ahref.txt' #保存链接地址为本地文件,文件名
- grabHref(url,listhref,localfile)
- listhref = list(set(listhref)) #去除链接中的重复地址
-
- curpath = rootpath
- start = time.clock()
- for item in listhref:
- curpath = rootpath
- name = item.split('/')[-1]
- fdir = item.split('/')[3:-1]
- for i in fdir:
- curpath += i
- curpath += '/'
- print curpath
- makedir(curpath)
- local = curpath+name
- urllib.urlretrieve(item, local,Schedule) # 远程保存函数
-
- end = time.clock()
- print u'模板抓取完成!'
- print u'一共用时:',end-start,u'秒'
-
- if __name__=="__main__":
-
- main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。