Ray 笔记

本文记录框架 Ray 相关的笔记

Basis

  1. Ray runtime:

The Ray runtime consists of multiple services/processes started in the background for communication, data transfer, scheduling, and more. The Ray runtime can be started on a laptop, a single server, or multiple servers.

  1. Creating an actor

An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

counter_actor = Counter.remote()

equivalent to the following:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

Counter = ray.remote(Counter)

counter_actor = Counter.remote()

call methods

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@ray.remote
class Foo(object):

    # Any method of the actor can return multiple object refs.
    @ray.method(num_returns=2)
    def bar(self):
        return 1, 2

f = Foo.remote()

obj_ref1, obj_ref2 = f.bar.remote()
assert ray.get(obj_ref1) == 1
assert ray.get(obj_ref2) == 2

Run Test

  1. test 1
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import ray
ray.init()

@ray.remote(num_cpus=3)
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        print("run increment")
        while True:
          self.n = 1
        self.n += 1

    def read(self):
        return self.n

counter = Counter.remote()
futures = counter.increment.remote()
counter = Counter.remote()
futures = counter.increment.remote()

结果是只有一个 CPU 在执行任务占用率为 100% . 但是会出现有很多 Ray::IDLE 的进程 感觉直接 @ray.remote(num_cpus=3) 并没有什么效果。且这里的死循环,CPU 占用 100% 并不是单独使用一个 CPU, 而是使用了多个 CPU , 且使用哪几个 CPU 并不确定一直在变化。

  1. test 2
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@ray.remote(num_cpus=10)
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
       # thread1 = myThread(1, "thread1", 1)
       # thread1.start()
        print("run increment")
        while True:
          self.n = 1
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
futures = [c.increment.remote() for c in counters]
ray.get(futures)

可知各个执行任务的 worker 进程是由 raylet 进程生成的,生成的 worker 进程的数量等于节点 cpu 的核数。通过 ray.init(num_cpus) 可以控制生成 worker 进程的数量。 可看出 IDLE 的 worker 也得占用 73 M 左右的内存

  1. test 3
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
print("task1", " pid", os.getpid())
counter = Counter.remote()
future = counter.increment.remote()
#ray.get(future)
print("task2", " pid", os.getpid())

counter = Counter.remote()
future = counter.increment.remote()
#ray.get(future)
print("task3", " pid", os.getpid())

counters = [Counter.remote() for i in range(4)]
futures = [c.increment.remote() for c in counters]
print(ray.get(futures))

输出的结果显示执行整个程序的代码的进程为 python (python test.py) . 该进程的子进程有

  • raylet : 执行的是在 /tmp/ray/session**/sockets/ 目录下的 raylet 程序 (该目录下还有个 plasma_store 程序)
  • python_2: 执行 /python/ray/_private/log_monitor.py
  • python_3: 执行 /python/ray/autoscaler/_private/monitor.py
  • gcs_server
  • redis-server
  • redis-server

由上可得出其进程关系图为:

  1. plot_pong_example.py

使用的 actor 个数为 10. 通过分析 ray 输出的 log 文件可得以下信息:

Source Code Note

worker.py

worker has three mode;

  • SCRIPT_MODE: should be used if this Worker is a driver that is being run as a Python script or interactively in a shell. It will print information about task failures.
  • WORKER_MODE should be used if this Worker is not a driver. It will not print information about tasks.
  • LOCAL_MODE should be used if this Worker is a driver and if you want to run the driver in a manner equivalent to serial Python for debugging purposes. It will not send remote function calls to the scheduler and will instead execute them in a blocking fashion.

Import functions:

