Pythons Note

本文记录 python 语法和 一些 module 的使用

语法

@property

Python内置的@property装饰器就是负责把一个方法变成属性调用的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class Student(object):

    @property
    def score(self):
        return self._score

    @score.setter
    def score(self, value):
        if not isinstance(value, int):
            raise ValueError('score must be an integer!')
        if value < 0 or value > 100:
            raise ValueError('score must between 0 ~ 100!')
        self._score = value
1
2
3
4
5
6
7
8
>>> s = Student()
>>> s.score = 60 # OK,实际转化为s.set_score(60)
>>> s.score # OK,实际转化为s.get_score()
60
>>> s.score = 9999
Traceback (most recent call last):
  ...
ValueError: score must between 0 ~ 100!

定义只读属性,只定义getter方法,不定义setter方法就是一个只读属性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class Student(object):

    @property
    def birth(self):
        return self._birth

    @birth.setter
    def birth(self, value):
        self._birth = value

    @property
    def age(self):
        return 2014 - self._birth

@staticmethod

staticmethod 装饰器同样是用于类中的方法,这表示这个方法将会是一个静态方法,意味着该方法可以直接被调用无需实例化,但同样意味着它没有 self 参数,也无法访问实例化后的对象。

1
2
3
4
5
6
class XiaoMing:
    @staticmethod
    def say_hello():
        print('同学你好')

XiaoMing.say_hello()

@classmethod

classmethod 依旧是用于类中的方法,这表示这个方法将会是一个类方法,意味着该方法可以直接被调用无需实例化,但同样意味着它没有 self 参数,也无法访问实例化后的对象。相对于 staticmethod 的区别在于它会接收一个指向类本身的 cls 参数。

1
2
3
4
5
6
7
8
9
class XiaoMing:
    name = '小明'

    @classmethod
    def say_hello(cls):
        print('同学你好, 我是' + cls.name)
        print(cls)

XiaoMing.say_hello()

属性和方法绑定

正常情况下,当我们定义了一个class,创建了一个class的实例后,我们可以给该实例绑定任何属性和方法,这就是动态语言的灵活性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
>>> class Student(object):
...     pass
...

>>> s = Student()
>>> s.name = 'Michael' # 动态给实例绑定一个属性
>>> print s.name
Michael

>>> def set_age(self, age): # 定义一个函数作为实例方法
...     self.age = age
...
>>> from types import MethodType
>>> s.set_age = MethodType(set_age, s, Student) # 给实例绑定一个方法
>>> s.set_age(25) # 调用实例方法
>>> s.age # 测试结果
25

需要注意给一个实例绑定的方法,对另一个实例是不起作用的。为了给所有实例都绑定方法,可以给class绑定方法:

1
2
3
4
>>> def set_score(self, score):
...     self.score = score
...
>>> Student.set_score = MethodType(set_score, None, Student)

动态绑定允许我们在程序运行的过程中动态给class加上功能

但是,如果我们想要限制class的属性怎么办?比如,只允许对Student实例添加name和age属性。

为了达到限制的目的,Python允许在定义class的时候,定义一个特殊的__slots__变量,来限制该class能添加的属性:

1
2
3
>>> class Student(object):
...     __slots__ = ('name', 'age') # 用tuple定义允许绑定的属性名称
...

使用__slots__要注意,__slots__定义的属性仅对当前类起作用,对继承的子类是不起作用的

Decorators

函数嵌套

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def hi(name="yasoob"):
    print("now you are inside the hi() function")

    def greet():
        return "now you are in the greet() function"

    def welcome():
        return "now you are in the welcome() function"

    print(greet())
    print(welcome())
    print("now you are back in the hi() function")

hi()
#output:now you are inside the hi() function
#       now you are in the greet() function
#       now you are in the welcome() function
#       now you are back in the hi() function

# 上面展示了无论何时你调用hi(), greet()和welcome()将会同时被调用。
# 然后greet()和welcome()函数在hi()函数之外是不能访问的,比如:

greet()
#outputs: NameError: name 'greet' is not defined

装饰器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def a_new_decorator(a_func):

    def wrapTheFunction():
        print("I am doing some boring work before executing a_func()")

        a_func()

        print("I am doing some boring work after executing a_func()")

    return wrapTheFunction

def a_function_requiring_decoration():
    print("I am the function which needs some decoration to remove my foul smell")

a_function_requiring_decoration()
#outputs: "I am the function which needs some decoration to remove my foul smell"

a_function_requiring_decoration = a_new_decorator(a_function_requiring_decoration)
#now a_function_requiring_decoration is wrapped by wrapTheFunction()

a_function_requiring_decoration()
#outputs:I am doing some boring work before executing a_func()
#        I am the function which needs some decoration to remove my foul smell
#        I am doing some boring work after executing a_func()

A simple way

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@a_new_decorator
def a_function_requiring_decoration():
    """Hey you! Decorate me!"""
    print("I am the function which needs some decoration to "
          "remove my foul smell")

a_function_requiring_decoration()
#outputs: I am doing some boring work before executing a_func()
#         I am the function which needs some decoration to remove my foul smell
#         I am doing some boring work after executing a_func()

#the @a_new_decorator is just a short way of saying:
a_function_requiring_decoration = a_new_decorator(a_function_requiring_decoration)

decorator with parameters:

1
2
3
@decorator(params)
def func_name():
    ''' Function implementation'''

The above code is equivalent to

1
2
3
4
5
def func_name():
    ''' Function implementation'''

