概述
import collections
from random import choice
import array
import bisect,sys
import random
import re
from random import random
from operator import itemgetter#根据元组某个字段排序
from functools import reduce#以函数为参数对序列进行操作
from operator import mul#求积
from operator import add#加法运算
from dis import dis#反汇编函数
from unicodedata import name#获取字符名字
from types import MappingProxyType#只读封装类
from time import perf_counter as pc #导入高性能计时器
from inspect import signature#签名
Card = collections.namedtuple(‘Card’,[‘rank’,‘suit’])#构建只有少数属性没有方法的对象
class FrenchDeck:
ranks = [str(n) for n in range(2,11)]+list(‘JQKA’)
suits = ‘spades diamonds clubs hearts’.split()
def __init__(self):
self._cards = [Card(rank,suit) for suit in self.suits for rank in self.ranks]
def __len__(self):
return len(self._cards)
def __getitem__(self, position):
return self._cards[position]
#猴子补丁,使序列支持可变
def set_card(deck,position,card):
deck._cards[position] = card
FrenchDeck.setitem = set_card
deck = FrenchDeck()
suit_values = dict(spades=3, hearts=2, diamonds=1, clubs=0)#字典化
def spades_high(card):#提供对象比较标准
rank_value = FrenchDeck.ranks.index(card.rank)
return rank_value*len(suit_values)+suit_values[card.suit]
#for card in sorted(deck,key=spades_high):
print(card)
symboys =’%&$#%%^&*’
new = list(filter(lambda c:c>12,map(ord,symboys)))
#笛卡儿积
colors =[‘black’,‘white’]
sizes = [‘S’,‘M’,‘L’]
tshirts = [(color,size) for color in colors for size in sizes]#列表解析
yshirts = (ord(symbol) for symbol in symboys)#元组解析
sshirts = array.array(‘I’,(ord(symbol)for symbol in symboys))#数组解析
#for tshirt in (’%s %s’%(c,s)for c in colors for s in sizes):
print(tshirts)
a,b,*t = range(5) #*前缀获取不确定数量参数
#元组具名
City = collections.namedtuple(‘City’,‘name county population coordinates’)#创建具名元组
tokyo = City(name=‘Tokoy’,county=‘JP’,population=‘36.933’,coordinates=(35.689722,139.691667))
aad = tokyo._fields#包含类所有字段名称的元组
#创建空列表
board = [[’_’]*3 for i in range(3)]
#bisect模块搜索 bisect(haystack,needle)在干草堆里搜索针 haystack必须是升序序列
HAYSTACK = [1, 4, 5, 6, 8, 12, 15, 20, 21, 23, 23, 26, 29, 30]
NEEDLES = [0, 1, 2, 5, 8, 10, 22, 23, 29, 30, 31]
row_fmt = ‘{0:2d}@{1:2d} {2}{0:<2d}’
def demo(bisect_fn):
for needle in reversed(NEEDLES):
position = bisect_fn(HAYSTACK,needle)
offset = position*’ |’
#print(row_fmt.format(needle,position,offset))
if name’main’:
if sys.argv[-1]‘left’:
bisect_fn = bisect.bisect_left#新元素插入在原列表相等元素左边
else:
bisect_fn = bisect.bisect#右边
#print('DEMO:', bisect_fn.__name__)
#print('haystack ->', ' '.join('%2d' % n for n in HAYSTACK))
demo(bisect_fn)
#等级
def grade(score,breakpoints=[60,70,80,90],grades=‘FDCBA’):
i = bisect.bisect(breakpoints,score)
return grades[i]
sort = [grade(score) for score in [33,99,77,70,89,90,100]]
#print(sort)
#保持升序插入新元素
‘’‘SIZE =7
random.seed(1729)#使生产的随机数可预测
my_list =[]
for i in range(SIZE):
new_item = random.randrange(SIZE*2)
bisect.insort(my_list,new_item)
#print(’%2d ->’% new_item,my_list)’’’
#数组
floats = array.array(‘d’,(random() for i in range(10**7)))
#双向队列
dp = collections.deque(range(10),maxlen=10)
dp.rotate(3)#右边三个元素旋转到最左边
dp.appendleft(-1)#添加首元素 删除末元素
dp.extend([11,12,13])#添加末元素 删除首元素
#字典
DIAL_CODES = [(86, ‘China’),(91, ‘India’),(1, ‘United States’),(62, ‘Indonesia’), (55, ‘Brazil’),(92, ‘Pakistan’),(880, ‘Bangladesh’),(234, ‘Nigeria’),(7, ‘Russia’), (81,‘Japan’),]
county_code = {county:code for code,county in DIAL_CODES}#字典解析
word_re = re.compile(r’w+’)
index = {}
‘’‘with open(sys.argv[1],encoding=‘utf-8’) as fp:
for line_no,line in enumerate(fp,1):
for match in word_re.finditer(line):
word = match.group()
column_no = match.start()+1
location = (line_no,column_no)
occurrences = index.get(word,[])
occurrences.append(location)
index[word] =occurrences
for word in sorted(index,key=str. upper):
print(word,index[word])’’’
#collections.Counter
ct = collections.Counter(‘afhnknfw’)#记录字母出现次数 返回counter对象字典
ct.update(‘dawffawa’)#添加
ct.most_common(2)#返回数量最相近的两个键值
class StrKeyDict(collections.UserDict):
def missing(self, key):
if isinstance(key,str):#判断是否为同一类型
raise KeyError(key)
return self[str(key)]
def __contains__(self,key):
return str(key) in self.data
def __setitem__(self, key, item):
self.data[str(key)]=item
#不可更改映射MappingProxyType
d = {1:‘A’}
d_proxy = MappingProxyType(d)
d_1 =hash(str(d))#返回字符串或者数值的散列值
#可以通过对d的修改来修改d_proxy 不可以修改d_proxy
#集
l = [‘spam’,‘apam’,‘spam’,‘eggs’]
p = [‘das’,‘spam’,‘adsqw’]
l_1 = set(l)
p_1 = set§
m = list(set(l))#集合去重 列表化
m_1 = p_1 | l_1#求合集
m_2 = p_1 & l_1#求交集
m_3 = l_1-p_1#求差集
m_4 =len(p_1&l_1)#求重
m_4 = len(p_1.intersection(l_1))#同操作
m_5 = l_1^p_1#求对称差集
#集判断
c = p_1 <= l_1#p_1是否为l_1子集
c_1 = p_1 < l_1# p_1是否为l_1真子集
c_2 = p_1.isdisjoint(l_1)#是否有共同元素
q = {1}#创建集合 字面量句法
q_1 = frozenset(range(10))#创建frozenset集合(不可变类型)
q_2 = {chr(i) for i in range(32,256) if ‘SIGN’ in name(chr(i),’ ')}#集构造
#集操作
p_1.add(3)#添加元素
p_1.clear()#清除所有元素
p_1.copy()#浅复制
p_1.discard(3)#若有传入元素便删除该元素
p_1.iter()#返回迭代器
p_1.len()#len(p_1)
p_1.pop()#从集中抛出元素并返回值
p_1.remove(3)#从集中移除元素
#编码与解码
s =‘cafe’
b = s.encode(‘utf8’)#使用UTF-8编码
b_1 = b.decode(‘utf8’)#使用UTF-8解码
#bytes类型(不可变二进制序列)和 bytearray类型(可变类型)
cafe = bytes(‘cafe’,encoding=‘utf_8’)#不可变类型
cafe_1 = bytearray(cafe)#可变类型
cafe_2 = bytes.fromhex(‘31 4B CE A9’)#解析16进制数字对,构建二进制序列
#函数
def factorial(n):#定义阶乘函数
return 1 if n<2 else n*factorial(n-1)
fact = factorial#把函数作参数
b_2 = map(factorial,range(11))#返回可迭代对象所有元素作为函数参数得到的值
b_3 = reduce(add,range(100))#以函数为参数对序列进行操作
class BingoCage:#可参数化
def init(self,items):
self._items = list(items)
random.shuffle(self._items)
def pick(self):
try:
return self._items.pop()
except IndexError:
raise LookupError('pick from empty BingoCage')
def __call__(self):
return self.pick()
def clip(text,max_len=80):
‘’‘在max_len前后的第一空格处截断文本’’’
end = None
if len(text)>max_len:
space_before = text.rfind(’ ‘,0,max_len)#从区间寻找给定字符,返回最后一次出现位置
if space_before>=0:
end = space_before
else:
space_after = text.rfind(’ ',max_len)
if space_after >=0:
end = space_after
if end is None:
end = len(text)
return text[:end].rstrip()
sig = signature(clip)#返回函数参数
#for name,param in sig.parameters.items():#获取参数属性,参数名称,参数默认值
#print(param.kind,’:’,name,’=’,param.default)
metro_data = [(‘Tokyo’, ‘JP’, 36.933, (35.689722, 139.691667)),
(‘Delhi NCR’, ‘IN’, 21.935, (28.613889, 77.208889)),
(‘Mexico City’, ‘MX’, 20.142, (19.433333, -99.133333)),
(‘New York-Newark’, ‘US’, 20.104, (40.808611, -74.020386)),
(‘Sao Paulo’, ‘BR’, 19.649, (-23.547778, -46.635833)), ]
#itemgetter
#for city in sorted(metro_data,key=itemgetter(1)):#按元组第一字段排序
print(city)
cc = itemgetter(1,0)
#for city in metro_data:#返回第一字段和第零字段的元组
#print(cc(city))
#attrgetter
LatLong = collections.namedtuple(‘LatLong’,‘lat long’)
Metropolis = collections.namedtuple(‘Metropolis’,‘name cc pop coord’)
metro__areas = [Metropolis(name_1,cc_1,pop,LatLong(lat,long))for name_1,cc_1,pop,(lat,long)in metro_data]
from operator import attrgetter#根据名称提取属性
name_lat = attrgetter(‘name’,‘coord.lat’)
#for city in sorted(metro__areas,key=attrgetter(‘coord.lat’)):#按纬度排序
#print(name_lat(city))
#methodcaller
from operator import methodcaller
s = ‘the time has come’
hiphe = methodcaller(‘replace’,’ ‘,’-’)#选取函数部分功能(冻结部分参数)
s_1 =hiphe(s)
#functools.partial
from functools import partial
tripe = partial(mul,3)#冻结部分参数,简短参数,(自行设置)
tripe(7)#3*7
#策略
from abc import ABC,abstractmethod
Customer = collections.namedtuple(‘Customer’,‘name fidelity’)
class LineItem:
def init(self,product,quantity,pride):
self.product = product
self.quantity = quantity
self.pride = pride
def total(self):
return self.pride*self.quantity
class Order:#上下文
def init(self,customer,cart,promotion = None):
self.customer = customer
self.cart = list(cart)
self.promotion = promotion
def total(self):
if not hasattr(self,'_total'):
self.__total = sum(item.total()for item in self.cart)
return self.__total
def due(self):
if self.promotion is None:
discount = 0
else:
discount = self.promotion.discount(self)
return self.total()-discount
def __repr__(self):
fmt = '<Order total: {:.2f} due: {:.2f}>'
return fmt.format(self.total(),self.due())
class Promotion(ABC):#策略:抽象基类
@abstractmethod
def discount(self,order):
‘’‘返回折扣金额’’’
class FidelityPromo(Promotion):#第一个策略
‘’‘为积分100以上的顾客折扣5/100’’’
def discount(self,order):
return order.total()*.05 if order.customer.fidelity>=1000 else 0
class BulkItemPromo(Promotion):#第二策略
‘’‘单个商品为20个或者以上时提供10/100折扣’’’
def discount(self,order):
discount = 0
for item in order.cart:
if item.quantity >=20:
discount += item.total()*.1
return discount
class LargeOrderPromo(Promotion):#第三策略
‘’‘订单中的不同商品达到10个或者以上时提供7/100折扣’’’
def discount(self,order):
distinct_items = {item.product for item in order.cart}
if len(distinct_items)>=10:
return order.total()*0.07
return 0
joe = Customer(‘John Doe’,0)#创建消费者对象 名字和积分
ann = Customer(‘Ann Smith’,1100)
cart = [LineItem(‘banana’,4,.5),
LineItem(‘apple’,10,1.5),
LineItem(‘watermellon’,5,5.0)]#创建购买列表
Order(joe,cart,FidelityPromo())#创建消费策略
Order(ann,cart,FidelityPromo())
#使用Order类和函数简化上面策略
Customer_new = collections.namedtuple(‘Customer_new’,‘name fidelity’)
class LineItem_new:
def init(self,product,quantity,price):
self.product = product
self.quantity = quantity
self.price = price
def total(self):
return self.price*self.quantity
class Order:#上下文
def init(self,customer,cart,promotion):
self.customer = customer
self.cart = cart
self.promotion = promotion
def total(self):
if not hasattr(self,'__total'):
self.__total = sum(item.total()for item in self.cart)
return self.__total
def due(self):
if self.promotion is None:
discount = 0
else:
discount = self.promotion(self)#调用函数
return self.total()-discount
def __repr__(self):
fmt ='<Order total: {:.2f} due: {:.2f}>'
return fmt.format(self.total(),self.due())
def fidelity_promo(order):
return order.total()*.05 if order.customer.fidelity>=1000 else 0
def bulk_item_promo(order):
discount = 0
for item in order.cart:
if item.quantity >= 20:
discount += item.total() * .1
return discount
def large_order_promo(order):
distinct_items = {item.product for item in order.cart}
if len(distinct_items) >= 10:
return order.total() * .07
return 0
joe = Customer_new(‘John Doe’,0)
ann = Customer_new(‘Ann Smith’,1100)
cart = [LineItem(‘banana’,4,.5),
LineItem(‘apple’,10,1.5),
LineItem(‘watermellon’,5,5.0)]#创建购买列表
Order(joe,cart,fidelity_promo)
#最佳策略,globals()返回字典表示当前全局符号表
promos = [globals()[name]for name in globals()
if name.endswith(’_promo’)
and name!=‘best_promo’]#构建策略列表
def best_promo(order):
return max(promo(order)for promo in promos)
#命令模式
class MacroCommand:
‘’‘一个执行一组命令的命令’’’
def init(self,commands):
self.commands = list(commands)
def __call__(self):
for command in self.commands:
command()
#装饰器
#特性一:把被装饰的函数替换成其他函数
def deco(func):
def inner():
print(‘1’)
return inner
@deco
def target():
print(‘2’)#deco(target) 返回inner
#特性二 装饰器在被装饰的函数定义后立即运行 而被装饰的函数只有在明确调用时才执行
registry = []
def register(func):
print(‘running register(%s)’%func)
registry.append(func)
return func
@register
def f1():
print(‘1’)
@register
def f2():
print(‘2’)
def f3():
print(‘3’)
def main():
print(‘main’)
print(‘registry->’,registry)
f1()
f2()
f3()
if name==‘main’:
main()
#策略改进 用promotion装饰器填充promos列表的值
def promotion(promo_func):
promos.append(promo_func)
return promo_func
@promotion#定义后直接执行装饰 把函数放入列表
def fidelity_promo(order):
return order.total()*.05 if order.customer.fidelity>=1000 else 0
@promotion
def bulk_item_promo(order):
discount = 0
for item in order.cart:
if item.quantity >= 20:
discount += item.total() * .1
return discount
@promotion
def large_order_promo(order):
distinct_items = {item.product for item in order.cart}
if len(distinct_items) >= 10:
return order.total() * .07
return 0
def best_promo(order):
return max(promo(order)for promo in promos)
#闭包
def make_averager():
series = []#自由变量 与下面函数形成闭包 绑定
def averager(new_value):
series.append(new_value)
total = sum(series)
return total/len(series)
return averager
#闭包更新 nonlocal关键字声明自由变量
def make_averager_new():
count = 0
total = 0
def averager(new_value):
nonlocal count,total
count += 1
total += new_value
return total/count
return averager
avg = make_averager()#avg指向averager
avg(10)
avg(11)
avg.code.co_varnames#闭包局部变量
avg.code.co_freevars#闭包自由变量
avg.closure#自由变量保存位置
avg.closure[0].cell_contents#真正值位置
#实现简单的装饰器
#典型行为:把被装饰的函数替换成新的函数 二者接受相同参数 新函数返回被装饰函数本该返回的值 同时进行额外操作
import time
def clock(func):
def clocked(*args):
t0 = time.perf_counter()
result = func(*args)
elapsed = time.perf_counter()-t0
name_2 = func.name
arg_str = ‘,’.join(repr(arg)for arg in args)
print(’[%0.8fs]%s(%s)->%r’%(elapsed,name_2,arg_str,result))
return result
return clocked
@clock
def snooze(seconds):#函数被替换成了colcked(snooze是对clocked的引用,调用snooze(seconds),实际调用clocked(seconds))
time.sleep(seconds)
@clock
def factorial(n):
return 1 if n<2 else n*factorial(n-1)
‘’‘if name==‘main’:
print(’*‘40,‘Calling snooze(.123)’)
snooze(.123)
print(’’*40,‘Calling factorial(6)’)
print(‘6!=’,factorial(6))’’’
#改进装饰器 使其能支持关键字参数
import functools
def clock_new(func):
@functools.wraps(func)#将func相关属性复制到clocked,防止重写修改名字和注释文档
def clocked(*args,**kwargs):
t0 = time.time()
result = func(*args,**kwargs)
elapsed = time.time()-t0
name_2 = func.name
arg_lst =[]
if args:
arg_lst.append(’,’.join(repr(arg)for arg in args))
if kwargs:
pairs = [’%s=%r’%(k,m)for k,m in sorted(kwargs.items())]
arg_lst.append(’,’.join(pairs))
arg_str =’,’.join(arg_lst)
print(’[%0.8fs]%s(%s)->%r’%(elapsed,name_2,arg_str,result))
return result
return clocked
#备忘装饰器 functools.lru_cache
@clock
def fibonacci(n):
if n<2:
return n
return fibonacci(n-2)+fibonacci(n-1)
‘’‘if name==‘main’:#调用25次
print(fibonacci(6))
‘’’
@functools.lru_cache()#优化递归算法 使每个n值只调用一次 接受参数maxsize= 指定多少储存结果typed =是否把不同类型参数分开
@clock#返回的函数应用到lru_cache()上
def fibonacci(n):
if n<2:
return n
return fibonacci(n-2)+fibonacci(n-1)
‘’‘if name==‘main’:#调用7次
print(fibonacci(6))
‘’’
#functools.singledispatch
from functools import singledispatch#泛化函数
from collections import abc
import numbers
import html
@singledispatch
def htmlize(obj):#标记处理对象的基函数
content = html.escape(repr(obj))
return ‘
{}’.format(content)
@htmlize.register(str)#装饰专门函数
def _(text):
content = html.escape(text).replace(’n’,’
n’)
return ‘
{0}>|
’.format(content)@htmlize.register(numbers.Integral)
def _(n):
return ‘
{0}(0x{0:x})’.format(n)
@htmlize.register(tuple)
@htmlize.register(abc.MutableSequence)
def _(seq):
inner = ‘n
- ’.join(htmlize(item) for item in seq)
return ‘- n
- ’ + inner + ‘
- n
-
#装饰器工厂
registry_1 = set()
def register(active = True):#装饰器工厂 返回装饰器
def decorate(func):#装饰器
print(‘running register(active=%s)->decorate(%s)’% (active, func))
if active:
registry_1.add(func)
else:
registry_1.discard(func)
return func
return decorate@register(active=False)#调用装饰器decorate
def f1():
print(‘running f1()’)@register()#调用装饰器decorate
def f2():
print(‘running f2()’)def f3():
print(‘running f3()’)#参数化clock装饰器
DEFAULT_FMT = ‘[{elapsed:0.8f}s] {name_2}({args}) -> {result}’
def clock(fmt = DEFAULT_FMT):#装饰工厂
def decorate(func):#真正装饰器
def clocked(_args):#包装被修饰的函数
t0 = time.time()
_result = func(_args)
elapsed = time.time()-t0
name_2 = func.name
args = ', '.join(repr(arg)for arg in _args)
result = repr(_result)
print(fmt.format(**locals()))#返回局部变量
return _result
return clocked
return decorate‘’‘if name==‘main’:
@clock(’{name_2}:{elapsed}s’)#自定义格式
def snooze(seconds):
time.sleep(seconds)
for i in range(3):
snooze(.123)
‘’’
#元组的相对不变性
#元组值可以随着引用的可变对象的变化变化,但元素的标识(地址)不变
t1 = (1,2,[30,40])
t2 = (1,2,[30,40])
t1[-1].append(50)
#浅复制
t3 = [1,2,3]
t4 = t3[:]#浅复制问题
l1 = [1,[1,2,3],(1,2)]
l2 = list(l1)#l1 l2 指向不同对象
l1[1].append(24)#l2列表发生改变 l1 l2引用同一个列表元组
l1[1]+=[4,5]#就地修改列表 l1 l2发生改变
l1[2]+=(3,4)#创建新元组 l1 l2元组指向不同对象#了解深复制
class Bus:
‘’‘公交车模拟’’’
def init(self,passengers=None):
if passengers is None:
self.passengers = []
else:
#self.passengers = passengers#两个变量引用同一对象,不符合设计规范
self.passengers = list(passengers)
def pick(self,name):
self.passengers.append(name)def drop(self,name): self.passengers.remove(name)
#弱引用
#WeakValueDictionary 常用于缓存
class Cheese:
def init(self,kind):
self.kind = kinddef __repr__(self): return 'Cheese(%r)'%self.kind
import weakref
stock = weakref.WeakValueDictionary()
catalog = [Cheese(‘Red Leicester’),Cheese(‘Tilsit’),Cheese(‘Brie’),Cheese(‘Parmesan’)]
for cheese in catalog:
stock[cheese.kind]=cheese#向量类
from array import array
import math
class Vector2d:
#__slots__属性
slots = (’__x’,"__y")#将所有实例属性储存在元组里 节省内存
typecode = ‘d’
def init(self,x,y):
self.__x = float(x)#使用两个前导下划线,将属性标记为私有,只读
self.__y = float(y)@property#把读值方法标记为特性 def x(self):#读值方法与公开属性同名 return self.__x @property def y(self): return self.__y def __iter__(self):#迭代器 return (i for i in(self.x,self.y)) def __repr__(self): class_name = type(self).__name__ return '{}({!r},{!r})'.format(class_name,*self)#*self提供x,y def __str__(self): return str(tuple(self))#从实例中获取元组 def __bytes__(self): return (bytes([ord(self.typecode)])+bytes(array(self.typecode,self))) def __eq__(self,other): return tuple(self)==tuple(other) def __abs__(self):#求模 return math.hypot(self.x,self.y) def __bool__(self): return bool(abs(self)) def __hash__(self):#获取散列值 return hash(self.x)^hash(self.y) @classmethod#定义备选构造方法 def frombytes(cls,octets): typecode = chr(octets[0]) memv = memoryview(octets[1:]).cast(typecode) return cls(*memv) def __format__(self, fmt_spec=' '):#自定义格式支持 # components = (format(c,fmt_spec)for c in self) #return '({},{})'.format(*components) #更新 if fmt_spec.endswith('p'):#格式符‘p’以极坐标显示向量 即<r,0> fmt_spec = fmt_spec[:-1]#去除末尾p coords = (abs(self),self.angle()) outer_fmt = '<{},{}>' else: coords = self outer_fmt ='({},{})' components = (format(c, fmt_spec) for c in coords) return outer_fmt.format(*components) def angle(self):#求角 return math.atan2(self.y,self.x)
v1 = Vector2d(3,4)
v1.dict#储存属性值
v1._Vector2d__x =7#为私有变量赋值class Vector2d_1(Vector2d):
typecode = ‘f’#通过创建子类覆盖超类属性#Vector
import reprlib
import operator
import itertools
import numbers
import functools
class Vector:
typecode = ‘d’
def init(self,components):
self.components = array(self.typecode,components)#…受保护的实例属性def __iter__(self):#迭代器 return iter(self._components) def __add__(self, other):#支持向量加法 try: pairs = itertools.zip_longest(self,other,fillvalue=0.0)#生成(a,b)形式元组,a来自self,b来自other return Vector(a+b for a,b in pairs) except TypeError: return NotImplemented def __radd__(self, other):#支持反向相加 return self+other def __mul__(self, scalar):#支持向量乘法 if isinstance(scalar,numbers.Real):#检查类型 return Vector(n*scalar for n in self) else: return NotImplemented def __rmul__(self, scalar):#支持反向 return self*scalar def __matmul__(self, other):#支持矩阵乘法 try: return sum(a*b for a,b in zip(self,other)) except TypeError: return NotImplemented def __rmatmul__(self, other): return self@other def __repr__(self): components = reprlib.repr(self._components)#字符串显示处理 components = components[components.find('['):-1] return 'Vector({})'.format(components) def __str__(self): return str(tuple(self)) def __bytes__(self,other): return (bytes([ord(self.typecode)]+bytes(self._components)))
#支持散列
def eq(self, other):
#return tuple(self)==tuple(other)
#效率提升
‘’‘if len(self)!=len(other):
return False
for a,b in zip(self,other):#将可迭代对象元素组成一个个元组
if a!=b:
return False
return True’’’
#升级
if isinstance(other,Vector):
return len(self)len(other)and all(ab for a,b in zip(self,other))#all判断可迭代对象所有元素是否都是True
else:
return NotImplemented#尝试进行参数调换def __hash__(self): hashs =(hash(x)for x in self._components) return functools.reduce(operator.xor,hashs,0)#a^b运算 0为初始值
#模运算
def abs(self):
return math.sqrt(sum(x*x for x in self))def __bool__(self): return bool(abs(self)) @classmethod def frombytes(cls,octets): typecode = chr(octets[0]) memv = memoryview(octets[1:].cast(typecode)) return cls(memv) #支持序列协议 def __len__(self): return len(self._components) def __getitem__(self, index): #支持切片 cls = type(self) if isinstance(index,slice):#如果index参数值是slice对象 return cls(self._components[index])#使用切片构建新实例 elif isinstance(index,numbers.Integral):#如果是整数对象 return self._components[index]#返回相应元素 else: msg = '{cls.__name__} indices must be integers' raise TypeError(msg.format(cls=cls)) #处理未定义的虚拟属性 shortcut_names ='xyz' def __getattr__(self, name): cls =type(self) if len(name)==1: pos = cls.shortcut_names.find(name) if 0<= pos<len(self._components): return self._components[pos] msg = '{.__name__!r} object has no attribute{!r}' raise AttributeError(msg.format(cls,name)) # 处理虚拟属性赋值 def __setattr__(self, name, value): cls = type(self) if len(name)==1: if name in cls.shortcut_names: error = 'readonly attribute{attr_name!r}' elif name.islower(): error = "can't set attributer 'a' to 'z' in {cls_name!r}" else: error = ' ' if error: msg = error.format(cls_name=cls.__name__,attr_name=name) raise AttributeError(msg) super().__setattr__(name,value)#默认情况调用超类 提供标准行为 #求角 def angle(self,n): r = math.sqrt(sum(x*x for x in self[n:])) a = math.atan2(r,self[n-1]) if (n==len(self)-1)and (self[-1]<0): return math.pi*2-a else: return a def angles(self): return (self.angle(n)for n in range(1,len(self))) def __format__(self, fmt_spec=' '): if fmt_spec.endswith('h'):#超球坐标 fmt_spec = fmt_spec[:-1] coords = itertools.chain([abs(self)],self.angles()) outer_fmt = '<{}>' else: coords =self outer_fmt = '({})' components = (format(c,fmt_spec)for c in coords) return outer_fmt.format(','.join(components))
#使用抽象基类定义子类
Card_new = collections.namedtuple(‘Card_new’,[‘rank’,‘suit’])
class FrenchDeck2(collections.MutableSequence):
ranks = [str(n)for n in range(2,11)]+list(‘JQKA’)
suits = ‘spades diamonds clubs hearts’.split()def __init__(self): self._cards = [Card(rank,suit)for suit in self.suits for rank in self.ranks] def __len__(self): return len(self._cards) def __getitem__(self, position): return self._cards[position] #支持动态序列 def __setitem__(self, position, value): self._cards[position]=value def __delitem__(self, position):#继承Muta...抽象类必须实现的方法 del self._cards[position] def insert(self, position,value): self._cards.insert(position,value)
#抽象基类的实现
import abc
class Tombola(abc.ABC):#抽象基类继承abc.ABC或者其他抽象基类
@abc.abstractmethod#用于声明属性和描述符的抽象方法
def load(self,iterable):
‘’‘添加元素’’’@abc.abstractmethod def pick(self): '''随机删除元素并返回值''' def loaded(self): '''是否至少有一个元素''' return bool(self.inspect()) def inspect(self):#抽象基类中的具体方法只能依赖抽象基类定义的接口 # (即只能使用 抽象基类中的其他具体方法、抽象方法或特性)。 '''返回一个有序元组''' items = [] while True: try: items.append(self.pick()) except LookupError: break self.load(items) return tuple(sorted(items))
#建立子类
class BingoCage(Tombola):#子类扩展
def init(self,items):
self._randomizer = random.SystemRandom()#生成适用于加密的随机字节序列
self._items = []
self.load(items)def load(self,items):#实现抽象方法load self._items.extend(items)#列表扩展 self._randomizer.shuffle(self._items) def pick(self):#实现抽象方法pick try: return self._items.pop() except IndexError: raise LookupError('pick from empty BingoCage') def __call__(self):#增加额外方法call self.pick()
class AddableBingoCage(BingoCage):#支持+ +=
def add(self, other):
if isinstance(other,Tombola):
return AddableBingoCage(self.inspect()+other.inspect())
else:
return NotImplementeddef __iadd__(self, other): if isinstance(other,Tombola): other_iterable = other.inspect() else: try: other_iterable =iter(other) except TypeError: self_cls =type(self).__name__ msg = 'right operand in += must be {!r} or an iterable' raise TypeError(msg.format(self_cls)) self.load(other_iterable) return self
#子类
class LotteryBlower(Tombola):
def init(self,iterable):
self._balls = list(iterable)def load(self,iterable): self._balls.extend(iterable) def pick(self): try: position = random.randrange(len(self._balls)) except ValueError: raise LookupError('pick from empty LotteryBlower') return self._balls.pop(position) def loaded(self):#方法覆写 return bool(self._balls) def inspect(self):#方法覆写 return tuple(sorted(self._balls))
#虚拟子类 不要继承也能实现子类 使类型检查变得容易
#常用注册方法
#Sequence.register(str) 把str类型注册为Sequence的虚拟子类
@Tombola.register#注册虚拟子类
class TomboList(list):#是list的子类 Tombola的虚拟子类
‘’‘覆写虚拟超类所有方法’’’
def pick(self):
if self:
position = random.randrange(len(self))
return self.pop(position)
else:
raise LookupError(‘pop from empty Tombolist’)load = list.extend def loaded(self): return bool(self) def inspect(self): return tuple(sorted(self))
#迭代器
import re
import reprlib
Re_WORD = re.compile(’w+’)#构建可迭代对象
class Sentence:
def init(self,text):
self.text = text
self.words = Re_WORD.findall(text)#寻找相应的所有字符段def __getitem__(self, index):#支持索引 return self.words[index] def __len__(self): return len(self.words) def __iter__(self):#支持迭代 return SentenceIterator(self.words)#返回迭代器 def __repr__(self):#支持字符串简略表达 return 'Sentence(%s)'%reprlib.repr(self.text)
#迭代器
class SentenceIterator:
def init(self,words):
self.words = words
self.index = 0def __next__(self):#实现迭代 try: word = self.words[self.index] except IndexError: raise StopIteration() self.index+=1 return word def __iter__(self): return self
#用生成器函数简化Sentence,使其支持迭代
class Sentence_1:
def init(self,text):
self.text = text
self.words = Re_WORD.findall(text)def __repr__(self): return 'Sentence(%s)'%reprlib.repr(self.text) def __iter__(self): for word in self.words: #调用生成器产出元素 为空时自动退出 yield word return
#惰性实现
class Sentence_2:
def init(self,text):
self.text = textdef __repr__(self): return 'Sentence(%s)'%reprlib.repr(self.text) def __iter__(self): for match in Re_WORD.finditer(self.text):#惰性寻找 只有在需要时才寻找 yield match.group()#返回匹配的正则表达的具体文本 #使用生成器表达式优化 #return (match.group()for match in Re_WORD.finditer(self.text))
#系统自定义生成器函数
import itertools
gen = itertools.count(1,.5)#产生起始值为1,间隔为0.5的无穷等差数列
gen_1 = itertools.takewhile(lambda n:n<3,itertools.count(1,.5))#在指定条件false时停止
suits = ‘spades hearts diamonds clubs’.split()
card = list(itertools.product(‘jqka’,suits))#惰性计算笛卡尔积
#传统的用生成器函数产出另一个生成器的值
def chain(*iterable):
for it in iterable:
for i in it:
yield i
#新句法yield from
def chain_1(*iterable):
for i in iterable:
yield from i#将工作委托给第三方
#iter函数进行哨岗操作
def d6():
return random.randint(1,6)d6_iter = iter(d6,1)#摇到1停止
#上下文管理器的实现
class LookingGlass:
def enter(self):
import sys
self.original_write = sys.stdout.write#print方法的实现调用sys.stdout.write
sys.stdout.write = self.reverse_write#打入猴子补兵,修改方法
return ‘JABBERWOCKY’def reverse_write(self,text): self.original_write(text[::-1])#将打印的文本反向 def __exit__(self, exc_type, exc_value, traceback): import sys sys.stdout.write = self.original_write#还原方法 if exc_type is ZeroDivisionError: print('please DO NOT divide by zero!') return True
#使用生成器实现
import contextlib
@contextlib.contextmanager#使用装饰器生成上下文管理器
def looking_glass():
import sys
original_write = sys.stdout.writedef reverse_write(text): original_write(text[::-1]) sys.stdout.write = reverse_write msg = ' ' try: yield 'JABBERWOCKY'#产出值绑定到with as 后目标 函数暂停直到跳出with块 执行后面的代码 except ZeroDivisionError: msg = 'please DO NOT divide by zero!' finally: sys.stdout.write = original_write#无论是否出错 结束时都撤销猴子补丁 if msg: print(msg)
#用作协程的生成器的基本行为
def simple_coroutine():
print(‘start’)
x = yield #从调用方接收值,默认为none
print('received: ',x)my_coro = simple_coroutine()
next(my_coro)#调用迭代器使生成器启动并停在yield处
my_coro.send(4)#从调用方接收值并通过生成器产出,隐式调用next
import inspect
inspect.getgeneratorstate(my_coro)#查看生成器状态#yield关键字 =右边的代码在赋值前先执行
def simple_coro2(a):
print(‘start a=’,a)
b = yield a #产出a,暂停等待调用者为b赋值
print(‘received : b=’,b)
c = yield a+b#产出a+b,暂停等待调用者为c赋值
print(‘rececied :c=’,c)#使用协程计算移动平均值
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average#产出平均值,暂停等待赋值
total += term
count += 1
average = total/count#使用装饰器进行预激协程
from functools import wraps
def coroutine(func):
‘’‘装饰器,进行预激’’’
@wraps(func)
def primer(*args,**kwargs):#将被装饰的生成器函数替换
gen = func(*args,**kwargs)#调用被装饰的函数
next(gen)#预激生成器
return gen#返回生成器
return primer#使用装饰器优化计算移动平均值
@coroutine
def averager_new():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count#在协程中处理异常类型
class DemoException(Exception):
‘’‘异常类型’’’def demo_exc_handling():
print(’–>coroutine started’)
while True:
try:
x = yield
except DemoException:#争对处理异常
print(’*** DemoException handled.Continuing…’)
else:#没有异常执行
print(’->corotine received:{!r}’.format(x))
#raise RuntimeError(‘this line should never run’)#永远不会执行
finally:#协程退出时执行
print(’->coroutine ending’)exc = demo_exc_handling()
next(exc)#预激
exc.send(11)#正常运行
exc.throw(DemoException)#处理异常,协程继续#让协程返回值
#子生成器
from collections import namedtuple
Result = namedtuple(‘Result’,‘count average’)
def average_new():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:#为了返回值,协程判断终止
break
total += term
count +=1
average = total/count
return Result(count,average)#返回的结果成为yield from表达式的值#委派生成器(管道)
def grouper(results,key):
while True:#每次循环新建一个average_new实例
results[key]=yield from average_new()#发送的值委派给yield from处理 通过管道
#传递给avera实例 grouper暂停 等待客户端发送值 average实例运行完毕后 返回的值绑定在
# results[key]上#客户端(调用方)
def main(data):
results = {}
for key,values in data.items():
group = grouper(results,key)#传递存储和相关键
next(group)#预激,在调用子生成器后在yield from处停止
for value in values:
group.send(value)#把value传递给grouper,再由grouper传递给average的term = yield,grouper
#无法知道传入的值
group.send(None)#传入None,导致当前实例停止,grouper继续运行,创建新实例
report(results)#输出
def report(results):
for key,result in sorted(results.items()):
group,unit = key.split(’;’)
print(’{:2}{:5} averaging {:.2f}{}’.format(result.count,group,result.average,unit))data = {‘girls;kg’:
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
‘girls;m’:
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
‘boys;kg’:[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
‘boys;m’:[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}#离散事件仿真——用协程处理并发事件
Event = namedtuple(‘Event’,‘time proc action’)
def taxi_process(ident,trips,start_time=0):
‘’‘改变状态时创建事件,把控制权给仿真器’’’
time = yield Event(start_time,ident,‘leave garage’)#产出第一个状态,暂停等待更新
for i in range(trips):#基于行程数循环
time = yield Event(time,ident,‘pick up passenger’)#产出第二个状态等待更新
time = yield Event(time,ident,‘drop off passenger’)#产出第三个状态等待更新
yield Event(time,ident,‘going home’)taxi = taxi_process(1,2)
next(taxi)
taxi.send(10)
taxi.send(_.time+10)#更新实例属性
#taxis = {i:taxi_process(i,(i+1)2,iDEPARTURE_INTERVAL)for i in range(num_taxis)}
#sim = Simulator(taxis)
#离散事件仿真类
‘’'class Simulator:
def init(self,procs_map):
self.event = queue.PriorityQueue()
self.procs = dict(procs_map)def run(self,end_time): #排定并显示事件到时间结束 for _,proc in sorted(self.procs.items()): first_event = next(proc) self.events.put(first_event) #仿真主循环 sim_time = 0 while sim_time<end_time: if self.event.empty(): print('*** end of events ***') break current_event = self.events.get() sim_time,proc_id,previous_action = current_event print('taxi:',proc_id,proc_id*' ',current_event) active_proc = self.procs[proc_id] next_time = sim_time+compute_duration(previous_action) try: next_event = active_proc.send(next_time) except StopIteration: del self.procs[proc_id] else: self.events.put(next_event) else: msg = '*** end of simulation time: {} evtnts pending ***' print(msg.format(self.events.qsize()))
‘’’
#使用刊物(异步处理)处理并发
#依次下载模型
import os
import time
import sys
import requests
POP20_CC = (‘CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR’).split()
BASE_URL =‘http://flupy.org/data/flags’#获取国徽图像的网站
DEST_DIR =‘downloads/’#图像保存目录
def save_flag(img,filename):#把图像的字节序列保存
path = os.path.join(DEST_DIR,filename)
with open(path,‘wb’)as fp:
fp.write(img)def get_flag(cc):
‘’‘指定国家代码构建url,下载并返回响应的二进制内容’’’
url = ‘{}/{cc}/{cc}.gif’.format(BASE_URL,cc=cc.lower())
resp = requests.get(url)
return resp.contentdef show(text):
‘’‘显示字符串,刷新sys。stdout显示进度’’’
print(text,end=’ ')
sys.stdout.flush()def download_many(cc_list):
for cc in sorted(cc_list):#迭代国家代码,得到图像
image = get_flag(cc)
show(cc)
save_flag(image,cc.lower()+’.gif’)
return len(cc_list)def main(download_many):#计算耗时
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time()-t0
msg = ‘n{}falgs downloaded in {:.2f}s’
print(msg.format(count,elapsed))#if name == ‘main’:
main(download_many)
#使用concurrent.futures模块进行并列下载
from concurrent import futures
MAX_WORKERS = 20
def download_one(cc):
image = get_flag(cc)
show(cc)
save_flag(image,cc.lower()+’.gif’)
return ccdef download_new_many(cc_list):
workers = min(MAX_WORKERS,len(cc_list))#设置线程数量
with futures.ThreadPoolExecutor(workers) as executor:#使用工作的线程实例化ThreadPoolExecuter类
res = executor.map(download_one,sorted(cc_list))#多线程并发调用map,返回生成器
#迭代器的__next__方法调用各个期物的.result方法
return len(list(res))#返回获取结果数量#窥探,等效与上面函数
def download_common_many(cc_list):
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(max_workers = 3)as executor:#把msx_workers硬编码为三,可用最大线程为3
#等效futures.ProcessPoolExecutor()as executor:可用线程数为cpu数量
to_do =[]
for cc in sorted(cc_list):#按顺序迭代
future = executor.submit(download_one,cc)#排定可调用对象的执行时间,返回一个期物,表示待执行操作
to_do.append(future)#储存期物
msg = ‘Scheduled for {}:{}’
print(msg.format(cc,future))#打印国家代码和对应期物
results = []
for future in futures.as_completed(to_do):#在期物运行结束后产出期物
res = future.result()#获取期物结果
msg = ‘{}result:{!r}’
print(msg.format(future,res))
results.append(res)
return len(results)#if name == ‘main’:
main(download_many)
#ThreadPoolExecutor类的map方法
from time import sleep,strftime
from concurrent import futures
def display(*args):
print(strftime(’[%H:%M:%S]’),end=’ ')#打印时间戳
print(*args)def loiter(n):
msg = ‘{}loiter({}):doing nothing for {}s…’
display(msg.format(’t’*n,n,n))
sleep(n)#函数暂停时释放GIL,切换线程
msg = ‘{}loiter({}):done.’
display(msg.format(’t’n,n))
return n10def main_2():
display(‘Script starting.’)
executor = futures.ThreadPoolExecutor(max_workers=3)#创建实例,提供三个线程
results = executor.map(loiter,range(5))#分配三个线程执行,不会阻塞
display(‘results:’,results)#该函数返回一个生成器
display(‘Waiting for individual results:’)
for i,result in enumerate(results):#隐式调用生成器next方法,函数会阻塞,等待上一期物完成,再在期物上调用.result()方法
display(‘result{}:{}’.format(i,result))import time
from tqdm import tqdm#动画显示进度条
for i in tqdm(range(1000)):
time.sleep(.01)#处理异常
def get_flag(base_url,cc):
url = ‘{}/{cc}/{cc}.gif’.format(base_url,cc=cc.lower())
resp = requests.get(url)
if resp.status_code !=200:#当请求不成功时抛出异常
resp.raise_for_status()
return resp.contentHTTPStatus = namedtuple(‘Status’,‘ok not_found error’)
def download_one(cc,base_url,verbose=False):
try:
image = get_flag(base_url,cc)
except requests.exceptions.HTTPError as exc:#处理HTTP404异常
res = exc.response
if res.status_code ==404:
status = HTTPStatus.not_found
msg = ‘not found’
else:#重新抛出其他异常
raise
else:
save_flag(image,cc.lower()+’.gif’)
status = HTTPStatus.ok
msg =‘OK’
if verbose:
print(cc,msg)
return Result(status,cc)def download_many(cc_list,base_url,verbose,max_req):
counter =collections.Counter()#统计不同的下载状态
cc_iter = sorted(cc_list)
if not verbose:
cc_list = tqdm.tqdm(cc_iter)#返回迭代器,产出元素和进度条动画
for cc in cc_iter:
try:
res = download_one(cc,base_url,verbose)#执行下载
except requests.exceptions.HTTPError as exc:
error_msg = ‘HTTP error {res.status_code}-{res.reason}’
error_msg = error_msg.format(res=exc.response)
except requests.exceptions.ConnectionError as exc:
error_msg =‘Connection error’
else:
error_msg=’ ’
status =res.status
if error_msg:#如果有错误,改变状态
status =HTTPStatus.error
counter[status]+=1#记录错误
if verbose and error_msg:
print(’*** Error for {}:{}’.format(cc,error_msg))
return counter#使用asyncio包处理并发
import threading
import itertools
import time
import sys
class Signal:
‘’‘定义可变对象外部控制线程’’’
go =Truedef spin(msg,signal):
write,flush =sys.stdout.write,sys.stdout.flush
for char in itertools.cycle(’|/-’):#无限循环
status = char+’ ‘+msg
write(status)
flush()
write(’x08’*len(status))#使用退格符移回光标
time.sleep(1)
if not signal.go:#判断是否退出
break
write(’ ‘*len(status)+’x08’*len(status))def slow_function():
‘’‘模拟等待I/O’’’
time.sleep(3)#调用sleep阻塞主线程,释放GIL,创建从属线程
return 42def supervisor():
‘’‘设置从属线程,显示线程对象,计算耗时,最后杀死线程’’’
signal = Signal()
spinner = threading.Thread(target=spin,args=(‘thinking!’,signal))
print(‘spinner object:’,spinner)
spinner.start()#启动从属线程
result = slow_function()#阻塞主线程,从属线程以动画形式显式旋转指针
signal.go = False
spinner.join()#等待线程结束
return resultdef main():
result = supervisor()
print(‘Answer:’,result)if name==‘main’:
main()#通过协程形式完成以上行为
import asyncio
import itertools
import sys
@asyncio.coroutine#用装饰器装饰要交给asyncio处理的协程
def spin(msg):
write,flush=sys.stdout.write,sys.stdout.flush
for char in itertools.cycle(’|/-’):
status = char+’ ‘+msg
write(status)
flush()
write(’x08’*len(status))
try:
yield from asyncio.sleep(.1)#代替time.sleep(.1),这样休眠不阻塞事件循环
except asyncio.CancelledError:#发出取消请求,退出循环
break
write(’ ‘*len(status)+’x08’*len(status))@asyncio.coroutine
def slow_function():
‘’‘模拟等待,使用yield from继续执行事件循环’’’
yield from asyncio.sleep(3)#将控制权交给主循环,在休眠结束后恢复
return 42@asyncio.coroutine
def supervisor():#异步执行
spinner = asyncio.create_task(spin(‘thinking!’))#排定协程运行时间
print(‘spinner object:’,spinner)
result =yield from slow_function()#结束后获取返回值,同时继续运行循环
spinner.cancel()#杀死
return resultdef main():
loop =asyncio.get_event_loop()#获取事件循环的引用
result = loop.run_until_complete(supervisor())#驱动协程直到运行完毕
loop.close()
print(‘Answer:’,result)if name ==‘main’:
main()#使用asyncio和aiohttp实现异步下载
import asyncio
import aiohttpasync def get_flag(cc):
url = ‘{}/{cc}/{cc}.gif’.format(BASE_URL,cc=cc.lower())
resp = await aiohttp.request(‘GET’,url)#通过协程实现阻塞,客户代码通过yield from 把职责委托给协程
image = await resp.read()#异步执行读取响应内容
return imageasync def download_one(cc):
image = await get_flag(cc)
show(cc)
save_flag(image,cc.lower()+’.gif’)
return ccdef download_many(cc_list):
loop = asyncio.get_event_loop()#获取事件循环引用
to_do = [download_one(cc)for cc in sorted(cc_list)]
wait_coro = asyncio.wait(to_do)#等待传给的参数中所有协程的结束后完毕
res,_ = loop.run_until_complete(wait_coro)#返回两个元素 一个是已经完成期物,第二个是未完成期物
loop.close()
return len(res)if name==‘main’:
main(download_many)#使用asyncio包实现完整版
import asyncio
import collections
import aiohttp
from aiohttp import web
import tqdm
DEFAULT_CONCUR_REQ =5
MAX_CONCUR_REQ = 1000
class FetchError(Exception):
def init(self,country_code):
self.country_code = country_codeasync def get_flag(base_url,cc):
url = ‘{}/{cc}/{cc}.gif’.format(base_url,cc=cc.lower())
resp = await aiohttp.request(‘GET’,url)
if resp.status == 200:
image = await resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessinfError(code=resp.status,message=resp.reason,headers=resp.headers)async def download_one(cc,base_url,semaphore,verbose):
try:
with(await semaphore):
image = await get_flag(base_url,cc)
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = ‘not found’
except Exception as exc:
raise FetchError(cc) from exc
else:
#save_flag(image,cc.lower()+’.gif’)
loop = asyncio.get_event_loop()#获取事件循环引用
loop.run_in_executor(None,image,cc.lower()+’.gif’)#第一个参数为Executor实例,如果为none,就默认为ThreadPoolExecutor实例
status = HTTPStatus.ok
msg = ‘OK’
if verbose and msg:
print(cc,msg)
return Result(status,cc)async def downloader_coro(cc_list,base_url,verbose,concur_req):
counter = collections.Counter()
semaphore = asyncio.Semaphore(concur_req)#创建实例,最多允许激活concur_req个使用这个计数器的协程
to_do = [download_one(cc,base_url,semaphore,verbose)for cc in sorted(cc_list)]#创建协程对象列表
to_do_iter = asyncio.as_completed(to_do)#获取迭代器,在期物运行结束后返回期物
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter,total=len(cc_list))#显示进度
for future in to_do_iter:
try:
res = await future#获取asyncio.Future结果
except FetchError as exc:
county_code = exc.country_code#获取错误的国家代码
try:
error_msg = exc.cause.args[0]#尝试从原来错误中获取信息
except IndexError:
error_msg = exc.cause.class.name#检索不到错误信息,把异常连接的类名当成错误消息
if verbose and error_msg:
msg = ‘***Error for {}:{}’
print(msg.format(county_code,error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1
return counterdef download_many(cc_list,base_url,verbose,concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list,base_url,verbose,concur_req)
counts = loop.run_until_complete(coro)
loop.close()
return countsif name==‘main’:
main(download_many,DEFAULT_CONCUR_REQ,MAX_CONCUR_REQ)#使用异步一次发送多个请求
async def http_get(url):
res = await aiohttp.request(‘GET’,url)
if res.status == 200:
ctype = res.headers.get(‘Content-type’,’ ').lower()
if 'json’in ctype or url.endswith(‘json’):
data = await res.json()#如果包含json,调用json方法解析,返回字典
else:
data = await res.read()#没有则使用read读取原始文本
return data
elif res.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.errors.HttpaProcessingError(code=res.status,message=res.reason,herders=res.headers)async def get_country(base_url,cc):
url = ‘{}/{cc}/metadata.json’.format(base_url,cc=cc.lower())
metadata = await http_get(url)#得到一个由json构建的字典
return metadata[‘country’]async def get_flag(base_url,cc):
url = ‘{}/{cc}/{cc}.gif’.format(base_url,cc=cc.lower())
return (await http_get(url))#必须加上括号不然报错async def download_one(cc,base_url,semaphore,verbose):
try:
with(await semaphore):
image = await get_flag(base_url,cc)
with(await semaphore):
country = await get_country(base_url,cc)
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = ‘not found’
except Exception as exc:
raise FetchError(cc) from exc
else:
country = country.replace(’ ‘,’_’)
filename = ‘{}-{}.gif’.format(country,cc)
loop = asyncio.get_event_loop()
loop.run_in_executor(None,save_flag,image,filename)
status = HTTPStatus.ok
msg = ‘OK’
if verbose and msg:
print(cc,msg)
return Result(status,cc)#元编程
#下载osconfeed.json
from urllib.request import urlopen
import warnings
import os
import json
URL = ‘http://www.oreilly.com/pub/sc/osconfeed’
JSON =‘data/osconfeed.json’
def load():
if not os.path.exists(JSON):
msg = ‘downloading{}to{}’.format(URL,JSON)
warnings.warn(msg)#如果需要下载就发出提醒
with urlopen(URL)as remote,open(JSON,‘wb’,encoding=‘utf-8’)as local:#打开两个上下文管理器,分别读取和保存
local.write(remote.read())
with open(JSON)as fp:
return json.load(fp)#解析JSON,返回python数据类型#使用自定义类进心
from collections import abc
import keyword
class FrozenJSON:
‘’‘一个只读接口,使用属性表示访问JSON对象’’’
def init(self,mapping):#使用传入对象创建一个字典副本
self.__data = {}
for key,value in mapping.items():
if keyword.iskeyword(key):
key += ‘_’#防止属性名与关键字冲突
self.__data[key]=value
def getattr(self, name):#仅当没有指定名称时才调用
if hasattr(self.__data,name):
return getattr(self.__data,name)#如果neme是实例属性data的属性,返回那个属性
else:
return FrozenJSON.build(self.__data[name])#否则获取键值,返回调用结果
@classmethod
def build(cls,obj):
if isinstance(obj,abc.Mapping):#如果obj是一个映射,构建FrozenJSON对象
return cls(obj)
elif isinstance(obj,abc.MutableSequence):#若果是列表,构建列表
return [cls.build(item)for item in obj]
else:#如果既不是字典也不是列表,返回原元素
return obj#使用实例方法实现
class FrozenJSON_new:
‘’‘与上方法等效’’’
def new(cls,arg):
if isinstance(arg,abc.Mapping):
return super().new(cls)
elif isinstance(arg,abc.MutableSequence):
return [cls(item)for item in arg]
else:
return argdef __init__(self,mapping): self.__data = {} for key,value in mapping.items(): if keyword.iskeyword(key): key += '_' self.__data[key]=value def __getattr__(self,name): if hasattr(self.__data,name): return getattr(self.__data,name) else: return FrozenJSON(self.__data[name])
#访问保存在shelve.Shelf对象里的数据
import warnings
DB_NAME=‘data/schedule1_db’
CONFERENCE = ‘conference.115’class Record:
def init(self,**kwargs):
self.dict.update(kwargs)#构建实例,快速创建一堆属性技巧def load_db(db):
#raw_data = load()#如果本地没有副本,从网络下载
#raw_data = open(‘D:新建文件夹 (2)dataosconfeed.json’)
warnings.warn(‘loading’+DB_NAME)
with open(‘D:新建文件夹 (2)dataosconfeed.json’)as raw_data:
raw_data = eval(raw_data)
for collections,rec_list in raw_data[‘Schedule’].items():
record_type = collections[:-1]#去掉尾部字符
for record in rec_list:
key = ‘{}.{}’.format(record_type,record[‘serial’])#重新定义键
record[‘serial’] = key#把值设为键
db[key]=Record(**record)#储存在键下List item
最后
以上就是阔达曲奇为你收集整理的python学习print(card)print(tshirts)print(city)main(download_many)main(download_many)的全部内容,希望文章能够帮你解决python学习print(card)print(tshirts)print(city)main(download_many)main(download_many)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复