remote(*args, **kwargs)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
@PublicAPI
def remote(*args, **kwargs):
    """Defines a remote function or an actor class.

    This can be used with no arguments to define a remote function or actor as
    follows:

    .. code-block:: python

        @ray.remote
        def f():
            return 1

        @ray.remote
        class Foo:
            def method(self):
                return 1

    It can also be used with specific keyword arguments as follows:

    .. code-block:: python

        @ray.remote(num_gpus=1, max_calls=1, num_returns=2)
        def f():
            return 1, 2

        @ray.remote(num_cpus=2, resources={"CustomResource": 1})
        class Foo:
            def method(self):
                return 1

    Remote task and actor objects returned by @ray.remote can also be
    dynamically modified with the same arguments as above using
    ``.options()`` as follows:

    .. code-block:: python

        @ray.remote(num_gpus=1, max_calls=1, num_returns=2)
        def f():
            return 1, 2
        g = f.options(num_gpus=2, max_calls=None)

        @ray.remote(num_cpus=2, resources={"CustomResource": 1})
        class Foo:
            def method(self):
                return 1
        Bar = Foo.options(num_cpus=1, resources=None)

    Running remote actors will be terminated when the actor handle to them
    in Python is deleted, which will cause them to complete any outstanding
    work and then shut down. If you want to kill them immediately, you can
    also call ``ray.kill(actor)``.

    Args:
        num_returns (int): This is only for *remote functions*. It specifies
            the number of object refs returned by
            the remote function invocation.
        num_cpus (float): The quantity of CPU cores to reserve
            for this task or for the lifetime of the actor.
        num_gpus (int): The quantity of GPUs to reserve
            for this task or for the lifetime of the actor.
        resources (Dict[str, float]): The quantity of various custom resources
            to reserve for this task or for the lifetime of the actor.
            This is a dictionary mapping strings (resource names) to floats.
        accelerator_type: If specified, requires that the task or actor run
            on a node with the specified type of accelerator.
            See `ray.accelerators` for accelerator types.
        max_calls (int): Only for *remote functions*. This specifies the
            maximum number of times that a given worker can execute
            the given remote function before it must exit
            (this can be used to address memory leaks in third-party
            libraries or to reclaim resources that cannot easily be
            released, e.g., GPU memory that was acquired by TensorFlow).
            By default this is infinite.
        max_restarts (int): Only for *actors*. This specifies the maximum
            number of times that the actor should be restarted when it dies
            unexpectedly. The minimum valid value is 0 (default),
            which indicates that the actor doesn't need to be restarted.
            A value of -1 indicates that an actor should be restarted
            indefinitely.
        max_task_retries (int): Only for *actors*. How many times to
            retry an actor task if the task fails due to a system error,
            e.g., the actor has died. If set to -1, the system will
            retry the failed task until the task succeeds, or the actor
            has reached its max_restarts limit. If set to `n > 0`, the
            system will retry the failed task up to n times, after which the
            task will throw a `RayActorError` exception upon :obj:`ray.get`.
            Note that Python exceptions are not considered system errors
            and will not trigger retries.
        max_retries (int): Only for *remote functions*. This specifies
            the maximum number of times that the remote function
            should be rerun when the worker process executing it
            crashes unexpectedly. The minimum valid value is 0,
            the default is 4 (default), and a value of -1 indicates
            infinite retries.
        runtime_env (Dict[str, Any]): Specifies the runtime environment for
            this actor or task and its children. See
            :ref:`runtime-environments` for detailed documentation. This API is
            in beta and may change before becoming stable.
        retry_exceptions (bool): Only for *remote functions*. This specifies
            whether application-level errors should be retried
            up to max_retries times.
    """
    worker = global_worker

    if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
        # This is the case where the decorator is just @ray.remote.
        return make_decorator(worker=worker)(args[0])

    # Parse the keyword arguments from the decorator.
    valid_kwargs = [
        "num_returns",
        "num_cpus",
        "num_gpus",
        "memory",
        "object_store_memory",
        "resources",
        "accelerator_type",
        "max_calls",
        "max_restarts",
        "max_task_retries",
        "max_retries",
        "runtime_env",
        "retry_exceptions",
        "placement_group",
        "concurrency_groups",
    ]
    error_string = ("The @ray.remote decorator must be applied either "
                    "with no arguments and no parentheses, for example "
                    "'@ray.remote', or it must be applied using some of "
                    f"the arguments in the list {valid_kwargs}, for example "
                    "'@ray.remote(num_returns=2, "
                    "resources={\"CustomResource\": 1})'.")
    assert len(args) == 0 and len(kwargs) > 0, error_string
    for key in kwargs:
        assert key in valid_kwargs, error_string

    num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None
    num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else None
    resources = kwargs.get("resources")
    if not isinstance(resources, dict) and resources is not None:
        raise TypeError("The 'resources' keyword argument must be a "
                        f"dictionary, but received type {type(resources)}.")
    if resources is not None:
        assert "CPU" not in resources, "Use the 'num_cpus' argument."
        assert "GPU" not in resources, "Use the 'num_gpus' argument."

    accelerator_type = kwargs.get("accelerator_type")

    # Handle other arguments.
    num_returns = kwargs.get("num_returns")
    max_calls = kwargs.get("max_calls")
    max_restarts = kwargs.get("max_restarts")
    max_task_retries = kwargs.get("max_task_retries")
    memory = kwargs.get("memory")
    object_store_memory = kwargs.get("object_store_memory")
    max_retries = kwargs.get("max_retries")
    runtime_env = kwargs.get("runtime_env")
    placement_group = kwargs.get("placement_group", "default")
    retry_exceptions = kwargs.get("retry_exceptions")
    concurrency_groups = kwargs.get("concurrency_groups")

    return make_decorator(
        num_returns=num_returns,
        num_cpus=num_cpus,
        num_gpus=num_gpus,
        memory=memory,
        object_store_memory=object_store_memory,
        resources=resources,
        accelerator_type=accelerator_type,
        max_calls=max_calls,
        max_restarts=max_restarts,
        max_task_retries=max_task_retries,
        max_retries=max_retries,
        runtime_env=runtime_env,
        placement_group=placement_group,
        worker=worker,
        retry_exceptions=retry_exceptions,
        concurrency_groups=concurrency_groups or [])

This function do nothing import, it just parse arguments and call function make_decorator().

make_decorator().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def make_decorator(num_returns=None,
                   num_cpus=None,
                   num_gpus=None,
                   memory=None,
                   object_store_memory=None,
                   resources=None,
                   accelerator_type=None,
                   max_calls=None,
                   max_retries=None,
                   max_restarts=None,
                   max_task_retries=None,
                   runtime_env=None,
                   placement_group="default",
                   worker=None,
                   retry_exceptions=None,
                   concurrency_groups=None):
    def decorator(function_or_class):
        if (inspect.isfunction(function_or_class)
                or is_cython(function_or_class)):
            # Set the remote function default resources.
            if max_restarts is not None:
                raise ValueError("The keyword 'max_restarts' is not "
                                 "allowed for remote functions.")
            if max_task_retries is not None:
                raise ValueError("The keyword 'max_task_retries' is not "
                                 "allowed for remote functions.")
            if num_returns is not None and (not isinstance(num_returns, int)
                                            or num_returns < 0):
                raise ValueError(
                    "The keyword 'num_returns' only accepts 0 or a"
                    " positive integer")
            if max_retries is not None and (not isinstance(max_retries, int)
                                            or max_retries < -1):
                raise ValueError(
                    "The keyword 'max_retries' only accepts 0, -1 or a"
                    " positive integer")
            if max_calls is not None and (not isinstance(max_calls, int)
                                          or max_calls < 0):
                raise ValueError(
                    "The keyword 'max_calls' only accepts 0 or a positive"
                    " integer")
            return ray.remote_function.RemoteFunction(
                Language.PYTHON, function_or_class, None, num_cpus, num_gpus,
                memory, object_store_memory, resources, accelerator_type,
                num_returns, max_calls, max_retries, retry_exceptions,
                runtime_env, placement_group)

        if inspect.isclass(function_or_class):
            if num_returns is not None:
                raise TypeError("The keyword 'num_returns' is not "
                                "allowed for actors.")
            if max_retries is not None:
                raise TypeError("The keyword 'max_retries' is not "
                                "allowed for actors.")
            if retry_exceptions is not None:
                raise TypeError("The keyword 'retry_exceptions' is not "
                                "allowed for actors.")
            if max_calls is not None:
                raise TypeError("The keyword 'max_calls' is not "
                                "allowed for actors.")
            if max_restarts is not None and (not isinstance(max_restarts, int)
                                             or max_restarts < -1):
                raise ValueError(
                    "The keyword 'max_restarts' only accepts -1, 0 or a"
                    " positive integer")
            if max_task_retries is not None and (not isinstance(
                    max_task_retries, int) or max_task_retries < -1):
                raise ValueError(
                    "The keyword 'max_task_retries' only accepts -1, 0 or a"
                    " positive integer")
            return ray.actor.make_actor(
                function_or_class, num_cpus, num_gpus, memory,
                object_store_memory, resources, accelerator_type, max_restarts,
                max_task_retries, runtime_env, concurrency_groups)

        raise TypeError("The @ray.remote decorator must be applied to "
                        "either a function or to a class.")

    return decorator

This function define a nested function decorator(). The decorator() function check parameter function_or_class is function or a class,

  • if is function just call function ray.remote_function.RemoteFunction().
  • if is class just call function ray.actor.make_actor()

The function make_decorator() just return nested function decorator()

Q: where is the function_or_class come from?

In function nested function decorator(), add code print("Add by liudy", function_or_class).

  1. test code
1
2
import ray
ray.init()

result: just using ray.init() the function decorator() is called by 10 times

