Cloudpickle Note

本文记录 cloudpickle 的相关笔记

基础

基本用法

server.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import redis
import cloudpickle as pk

C = 40
def func(data):
  return data + C

class A:
  def __init__(self):
    self.count = 3
  def increase(self):
    self.count = func(self.count)
  def show(self):
    print("count = ", self.count)

def to_redis():
  serialized_A = pk.dumps(A)
  gcs = redis.Redis(host='127.0.0.1', port = 6379)
  gcs.set('A', serialized_A)

def to_file():
  fout = open("dump.txt", 'wb')
  serialized_A = pk.dump(A, fout)
  fout.close()

if __name__ == '__main__':
  #to_redis()
  to_file()

client.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import redis
import cloudpickle as pk

def from_redis():
  print("from redis")
  server = redis.Redis(host = "127.0.0.1", port = 6379)
  serialied_A = server.get('A')
  A = pk.loads(serialied_A)
  a = A()
  a.show()
  a.increase()
  a.show()

def from_file():
  print("from file")
  fout = open("dump.txt", 'rb')
  A = pk.load(fout)
  fout.close()
  a = A()
  a.show()
  a.increase()
  a.show()

if __name__ == "__main__":
  from_file()

pickle by value

since cloudpickle 2.0.0, one can explicitly specify modules for which serialization by value should be used, using the register_pickle_by_value(module)//unregister_pickle(module) API:

1
2
3
4
5
6
>>> import cloudpickle
>>> import my_module
>>> cloudpickle.register_pickle_by_value(my_module)
>>> cloudpickle.dumps(my_module.my_function)  # my_function is pickled by value
>>> cloudpickle.unregister_pickle_by_value(my_module)
>>> cloudpickle.dumps(my_module.my_function)  # my_function is pickled by reference

使用 register_pickle_by_value 确实时可以实现在 worker 端直接 loads 后就可以执行,但是这个 register 不能嵌套,需要对调用的文件模块都得分别进行 register.

注意

server 多文件

如果在 server 端使用了 import 其它文件的函数,pickle 后进行 unpickle 会出现找不到该模块 server.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import redis
import cloudpickle as pk
import model

C = 40
def func(data):
  return data + C

class A:
  def __init__(self):
    self.count = 3
  def increase(self):
    self.count = func(self.count)
  def increase2(self):
    self.count = model_func(self.count)
  def show(self):
    print("count = ", self.count)

def to_redis():
  serialized_A = pk.dumps(A)
  gcs = redis.Redis(host='127.0.0.1', port = 6379)
  gcs.set('A', serialized_A)

def to_file():
  fout = open("dump.txt", 'wb')
  serialized_A = pk.dump(A, fout)
  fout.close()


if __name__ == '__main__':
  #to_redis()
  to_file()

client.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import redis
import cloudpickle as pk

def from_redis():
  print("from redis")
  server = redis.Redis(host = "127.0.0.1", port = 6379)
  serialied_A = server.get('A')
  A = pk.loads(serialied_A)
  a = A()
  a.show()
  a.increase()
  a.show()
  a.increase2()
  a.show()


def from_file():
  print("from file")
  fout = open("dump.txt", 'rb')
  A = pk.load(fout)
  fout.close()
  a = A()
  a.show()
  a.increase()
  a.show()

if __name__ == "__main__":
  from_file()


model.py
def model_func(data):
  print("start modle")
  data += 1
  return data

通过 from * import * 且该文件(即 model.py )需要在 unpickle 环境能检索到的情况下可以执行 server.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import redis
import cloudpickle as pk
from model import model_func

C = 40
def func(data):
  return data + C

class A:
  def __init__(self):
    self.count = 3
  def increase(self):
    self.count = func(self.count)
  def increase2(self):
    self.count = model_func(self.count)
  def show(self):
    print("count = ", self.count)

def to_redis():
  serialized_A = pk.dumps(A)
  gcs = redis.Redis(host='127.0.0.1', port = 6379)
  gcs.set('A', serialized_A)

def to_file():
  fout = open("dump.txt", 'wb')
  serialized_A = pk.dump(A, fout)
  fout.close()


if __name__ == '__main__':
  #to_redis()
  to_file()
updatedupdated2022-04-212022-04-21