func_name = (decorator(params))(func_name)
"""

example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from functools import wraps

def logit(func):
    @wraps(func)
    def with_logging(*args, **kwargs):
        print(func.__name__ + " was called")
        return func(*args, **kwargs)
    return with_logging

@logit
def addition_func(x):
   """Do some math."""
   return x + x


result = addition_func(4)
# Output: addition_func was called
1
2
3
4
5
6
7
def decorator(func):
    def wrapper(*args, **kwargs):
        # Do something before function called
        ans = func(*args, **kwargs)
        # Do something after function called
        return ans
    return wrapper
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def decorator_func(x, y):

    def Inner(func):

        def wrapper(*args, **kwargs):
            print("I like Geeksforgeeks")
            print("Summation of values - {}".format(x+y) )

            func(*args, **kwargs)

        return wrapper
    return Inner


# Not using decorator
def my_fun(*args):
    for ele in args:
        print(ele)

# another way of using decorators
decorator_func(12, 15)(my_fun)('Geeks', 'for', 'Geeks')

equal toggle

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def decorator_func(x, y):

    def Inner(func):

        def wrapper(*args, **kwargs):
            print("I like Geeksforgeeks")
            print("Summation of values - {}".format(x+y) )

            func(*args, **kwargs)

        return wrapper
    return Inner

@decorator_func(12, 15)
def my_fun(*args):
    for ele in args:
        print(ele)

my_fun('Geeks', 'for', 'Geeks')

解决原函数的元信息 (name) 替换问题

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from functools import wraps

def decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        """doc of wrapper"""
        print('123')
        return func(*args, **kwargs)

    return wrapper

@decorator
def say_hello():
    """doc of say hello"""
    print('同学你好')

print(say_hello.__name__)
print(say_hello.__doc__)

类装饰器

类能实现装饰器的功能, 是由于当我们调用一个对象时,实际上调用的是它的 call 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class Decorator:
    def __init__(self, func):
        self.func = func

    def __call__(self, *args, **kwargs):
        print('123')
        return self.func(*args, **kwargs)

@Decorator
def say_hello():
    print('同学你好')

say_hello()

用函数来能实现的功能为什么需要类来实现?

因为通过类我们可以将执行过程拆解到各函数中,降低代码的复杂度,甚至可以通过类属性实现一些复杂功能。

运算符重载

()

1
2
3
class A(Object):
    def __call__(self):
        ...

python 导入与路径管理

Python3,在 Python 中有内建函数(built-in)、第三方库(site-packages)以及自义库三种可以 import 的模块。在 import 模块时,Python 解释器的搜索顺序是先搜索 built-in 模块,然后搜索 sys.path 这个路径列表中的模块。

1
2
import sys
print(sys.builtin_module_names)

sys.path是一个路径列表,里面保存了解释器可以索引的所有路径。

  • 当前脚本路径
  • PYTHONPATH路径
  • 虚拟环境路径
  • site-packages路径

注意

__

  1. 在类内要小心使用 __ 两个下划线的命名方式,因为 python 解释器会对其进行修改,在前面加上类的名称。

sys.path

通过设置 sys.path 我们可以给 python 添加模块搜索路径

sys.path.append('/home/...')

sys.path 是一个列表

获取文件路径

import sys
print(sys.argv[0]) # 获取的是 python 主程序路径
print(__file__) # 获取的是当前文件的路径

path = os.path.abspath(sys.argv[0])
dir_path = os.path.split(path)[0]

循环引用问题

1
2
3
4
from typing import TYPE_CHECKING

if TYPE_CHECKING
    from * import *

该方法中并不能解决循环引用的问题,TYPE_CHECKING is a special constant that assumed to be True by static type checkers, False at runtime.

解决循环引用的方法,可能需要将 import module 语句放到函数内部去。

全局变量

  1. 不同文件直接 import 一个变量不能构共享
1
2
# a.py
var = 10
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# b.py
import numpy as np
import time
from a import var
import c


def func():
    var = 100

if __name__ == '__main__':
    func()
    c.cfunc()
1
2
3
4
5
# c.py
from a import var

def cfunc():
    print(var)
  1. 专门为全局变量定义一个全局变量管理模块
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# a.py
def init():
    global var
    var = 10

def get_var():
    global var
    return var

def set_var(n):
    global var
    var = n
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# b.py
import numpy as np
import time
from a import *
import c


def func():
    set_var(100)

if __name__ == '__main__':
    func()
    c.cfunc()

1
2
3
4
from a import *

def cfunc():
    print(get_var())
  1. 对于类变量是可以直接多文件 import 共享
1
2
3
4
5
# a.py
class A:
    def __init__(self):
        self.val = 10
var = A()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# b.py
import numpy as np
import time
from a import *
import c


def func():
    var.val = 100

if __name__ == '__main__':
    func()
    c.cfunc()
1
2
3
4
5
# c.py
from a import *

def cfunc():
    print(var.val)

设计模式

单例模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class Single(object):
    _instance = None
    def __new__(cls, *args, **kw):
        if cls._instance is None:
            cls._instance = object.__new__(cls, *args, **kw)
        return cls._instance
    def __init__(self):
        pass

single1 = Single()
single2 = Single()
print(id(single1) == id(single2))
1
2
3
4
5
6
class Singleton(object):
    def __new__(cls, *args, **kw):
        if not hasattr(cls, '_instance'):
            orig = super(Singleton, cls)
            cls._instance = orig.__new__(cls)
        return cls._instance

凡是继承该类的,都为单例类

dict

remove item

Remove all items: clear()

1
2
3
4
5
d = {'k1': 1, 'k2': 2, 'k3': 3}

d.clear()
print(d)
# {}

Remove an item by a key and return a value: pop()

1
2
3
4
5
6
7
8
d = {'k1': 1, 'k2': 2, 'k3': 3}

removed_value = d.pop('k1')
print(d)
# {'k2': 2, 'k3': 3}

print(removed_value)
# 1

Remove an item and return a key and value: popitem()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
d = {'k1': 1, 'k2': 2}

k, v = d.popitem()
print(k)
print(v)
print(d)
# k2
# 2
# {'k1': 1}

k, v = d.popitem()
print(k)
print(v)
print(d)
# k1
# 1
# {}

# k, v = d.popitem()
# KeyError: 'popitem(): dictionary is empty'

Remove an item by a key: del

1
2
3
4
5
d = {'k1': 1, 'k2': 2, 'k3': 3}

del d['k2']
print(d)
# {'k1': 1, 'k3': 3}

add item

Add and update an item to the dictionary by specifying the key

1
dict_object[key] = new_value

Use the setdefault() method. If the key specified as the first argument already exists, the existing item remains unchanged as the original, no matter what value is specified as the second argument.

1
2
3
4
5
6
7
8
9
d = {'k1': 1, 'k2': 2}

d.setdefault('k3', 3)
print(d)
# {'k1': 1, 'k2': 2, 'k3': 3}

d.setdefault('k1', 100)
print(d)
# {'k1': 1, 'k2': 2, 'k3': 3, 'k4': None}

静态变量

在类中定义在函数外面的变量是类变量,不属于类的实例。利用它可以实现静态变量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class Foo(object):
    __count = 0 # 私有变量,无法在外部访问,Foo.__count会出错

    @classmethod
    def get_count(cls):
        return cls.__count

    @classmethod
    def set_count(cls, num):
        cls.__count = num

f1 = Foo()
f2 = Foo()
Foo.set_count(1)
print(f1.get_count(), f2.get_count())

析构函数

1
2
3
class C():
  def __del__(self):
    pass

delete class member variable

1
2
3
4
5
6
class A():
    def __init__(self):
        self.data = 1

a = A()
del(a.data)

check attributes

1
2
3
4
5
6
class Foo(object):
    @classmethod
    def singleton(self):
        if not hasattr(self, 'instance'):
            self.instance = Foo()
        return self.instance

继承

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class Animal(object):
   def __init__(self, name, age):
       self.name = name
       self.age = age
   def call(self):
       print(self.name, '会叫')

class Cat(Animal):
   def __init__(self, name, age, sex):
       super(Cat, self).__init__(name, age)
       self.sex = sex

   def call(self):
       print(self.name, '会“喵喵”叫')

class Dog(Animal):
   def __init__(self, name, age, sex):
       super(Dog, self).__init__(name, age)
       self.sex = sex
   def call(self):
       print(self.name, '会“汪汪”叫')

函数参数:按值传递和按引用传递

根据具体情况,Python的函数参数既支持按值传递也支持按引用传递。

实际上,解释器会查看对象引用(内存地址)指示的那个值的类型,如果变量指示一个可变的值,就会按引用调用语义。如果所指示的数据的类型是不可变的,则会应用按值调用语义

  • 列表 字典 集合 class 总是会按引用传入函数
  • 字符串 整数 元组 总是会按值传入函数

但需要注意通过引用值传递的。对对象的修改可以在函数外部看到,但是将变量分配给新对象不会改变函数外部的任何内容。

1
2
3
4
5
6
7
def func(a):
    b = [1, 23, 4]
    a = b

a = [1, 1, 1]
func(a)
print(a) # output [1, 1, 1]

class copy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class C():
    data = 6
    def func(self):
        def work():
            print(self.data)
        work()

if __name__ == "__main__":

    c1 = C()
    c2 = c1
    c1.data = 9
    print(c2.data) # print 9

    c3 = copy.deepcopy(c1)
    c1.data = 5
    print(c3.data) # print 9

一般的 class 赋值都是浅拷贝, 神拷贝需要调用 copy.deepcopy

GC

Standard CPython's garbage collector has two components, the reference counting collector and the generational garbage collector, known as gc module.

The reference counting algorithm is incredibly efficient and straightforward, but it cannot detect reference cycles. That is why Python has a supplemental algorithm called generational cyclic GC. It deals with reference cycles only

generations

In order to limit the time each garbage collection takes, the GC uses a popular optimization: generations. The main idea behind this concept is the assumption that most objects have a very short lifespan and can thus be collected shortly after their creation. This has proven to be very close to the reality of many Python programs as many temporary objects are created and destroyed very fast. The older an object is the less likely it is that it will become unreachable.

To take advantage of this fact, all container objects are segregated into three spaces/generations. Every new object starts in the first generation (generation 0). The previous algorithm is executed only over the objects of a particular generation and if an object survives a collection of its generation it will be moved to the next one (generation 1), where it will be surveyed for collection less often. If the same object survives another GC round in this new generation (generation 1) it will be moved to the last generation (generation 2) where it will be surveyed the least often.

Generations are collected when the number of objects that they contain reaches some predefined threshold, which is unique for each generation and is lower the older the generations are. These thresholds can be examined using the gc.get_threshold function:

1
2
3
>>> import gc
>>> gc.get_threshold()
(700, 10, 10)

arbitrary number of arguments

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def cheeseshop(kind, *arguments, **keywords):
    print("-- Do you have any", kind, "?")
    print("-- I'm sorry, we're all out of", kind)
    for arg in arguments:
        print(arg)
    print("-" * 40)
    for kw in keywords:
        print(kw, ":", keywords[kw])

cheeseshop("Limburger", "It's very runny, sir.",
           "It's really very, VERY runny, sir.",
           shopkeeper="Michael Palin",
           client="John Cleese",
           sketch="Cheese Shop Sketch")

output:

-- Do you have any Limburger ?
-- I'm sorry, we're all out of Limburger
It's very runny, sir.
It's really very, VERY runny, sir.
----------------------------------------
shopkeeper : Michael Palin
client : John Cleese
sketch : Cheese Shop Sketch

resource

该模块提供了测量和控制程序使用的系统资源的基本机制。符号常量用于指定特定的系统资源,并请求有关当前进程或其子进程的使用信息。

  • setrlimit() 每个资源由一对限制控制:软限制和硬限制。软限制是电流限制,可以通过一个过程随着时间的推移而降低或提高。软极限不能超过硬极限。硬极限可以降低到任何大于软极限的值,但不能升高。(只有具有超级用户有效uid的进程才能提高硬限制。)
  • resource.RLIM_INFINITY 常量,用于表示无限资源的限制
  • resource.getrlimit(resource) 返回元组 (soft, hard) 目前软硬极限 资源 . 引发 ValueError 如果指定的资源无效,或 error 如果基础系统调用意外失败。
  • resource.setrlimit(resource, limits) 设置新的消耗限制 资源 . 这个 limit 参数必须是元组 (soft, hard) 描述新极限的两个整数。一个值 RLIM_INFINITY 可用于请求不受限制的限制。
  • resource.getrusage(who) 此函数返回一个对象,该对象描述当前进程或其子进程所消耗的资源,由 who 参数。这个 who 参数应使用 RUSAGE_* 常量如下所述。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from resource import *
import time

# a non CPU-bound task
time.sleep(3)
print(getrusage(RUSAGE_SELF))

# a CPU-bound task
for i in range(10 ** 8):
   _ = 1 + 1
print(getrusage(RUSAGE_SELF))
  • resource.getpagesize() 返回系统页中的字节数。

colorama

Python的Colorama模块,可以跨多终端,显示字体不同的颜色和背景

Fore是针对字体颜色,Back是针对字体背景颜色,Style是针对字体格式

  • Fore: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, RESET.
  • Back: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, RESET.
  • Style: DIM, NORMAL, BRIGHT, RESET_ALL

psutil

psutil是一个跨平台库能够轻松实现获取系统运行的进程和系统利用率(包括CPU、内存、磁盘、网络等)信息。它主要用来做系统监控,性能分析,进程管理。它实现了同等命令行工具提供的功能,如ps、top、lsof、netstat、ifconfig、who、df、kill、free、nice、ionice、iostat、iotop、uptime、pidof、tty、taskset、pmap等。

获取CPU信息

1
2
3
4
5
6
>>> import psutil
>>> psutil.cpu_count() # CPU逻辑数量
4
>>> psutil.cpu_count(logical=False) # CPU物理核心
2
# 2说明是双核超线程, 4则是4核非超线程

统计CPU的用户/系统/空闲时间:

1
2
>>> psutil.cpu_times()
scputimes(user=10963.31, nice=0.0, system=5138.67, idle=356102.45)

再实现类似top命令的CPU使用率,每秒刷新一次,累计10次:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
>>> for x in range(10):
...     print(psutil.cpu_percent(interval=1, percpu=True))
...
[14.0, 4.0, 4.0, 4.0]
[12.0, 3.0, 4.0, 3.0]
[8.0, 4.0, 3.0, 4.0]
[12.0, 3.0, 3.0, 3.0]
[18.8, 5.1, 5.9, 5.0]
[10.9, 5.0, 4.0, 3.0]
[12.0, 5.0, 4.0, 5.0]
[15.0, 5.0, 4.0, 4.0]
[19.0, 5.0, 5.0, 4.0]
[9.0, 3.0, 2.0, 3.0]

使用psutil获取物理内存和交换内存信息,分别使用:

1
2
3
4
>>> psutil.virtual_memory()
svmem(total=8589934592, available=2866520064, percent=66.6, used=7201386496, free=216178688, active=3342192640, inactive=2650341376, wired=1208852480)
>>> psutil.swap_memory()
sswap(total=1073741824, used=150732800, free=923009024, percent=14.0, sin=10705981440, sout=40353792)

可以通过psutil获取磁盘分区、磁盘使用率和磁盘IO信息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
>>> psutil.net_io_counters() # 获取网络读写字节/包的个数
snetio(bytes_sent=3885744870, bytes_recv=10357676702, packets_sent=10613069, packets_recv=10423357, errin=0, errout=0, dropin=0, dropout=0)
>>> psutil.net_if_addrs() # 获取网络接口信息
{
  'lo0': [snic(family=<AddressFamily.AF_INET: 2>, address='127.0.0.1', netmask='255.0.0.0'), ...],
  'en1': [snic(family=<AddressFamily.AF_INET: 2>, address='10.0.1.80', netmask='255.255.255.0'), ...],
  'en0': [...],
  'en2': [...],
  'bridge0': [...]
}
>>> psutil.net_if_stats() # 获取网络接口状态
{
  'lo0': snicstats(isup=True, duplex=<NicDuplex.NIC_DUPLEX_UNKNOWN: 0>, speed=0, mtu=16384),
  'en0': snicstats(isup=True, duplex=<NicDuplex.NIC_DUPLEX_UNKNOWN: 0>, speed=0, mtu=1500),
  'en1': snicstats(...),
  'en2': snicstats(...),
  'bridge0': snicstats(...)
}

要获取当前网络连接信息(获取网络连接信息需要root权限),使用net_connections():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
$ sudo python3
Password: ******
Python 3.8 ... on darwin
Type "help", ... for more information.
>>> import psutil
>>> psutil.net_connections()
[
    sconn(fd=83, family=<AddressFamily.AF_INET6: 30>, type=1, laddr=addr(ip='::127.0.0.1', port=62911), raddr=addr(ip='::127.0.0.1', port=3306), status='ESTABLISHED', pid=3725),
    sconn(fd=84, family=<AddressFamily.AF_INET6: 30>, type=1, laddr=addr(ip='::127.0.0.1', port=62905), raddr=addr(ip='::127.0.0.1', port=3306), status='ESTABLISHED', pid=3725),
    sconn(fd=93, family=<AddressFamily.AF_INET6: 30>, type=1, laddr=addr(ip='::', port=8080), raddr=(), status='LISTEN', pid=3725),
    sconn(fd=103, family=<AddressFamily.AF_INET6: 30>, type=1, laddr=addr(ip='::127.0.0.1', port=62918), raddr=addr(ip='::127.0.0.1', port=3306), status='ESTABLISHED', pid=3725),
    sconn(fd=105, family=<AddressFamily.AF_INET6: 30>, type=1, ..., pid=3725),
    sconn(fd=106, family=<AddressFamily.AF_INET6: 30>, type=1, ..., pid=3725),
    sconn(fd=107, family=<AddressFamily.AF_INET6: 30>, type=1, ..., pid=3725),
    ...
    sconn(fd=27, family=<AddressFamily.AF_INET: 2>, type=2, ..., pid=1)
]

通过psutil可以获取到所有进程的详细信息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
>>> psutil.pids() # 所有进程ID
[3865, 3864, 3863, 3856, 3855, 3853, 3776, ..., 45, 44, 1, 0]
>>> p = psutil.Process(3776) # 获取指定进程ID=3776,其实就是当前Python交互环境
>>> p.name() # 进程名称
'python3.6'
>>> p.exe() # 进程exe路径
'/Users/michael/anaconda3/bin/python3.6'
>>> p.cwd() # 进程工作目录
'/Users/michael'
>>> p.cmdline() # 进程启动的命令行
['python3']
>>> p.ppid() # 父进程ID
3765
>>> p.parent() # 父进程
<psutil.Process(pid=3765, name='bash') at 4503144040>
>>> p.children() # 子进程列表
[]
>>> p.status() # 进程状态
'running'
>>> p.username() # 进程用户名
'michael'
>>> p.create_time() # 进程创建时间
1511052731.120333
>>> p.terminal() # 进程终端
'/dev/ttys002'
>>> p.cpu_times() # 进程使用的CPU时间
pcputimes(user=0.081150144, system=0.053269812, children_user=0.0, children_system=0.0)
>>> p.memory_info() # 进程使用的内存
pmem(rss=8310784, vms=2481725440, pfaults=3207, pageins=18)
>>> p.open_files() # 进程打开的文件
[]
>>> p.connections() # 进程相关网络连接
[]
>>> p.num_threads() # 进程的线程数量
1
>>> p.threads() # 所有线程信息
[pthread(id=1, user_time=0.090318, system_time=0.062736)]
>>> p.environ() # 进程环境变量
{'SHELL': '/bin/bash', 'PATH': '/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:...', 'PWD': '/Users/michael', 'LANG': 'zh_CN.UTF-8', ...}
>>> p.terminate() # 结束进程
Terminated: 15 <-- 自己把自己结束了

collections

这个模块实现了特定目标的容器,以提供Python标准内建容器 dict , list , set , 和 tuple 的替代选择。

errno

该模块实现标准的 errno 系统符号,每一个对应于一个整数

1
2
3
4
5
6
7
8
import os
import errno

errno = errno.E2BIG
print(os.strerror(errno))

# 结果
Arg list too long

下面所列的 errno 可用于 errno.errorcode.keys():

errno.EPERM (值:(int)1)无权操作 
errno.ENOENT 没有该文件或目录(2)  
errno.ESRCH 无此进程(3)  
errno.EINTR 系统调用被打断(4)  
errno.EIO I/O 错误(5)  
errno.ENXIO 无此设备或地址(6)  
errno.E2BIG 参数列表过长(7)  
errno.ENOEXEC Exec 格式错误  
errno.EBADF 文件个数出错  
errno.ECHILD 没有子进程(10)  
errno.EAGAIN 重试(11)  
errno.ENOMEM 内存不足(Out of memory)(  
errno.EACCES 访问权限不足(13)    
errno.EFAULT 地址错误(14)  
errno.ENOTBLK 需要块设备  
errno.EBUSY 设备或资源正忙  
errno.EEXIST 文件已存在  
errno.EXDEV 跨设备符号(Cross-device link)  
errno.ENODEV 没有这个设备  
errno.ENOTDIR 不是一个目录  
errno.EISDIR 这是一个目录  
errno.EINVAL 参数无效  
errno.ENFILE 文件表溢出  
errno.EMFILE 打开文件过多  
errno.ENOTTY 与文件描述符关联的不是终端  
errno.ETXTBSY 文本文件正忙  
errno.EFBIG 文件过大 
errno.ENOSPC 设备空间不足  
errno.ESPIPE Illegal seek  
errno.EROFS 只读文件系统  
errno.EMLINK 链接过多  
errno.EPIPE 管道损坏  
errno.EDOM 数学参数超过函数的值域  
errno.ERANGE数学结果不可表示  
errno.EDEADLK 资源死锁  
errno.ENAMETOOLONG 文件名过长  
errno.ENOLCK 无可用记录锁  
errno.ENOSYS 函数未实现  
errno.ENOTEMPTY 目录非空  
errno.ELOOP 太多符号链接  
errno.EWOULDBLOCK 操作将会阻塞  
errno.ENOMSG 没有目标类型的消息  
errno.EIDRM 标识符被删除  
errno.ECHRNG Channel 数越界  
errno.EL2NSYNC Level 2 未同步  
errno.EL3HLT Level 3 停止  
errno.EL3RST Level 3 重置  
errno.ELNRNG Link 数越界  
errno.EUNATCH Protocol driver not attached  
errno.ENOCSI 没有CSI 结构可用  
errno.EL2HLT Level 2 中止  
errno.EBADE 无效的 exchange  
errno.EBADR 无效的请求标识符  
errno.EXFULL exchange 已满  
errno.ENOANO No anode  
errno.EBADRQC 无效请求码  
errno.EBADSLT 无效 slot  
errno.EDEADLOCK File locking deadlock error  
errno.EBFONT 字体文件格式不合法  
errno.ENOSTR 非流设备  
errno.ENODATA 无可用数据  
errno.ETIME 定时器过期    
errno.ENOSR 无可用流资源  
errno.ENONET 机器不在网络上  
errno.ENOPKG 包未安装  
errno.EREMOTE 远程对象  
errno.ENOLINK Link has been severed  
errno.EADV 广播错误
errno.ESRMNT Srmount error  
errno.ECOMM Communication error on send  
errno.EPROTO 协议错误  
errno.EMULTIHOP 尝试进行多跳(Multihop)  
errno.EDOTDOT RFS specific error
errno.EBADMSG 不是数据消息  
errno.EOVERFLOW 值过大  
errno.ENOTUNIQ 在网络上的名称不唯一  
errno.EBADFD 文件描述字异常  
errno.EREMCHG 远程地址已变  
errno.ELIBACC 无法访问一个需要的公共库  
errno.ELIBBAD 正在访问一个受损的公共库  
errno.ELIBSCN a.out 文件中的 .lib 端受损  
errno.ELIBMAX 尝试链接太多的公共库  
errno.ELIBEXEC 无法直接执行一个公共库    
errno.EILSEQ 非法字节序  
errno.ERESTART 被打断的系统调用应该重启  
errno.ESTRPIPE 流管道错误Streams pipe error  
errno.EUSERS 用户过多  
errno.ENOTSOCK 在非套接字上进行套接字操作  
errno.EDESTADDRREQ 需要目的地地址    
errno.EMSGSIZE 消息过长  
errno.EPROTOTYPE Protocol wrong type for socket  
errno.ENOPROTOOPT 协议不可用  
errno.EPROTONOSUPPORT 协议不支持  
errno.ESOCKTNOSUPPORT 套接字类型不支持  
errno.EOPNOTSUPP Operation not supported on transport endpoint  
errno.EPFNOSUPPORT 协议族不支持  
errno.EAFNOSUPPORT 协议不支持地址族  
errno.EADDRINUSE 地址正在使用中  
errno.EADDRNOTAVAIL 无法指定请求的地址  
errno.ENETDOWN 网络已宕  
errno.ENETUNREACH 网络不可达  
errno.ENETRESET Network dropped connection because of reset  
errno.ECONNABORTED 软件造成的连接中止  
errno.ECONNRESET 连接由 peer 重置  
errno.ENOBUFS无可用缓冲空间  
errno.EISCONN Transport endpoint is already connected  
errno.ENOTCONN Transport endpoint is not connected  
errno.ESHUTDOWN Cannot send after transport endpoint shutdown  
errno.ETOOMANYREFS 引用过多: cannot splice  
errno.ETIMEDOUT 连接超时  
errno.ECONNREFUSED 拒绝连接  
errno.EHOSTDOWN 主机宕机  
errno.EHOSTUNREACH 没有到达主机的路由  
errno.EALREADY 操作已经进行  
errno.EINPROGRESS 操作现在进行    
errno.ESTALE Stale NFS file handle  
errno.EUCLEAN 结构体需要清洗   
errno.ENOTNAM 不是一个 XENIX具名文件  
errno.ENAVAIL 没有 XENIX 信号量可用  
errno.EISNAM 是一个具名文件  
errno.EREMOTEIO 远程 I/O 错误
errno.EDQUOT 超过限额

多线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):
        print ("开始线程:" + self.name)
        while True:
            self.counter = 1
        print ("退出线程:" + self.name)

def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            threadName.exit()
        time.sleep(delay)
        print ("%s: %s" % (threadName, time.ctime(time.time())))
        counter -= 1

# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 开启新线程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("退出主线程")

condition variables

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import threading
import time
import random

class subclass:
  # Initialising the shared resources
  def __init__(self):
    self.x = []

  # Add an item for the producer
  def produce_item(self, x_item):
    print("Producer adding an item to the list")
    self.x.append(x_item)

  # Consume an item for the consumer
  def consume_item(self):
    print("Consuming from the list")
    consumed_item = self.x[0]
    print("Consumed item: ", consumed_item)
    self.x.remove(consumed_item)

def producer(subclass_obj, condition_obj):
    # Selecting a random number from the 1 to 3
    r = random.randint(1,3)
    print("Random number selected was:", r)

    # Creting r number of items by the producer
    for i in range(1, r):
      print("Producing an item, time it will take(seconds): " + str(i))
      time.sleep(i)

      print("Producer acquiring the lock")
      condition_obj.acquire()
      try:
        # Produce an item
        subclass_obj.produce_item(i)
        # Notify that an item  has been produced
        condition_obj.notify()
      finally:
        # Releasing the lock after producing
        condition_obj.release()

def consumer(subclass_obj, condition_obj):
    condition_obj.acquire()
    while True:
      try:
        # Consume the item
        subclass_obj.consume_item()
      except:
        print("No item to consume, list empty")
        print("Waiting for 10 seconds")
        # wait with a maximum timeout of 10 sec
        value = condition_obj.wait(10)
        if value:
          print("Item produced notified")
          continue
        else:
          print("Waiting timeout")
          break

    # Releasig the lock after consuming
    condition_obj.release()

if __name__=='__main__':

  # Initialising a condition class object
  condition_obj = threading.Condition()
  # subclass object
  subclass_obj = subclass()

  # Producer thread
  pro = threading.Thread(target=producer, args=(subclass_obj,condition_obj,))
  pro.start()

  # consumer thread
  con = threading.Thread(target=consumer, args=(subclass_obj,condition_obj,))
  con.start()

  pro.join()
  con.join()
  print("Producer Consumer code executed")

threading.Condition() 支持赋值,且赋值后 notify 依旧有效

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import random
import threading
import time
import copy

def consumer(cv):
    cv.acquire()
    cv.wait()
    cv.release()
    print("consumer completed.")

if __name__ == "__main__":
    cv = threading.Condition()
    con = threading.Thread(target=consumer, args=(cv,))
    con.start()
    time.sleep(1)
    copy_cv = threading.Condition()
    copy_cv = cv
    copy_cv.acquire()
    copy_cv.notify()
    copy_cv.release()
    con.join()

logging

logging 模块

1
2
# print to stdout
logging.basicConfig(level=logging.DEBUG)
1
2
3
4
5
6
import logging
logging.basicConfig(filename='example.log', encoding='utf-8', level=logging.DEBUG)
logging.debug('This message should go to the log file')
logging.info('So should this')
logging.warning('And this, too')
logging.error('And non-ASCII stuff, too, like Øresund and Malmö')

从多个模块记录日志

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# myapp.py
import logging
import mylib

def main():
    logging.basicConfig(filename='myapp.log', level=logging.INFO)
    logging.info('Started')
    mylib.do_something()
    logging.info('Finished')

if __name__ == '__main__':
    main()
1
2
3
4
5
# mylib.py
import logging

def do_something():
    logging.info('Doing something')

如果你运行 myapp.py ,你应该在 myapp.log 中看到:

INFO:root:Started
INFO:root:Doing something
INFO:root:Finished

日志级别大小关系为: CRITICAL > ERROR > WARNING > INFO > DEBUG > NOTSET

multiprocessing

multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix 和 Windows 上均可运行。

multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool ,

1
2
3
4
5
6
7
8
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

多进程示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

根据不同的平台, multiprocessing 支持三种启动进程的方法:

  • spawn : 父进程会启动一个全新的 python 解释器进程。 子进程将只继承那些运行进程对象的 run() 方法所必需的资源。 特别地,来自父进程的非必需文件描述符和句柄将不会被继承。 使用此方法启动进程相比使用 fork 或 forkserver 要慢上许多。可在 Unix 和 Windows 上使用。 Windows 上的默认设置。

+fork :父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。 只存在于Unix。Unix中的默认值。

  • forkserver : 程序启动并选择 forkserver 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。

可在Unix平台上使用,支持通过Unix管道传递文件描述符

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

在程序中 set_start_method() 不应该被多次调用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

可以使用 get_context() 来获取上下文对象。上下文对象与 multiprocessing 模块具有相同的API,并允许在同一程序中使用多种启动方法。

进程之间交换对象

multiprocessing 支持进程之间的两种通信通道;

队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

管道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向), 返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

进程间同步

multiprocessing 包含来自 threading 的所有同步原语的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

进程间共享状态

共享内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

可以使用 Value 或 Array 将数据存储在共享内存映射中。为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意 ctypes 对象

服务进程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

由 Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array . 使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

需要注意的是基于 manager 进行数据共享,其实是进行了数据的拷贝的。并且每次进行 manager dict 的操作都很慢,所以可以采用如下的方式进行优化

1
2
3
4
5
6
7
8
jobs = {}
job = Job()
jobs[job.name] = job
# insert other jobs in the normal dictionary

mgr = multiprocessing.Manager()
mgr_jobs = mgr.dict()
mgr_jobs.update(jobs)
1
2
3
4
5
>>> d1 = {1: 1, 2: 2}
>>> d2 = {2: 'ha!', 3: 3}
>>> d1.update(d2)
>>> d1
{1: 1, 2: 'ha!', 3: 3}

example code

主进程判断

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process, current_process, parent_process
import os

def info(title):
    cur_process = current_process()
    print(cur_process)
    parent = parent_process()
    print(parent)
    if (parent == None):
        print("this is main process")

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()

inspect 模块

inspect模块主要提供了四种用处:

  • 对是否是模块,框架,函数等进行类型检查。
  • 获取源码
  • 获取类或函数的参数的信息
  • 解析堆栈
1
2
3
4
5
6
7
8
inspect.signature(obj, *, follow_wrapped=True)
#    return signature类型的对象,值为函数的所有参数
inspect.signature(fun).parameters
#    return orderdict key就是参数名  str类型
inspect.getmembers(module)
#    return module的所有成员的name和obj类型
inspect.isclass(obj)
#    return boolean判断一个对象是否是类

unwrap

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from inspect import unwrap
from functools import wraps

def test_decorator(num=1):
    print("num =", num)
    def wrapper(func):
        print('test_decorator')
        return func
    return wrapper

@test_decorator(num=4)
def spam():
    print('spam1', '\n')



# spam()
unwrap_spam = unwrap(spam)
# unwrap_spam()

unwrap,用于将被装饰的函数,逐层进行解包装。接包装会执行至添加了 @wraps(fucn) 的装饰器,或则满足 inspect.unwrap(func, *, stop=None) 中的回调函数 stop 返回 True.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from inspect import unwrap
from functools import wraps

def test_decorator(num=1):
    print("num =", num)
    @wraps(num)
    def wrapper(func):
        print('test_decorator')
        return func
    return wrapper

@test_decorator(num=4)
def spam():
    print('spam1', '\n')
# spam()
unwrap_spam = unwrap(spam)

其中 @wraps() 内则是 unwrap 后返回的对象。

pytest 模块

只需要按照下面的规则:

  • 测试文件以test_开头(以_test结尾也可以)
  • 测试类以Test开头,并且不能带有 init 方法
  • 测试函数以test_开头
  • 断言使用基本的assert即可

fixture的scope参数 scope参数有四种,分别是'function','module','class','session',默认为function。

  • function:每个 test 都运行,默认是 function 的 scope
  • class:每个 class 的所有test只运行一次
  • module:每个module的所有 test 只运行一次
  • session:每个 session 只运行一次
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pytest

@pytest.fixture(scope='function')
def setup_function(request):
    def teardown_function():
        print("teardown_function called.")
    request.addfinalizer(teardown_function)  # 此内嵌函数做teardown工作
    print('setup_function called.')

@pytest.fixture(scope='module')
def setup_module(request):
    def teardown_module():
        print("teardown_module called.")
    request.addfinalizer(teardown_module)
    print('setup_module called.')

@pytest.mark.website
def test_1(setup_function):
    print('Test_1 called.')

def test_2(setup_module):
    print('Test_2 called.')

def test_3(setup_module):
    print('Test_3 called.')
    assert 2==1+1              # 通过assert断言确认测试结果是否符合预期

setup和teardown操作

  • setup,在测试函数或类之前执行,完成准备工作,例如数据库链接、测试数据、打开文件等
  • teardown,在测试函数或类之后执行,完成收尾工作,例如断开数据库链接、回收内存资源等

在 test 函数里面添加的 print 默认是不会打印在终端的,需要在终端打印输出使用 pytest -s .

使用 logging 模块默认 pytest 也不会有输出,需要增加一个配置文件 pytest.ini 放在项目目录下即可

[pytest]
log_cli = true
log_cli_level = DEBUG
log_file = test.log

argparse

1
2
3
4
5
6
7
import argparse
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--gpus', type=str, default = None)
parser.add_argument('--batch-size', type=int, default=32)
args = parser.parse_args()
print args.gpus
print args.batch_size

functools 模块

partial 函数: 和装饰器一样,它可以扩展函数的功能,但又不完成等价于装饰器。通常应用的场景是当我们要频繁调用某个函数时,其中某些参数是已知的固定值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from functools import partial

def add(*args):
    return sum(args)

add_100 = partial(add, 100)
print(add_100(1, 2, 3))  # 106

add_101 = partial(add, 101)
print(add_101(1, 2, 3))  # 107

decrease reference count of an object

1
2
3
4
5
6
7
import ctypes

_decref = ctypes.pythonapi.Py_DecRef
_decref.argtypes = [ctypes.py_object]
_decref.restype = None

_decref(a_python_object)

ifaddr

获取 ip 地址

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import re
import ifaddr

adapters = ifaddr.get_adapters()

for adapter in adapters:
    print("IPs of network adapter " + adapter.nice_name)
    for ip in adapter.ips:
        # print(ip.ip)
        is_ip = re.match('[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+', str(ip.ip))
        if is_ip:
            print(ip.ip)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import re
import ifaddr

host_ip = None

def get_host_ip():
    if host_ip:
        return host_ip

    adapters = ifaddr.get_adapters()
    for adapter in adapters:
        for ip in adapter.ips:
            ip = str(ip.ip)
            is_ip = re.match('[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+', ip)
            if is_ip and ip[0:3] != "127":
                print(ip)


get_host_ip()

sys

sys.getsizeof

该函数获得的并不是真实的内存占用

1
2
3
4
d1 = {"a": "a", "b": "b", "c": "c"}
d2 = {"a": "a"*100_000, "b": "b"*100_000, "c": "c"*100_000}
print(sys.getsizeof(d1))
print(sys.getsizeof(d2))

json

输出中文

1
2
3
4
5
import json
msg = {}
msg["天气"] = "晴天"
with open("test.json", "w") as f:
    json.dump(msg, f, ensure_ascii=False)

openpyxl 模块

Create A New Excel Workbook

1
2
3
from openpyxl import Workbook
wb = Workbook()
wb.save("demo.xlsx")

Write Data To The Excel File

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from openpyxl import load_workbook
wb = load_workbook("demo.xlsx")
# Sheet is the SheetName where the data has to be entered
sheet = wb["Sheet"]
# Enter into 1st row and Ath column
sheet['A1'] = 'Software Testing Help'
# Similarly you can enter in the below shown fashion
sheet.cell(row=2, column=1).value = 'OpenPyxl Tutorial'
sheet['B1'] = 10
sheet.cell(row=2, column=2).value =13.4
wb.save("demo.xlsx")

There are 2 ways to enter the data in the Excel file. These are as follows:

Directly use columnrow combination. Example [A1], where A is the column and 1 is the row. Use the row and column numbers. Example row=4, column=2

Add Sheets To The Excel File

1
2
3
sheetname ="Day2 Result "+now.strftime("%d.%m.%Y")
#Add new sheet using the index and title fields
wb.create_sheet(index = 1 , title = sheetname)

create_sheet(title=None, index=None) Create a worksheet (at an optional index).

Parameters:

  • title (str): optional title of the sheet.
  • index (int): optional position at which the sheet will be inserted.

Appending Multiple Values Into Sheet

append(iterable) Where iterable is a list of values or single value.

  • If it’s a list, all the values are added in order, starting from the first column.
  • If it’s one value, then the values are assigned to the columns indicated by the keys (numbers or letters).
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from openpyxl import load_workbook
wb = load_workbook("demo.xlsx")
#Mention the sheet where the data can be entered,
sheet = wb["Day2 Result 27.07.2020"]
# Assign multiple values to data
data =[('Emp Id', 'Emp Name', 'Designation'),
       (1, 'XYZ', 'Manager'),
       (2, 'ABC', 'Consultant')]
#Append all rows
for i in data:
    sheet.append(i)
wb.save("demo.xlsx")

Delete A Sheet From Excel Workbook

you can use wb.remove(sheetname) or del wb[sheetname]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import openpyxl
wb = openpyxl.load_workbook("DeleteSheet.xlsx")
print("The names of work sheets before deleting")
print(wb.sheetnames)
sheetDelete = wb["Sheet2"]
wb.remove(sheetDelete)  #Sheet2 will be deleted
#del wb[sheetDelete]    #Alternatively you can use del cmd
print("The names of work sheets after deleting")
print(wb.sheetnames)
wb.save("DeleteSheet.xlsx")

Reading Data From The Excel File

directly use columnrow combination. Example [A1] or, use the row and column numbers. Example row=4, column=2.

sheet.cell().value command will help us retrieve the value from the specified cell.

1
2
3
4
5
6
7
from openpyxl import load_workbook
wb = load_workbook("demo.xlsx")
sheet = wb.active
dataAtA1 = sheet['A1']
dataAtA2 = sheet.cell(row=2, column=1)
print("Data at A1:", dataAtA1.value)
print("Data at A2:", dataAtA2.value)

Reading All Values In The File

  • max_row-This gives the maximum row index containing data (1-based)
  • max_column – The maximum column index containing data (1-based)
1
2
row_count = sheet.max_row
column_count = sheet.max_column
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from openpyxl import load_workbook
wb = load_workbook("demo.xlsx")
sheet = wb["Day2 Result 27.07.2020"]
row_count = sheet.max_row
column_count = sheet.max_column
for i in range(1, row_count + 1):
    for j in range(1, column_count + 1):
        data = sheet.cell(row=i, column=j).value
        print(data, end='   ')
    print('\n')

Getting Column Letter

get_column_letter this converts a column index into a column letter. Example 3 to C.

Inserting and deleting rows and columns, moving ranges of cells

example code

获取 python 对象内存占用大小

1
2
3
4
5
6
7
8
9
import numpy as np
import psutil
process = psutil.Process()
memory_info_before = process.memory_info().rss / (1024)  # KB

origin_result = np.ones((1024, 1024))

memory_info_after = process.memory_info().rss / (1024)  # KB
origin_result_size = memory_info_after - memory_info_before
1
2
3
4
import cloudpickle as pickle
d1 = {'a': 'a' * 10000}
pd = pickle.dumps(d1)
print(sys.getsizeof(pd)) # bytes

get local ip

1
2
3
4
5
6
7
import socket
address = "8.8.8.8:53"
ip_address, port = address.split(":")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((ip_address, int(port)))
node_ip_address = s.getsockname()[0]
print(node_ip_address)

获得当前路径

1
2
current_directory = os.path.abspath(os.path.curdir)
cur_dir = os.getcwd()

更换 working 目录

1
os.chdir()

添加搜索路径

1
2
3
sys.path.append("/home/...")
# or
sys.path.insert(0, project_path)

filelock

filelock实现了一个平台无关的文件锁,可以用于进程间通信

1
2
3
4
5
6
from filelock import Timeout, FileLock

lock = FileLock("high_ground.txt.lock")
with lock:
    with open("high_ground.txt", "a") as f:
        f.write("You were the chosen one.")

Don’t use a FileLock to lock the file you want to write to, instead create a separate .lock file as shown above.

gym

Pong

pip install gym[accept-rom-license]
pip install pygame

LunarLander-v2

pip3 install Box2D
pip3 install box2d-py
pip3 install gym[all]
pip3 install gym[Box_2D]
1
2
import gym
env = gym.make("LunarLander-v2")

Socket Server

使用 =gevent= 实现高并发的 socket server.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python
#-*-coding:utf8-*-

import sys
import socket
import time
import gevent

from gevent import socket, monkey

monkey.patch_all()


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()  #socket sever到这步默认会生成一个线程,把请求交给线程处理
        # 但这里是交给协程处理(cli就是客户端连过来而在服务器端为其生成的一个连接实例)

        gevent.spawn(handle_request, cli) #启动一个协程
        #把客户端请求连接生成的实例cli交给handle_request方法


def handle_request(conn):  #负责处理和客户端请求的所有交互
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(8001)

格式化输出

表格

使用 tableprint 库

1
2
3
4
5
6
7
import tableprint
import numpy as np

data = np.random.randn(10,3)
headers = ['Column A', 'Column B', 'Column C']

tableprint.table(data, headers)

使用 prettytable 库

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import sys
from prettytable import PrettyTable
reload(sys)
sys.setdefaultencoding('utf8')

table = PrettyTable(['编号','云编号','名称','IP地址'])
table.add_row(['1','server01','服务器01','172.16.0.1'])
table.add_row(['2','server02','服务器02','172.16.0.2'])
table.add_row(['3','server03','服务器03','172.16.0.3'])
table.add_row(['4','server04','服务器04','172.16.0.4'])
table.add_row(['5','server05','服务器05','172.16.0.5'])
table.add_row(['6','server06','服务器06','172.16.0.6'])
table.add_row(['7','server07','服务器07','172.16.0.7'])
table.add_row(['8','server08','服务器08','172.16.0.8'])
table.add_row(['9','server09','服务器09','172.16.0.9'])
print(table)

rich

Rich 是一个 Python 库,可以为您在终端中提供富文本和精美格式。

Rich API 可以很容易的在终端输出添加各种颜色和不同风格。Rich 还可以绘制漂亮的表格,进度条,markdown,突出显示语法的源代码及回溯等等。

Reference

  1. https://www.liaoxuefeng.com/wiki/897692888725344/923030542875328
  2. resource module
  3. collection module
  4. logging module
  5. multiprocessing module
  6. introduction-to-the-python-buffer-protocol
  7. python-garbage-collector
  8. python-memory-managment
  9. rich
updatedupdated2022-10-122022-10-12