<class 'ray.data.datasource.datasource._DesignatedBlockOwner'>

1
2
3
4
5
6
# locate python/ray/data/datasource/datasource.py

@ray.remote(num_cpus=0, placement_group=None)
class _DesignatedBlockOwner:
    def ping(self):
        return "ok"

<function _hash at 0x7f3664d3c7b8>

1
2
3
4
5
6
# localte python/ray/workflow/common.py
@ray.remote
def _hash(obj: Any) -> bytes:
    m = hashlib.sha256()
    m.update(cloudpickle.dumps(obj))
    return m.digest()

<function calculate_identifier at 0x7f3664d3cea0>

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# localte python/ray/workflow/common.py
@ray.remote
def calculate_identifier(obj: Any) -> str:
    """Calculate a url-safe identifier for an object."""

    # Step 1: Serialize the object.
    # Step 2: Calculate its sha256 hash.
    # Step 3: Get the url safe, base64 representation of it.

    # TODO (Alex): Ideally we should use the existing ObjectRef serializer to
    # avoid duplicate serialization passes and support nested object refs.
    m = hashlib.sha256()
    m.update(cloudpickle.dumps(obj))
    hash = m.digest()
    encoded = base64.urlsafe_b64encode(hash).decode("ascii")
    return encoded

<class 'ray.workflow.serialization.Manager'>

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# locate: python/ray/workflow/serialization.py
@ray.remote(num_cpus=0)
class Manager:
    """
    Responsible for deduping the serialization/upload of object references.
    """

    def __init__(self, storage: storage.Storage):
        self._uploads: Dict[ray.ObjectRef, Upload] = {}
        self._storage = storage
        self._num_uploads = 0

    def ping(self) -> None:
        """
        Trivial function to ensure actor creation is successful.
        """
        return None

    async def save_objectref(
            self, ref_tuple: Tuple[ray.ObjectRef],
            workflow_id: "str") -> Tuple[List[str], ray.ObjectRef]:
        """Serialize and upload an object reference exactly once.

        Args:
            ref_tuple: A 1-element tuple which wraps the reference.

        Returns:
            A pair. The first element is the paths the ref will be uploaded to.
            The second is an object reference to the upload task.
        """
        ref, = ref_tuple
        # Use the hex as the key to avoid holding a reference to the object.
        key = (ref.hex(), workflow_id)

        if key not in self._uploads:
            # TODO(Alex): We should probably eventually free these refs.
            identifier_ref = common.calculate_identifier.remote(ref)
            upload_task = _put_helper.remote(identifier_ref, ref, workflow_id,
                                             self._storage)
            self._uploads[key] = Upload(
                identifier_ref=identifier_ref, upload_task=upload_task)
            self._num_uploads += 1

        info = self._uploads[key]
        identifer = await info.identifier_ref
        paths = obj_id_to_paths(workflow_id, identifer)
        return paths, info.upload_task

    async def export_stats(self) -> Dict[str, Any]:
        return {"num_uploads": self._num_uploads}
  1. test code
1
2
3
4
5
6
7
import ray
ray.init()
@ray.remote
def f(x):
  return x * x
futures = f.remote(3)

result:

@ray.init()

locate worker.py

gcs_server 是一个编译好的程序,位于 /ray/python/ray/core/src/ray/gcs 通过使用 subprocess.Popen 来创建子进程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def start_gcs_server(redis_address,
                     log_dir,
                     stdout_file=None,
                     stderr_file=None,
                     redis_password=None,
                     config=None,
                     fate_share=None,
                     gcs_server_port=None,
                     metrics_agent_port=None,
                     node_ip_address=None):
    """Start a gcs server.
    Args:
        redis_address (str): The address that the Redis server is listening on.
        log_dir (str): The path of the dir where log files are created.
        stdout_file: A file handle opened for writing to redirect stdout to. If
            no redirection should happen, then this should be None.
        stderr_file: A file handle opened for writing to redirect stderr to. If
            no redirection should happen, then this should be None.
        redis_password (str): The password of the redis server.
        config (dict|None): Optional configuration that will
            override defaults in RayConfig.
        gcs_server_port (int): Port number of the gcs server.
        metrics_agent_port(int): The port where metrics agent is bound to.
        node_ip_address(str): IP Address of a node where gcs server starts.
    Returns:
        ProcessInfo for the process that was started.
    """
    gcs_ip_address, gcs_port = redis_address.split(":")
    redis_password = redis_password or ""
    config_str = serialize_config(config)
    if gcs_server_port is None:
        gcs_server_port = 0

    command = [
        GCS_SERVER_EXECUTABLE,
        f"--redis_address={gcs_ip_address}",
        f"--redis_port={gcs_port}",
        f"--log_dir={log_dir}",
        f"--config_list={config_str}",
        f"--gcs_server_port={gcs_server_port}",
        f"--metrics-agent-port={metrics_agent_port}",
        f"--node-ip-address={node_ip_address}",
    ]
    if redis_password:
        command += [f"--redis_password={redis_password}"]
    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_GCS_SERVER,
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share)
    return process_info

