赞
踩
wget -c https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-latest-Linux-x86\_64.sh
bash Miniconda3-latest-Linux-x86\_64.sh
注: -c 代表自动断点续传
管理环境命令,建议直接写入~/.bashrc中,这样开启terminal后会自动进入py385环境
conda create -n py385 python=3.8 #创建环境
conda activate py385 #激活环境
conda remove -n py385 --all #删除环境
conda config --show #查看conda配置
建立conda环境,建议根据所安装python版本命名,比如所安装python为python3.8.5
注: conda会建立新目录~/.conda/envs/py385,并在其中建立一份独立的python可执行程序,与系统自带的/usr/bin/下那份无关
添加国内镜像
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/fastai/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/bioconda/
conda管理package
conda list
conda install xxx
conda install xxx=版本号
conda update xxx
conda remove xxx
pip管理package
pip list
pip install xxx
pip uninstall xxx
类型 | 举例 | 获取元素 |
---|---|---|
list | x=[1,2,3] | x[0] |
tuple | x=(1,2,3) | x[2] |
dict | x={‘a’:1,‘b’:2,‘c’:3} | x[‘a’]或x.get(‘a’,-1),后者不存在则返回-1 |
set | x=set(list(x)) | list(x),需要转换为list才可以访问其中元素 |
list获取偶数项元素: x[::2]
list获取奇数项元素: x[1::2]
list倒序: x[::-1]
如果列表元素是简单类型,去重可用set()
如果列表元素是复合类型,去重可用dict结构,因为key不会重复
xdict={}
for x in xlist:
xdict.setdefault(x[0],[]).append(x)
reduce(f, [x1, x2, x3, x4]) = f(f(f(x1,x2),x3),x4)
>>> sorted([36, 5, -12, 9, -21],key=abs)
[5, 9, -12, -21, 36]
>>> L = [('Bob', 75), ('Adam', 92), ('Bart', 66), ('Lisa', 88)]
def by_score(t):
return(t[1])
>>> L2 = sorted(L, key=by_score)
[('Bart', 66), ('Bob', 75), ('Lisa', 88), ('Adam', 92)]
def log(func):
def wrapper(*args, **kw):
print('call %s():' % func.__name__)
return func(*args, **kw)
return wrapper
@log
def now():
print('hello')
>>> now()
call now():
hello
class Student(object):
@property
def score(self):
return self._score
@score.setter
def score(self, value):
if not isinstance(value, int):
raise ValueError('score must be an integer!')
if value < 0 or value > 100:
raise ValueError('score must between 0 ~ 100!')
self._score = value
with open(filename,'r') as f:
while True:
line = f.readline()
if line == '':
break
print(line.strip()) #如果不用strip()函数,会输出额外的行
from io import StringIO, BytesIO f = StringIO() g = StringIO('Hello!\nHi!\nGoodbye!') while True: # 如同操作文件一样 s=g.readline() if s == '': break f.write(s.strip()) print(f.getvalue()) # getvalue()方法用于获得写入后的str h = BytesIO() h.write('中文'.encode('utf-8')) print(h.getvalue()) #输出为b'\xe4\xb8\xad\xe6\x96\x87' k = BytesIO(b'\xe4\xb8\xad\xe6\x96\x87') m = k.read() print(str(m, encoding="utf-8") bytes.decode(m) #两种方法都可得到'中文'
>>> import pickle
>>> d = dict(name='Bob', age=20, score=88)
>>> pickle.dumps(d)
b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x14X\x05\x00\x00\x00scoreq\x02KXX\x04\x00\x00\x00nameq\x03X\x03\x00\x00\x00Bobq\x04u.'
>>> f = open('dump.txt','wb') #将dumps内容写入文件,注意前面使用的是dumps(),这里使用的是dump()
>>> pickle.dump(d,f)
>>> f.close()
>>> g = open('dump.txt','rb') #将dumps内容读回来
>>> h = pickle.load(g)
>>> g.close()
>>> h
{'age': 20, 'score': 88, 'name': 'Bob'}
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) # 获取进程号用os命令getpid if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) # Process()创建进程,注意如何指定子进程要执行的函数和对应参数 print('Child process will start.') p.start() # 子进程开始执行 p.join() # 等待子进程结束,用于进程间同步,强制进程结束用p.terminate(),比如子进程执行死循环 print('Child process end.') # 进程池,用于启动大量子进程情况 from multiprocessing import Pool ... p = Pool(4) # 这里是声明进程池 for i in range(5): p.apply_async(run_proc, args=(i,)) # 这里指定子进程执行函数及参数,随后即进入调度队列 p.close() # 调用join前要关闭进程池 p.join()
# task_master.py,要先启动master再启动worker,不然worker的连接请求会被拒绝 import random, time, queue from multiprocessing.managers import BaseManager # 由master分配发送任务队列和接收结果队列: task_queue = queue.Queue() result_queue = queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: # 相当于给队列重新起了别名,这样后续对队列读写都必须通过QueueManager对象提供的接口实现 QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: # 对比前面的task_queue和result_queue,这里已经是通过QueueManager操作队列了 task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') # 队列没有结果会停在这里 for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')
# task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: # 如果是本机运行就设为127.0.0.1 server_addr = '172.31.226.31' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: # 如果master未启动,这里会抛出异常 m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')
from collections import namedtuple
Point = namedtuple('Point',['x','y'])
p=Point(x,y)
p.x,p.y
>>> from collections import Counter
>>> c = Counter()
>>> for ch in 'programming':
... c[ch] = c[ch] + 1
...
>>> c
Counter({'g': 2, 'm': 2, 'r': 2, 'a': 1, 'i': 1, 'o': 1, 'n': 1, 'p': 1})
>>> c.update('hello') # 也可以一次性update
>>> c
Counter({'r': 2, 'o': 2, 'g': 2, 'm': 2, 'l': 2, 'p': 1, 'a': 1, 'i': 1, 'n': 1, 'h': 1, 'e': 1})
class ClassificationTrainingApp: def __init__(self, sys_argv=None): if sys_argv is None: sys_argv = sys.argv[1:] parser = argparse.ArgumentParser() parser.add_argument('--batch-size', help='Batch size to use for training', default=24, type=int, ) parser.add_argument('--malignant', help="Train the model to classify nodules as benign or malignant.", action='store_true', #代表一旦有这个参数,就将其值设为True,等于不用另外加值了 default=False, ) parser.add_argument('comment', help="Comment suffix for Tensorboard run.", nargs='?', #为?代表如果命令行参数出现comment,则给comment赋以默认值dlwpt default='dlwpt', ) ... self.cli_args = parser.parse_args(sys_argv) #不加参数默认也是用sys.argv获取 def initTrain(self): batch_size = self.cli_args.batch_size #注意使用时要将-换成_ ...
>>> for key, group in itertools.groupby('AaaBBbcCAAa', lambda c: c.upper()):
... print(key, list(group)) # 用lambda函数表示挑选相同元素时忽略大小写
A ['A', 'a', 'a'] # 但返回结果还是原来的样子
B ['B', 'B', 'b']
C ['c', 'C']
A ['A', 'A', 'a']
分类 | HTTP Request |
---|---|
Request Line | Method sp URL sp Version cr cl |
Header Lines | Header Name : sp Value cr cl |
… | |
Header Name : sp Value cr cl | |
Blank Lines | cr cl |
Body Lines | Variable Numbers of Lines |
(Present only in some messages) |
注:cr cl代表/r/n, sp代表空格
METHOD: GET/HEAD/POST/PUT/TRACE/CONNECT/DELETE/OPTIONS
Header Name: User-agent/Accept/Host/Date/Upgrade/Cookie/If-Modified-Since/…
分类 | HTTP Response |
---|---|
Status Line | Version sp Status Code sp Phrase cr cl |
Header Lines | Header Name : sp Value cr cl |
… | |
Header Name : sp Value cr cl | |
Blank Lines | cr cl |
Body Lines | Variable Numbers of Lines |
(Present only in some messages) |
注:Status Code:100/101/200/400/404/…
Phrase: Continue/Switching/OK/Bad request/Not Found/…
Header Name: Server/Set-Cookie/Content-Length/Location/Upgrade/Last-Modified/…
import socket # 创建socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 建立连接,参数是个tuple s.connect(('www.sina.com.cn',80)) # 发送数据 s.send(b'GET / HTTP/1.1\r\nHost: www.sina.com.cn\r\nConnection: close\r\n\r\n') # 接收数据 buffer = [] while True: # 每次最多接收1k字节 d = s.recv(1024) if d: buffer.append(d) else: break data = b''.join(buffer) # 关闭连接 s.close() # 分离HTTP header、HTTP body header,html = data.split(b'\r\n\r\n',1) # 打印头部 print(header.decode('utf-8')) # 写入文件 with open('sina.html','wb') as f: f.write(html)
# server端程序 import socket import threading import time def tcplink(sock, addr): sock.send(b'Welcome!') while True: data=sock.recv(1024) time.sleep(1) if not data or data.decode('utf-8') == 'exit': break sock.send(('Hello, %s!' % data.decode('utf-8')).encode('utf-8')) sock.close() print('Connection from %s:%s closed.' % addr) # 创建socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 绑定端口 s.bind(('0.0.0.0',9999)) # 监听端口,参数指定等待连接的最大数量 s.listen(5) print("Waiting for connection...") # 永久循环接受客户端连接 # 每个连接都必须创建新线程(或进程处理),否则处理连接过程中无法接受其他客户端连接 while True: # 接受一个新连接: sock, addr = s.accept() # 创建新线程来处理TCP连接: t = threading.Thread(target=tcplink, args=(sock, addr)) t.start()
import socket import os s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 建立连接: s.connect(('172.31.226.31', 9999)) # 接收欢迎消息: print(s.recv(1024).decode('utf-8')) #注意这里是如何将bytes转换为str的 for data in ['Michael', 'Tracy', 'Sarah']: # 发送数据: data=data+'_'+str(os.getpid()) data=bytes(data,encoding='utf-8') #注意这里是如何将str转换为bytes的 s.send(data) print(s.recv(1024).decode('utf-8')) s.send(b'exit') #注意这里的字节对象定义方式 s.close()
import socket
# SOCK_DGRAM表明这是udp类型
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for data in [b'Michael', b'Tracy', b'Sarah']:
# 发送数据:
s.sendto(data, ('172.31.226.31', 9999))
# 接收数据:
print(s.recv(1024).decode('utf-8'))
s.close()
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定端口:
s.bind(('0.0.0.0', 9999))
print('Bind UDP on 9999...')
while True:
# 接收数据,recvfrom返回数据和客户端地址与端口,这样就可以直接通过sendto就可以发送数据出去
data, addr = s.recvfrom(1024)
# addr是tuple
print('Received from %s:%s.' % addr)
s.sendto(b'Hello, %s!' % data, addr) #两个参数,第一个是数据,第二个是客户端地址与端口
import sqlite3 # 开 conn = sqlite3.connect('test.db') #没有会新创建一个 cursor = conn.cursor() #创建一个Cursor # 增 cursor.execute('create table user (id varchar(20) primary key, name varchar(20))') #创建表 cursor.execute('insert into user (id, name) values (\'1\', \'Michael\')') #插入记录 cursor.execute('insert into user (id, name) values (\'2\', \'Jack\')') #插入记录 cursor.rowcount #获取行数,2 # 查 cursor.execute('select * from user where id=?', ('1',)) #?代表通过参数传入id值 values = cursor.fetchall() #获取查询结果集 print(values) # [('1', 'Michael')] # 改 cursor.execute('update user set name=\'Alice\' where id=\'1\') #修改记录 cursor.execute('select * from user') #获取所有行 values = cursor.fetchall() #获取查询结果集 print(values) # [('1', 'Alice'),('2','Jack')] # 删 cursor.execute('delete from user where id=\'1\'') #删除记录 cursor.rowcount #获取行数,1 # 关 cursor.close() conn.commit() conn.close()
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)
import asyncio
# 定义协程
async def task():
...
# await后面也必须是协程或异步操作
await
...
loop = asyncio.get_event_loop()
tasks = [task, task]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。