redis server 通过使用位于 /data/ray/ray/python/ray/core/src/ray/thirdparty/redis/src 的 redis-server 程序

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def start_redis(node_ip_address,
                redirect_files,
                resource_spec,
                port=None,
                redis_shard_ports=None,
                num_redis_shards=1,
                redis_max_clients=None,
                redirect_worker_output=False,
                password=None,
                fate_share=None,
                external_addresses=None,
                port_denylist=None):
    """Start the Redis global state store.

    Args:
        node_ip_address: The IP address of the current node. This is only used
            for recording the log filenames in Redis.
        redirect_files: The list of (stdout, stderr) file pairs.
        resource_spec (ResourceSpec): Resources for the node.
        port (int): If provided, the primary Redis shard will be started on
            this port.
        redis_shard_ports: A list of the ports to use for the non-primary Redis
            shards.
        num_redis_shards (int): If provided, the number of Redis shards to
            start, in addition to the primary one. The default value is one
            shard.
        redis_max_clients: If this is provided, Ray will attempt to configure
            Redis with this maxclients number.
        redirect_worker_output (bool): True if worker output should be
            redirected to a file and false otherwise. Workers will have access
            to this value when they start up.
        password (str): Prevents external clients without the password
            from connecting to Redis if provided.
        port_denylist (set): A set of denylist ports that shouldn't
            be used when allocating a new port.

    Returns:
        A tuple of the address for the primary Redis shard, a list of
            addresses for the remaining shards, and the processes that were
            started.
    """
    processes = []

    if external_addresses is not None:
        primary_redis_address = external_addresses[0]
        [primary_redis_ip, port] = primary_redis_address.split(":")
        port = int(port)
        redis_address = address(primary_redis_ip, port)
        primary_redis_client = create_redis_client(
            "%s:%s" % (primary_redis_ip, port), password=password)
        # Deleting the key to avoid duplicated rpush.
        primary_redis_client.delete("RedisShards")
    else:
        if len(redirect_files) != 1 + num_redis_shards:
            raise ValueError(
                "The number of redirect file pairs should be equal "
                "to the number of redis shards (including the "
                "primary shard) we will start.")
        if redis_shard_ports is None:
            redis_shard_ports = num_redis_shards * [None]
        elif len(redis_shard_ports) != num_redis_shards:
            raise RuntimeError(
                "The number of Redis shard ports does not match "
                "the number of Redis shards.")
        redis_executable = REDIS_EXECUTABLE

        redis_stdout_file, redis_stderr_file = redirect_files[0]
        # If no port is given, fallback to default Redis port for the primary
        # shard.
        if port is None:
            port = ray_constants.DEFAULT_PORT
            num_retries = 20
        else:
            num_retries = 1
        # Start the primary Redis shard.
        port, p = _start_redis_instance(
            redis_executable,
            port=port,
            password=password,
            redis_max_clients=redis_max_clients,
            num_retries=num_retries,
            # Below we use None to indicate no limit on the memory of the
            # primary Redis shard.
            redis_max_memory=None,
            stdout_file=redis_stdout_file,
            stderr_file=redis_stderr_file,
            fate_share=fate_share,
            port_denylist=port_denylist,
            listen_to_localhost_only=(node_ip_address == "127.0.0.1"))
        processes.append(p)
        redis_address = address(node_ip_address, port)
        primary_redis_client = redis.StrictRedis(
            host=node_ip_address, port=port, password=password)

    # Register the number of Redis shards in the primary shard, so that clients
    # know how many redis shards to expect under RedisShards.
    primary_redis_client.set("NumRedisShards", str(num_redis_shards))

    # Put the redirect_worker_output bool in the Redis shard so that workers
    # can access it and know whether or not to redirect their output.
    primary_redis_client.set("RedirectOutput", 1
                             if redirect_worker_output else 0)

    # Init job counter to GCS.
    primary_redis_client.set("JobCounter", 0)

    # Store version information in the primary Redis shard.
    _put_version_info_in_redis(primary_redis_client)

    # Calculate the redis memory.
    assert resource_spec.resolved()
    redis_max_memory = resource_spec.redis_max_memory

    # Start other Redis shards. Each Redis shard logs to a separate file,
    # prefixed by "redis-<shard number>".
    redis_shards = []
    # If Redis shard ports are not provided, start the port range of the
    # other Redis shards at a high, random port.
    last_shard_port = new_port(denylist=port_denylist) - 1
    for i in range(num_redis_shards):
        if external_addresses is not None:
            shard_address = external_addresses[i + 1]
        else:
            redis_stdout_file, redis_stderr_file = redirect_files[i + 1]
            redis_executable = REDIS_EXECUTABLE
            redis_shard_port = redis_shard_ports[i]
            # If no shard port is given, try to start this shard's Redis
            # instance on the port right after the last shard's port.
            if redis_shard_port is None:
                redis_shard_port = last_shard_port + 1
                num_retries = 20
            else:
                num_retries = 1

            redis_shard_port, p = _start_redis_instance(
                redis_executable,
                port=redis_shard_port,
                password=password,
                redis_max_clients=redis_max_clients,
                num_retries=num_retries,
                redis_max_memory=redis_max_memory,
                stdout_file=redis_stdout_file,
                stderr_file=redis_stderr_file,
                fate_share=fate_share,
                port_denylist=port_denylist,
                listen_to_localhost_only=(node_ip_address == "127.0.0.1"))
            processes.append(p)

            shard_address = address(node_ip_address, redis_shard_port)
            last_shard_port = redis_shard_port

        redis_shards.append(shard_address)
        # Store redis shard information in the primary redis shard.
        primary_redis_client.rpush("RedisShards", shard_address)

    return redis_address, redis_shards, processes

start monitor 通过 cmd 调用 python monitor.py 执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def start_monitor(redis_address,
                  logs_dir,
                  stdout_file=None,
                  stderr_file=None,
                  autoscaling_config=None,
                  redis_password=None,
                  fate_share=None,
                  max_bytes=0,
                  backup_count=0,
                  monitor_ip=None):
    """Run a process to monitor the other processes.

    Args:
        redis_address (str): The address that the Redis server is listening on.
        logs_dir(str): The path to the log directory.
        stdout_file: A file handle opened for writing to redirect stdout to. If
            no redirection should happen, then this should be None.
        stderr_file: A file handle opened for writing to redirect stderr to. If
            no redirection should happen, then this should be None.
        autoscaling_config: path to autoscaling config file.
        redis_password (str): The password of the redis server.
        max_bytes (int): Log rotation parameter. Corresponding to
            RotatingFileHandler's maxBytes.
        backup_count (int): Log rotation parameter. Corresponding to
            RotatingFileHandler's backupCount.
        monitor_ip (str): IP address of the machine that the monitor will be
            run on. Can be excluded, but required for autoscaler metrics.
    Returns:
        ProcessInfo for the process that was started.
    """
    monitor_path = os.path.join(RAY_PATH, AUTOSCALER_PRIVATE_DIR, "monitor.py")
    command = [
        sys.executable, "-u", monitor_path, f"--logs-dir={logs_dir}",
        f"--redis-address={redis_address}",
        f"--logging-rotate-bytes={max_bytes}",
        f"--logging-rotate-backup-count={backup_count}"
    ]
    if autoscaling_config:
        command.append("--autoscaling-config=" + str(autoscaling_config))
    if redis_password:
        command.append("--redis-password=" + redis_password)
    if monitor_ip:
        command.append("--monitor-ip=" + monitor_ip)
    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_MONITOR,
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share)
    return process_info

start ray client server 通过 cmd 调用 python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
ef start_ray_client_server(
        redis_address,
        ray_client_server_port,
        stdout_file=None,
        stderr_file=None,
        redis_password=None,
        fate_share=None,
        metrics_agent_port=None,
        server_type: str = "proxy",
        serialized_runtime_env_context: Optional[str] = None):
    """Run the server process of the Ray client.

    Args:
        ray_client_server_port (int): Port the Ray client server listens on.
        stdout_file: A file handle opened for writing to redirect stdout to. If
            no redirection should happen, then this should be None.
        stderr_file: A file handle opened for writing to redirect stderr to. If
            no redirection should happen, then this should be None.
        redis_password (str): The password of the redis server.
        server_type (str): Whether to start the proxy version of Ray Client.
        serialized_runtime_env_context (str|None): If specified, the serialized
            runtime_env_context to start the client server in.

    Returns:
        ProcessInfo for the process that was started.
    """
    root_ray_dir = Path(__file__).resolve().parents[1]
    setup_worker_path = os.path.join(root_ray_dir, "workers",
                                     ray_constants.SETUP_WORKER_FILENAME)

    command = [
        sys.executable,
        setup_worker_path,
        "-m",
        "ray.util.client.server",
        f"--redis-address={redis_address}",
        f"--port={ray_client_server_port}",
        f"--mode={server_type}",
        f"--language={Language.Name(Language.PYTHON)}",
    ]
    if redis_password:
        command.append(f"--redis-password={redis_password}")
    if serialized_runtime_env_context:
        command.append(
            f"--serialized-runtime-env-context={serialized_runtime_env_context}"  # noqa: E501
        )
    if metrics_agent_port:
        command.append(f"--metrics-agent-port={metrics_agent_port}")
    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER,
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share)
    return process_info

start redis client 位于 /data/ray/ray/python/ray/core/src/ray/thirdparty/redis/src/ 下的程序 redis-cli

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def create_redis_client(redis_address, password=None):
    """Create a Redis client.

    Args:
        The IP address, port, and password of the Redis server.

    Returns:
        A Redis client.
    """
    if not hasattr(create_redis_client, "instances"):
        create_redis_client.instances = {}
    else:
        cli = create_redis_client.instances.get(redis_address)
        if cli is not None:
            try:
                cli.ping()
                return cli
            except Exception:
                create_redis_client.instances.pop(redis_address)

    _, redis_ip_address, redis_port = validate_redis_address(redis_address)
    # For this command to work, some other client (on the same machine
    # as Redis) must have run "CONFIG SET protected-mode no".
    create_redis_client.instances[redis_address] = redis.StrictRedis(
        host=redis_ip_address, port=int(redis_port), password=password)

    return create_redis_client.instances[redis_address]

start raylet (combined local scheduler and object manager.), 运行位于 ray/python/ray/core/src/ray/raylet/ 的 raylet 程序, 调用 raylet 程序的终端命令为:

1
 ['/data/ray/ray/python/ray/core/src/ray/raylet/raylet', '--raylet_socket_name=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/sockets/raylet', '--store_socket_name=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/sockets/plasma_store', '--object_manager_port=0', '--min_worker_port=0', '--max_worker_port=0', '--node_manager_port=0', '--node_ip_address=10.186.115.19', '--redis_address=10.186.115.19', '--redis_port=6379', '--maximum_startup_concurrency=10', '--static_resource_list=node:10.186.115.19,1.0,CPU,10,memory,54792238695,object_store_memory,27396119347', '--python_worker_command=/usr/bin/python /data/ray/ray/python/ray/workers/setup_worker.py /data/ray/ray/python/ray/workers/default_worker.py --node-ip-address=10.186.115.19 --node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER --object-store-name=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/sockets/plasma_store --raylet-name=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/sockets/raylet --redis-address=10.186.115.19:6379 --temp-dir=/tmp/ray --metrics-agent-port=41628 --logging-rotate-bytes=536870912 --logging-rotate-backup-count=5 RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER --redis-password=5241590000000000', '--java_worker_command=', '--cpp_worker_command=/data/ray/ray/python/ray/cpp/default_worker --ray_plasma_store_socket_name=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/sockets/plasma_store --ray_raylet_socket_name=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/sockets/raylet --ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER --ray_address=10.186.115.19:6379 --ray_redis_password=5241590000000000 --ray_session_dir=/tmp/ray/session_2021-12-07_14-51-20_860909_63031 --ray_logs_dir=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/logs --ray_node_ip_address=10.186.115.19 RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER', '--native_library_path=/data/ray/ray/python/ray/cpp/lib', '--redis_password=5241590000000000', '--temp_dir=/tmp/ray', '--session_dir=/tmp/ray/session_2021-12-07_14-51-20_860909_63031', '--log_dir=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/logs', '--resource_dir=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/runtime_resources', '--metrics-agent-port=41628', '--metrics_export_port=53756', '--object_store_memory=27396119347', '--plasma_directory=/dev/shm', '--ray-debugger-external=0', '--num_initial_python_workers_for_first_job=10', '--agent_command='

其中开启多个 worker 进程的命令为, 开启多少个 worker 设置为 --maximum_startup_concurrency=10

--python_worker_command=/usr/bin/python /data/ray/ray/python/ray/workers/setup_worker.py /data/ray/ray/python/ray/workers/default_worker.py --node-ip-address=10.186.115.19 --node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER --object-store-name=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/sockets/plasma_store --raylet-name=/tmp/ray/session_2021-12-07_14-51-20_860909_63031/sockets/raylet --redis-address=10.186.115.19:6379 --temp-dir=/tmp/ray --metrics-agent-port=41628 --logging-rotate-bytes=536870912 --logging-rotate-backup-count=5 RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER --redis-password=5241590000000000',
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
ef start_raylet(redis_address,
                 node_ip_address,
                 node_manager_port,
                 raylet_name,
                 plasma_store_name,
                 worker_path,
                 setup_worker_path,
                 temp_dir,
                 session_dir,
                 resource_dir,
                 log_dir,
                 resource_spec,
                 plasma_directory,
                 object_store_memory,
                 min_worker_port=None,
                 max_worker_port=None,
                 worker_port_list=None,
                 object_manager_port=None,
                 redis_password=None,
                 metrics_agent_port=None,
                 metrics_export_port=None,
                 dashboard_agent_listen_port=None,
                 use_valgrind=False,
                 use_profiler=False,
                 stdout_file=None,
                 stderr_file=None,
                 config=None,
                 huge_pages=False,
                 fate_share=None,
                 socket_to_use=None,
                 start_initial_python_workers_for_first_job=False,
                 max_bytes=0,
                 backup_count=0,
                 ray_debugger_external=False,
                 env_updates=None):
    """Start a raylet, which is a combined local scheduler and object manager.

    Args:
        redis_address (str): The address of the primary Redis server.
        node_ip_address (str): The IP address of this node.
        node_manager_port(int): The port to use for the node manager. If it's
            0, a random port will be used.
        raylet_name (str): The name of the raylet socket to create.
        plasma_store_name (str): The name of the plasma store socket to connect
             to.
        worker_path (str): The path of the Python file that new worker
            processes will execute.
        setup_worker_path (str): The path of the Python file that will set up
            the environment for the worker process.
        temp_dir (str): The path of the temporary directory Ray will use.
        session_dir (str): The path of this session.
        resource_dir(str): The path of resource of this session .
        log_dir (str): The path of the dir where log files are created.
        resource_spec (ResourceSpec): Resources for this raylet.
        object_manager_port: The port to use for the object manager. If this is
            None, then the object manager will choose its own port.
        min_worker_port (int): The lowest port number that workers will bind
            on. If not set, random ports will be chosen.
        max_worker_port (int): The highest port number that workers will bind
            on. If set, min_worker_port must also be set.
        redis_password: The password to use when connecting to Redis.
        metrics_agent_port(int): The port where metrics agent is bound to.
        metrics_export_port(int): The port at which metrics are exposed to.
        use_valgrind (bool): True if the raylet should be started inside
            of valgrind. If this is True, use_profiler must be False.
        use_profiler (bool): True if the raylet should be started inside
            a profiler. If this is True, use_valgrind must be False.
        stdout_file: A file handle opened for writing to redirect stdout to. If
            no redirection should happen, then this should be None.
        stderr_file: A file handle opened for writing to redirect stderr to. If
            no redirection should happen, then this should be None.
        tracing_startup_hook: Tracing startup hook.
        config (dict|None): Optional Raylet configuration that will
            override defaults in RayConfig.
        max_bytes (int): Log rotation parameter. Corresponding to
            RotatingFileHandler's maxBytes.
        backup_count (int): Log rotation parameter. Corresponding to
            RotatingFileHandler's backupCount.
        ray_debugger_external (bool): True if the Ray debugger should be made
            available externally to this node.
        env_updates (dict): Environment variable overrides.

    Returns:
        ProcessInfo for the process that was started.
    """
    assert node_manager_port is not None and type(node_manager_port) == int

    if use_valgrind and use_profiler:
        raise ValueError("Cannot use valgrind and profiler at the same time.")

    assert resource_spec.resolved()
    static_resources = resource_spec.to_resource_dict()

    # Limit the number of workers that can be started in parallel by the
    # raylet. However, make sure it is at least 1.
    num_cpus_static = static_resources.get("CPU", 0)
    maximum_startup_concurrency = max(
        1, min(multiprocessing.cpu_count(), num_cpus_static))

    # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
    resource_argument = ",".join(
        ["{},{}".format(*kv) for kv in static_resources.items()])

    gcs_ip_address, gcs_port = redis_address.split(":")

    has_java_command = False
    if shutil.which("java") is not None:
        has_java_command = True

    ray_java_installed = False
    try:
        jars_dir = get_ray_jars_dir()
        if os.path.exists(jars_dir):
            ray_java_installed = True
    except Exception:
        pass

    include_java = has_java_command and ray_java_installed
    if include_java is True:
        java_worker_command = build_java_worker_command(
            redis_address,
            plasma_store_name,
            raylet_name,
            redis_password,
            session_dir,
            node_ip_address,
            setup_worker_path,
        )
    else:
        java_worker_command = []

    if os.path.exists(DEFAULT_WORKER_EXECUTABLE):
        cpp_worker_command = build_cpp_worker_command(
            "", redis_address, plasma_store_name, raylet_name, redis_password,
            session_dir, log_dir, node_ip_address)
    else:
        cpp_worker_command = []

    # Create the command that the Raylet will use to start workers.
    # TODO(architkulkarni): Pipe in setup worker args separately instead of
    # inserting them into start_worker_command and later erasing them if
    # needed.
    start_worker_command = [
        sys.executable,
        setup_worker_path,
        worker_path,
        f"--node-ip-address={node_ip_address}",
        "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
        f"--object-store-name={plasma_store_name}",
        f"--raylet-name={raylet_name}",
        f"--redis-address={redis_address}",
        f"--temp-dir={temp_dir}",
        f"--metrics-agent-port={metrics_agent_port}",
        f"--logging-rotate-bytes={max_bytes}",
        f"--logging-rotate-backup-count={backup_count}",
        "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER",
    ]
    if redis_password:
        start_worker_command += [f"--redis-password={redis_password}"]

    # If the object manager port is None, then use 0 to cause the object
    # manager to choose its own port.
    if object_manager_port is None:
        object_manager_port = 0

    if min_worker_port is None:
        min_worker_port = 0

    if max_worker_port is None:
        max_worker_port = 0

    if not ray._private.utils.check_dashboard_dependencies_installed():
        # An empty agent command will cause the raylet not to start it.
        agent_command = []
    else:
        agent_command = [
            sys.executable,
            "-u",
            os.path.join(RAY_PATH, "dashboard", "agent.py"),
            f"--node-ip-address={node_ip_address}",
            f"--redis-address={redis_address}",
            f"--metrics-export-port={metrics_export_port}",
            f"--dashboard-agent-port={metrics_agent_port}",
            f"--listen-port={dashboard_agent_listen_port}",
            "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
            f"--object-store-name={plasma_store_name}",
            f"--raylet-name={raylet_name}",
            f"--temp-dir={temp_dir}",
            f"--session-dir={session_dir}",
            f"--runtime-env-dir={resource_dir}",
            f"--log-dir={log_dir}",
            f"--logging-rotate-bytes={max_bytes}",
            f"--logging-rotate-backup-count={backup_count}",
        ]

        if redis_password is not None and len(redis_password) != 0:
            agent_command.append("--redis-password={}".format(redis_password))

    command = [
        RAYLET_EXECUTABLE,
        f"--raylet_socket_name={raylet_name}",
        f"--store_socket_name={plasma_store_name}",
        f"--object_manager_port={object_manager_port}",
        f"--min_worker_port={min_worker_port}",
        f"--max_worker_port={max_worker_port}",
        f"--node_manager_port={node_manager_port}",
        f"--node_ip_address={node_ip_address}",
        f"--redis_address={gcs_ip_address}",
        f"--redis_port={gcs_port}",
        f"--maximum_startup_concurrency={maximum_startup_concurrency}",
        f"--static_resource_list={resource_argument}",
        f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}",  # noqa
        f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}",  # noqa
        f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}",  # noqa
        f"--native_library_path={DEFAULT_NATIVE_LIBRARY_PATH}",
        f"--redis_password={redis_password or ''}",
        f"--temp_dir={temp_dir}",
        f"--session_dir={session_dir}",
        f"--log_dir={log_dir}",
        f"--resource_dir={resource_dir}",
        f"--metrics-agent-port={metrics_agent_port}",
        f"--metrics_export_port={metrics_export_port}",
        f"--object_store_memory={object_store_memory}",
        f"--plasma_directory={plasma_directory}",
        f"--ray-debugger-external={1 if ray_debugger_external else 0}",
    ]
    if worker_port_list is not None:
        command.append(f"--worker_port_list={worker_port_list}")
    if start_initial_python_workers_for_first_job:
        command.append("--num_initial_python_workers_for_first_job={}".format(
            resource_spec.num_cpus))
    command.append("--agent_command={}".format(
        subprocess.list2cmdline(agent_command)))
    if huge_pages:
        command.append("--huge_pages")
    if socket_to_use:
        socket_to_use.close()
    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_RAYLET,
        use_valgrind=use_valgrind,
        use_gdb=False,
        use_valgrind_profiler=use_profiler,
        use_perftools_profiler=("RAYLET_PERFTOOLS_PATH" in os.environ),
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share,
        env_updates=env_updates)

    return process_info

start head processes (node.py)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def start_head_processes(self):
    """Start head processes on the node."""
    logger.debug(f"Process STDOUT and STDERR is being "
                  f"redirected to {self._logs_dir}.")
    assert self._redis_address is None
    # If this is the head node, start the relevant head node processes.
    self.start_redis()

    self.start_gcs_server()

    if not self._ray_params.no_monitor:
        self.start_monitor()

    if self._ray_params.ray_client_server_port:
        self.start_ray_client_server()

    if self._ray_params.include_dashboard:
        self.start_dashboard(require_dashboard=True)
    elif self._ray_params.include_dashboard is None:
        self.start_dashboard(require_dashboard=False)

start redis_servers (node.py)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def start_redis(self):
    """Start the Redis servers."""
    assert self._redis_address is None
    redis_log_files = []
    if self._ray_params.external_addresses is None:
        redis_log_files = [self.get_log_file_handles("redis", unique=True)]
        for i in range(self._ray_params.num_redis_shards):
            redis_log_files.append(
                self.get_log_file_handles(f"redis-shard_{i}", unique=True))

    (self._redis_address, redis_shards,
      process_infos) = ray._private.services.start_redis(
          self._node_ip_address,
          redis_log_files,
          self.get_resource_spec(),
          port=self._ray_params.redis_port,
          redis_shard_ports=self._ray_params.redis_shard_ports,
          num_redis_shards=self._ray_params.num_redis_shards,
          redis_max_clients=self._ray_params.redis_max_clients,
          redirect_worker_output=True,
          password=self._ray_params.redis_password,
          fate_share=self.kernel_fate_share,
          external_addresses=self._ray_params.external_addresses,
          port_denylist=self._ray_params.reserved_ports)
    assert (
        ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes)
    self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = (
        process_infos)

core_worker.h

class CoreWorker, 在构造函数中

  1. 初始化 task receivers, raylet client,
  2. start a rpc server
  3. Initialize gcs client
  4. Initialize profiler
  5. Start IO thread

TaskSpecification class

location: task_spec.h

FunctionDescriptorInterface class

  • function_descriptor.h
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  class FunctionDescriptorInterface : public MessageWrapper<rpc::FunctionDescriptor> {
 public:
  virtual ~FunctionDescriptorInterface() {}

  /// Construct an empty FunctionDescriptor.
  FunctionDescriptorInterface() : MessageWrapper() {}

  /// Construct from a protobuf message object.
  /// The input message will be **copied** into this object.
  ///
  /// \param message The protobuf message.
  FunctionDescriptorInterface(rpc::FunctionDescriptor message)
      : MessageWrapper(std::move(message)) {}

  ray::FunctionDescriptorType Type() const {
    return message_->function_descriptor_case();
  }

  virtual size_t Hash() const = 0;

  // DO NOT define operator==() or operator!=() in the base class.
  // Let the derived classes define and implement.
  // This is to avoid unexpected behaviors when comparing function descriptors of
  // different declard types, as in this case, the base class version is invoked.

  virtual std::string ToString() const = 0;

  // A one-word summary of the function call site (e.g., __main__.foo).
  virtual std::string CallSiteString() const { return CallString(); }

  // The function or method call, e.g. "foo()" or "Bar.foo()". This does not include the
  // module/library.
  virtual std::string CallString() const = 0;

  // The default name for a task that executes this function.
  virtual std::string DefaultTaskName() const { return CallString() + "()"; }

  template <typename Subtype>
  Subtype *As() {
    return reinterpret_cast<Subtype *>(this);
  }
};

PythonFunctionDescriptor class, 通过 RPC 接收来的 rpc::FunctionDescriptor messag 包含了四个信息: module_name, class_name, function_name, function_hash

message PythonFunctionDescriptor {
  string module_name = 1;
  string class_name = 2;
  string function_name = 3;
  string function_hash = 4;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class PythonFunctionDescriptor : public FunctionDescriptorInterface {
 public:
  /// Construct from a protobuf message object.
  /// The input message will be **copied** into this object.
  ///
  /// \param message The protobuf message.
  explicit PythonFunctionDescriptor(rpc::FunctionDescriptor message)
      : FunctionDescriptorInterface(std::move(message)) {
    RAY_CHECK(message_->function_descriptor_case() ==
              ray::FunctionDescriptorType::kPythonFunctionDescriptor);
    typed_message_ = &(message_->python_function_descriptor());
  }

  virtual size_t Hash() const {
    return std::hash<int>()(ray::FunctionDescriptorType::kPythonFunctionDescriptor) ^
           std::hash<std::string>()(typed_message_->module_name()) ^
           std::hash<std::string>()(typed_message_->class_name()) ^
           std::hash<std::string>()(typed_message_->function_name()) ^
           std::hash<std::string>()(typed_message_->function_hash());
  }

  inline bool operator==(const PythonFunctionDescriptor &other) const {
    if (this == &other) {
      return true;
    }
    return this->ModuleName() == other.ModuleName() &&
           this->ClassName() == other.ClassName() &&
           this->FunctionName() == other.FunctionName() &&
           this->FunctionHash() == other.FunctionHash();
  }

  inline bool operator!=(const PythonFunctionDescriptor &other) const {
    return !(*this == other);
  }

  virtual std::string ToString() const {
    return "{type=PythonFunctionDescriptor, module_name=" +
           typed_message_->module_name() +
           ", class_name=" + typed_message_->class_name() +
           ", function_name=" + typed_message_->function_name() +
           ", function_hash=" + typed_message_->function_hash() + "}";
  }

  virtual std::string CallSiteString() const {
    return typed_message_->module_name() + "." + CallString();
  }

  virtual std::string CallString() const {
    const std::string &class_name = typed_message_->class_name();
    const std::string &function_name = typed_message_->function_name();
    if (class_name.empty()) {
      return function_name.substr(function_name.find_last_of(".") + 1);
    } else {
      return class_name.substr(class_name.find_last_of(".") + 1) + "." +
             function_name.substr(function_name.find_last_of(".") + 1);
    }
  }

  const std::string &ModuleName() const { return typed_message_->module_name(); }

  const std::string &ClassName() const { return typed_message_->class_name(); }

  const std::string &FunctionName() const { return typed_message_->function_name(); }

  const std::string &FunctionHash() const { return typed_message_->function_hash(); }

 private:
  const rpc::PythonFunctionDescriptor *typed_message_;
};

notes

  • 在 function_descriptor.pxi 里有封装 python 函数和类的类 PythonFunctionDescriptor
  • _raylet.pyx 里有对 CoreWorker 的实现和赋值
  • FunctionDescriptor 的定义
1
2
3
4
# _raylet.pxd_
cdef class FunctionDescriptor:
    cdef:
        CFunctionDescriptor descriptor
  • 对于 python task 执行的代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# _raylet.pyx
_cdef execute_task(
        CTaskType task_type,
        const c_string name,
        const CRayFunction &ray_function,
        const unordered_map[c_string, double] &c_resources,
        const c_vector[shared_ptr[CRayObject]] &c_args,
        const c_vector[CObjectReference] &c_arg_refs,
        const c_vector[CObjectID] &c_return_ids,
        const c_string debugger_breakpoint,
        c_vector[shared_ptr[CRayObject]] *returns,
        c_bool *is_application_level_error,
        # This parameter is only used for actor creation task to define
        # the concurrency groups of this actor.
        const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups,
        const c_string c_name_of_concurrency_group_to_execute):

获取 python 任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# _raylet.pyx 529
_   if <int>task_type == <int>TASK_TYPE_NORMAL_TASK:
        next_title = "ray::IDLE"
        function_executor = execution_info.function     # 此处获取python 任务
        # Record the task name via :task_name: magic token in the log file.
        # This is used for the prefix in driver logs `(task_name pid=123) ...`
        task_name_magic_token = "{}{}".format(
            ray_constants.LOG_PREFIX_TASK_NAME, task_name.replace("()", ""))
        # Print on both .out and .err
        print(task_name_magic_token)
        print(task_name_magic_token, file=sys.stderr)

其中 execution_info 是从一个字典execution_infos中获取的

1
2
3
4
5
6
7
8
9
# _raylet.pyx _514
execution_info = execution_infos.get(function_descriptor)

# _raylet.pyx 467_
dict execution_infos = manager.execution_infos

# _raylet.pyx 462_
worker = ray.worker.global_worker
manager = worker.function_actor_manager

所以这个 function_actor_manager 是什么呢?

1
2
# worker.py 113
self.function_actor_manager = FunctionActorManager(self)

所以其是一个FunctionActorManager类 (在 FunctionActorManager 类中其将 actor class 和 function 用 pickle 序列化后将其保存至 redis。)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# function_manager.py 40
class FunctionActorManager:
    """A class used to export/load remote functions and actors.
    Attributes:
        _worker: The associated worker that this manager related.
        _functions_to_export: The remote functions to export when
            the worker gets connected.
        _actors_to_export: The actors to export when the worker gets
            connected.
        _function_execution_info: The function_id
            and execution_info.
        _num_task_executions: The function
            execution times.
        imported_actor_classes: The set of actor classes keys (format:
            ActorClass:function_id) that are already in GCS.
    """

在该类中 execution_infos 就是一个空的字典

1
2
# function_manager.py 79
self.execution_infos = {}

所以这个 execution_infos 里面到底保存了什么东西呢?

1
2
3
4
5
6
# _raylet.pyx 514
execution_info = execution_infos.get(function_descriptor)
if not execution_info:
    execution_info = manager.get_execution_info(
        job_id, function_descriptor)
    execution_infos[function_descriptor] = execution_info

可知其中保存的是 get_execution_info 函数的返回值,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
_# function_manager.py 248
def get_execution_info(self, job_id, function_descriptor):
    """Get the FunctionExecutionInfo of a remote function.
    Args:
        job_id: ID of the job that the function belongs to.
        function_descriptor: The FunctionDescriptor of the function to get.
    Returns:
        A FunctionExecutionInfo object.
    """
    ...
    # If the function has already been loaded,
    # There's no need to load again
    if function_id in self._function_execution_info:
        return self._function_execution_info[function_id]
    ...

其中 _function_execution_info 定义如下;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# function_manager.py 63

# This field is a dictionary that maps function IDs
# to a FunctionExecutionInfo object. This should only be used on
# workers that execute remote functions.
self._function_execution_info = defaultdict(lambda: {})

...

 try:
    function = pickle.loads(serialized_function)
 ...
 else:
    function.__module__ = module
    self._function_execution_info[function_id] = (
      FunctionExecutionInfo(
          function=function,
          function_name=function_name,
          max_calls=max_calls))

其中 FunctionExecutionInfo 是一个 nametuple

1
2
3
FunctionExecutionInfo = namedtuple("FunctionExecutionInfo",
                                   ["function", "function_name", "max_calls"])
"""FunctionExecutionInfo: A named tuple storing remote function information."""

gym

1
2
3
4
5
6
7
import gym
env = gym.make('CartPole-v0')
env.reset()
for _ in range(1000):
    env.render()
    env.step(env.action_space.sample()) # take a random action
env.close()

The step function will returns four values:

  • observation (object): an environment-specific object representing your observation of the environment. For example, pixel data from a camera, joint angles and joint velocities of a robot, or the board state in a board game.
  • reward (float): amount of reward achieved by the previous action. The scale varies between environments, but the goal is always to increase your total reward.
  • done (boolean): whether it’s time to reset the environment again. Most (but not all) tasks are divided up into well-defined episodes, and done being True indicates the episode has terminated. (For example, perhaps the pole tipped too far, or you lost your last life.)
  • info (dict): diagnostic information useful for debugging. It can sometimes be useful for learning (for example, it might contain the raw probabilities behind the environment’s last state change). However, official evaluations of your agent are not allowed to use this for learning.

Every environment comes with an action_space and an observation_space. These attributes are of type Space, and they describe the format of valid actions and observations:

1
2
3
4
5
6
import gym
env = gym.make('CartPole-v0')
print(env.action_space)
#> Discrete(2)
print(env.observation_space)
#> Box(4,)

The Discrete space allows a fixed range of non-negative numbers, so in this case valid actions are either 0 or 1. The Box space represents an n-dimensional box, so valid observations will be an array of 4 numbers. We can also check the Box’s bounds:

1
2
3
4
print(env.observation_space.high)
#> array([ 2.4       ,         inf,  0.20943951,         inf])
print(env.observation_space.low)
#> array([-2.4       ,        -inf, -0.20943951,        -inf])

Reference

  1. Ray 1.0 Architecture
  2. Ownership A Distributed Futures System for Fine-Grained Tasks
updatedupdated2022-04-012022-04-01