Ray_raylet Note

本文记录 ray 框架的 raylet 模块

raylet

main.cc

raylet 是一个单独编译的程序,使用 c++ 进行编写,源码位于 /src/ray/raylet. 程序开始执行位于 main.cc.

在 main.cc 中解析 python_worker_command 的关键代码;

1
2
3
4
  if (!python_worker_command.empty()) {
    node_manager_config.worker_commands.emplace(
        make_pair(ray::Language::PYTHON, ParseCommandLine(python_worker_command)));
  }

通过对 node_manager_config 的一系列赋值后,传给 raylet,然后 start

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// main.cc 256
// Initialize the node manager.
raylet.reset(new ray::raylet::Raylet(
    main_service, raylet_socket_name, node_ip_address, redis_address, redis_port,
    redis_password, node_manager_config, object_manager_config, gcs_client,
    metrics_export_port));

// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) {
  ray::RayEventInit(ray::rpc::Event_SourceType::Event_SourceType_RAYLET,
                    {{"node_id", raylet->GetNodeId().Hex()}}, log_dir,
                    RayConfig::instance().event_level());
};

raylet->Start();

在 Raylet 的构造函数中将 node_manager_config 用于创建 node_manager_ (NodeManager 类)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// raylet.cc 57
Raylet::Raylet(instrumented_io_context &main_service, const std::string &socket_name,
               const std::string &node_ip_address, const std::string &redis_address,
               int redis_port, const std::string &redis_password,
               const NodeManagerConfig &node_manager_config,
               const ObjectManagerConfig &object_manager_config,
               std::shared_ptr<gcs::GcsClient> gcs_client, int metrics_export_port)
    : main_service_(main_service),
      self_node_id_(
          !RayConfig::instance().OVERRIDE_NODE_ID_FOR_TESTING().empty()
              ? NodeID::FromHex(RayConfig::instance().OVERRIDE_NODE_ID_FOR_TESTING())
              : NodeID::FromRandom()),
      gcs_client_(gcs_client),
      node_manager_(main_service, self_node_id_, node_manager_config,
                    object_manager_config, gcs_client_),
      socket_name_(socket_name),
      acceptor_(main_service, ParseUrlEndpoint(socket_name)),
      socket_(main_service) {

在 NodeManager 类的构造函数中, config 用于构造 worker_pool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// node_manager.cc 174
NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self_node_id,
                         const NodeManagerConfig &config,
                         const ObjectManagerConfig &object_manager_config,
                         std::shared_ptr<gcs::GcsClient> gcs_client)
    : self_node_id_(self_node_id),
      io_service_(io_service),
      gcs_client_(gcs_client),
      worker_pool_(
          io_service, self_node_id_, config.node_manager_address,
          config.num_workers_soft_limit, config.num_initial_python_workers_for_first_job,
          config.maximum_startup_concurrency, config.min_worker_port,
          config.max_worker_port, config.worker_ports, gcs_client_,
          config.worker_commands, config.native_library_path,
          /*starting_worker_timeout_callback=*/
          [this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
          config.ray_debugger_external,
          /*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),

WorkerPool 类继承两个类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// worker_pool.h 141
/// The WorkerPool is responsible for managing a pool of Workers. Each Worker
/// is a container for a unit of work.
class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface
{
 public:
  /// Create a pool and asynchronously start at least the specified number of workers per
  /// language.
  /// Once each worker process has registered with an external server, the
  /// process should create and register the specified number of workers, and add them to
  /// the pool.
  ///
  /// \param node_id The id of the current node.
  /// \param node_address The address of the current node.
  /// \param num_workers_soft_limit The soft limit of the number of workers.
  /// \param num_initial_python_workers_for_first_job The number of initial Python
  /// workers for the first job.
  /// \param maximum_startup_concurrency The maximum number of worker processes
  /// that can be started in parallel (typically this should be set to the number of CPU
  /// resources on the machine).
  /// \param min_worker_port The lowest port number that workers started will bind on.
  /// If this is set to 0, workers will bind on random ports.
  /// \param max_worker_port The highest port number that workers started will bind on.
  /// If this is not set to 0, min_worker_port must also not be set to 0.
  /// \param worker_ports An explicit list of open ports that workers started will bind
  /// on. This takes precedence over min_worker_port and max_worker_port.
  /// \param worker_commands The commands used to start the worker process, grouped by
  /// language.
  /// \param native_library_path The native library path which includes the core
  /// libraries.
  /// \param starting_worker_timeout_callback The callback that will be triggered once
  /// it times out to start a worker.
  /// \param ray_debugger_external Ray debugger in workers will be started in a way
  /// that they are accessible from outside the node.
  /// \param get_time A callback to get the current time.
  WorkerPool(instrumented_io_context &io_service, const NodeID node_id,
             const std::string node_address, int num_workers_soft_limit,
             int num_initial_python_workers_for_first_job,
             int maximum_startup_concurrency, int min_worker_port, int max_worker_port,
             const std::vector<int> &worker_ports,
             std::shared_ptr<gcs::GcsClient> gcs_client,
             const WorkerCommandMap &worker_commands,
             const std::string &native_library_path,
             std::function<void()> starting_worker_timeout_callback,
             int ray_debugger_external, const std::function<double()> get_time);

其中 worker_commands 类型为 WorkerCommandMap 定义如下:

1
2
using WorkerCommandMap =
    absl::flat_hash_map<Language, std::vector<std::string>, std::hash<int>>;   // <K, V, Hash>

在 WorkerPool 的构造函数内,对 worker_commands 进行了遍历

1
2
3
4
5
6
7
8
9
// worker_pool.cc 96
for (const auto &entry : worker_commands) {
  // Initialize the pool state for this language.
  auto &state = states_by_lang_[entry.first];
  state.multiple_for_warning = maximum_startup_concurrency;
  // Set worker command for this language.
  state.worker_command = entry.second;
  RAY_CHECK(!state.worker_command.empty()) << "Worker command must not be empty.";
}

其中的 states_by_lang_ 是个 hash_map , 定义如下:

1
  absl::flat_hash_map<Language, State, std::hash<int>> states_by_lang_;

其中 State 定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/// An internal data structure that maintains the pool state per language.
  struct State {
    /// The commands and arguments used to start the worker process
    std::vector<std::string> worker_command;
    /// The pool of dedicated workers for actor creation tasks
    /// with dynamic worker options (prefix or suffix worker command.)
    absl::flat_hash_map<TaskID, std::shared_ptr<WorkerInterface>> idle_dedicated_workers;
    /// The pool of idle non-actor workers.
    std::unordered_set<std::shared_ptr<WorkerInterface>> idle;
    // States for io workers used for python util functions.
    IOWorkerState util_io_worker_state;
    // States for io workers used for spilling objects.
    IOWorkerState spill_io_worker_state;
    // States for io workers used for restoring objects.
    IOWorkerState restore_io_worker_state;
    /// All workers that have registered and are still connected, including both
    /// idle and executing.
    std::unordered_set<std::shared_ptr<WorkerInterface>> registered_workers;
    /// All drivers that have registered and are still connected.
    std::unordered_set<std::shared_ptr<WorkerInterface>> registered_drivers;
    /// All workers that have registered but is about to disconnect. They shouldn't be
    /// popped anymore.
    std::unordered_set<std::shared_ptr<WorkerInterface>> pending_disconnection_workers;
    /// A map from the startup tokens of worker processes, assigned by the raylet, to
    /// the extra information of the process. Note that the shim process PID is the
    /// same with worker process PID, except starting worker process in container.
    absl::flat_hash_map<StartupToken, StartingWorkerProcessInfo>
        starting_worker_processes;
    /// A map for looking up the task by the pid of starting worker process.
    absl::flat_hash_map<Process, TaskWaitingForWorkerInfo> starting_workers_to_tasks;
    /// A map for looking up the task with dynamic options by the pid of
    /// starting worker process. Note that this is used for the dedicated worker
    /// processes.
    absl::flat_hash_map<Process, TaskWaitingForWorkerInfo>
        starting_dedicated_workers_to_tasks;
    /// We'll push a warning to the user every time a multiple of this many
    /// worker processes has been started.
    int multiple_for_warning;
    /// The last size at which a warning about the number of registered workers
    /// was generated.
    int64_t last_warning_multiple;
  };

回到 node_manager 的构造函数,在函数内调用了 worker_pool_ 的 SetNodeManagerPort 函数如下:

1
2
// node_manager.cc 363
  worker_pool_.SetNodeManagerPort(GetServerPort());

关于 SetNodeManagerPort 的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// worker_pool.h 188
  /// Set the node manager port.
  /// \param node_manager_port The port Raylet uses for listening to incoming connections.
  void SetNodeManagerPort(int node_manager_port);

// worker_pool.cc 148
  // NOTE(kfstorm): The node manager cannot be passed via WorkerPool constructor because the
// grpc server is started after the WorkerPool instance is constructed.
void WorkerPool::SetNodeManagerPort(int node_manager_port) {
  node_manager_port_ = node_manager_port;
}

node_manager 的构造函数,在函数最后调用了 worker_pool_ 的 SetAgentManager 函数如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// node_manager.cc 375
  agent_manager_ = std::make_shared<AgentManager>(
      std::move(options),
      /*delay_executor=*/
      [this](std::function<void()> task, uint32_t delay_ms) {
        return execute_after(io_service_, task, delay_ms);
      },
      /*runtime_env_agent_factory=*/
      [this](const std::string &ip_address, int port) {
        RAY_CHECK(!ip_address.empty() && port != 0)
            << "ip_address: " << ip_address << " port: " << port;
        return std::shared_ptr<rpc::RuntimeEnvAgentClientInterface>(
            new rpc::RuntimeEnvAgentClient(ip_address, port, client_call_manager_));
      });
  worker_pool_.SetAgentManager(agent_manager_);

回到main.cc 中调用 raylet->Start(), Raylet 类 Start 函数实现

1
2
3
4
5
6
7
// raylet.cc 87
void Raylet::Start() {
  RAY_CHECK_OK(RegisterGcs());

  // Start listening for clients.
  DoAccept();
}

DoAccept 函数实现

1
2
3
4
5
// raylet.cc 131
void Raylet::DoAccept() {
  acceptor_.async_accept(socket_, boost::bind(&Raylet::HandleAccept, this,
                                              boost::asio::placeholders::error));
}

其中 acceptor 定义如下:

1
2
3
4
5
// raylet.h 98
  /// An acceptor for new clients.
  boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor_;
  /// The socket to listen on for new clients.
  local_stream_socket socket_;

使异步连接调用 HandleAccept 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// raylet.cc 136
void Raylet::HandleAccept(const boost::system::error_code &error) {
  if (!error) {
    // TODO: typedef these handlers.
    ClientHandler client_handler = [this](ClientConnection &client) {
      node_manager_.ProcessNewClient(client);
    };
    MessageHandler message_handler = [this](std::shared_ptr<ClientConnection> client,
                                            int64_t message_type,
                                            const std::vector<uint8_t> &message) {
      node_manager_.ProcessClientMessage(client, message_type, message.data());
    };
    flatbuffers::FlatBufferBuilder fbb;
    protocol::DisconnectClientBuilder builder(fbb);
    builder.add_disconnect_type(static_cast<int>(rpc::WorkerExitType::SYSTEM_ERROR_EXIT));
    fbb.Finish(builder.Finish());
    std::vector<uint8_t> message_data(fbb.GetBufferPointer(),
                                      fbb.GetBufferPointer() + fbb.GetSize());
    // Accept a new local client and dispatch it to the node manager.
    auto new_connection = ClientConnection::Create(
        client_handler, message_handler, std::move(socket_), "worker",
        node_manager_message_enum,
        static_cast<int64_t>(protocol::MessageType::DisconnectClient), message_data);
  }
  // We're ready to accept another client.
  DoAccept();
}

此处的 ClientConnection::Create 为一个静态方法, NodeManager 的 ProcessNewClient 和 ProcessClientMessage 分别定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// client_connection.h 185
  /// Allocate a new node client connection.
  ///
  /// \param new_client_handler A reference to the client handler.
  /// \param message_handler A reference to the message handler.
  /// \param socket The client socket.
  /// \param debug_label Label that is printed in debug messages, to identify
  /// the type of client.
  /// \param message_type_enum_names A table of printable enum names for the
  /// message types received from this client, used for debug messages.
  /// \param error_message_type the type of error message
  /// \param error_message_data the companion data to the error message type.
  /// \return std::shared_ptr<ClientConnection>.
  static std::shared_ptr<ClientConnection> Create(
      ClientHandler &new_client_handler, MessageHandler &message_handler,
      local_stream_socket &&socket, const std::string &debug_label,
      const std::vector<std::string> &message_type_enum_names, int64_t error_message_type,
      const std::vector<uint8_t> &error_message_data = _dummy_error_message_data);

// client_connection.cc 342
std::shared_ptr<ClientConnection> ClientConnection::Create(
    ClientHandler &client_handler, MessageHandler &message_handler,
    local_stream_socket &&socket, const std::string &debug_label,
    const std::vector<std::string> &message_type_enum_names, int64_t error_message_type,
    const std::vector<uint8_t> &error_message_data) {
  std::shared_ptr<ClientConnection> self(new ClientConnection(
      message_handler, std::move(socket), debug_label, message_type_enum_names,
      error_message_type, error_message_data));
  // Let our manager process our new connection.
  client_handler(*self);
  return self;
}
1
2
3
4
5
// node_manager.cc 961
void NodeManager::ProcessNewClient(ClientConnection &client) {
  // The new client is a worker, so begin listening for messages.
  client.ProcessMessages();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// node_manager.cc 965
void NodeManager::ProcessClientMessage(const std::shared_ptr<ClientConnection> &client,
                                       int64_t message_type,
                                       const uint8_t *message_data) {
  auto registered_worker = worker_pool_.GetRegisteredWorker(client);
  auto message_type_value = static_cast<protocol::MessageType>(message_type);
  RAY_LOG(DEBUG) << "[Worker] Message "
                 << protocol::EnumNameMessageType(message_type_value) << "("
                 << message_type << ") from worker with PID "
                 << (registered_worker
                         ? std::to_string(registered_worker->GetProcess().GetId())
                         : "nil");

  if (registered_worker && registered_worker->IsDead()) {
    // For a worker that is marked as dead (because the job has died already),
    // all the messages are ignored except DisconnectClient.
    if (message_type_value != protocol::MessageType::DisconnectClient) {
      // Listen for more messages.
      client->ProcessMessages();
      return;
    }
  }

  switch (message_type_value) {
  case protocol::MessageType::RegisterClientRequest: {
    ProcessRegisterClientRequestMessage(client, message_data);
  } break;
  case protocol::MessageType::AnnounceWorkerPort: {
    ProcessAnnounceWorkerPortMessage(client, message_data);
  } break;
  case protocol::MessageType::TaskDone: {
    HandleWorkerAvailable(client);
  } break;
  case protocol::MessageType::DisconnectClient: {
    ProcessDisconnectClientMessage(client, message_data);
    // We don't need to receive future messages from this client,
    // because it's already disconnected.
    return;
  } break;
  case protocol::MessageType::FetchOrReconstruct: {
    ProcessFetchOrReconstructMessage(client, message_data);
  } break;
  case protocol::MessageType::NotifyDirectCallTaskBlocked: {
    ProcessDirectCallTaskBlocked(client, message_data);
  } break;
  case protocol::MessageType::NotifyDirectCallTaskUnblocked: {
    std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
    HandleDirectCallTaskUnblocked(worker);
  } break;
  case protocol::MessageType::NotifyUnblocked: {
    // TODO(ekl) this is still used from core worker even in direct call mode to
    // finish up get requests.
    auto message = flatbuffers::GetRoot<protocol::NotifyUnblocked>(message_data);
    AsyncResolveObjectsFinish(client, from_flatbuf<TaskID>(*message->task_id()),
                              /*was_blocked*/ true);
  } break;
  case protocol::MessageType::WaitRequest: {
    ProcessWaitRequestMessage(client, message_data);
  } break;
  case protocol::MessageType::WaitForDirectActorCallArgsRequest: {
    ProcessWaitForDirectActorCallArgsRequestMessage(client, message_data);
  } break;
  case protocol::MessageType::PushErrorRequest: {
    ProcessPushErrorRequestMessage(message_data);
  } break;
  case protocol::MessageType::FreeObjectsInObjectStoreRequest: {
    auto message = flatbuffers::GetRoot<protocol::FreeObjectsRequest>(message_data);
    std::vector<ObjectID> object_ids = from_flatbuf<ObjectID>(*message->object_ids());
    // Clean up objects from the object store.
    object_manager_.FreeObjects(object_ids, message->local_only());
  } break;
  case protocol::MessageType::SubscribePlasmaReady: {
    ProcessSubscribePlasmaReady(client, message_data);
  } break;
  default:
    RAY_LOG(FATAL) << "Received unexpected message type " << message_type;
  }

  // Listen for more messages.
  client->ProcessMessages();
}

AgentManager

至此,丢失线索,查看文件从 agent_manager.cc 开始继续。在 AgentManager 类中有一个 StartAgent 方法其中有开启进程的代码,如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// agent_manager.cc 60
  // Launch the process to create the agent.
  std::error_code ec;
  std::vector<const char *> argv;
  for (const std::string &arg : options_.agent_commands) {
    argv.push_back(arg.c_str());
  }
  argv.push_back(NULL);
  // Set node id to agent.
  ProcessEnvironment env;
  env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
  env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});
  // Report the restart count to the agent so that we can decide whether or not
  // report the error message to drivers.
  env.insert({"RESTART_COUNT", std::to_string(agent_restart_count_)});
  env.insert({"MAX_RESTART_COUNT",
              std::to_string(RayConfig::instance().agent_max_restart_count())});
  Process child(argv.data(), nullptr, ec, false, env);

其中声明的 Process 类的构造函数定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// process.h 69
  /// Creates a new process.
  /// \param[in] argv The command-line of the process to spawn (terminated with NULL).
  /// \param[in] io_service Boost.Asio I/O service (optional).
  /// \param[in] ec Returns any error that occurred when spawning the process.
  /// \param[in] decouple True iff the parent will not wait for the child to exit.
  /// \param[in] env Additional environment variables to be set on this process besides
  /// the environment variables of the parent process.
  explicit Process(const char *argv[], void *io_service, std::error_code &ec,
                   bool decouple = false, const ProcessEnvironment &env = {});

  // process.cc 342
  Process::Process(const char *argv[], void *io_service, std::error_code &ec, bool decouple,
                 const ProcessEnvironment &env) {
  (void)io_service;
  ProcessFD procfd = ProcessFD::spawnvpe(argv, ec, decouple, env);
  if (!ec) {
    p_ = std::make_shared<ProcessFD>(std::move(procfd));
  }
}

其中调用的 ProcessFD::spawnvpe 静态方法如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// process.cc 101
// Fork + exec combo. Returns -1 for the PID on failure.
  static ProcessFD spawnvpe(const char *argv[], std::error_code &ec, bool decouple,
                            const ProcessEnvironment &env) {
    ec = std::error_code();
    intptr_t fd;
    pid_t pid;
    ProcessEnvironment new_env;
    for (char *const *e = environ; *e; ++e) {
      RAY_CHECK(*e && **e != '\0') << "environment variable name is absent";
      const char *key_end = strchr(*e + 1 /* +1 is needed for Windows */, '=');
      RAY_CHECK(key_end) << "environment variable value is absent: " << e;
      new_env[std::string(*e, static_cast<size_t>(key_end - *e))] = key_end + 1;
    }
    for (const auto &item : env) {
      new_env[item.first] = item.second;
    }
    std::string new_env_block;
    for (const auto &item : new_env) {
      new_env_block += item.first + '=' + item.second + '\0';
    }
#ifdef _WIN32

    (void)decouple;  // Windows doesn't require anything particular for decoupling.
    std::vector<std::string> args;
    for (size_t i = 0; argv[i]; ++i) {
      args.push_back(argv[i]);
    }
    std::string cmds[] = {std::string(), CreateCommandLine(args)};
    if (GetFileName(args.at(0)).find('.') == std::string::npos) {
      // Some executables might be missing an extension.
      // Append a single "." to prevent automatic appending of extensions by the system.
      std::vector<std::string> args_direct_call = args;
      args_direct_call[0] += ".";
      cmds[0] = CreateCommandLine(args_direct_call);
    }
    bool succeeded = false;
    PROCESS_INFORMATION pi = {};
    for (int attempt = 0; attempt < sizeof(cmds) / sizeof(*cmds); ++attempt) {
      std::string &cmd = cmds[attempt];
      if (!cmd.empty()) {
        (void)cmd.c_str();  // We'll need this to be null-terminated (but mutable) below
        TCHAR *cmdline = &*cmd.begin();
        STARTUPINFO si = {sizeof(si)};
        RAY_UNUSED(
            new_env_block.c_str());  // Ensure there's a final terminator for Windows
        char *const envp = &new_env_block[0];
        if (CreateProcessA(NULL, cmdline, NULL, NULL, FALSE, 0, envp, NULL, &si, &pi)) {
          succeeded = true;
          break;
        }
      }
    }
    if (succeeded) {
      CloseHandle(pi.hThread);
      fd = reinterpret_cast<intptr_t>(pi.hProcess);
      pid = pi.dwProcessId;
    } else {
      ec = std::error_code(GetLastError(), std::system_category());
      fd = -1;
      pid = -1;
    }
#else
    std::vector<char *> new_env_ptrs;
    for (size_t i = 0; i < new_env_block.size(); i += strlen(&new_env_block[i]) + 1) {
      new_env_ptrs.push_back(&new_env_block[i]);
    }
    new_env_ptrs.push_back(static_cast<char *>(NULL));
    char **envp = &new_env_ptrs[0];

    // TODO(mehrdadn): Use clone() on Linux or posix_spawnp() on Mac to avoid duplicating
    // file descriptors into the child process, as that can be problematic.
    int pipefds[2];  // Create pipe to get PID & track lifetime
    if (pipe(pipefds) == -1) {
      pipefds[0] = pipefds[1] = -1;
    }
    pid = pipefds[1] != -1 ? fork() : -1;
    if (pid <= 0 && pipefds[0] != -1) {
      close(pipefds[0]);  // not the parent, so close the read end of the pipe
      pipefds[0] = -1;
    }
    if (pid != 0 && pipefds[1] != -1) {
      close(pipefds[1]);  // not the child, so close the write end of the pipe
      pipefds[1] = -1;
    }
    if (pid == 0) {
      // Child process case. Reset the SIGCHLD handler.
      signal(SIGCHLD, SIG_DFL);
      // If process needs to be decoupled, double-fork to avoid zombies.
      if (pid_t pid2 = decouple ? fork() : 0) {
        _exit(pid2 == -1 ? errno : 0);  // Parent of grandchild; must exit
      }
      // This is the spawned process. Any intermediate parent is now dead.
      pid_t my_pid = getpid();
      if (write(pipefds[1], &my_pid, sizeof(my_pid)) == sizeof(my_pid)) {
        execvpe(argv[0], const_cast<char *const *>(argv),
                const_cast<char *const *>(envp));
      }
      _exit(errno);  // fork() succeeded and exec() failed, so abort the child
    }
    if (pid > 0) {
      // Parent process case
      if (decouple) {
        int s;
        (void)waitpid(pid, &s, 0);  // can't do much if this fails, so ignore return value
      }
      int r = read(pipefds[0], &pid, sizeof(pid));
      (void)r;  // can't do much if this fails, so ignore return value
    }
    // Use pipe to track process lifetime. (The pipe closes when process terminates.)
    fd = pipefds[0];
    if (pid == -1) {
      ec = std::error_code(errno, std::system_category());
    }
#endif
    return ProcessFD(pid, fd);
  }
};

在此处,通过 fork() 系统调用生成多个 worker 子进程。 每个子进程通过执行 execvpe 运行 default_worker.py. 如下

1
2
3
4
argv: /usr/bin/python /data/ray/ray/python/ray/workers/default_worker.py --node-ip-address=10.186.115.19 --node-manager-port=44361 --object-store-name=/tmp/ray/session_2021-12-15_15-05-48_694099_43976/sockets/plasma_store --raylet-name=/tmp/ray/session_2021-12-15_15-05-48_694099_43976/sockets/raylet --redis-address=10.186.115.19:6379 --temp-dir=/tmp/ray --metrics-agent-port=39805 --logging-rotate-bytes=536870912 --logging-rotate-backup-count=5 --redis-password=5241590000000000 --startup-token=0


env: CONDA_EXE=/root/anaconda3/bin/conda CONDA_PYTHON_EXE=/root/anaconda3/bin/python CONDA_SHLVL=0 HOME=/root LANG=C.UTF-8 LC_ADDRESS=C.UTF-8 LC_IDENTIFICATION=C.UTF-8 LC_MEASUREMENT=C.UTF-8 LC_MONETARY=C.UTF-8 LC_NAME=C.UTF-8 LC_NUMERIC=C.UTF-8 LC_PAPER=C.UTF-8 LC_TELEPHONE=C.UTF-8 LC_TIME=C.UTF-8 LD_LIBRARY_PATH=/usr/local/lib/python3.6/dist-packages/cv2/../../lib64::/usr/lib/x86_64-linux-gnu/ LESSCLOSE=/usr/bin/lesspipe %s %s LESSOPEN=| /usr/bin/lesspipe %s LOGNAME=root LS_COLORS=rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.Z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.wim=01;31:*.swm=01;31:*.dwm=01;31:*.esd=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36: MAIL=/var/mail/root OLDPWD=/data/ray OMP_NUM_THREADS=1 PATH=/root/anaconda3/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/root/anaconda3/bin PWD=/data/ray/ray/python QT_QPA_FONTDIR=/usr/local/lib/python3.6/dist-packages/cv2/qt/fonts QT_QPA_PLATFORM_PLUGIN_PATH=/usr/local/lib/python3.6/dist-packages/cv2/qt/plugins RAY_CLIENT_MODE=0 RAY_JOB_ID=01000000 SHELL=/bin/bash SHLVL=1 SPT_NOENV=1 SSH_CLIENT=172.16.11.137 39188 22 SSH_CONNECTION=172.16.11.137 39188 10.186.115.19 22 SSH_TTY=/dev/pts/0 TERM=screen.xterm-256color TZ=Asia/Shanghai USER=root XDG_DATA_DIRS=/usr/local/share:/usr/share:/var/lib/snapd/desktop XDG_RUNTIME_DIR=/run/user/0 XDG_SESSION_ID=20784 _=/usr/bin/python _CE_CONDA= _CE_M=

在 AgentManager 类中还有一个方法 CreateRuntimeEnv 定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// agent_manager.cc 130
void AgentManager::CreateRuntimeEnv(
    const JobID &job_id, const std::string &serialized_runtime_env,
    const std::string &serialized_allocated_resource_instances,
    CreateRuntimeEnvCallback callback) {
  // If the agent cannot be started, fail the request.
  if (!should_start_agent_) {
    RAY_LOG(ERROR) << "Not all required Ray dependencies for the runtime_env "
                      "feature were found. To install the required dependencies, "
                   << "please run `pip install \"ray[default]\"`.";
    // Execute the callback after the currently executing callback finishes.  Otherwise
    // the task may be erased from the dispatch queue during the queue iteration in
    // ClusterTaskManager::DispatchScheduledTasksToWorkers(), invalidating the iterator
    // and causing a segfault.
    delay_executor_(
        [callback] {
          callback(/*successful=*/false, /*serialized_runtime_env_context=*/"");
        },
        0);
    return;
  }

  if (runtime_env_agent_client_ == nullptr) {
    // If the agent cannot be restarted anymore, fail the request.
    if (agent_restart_count_ >= RayConfig::instance().agent_max_restart_count()) {
      RAY_LOG(WARNING) << "Runtime environment " << serialized_runtime_env
                       << " cannot be created on this node because the agent is dead.";
      delay_executor_(
          [callback, serialized_runtime_env] {
            callback(/*successful=*/false,
                     /*serialized_runtime_env_context=*/serialized_runtime_env);
          },
          0);
      return;
    }

    RAY_LOG(INFO)
        << "Runtime env agent is not registered yet. Will retry CreateRuntimeEnv later: "
        << serialized_runtime_env;
    delay_executor_(
        [this, job_id, serialized_runtime_env, serialized_allocated_resource_instances,
         callback] {
          CreateRuntimeEnv(job_id, serialized_runtime_env,
                           serialized_allocated_resource_instances, callback);
        },
        RayConfig::instance().agent_manager_retry_interval_ms());
    return;
  }
  rpc::CreateRuntimeEnvRequest request;
  request.set_job_id(job_id.Hex());
  request.set_serialized_runtime_env(serialized_runtime_env);
  request.set_serialized_allocated_resource_instances(
      serialized_allocated_resource_instances);
  runtime_env_agent_client_->CreateRuntimeEnv(
      request,
      [this, job_id, serialized_runtime_env, serialized_allocated_resource_instances,
       callback](const Status &status, const rpc::CreateRuntimeEnvReply &reply) {
        if (status.ok()) {
          if (reply.status() == rpc::AGENT_RPC_STATUS_OK) {
            callback(true, reply.serialized_runtime_env_context());
          } else {
            RAY_LOG(ERROR) << "Failed to create runtime env: " << serialized_runtime_env
                           << ", error message: " << reply.error_message();
            callback(false, reply.serialized_runtime_env_context());
          }

        } else {
          RAY_LOG(ERROR)
              << "Failed to create the runtime env: " << serialized_runtime_env
              << ", status = " << status
              << ", maybe there are some network problems, will retry it later.";
          delay_executor_(
              [this, job_id, serialized_runtime_env,
               serialized_allocated_resource_instances, callback] {
                CreateRuntimeEnv(job_id, serialized_runtime_env,
                                 serialized_allocated_resource_instances, callback);
              },
              RayConfig::instance().agent_manager_retry_interval_ms());
        }
      });
}

此处执行了一个 RPC 远程调用 CreateRuntimeEnv, 在运行 plot_pong 样例时,该函数并未被调用, 还不知该函数的作用。

cluster_task_manager

其中定义了一个 Work 类, 如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// cluster_task_manager.h 67
class Work {
 public:
  RayTask task;
  rpc::RequestWorkerLeaseReply *reply;
  std::function<void(void)> callback;
  std::shared_ptr<TaskResourceInstances> allocated_instances;
  Work(RayTask task, rpc::RequestWorkerLeaseReply *reply,
       std::function<void(void)> callback, WorkStatus status = WorkStatus::WAITING)
      : task(task),
        reply(reply),
        callback(callback),
        allocated_instances(nullptr),
        status_(status){};
  Work(const Work &Work) = delete;
  Work &operator=(const Work &work) = delete;
  ~Work() = default;

  /// Set the state as waiting with the cause.
  void SetStateWaiting(const UnscheduledWorkCause &cause) {
    status_ = WorkStatus::WAITING;
    unscheduled_work_cause_ = cause;
  }

  /// Set the state as waiting for workers, meaning it is waiting for workers to start.
  void SetStateWaitingForWorker() { status_ = WorkStatus::WAITING_FOR_WORKER; }

  /// Set the state as cancelled, meaning this task has to be unqueued from the node.
  void SetStateCancelled() { status_ = WorkStatus::CANCELLED; }

  WorkStatus GetState() const { return status_; }

  UnscheduledWorkCause GetUnscheduledCause() const { return unscheduled_work_cause_; }

 private:
  WorkStatus status_ = WorkStatus::WAITING;
  UnscheduledWorkCause unscheduled_work_cause_ =
      UnscheduledWorkCause::WAITING_FOR_RESOURCE_ACQUISITION;
};

其中包含一个对象 RayTask task. 对于 RayTask 的定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// task.h 33
/// \class RayTask
///
/// A RayTask represents a Ray task and a specification of its execution (e.g.,
/// resource demands). The task's specification contains both immutable fields,
/// determined at submission time, and mutable fields, determined at execution
/// time.
class RayTask {
 public:
  /// Construct an empty task. This should only be used to pass a task
  /// as an out parameter to a function or method.
  RayTask() {}

  /// Construct a `RayTask` object from a protobuf message.
  ///
  /// \param message The protobuf message.
  explicit RayTask(const rpc::Task &message);

  /// Construct a `RayTask` object from a `TaskSpecification` and a
  /// `TaskExecutionSpecification`.
  RayTask(TaskSpecification task_spec, TaskExecutionSpecification task_execution_spec);

  /// Get the mutable specification for the task. This specification may be
  /// updated at runtime.
  ///
  /// \return The mutable specification for the task.
  const TaskExecutionSpecification &GetTaskExecutionSpec() const;

  /// Get the immutable specification for the task.
  ///
  /// \return The immutable specification for the task.
  const TaskSpecification &GetTaskSpecification() const;

  /// Increment the number of times this task has been forwarded.
  void IncrementNumForwards();

  /// Get the task's object dependencies. This comprises the immutable task
  /// arguments and the mutable execution dependencies.
  ///
  /// \return The object dependencies.
  const std::vector<rpc::ObjectReference> &GetDependencies() const;

  /// Update the dynamic/mutable information for this task.
  /// \param task RayTask structure with updated dynamic information.
  void CopyTaskExecutionSpec(const RayTask &task);

  std::string DebugString() const;

 private:
  void ComputeDependencies();

  /// RayTask specification object, consisting of immutable information about this
  /// task determined at submission time. Includes resource demand, object
  /// dependencies, etc.
  TaskSpecification task_spec_;
  /// RayTask execution specification, consisting of all dynamic/mutable
  /// information about this task determined at execution time.
  TaskExecutionSpecification task_execution_spec_;
  /// A cached copy of the task's object dependencies, including arguments from
  /// the TaskSpecification and execution dependencies from the
  /// TaskExecutionSpecification.
  std::vector<rpc::ObjectReference> dependencies_;
};

在 RayTask 的构造函数中的 massage 包含 task_execution_spec 的信息,如下所示:

1
2
3
4
5
6
// task.cc 21
RayTask::RayTask(const rpc::Task &message)
    : task_spec_(message.task_spec()),
      task_execution_spec_(message.task_execution_spec()) {
  ComputeDependencies();
}

其中 Task 的定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// common.proto 367
// Represents a task, including task spec, and task execution spec.
message Task {
  TaskSpec task_spec = 1;
  TaskExecutionSpec task_execution_spec = 2;
}

// common.proto 357
// The task execution specification encapsulates all mutable information about
// the task. These fields may change at execution time, converse to the
// `TaskSpec` is determined at submission time.
message TaskExecutionSpec {
  // The last time this task was received for scheduling.
  double last_timestamp = 1;
  // The number of times this task was spilled back by raylets.
  uint64 num_forwards = 2;
}

_raylet.pyx

_raylet.pyx_ 中实现了 execute_task 函数如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# _raylet.pyx 444_
cdef execute_task(
        CTaskType task_type,
        const c_string name,
        const CRayFunction &ray_function,
        const unordered_map[c_string, double] &c_resources,
        const c_vector[shared_ptr[CRayObject]] &c_args,
        const c_vector[CObjectReference] &c_arg_refs,
        const c_vector[CObjectID] &c_return_ids,
        const c_string debugger_breakpoint,
        c_vector[shared_ptr[CRayObject]] *returns,
        c_bool *is_application_level_error,
        # This parameter is only used for actor creation task to define
        # the concurrency groups of this actor.
        const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups,
        const c_string c_name_of_concurrency_group_to_execute):

    is_application_level_error[0] = False

    worker = ray.worker.global_worker
    manager = worker.function_actor_manager
    actor = None

    cdef:
        dict execution_infos = manager.execution_infos
        CoreWorker core_worker = worker.core_worker
        JobID job_id = core_worker.get_current_job_id()
        TaskID task_id = core_worker.get_current_task_id()
        CFiberEvent task_done_event

    # Automatically restrict the GPUs available to this task.
    ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids())

    # Helper method used to exit current asyncio actor.
    # This is called when a KeyboardInterrupt is received by the main thread.
    # Upon receiving a KeyboardInterrupt signal, Ray will exit the current
    # worker. If the worker is processing normal tasks, Ray treat it as task
    # cancellation from ray.cancel(object_ref). If the worker is an asyncio
    # actor, Ray will exit the actor.
    def exit_current_actor_if_asyncio():
        if core_worker.current_actor_is_asyncio():
            error = SystemExit(0)
            error.is_ray_terminate = True
            raise error

    function_descriptor = CFunctionDescriptorToPython(
        ray_function.GetFunctionDescriptor())

    if <int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK:
        actor_class = manager.load_actor_class(job_id, function_descriptor)
        actor_id = core_worker.get_actor_id()
        actor = actor_class.__new__(actor_class)
        worker.actors[actor_id] = actor
        if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
            # Record the actor class via :actor_name: magic token in the log.
            #
            # (Phase 1): this covers code run before __init__ finishes.
            # We need to handle this separately because `__repr__` may not be
            # runnable until after `__init__` (e.g., if it accesses fields
            # defined in the constructor).
            actor_magic_token = "{}{}".format(
                ray_constants.LOG_PREFIX_ACTOR_NAME, actor_class.__name__)
            # Flush to both .out and .err
            print(actor_magic_token)
            print(actor_magic_token, file=sys.stderr)

        # Initial eventloops for asyncio for this actor.
        if core_worker.current_actor_is_asyncio():
            core_worker.initialize_eventloops_for_actor_concurrency_group(
                c_defined_concurrency_groups)

    execution_info = execution_infos.get(function_descriptor)
    if not execution_info:
        execution_info = manager.get_execution_info(
            job_id, function_descriptor)
        execution_infos[function_descriptor] = execution_info

    function_name = execution_info.function_name
    extra_data = (b'{"name": ' + function_name.encode("ascii") +
                  b' "task_id": ' + task_id.hex().encode("ascii") + b'}')

    task_name = name.decode("utf-8")
    name_of_concurrency_group_to_execute = \
        c_name_of_concurrency_group_to_execute.decode("ascii")
    title = f"ray::{task_name}"

    if <int>task_type == <int>TASK_TYPE_NORMAL_TASK:
        next_title = "ray::IDLE"
        function_executor = execution_info.function
        # Record the task name via :task_name: magic token in the log file.
        # This is used for the prefix in driver logs `(task_name pid=123) ...`
        task_name_magic_token = "{}{}".format(
            ray_constants.LOG_PREFIX_TASK_NAME, task_name.replace("()", ""))
        # Print on both .out and .err
        print(task_name_magic_token)
        print(task_name_magic_token, file=sys.stderr)
    else:
        actor = worker.actors[core_worker.get_actor_id()]
        class_name = actor.__class__.__name__
        next_title = f"ray::{class_name}"
        pid = os.getpid()
        worker_name = f"ray_{class_name}_{pid}"
        if c_resources.find(b"object_store_memory") != c_resources.end():
            worker.core_worker.set_object_store_client_options(
                worker_name,
                int(ray_constants.from_memory_units(
                        dereference(
                            c_resources.find(b"object_store_memory")).second)))

        def function_executor(*arguments, **kwarguments):
            function = execution_info.function

            if core_worker.current_actor_is_asyncio():
                # Increase recursion limit if necessary. In asyncio mode,
                # we have many parallel callstacks (represented in fibers)
                # that's suspended for execution. Python interpreter will
                # mistakenly count each callstack towards recusion limit.
                # We don't need to worry about stackoverflow here because
                # the max number of callstacks is limited in direct actor
                # transport with max_concurrency flag.
                increase_recursion_limit()

                if inspect.iscoroutinefunction(function.method):
                    async_function = function
                else:
                    # Just execute the method if it's ray internal method.
                    if function.name.startswith("__ray"):
                        return function(actor, *arguments, **kwarguments)
                    async_function = sync_to_async(function)

                return core_worker.run_async_func_in_event_loop(
                    async_function, function_descriptor,
                    name_of_concurrency_group_to_execute, actor,
                    *arguments, **kwarguments)

            return function(actor, *arguments, **kwarguments)

    with core_worker.profile_event(b"task", extra_data=extra_data):
        try:
            task_exception = False
            if not (<int>task_type == <int>TASK_TYPE_ACTOR_TASK
                    and function_name == "__ray_terminate__"):
                worker.memory_monitor.raise_if_low_memory()

            with core_worker.profile_event(b"task:deserialize_arguments"):
                if c_args.empty():
                    args, kwargs = [], {}
                else:
                    metadata_pairs = RayObjectsToDataMetadataPairs(c_args)
                    object_refs = VectorToObjectRefs(c_arg_refs)

                    if core_worker.current_actor_is_asyncio():
                        # We deserialize objects in event loop thread to
                        # prevent segfaults. See #7799
                        async def deserialize_args():
                            return (ray.worker.global_worker
                                    .deserialize_objects(
                                        metadata_pairs, object_refs))
                        args = core_worker.run_async_func_in_event_loop(
                            deserialize_args, function_descriptor,
                            name_of_concurrency_group_to_execute)
                    else:
                        args = ray.worker.global_worker.deserialize_objects(
                            metadata_pairs, object_refs)

                    for arg in args:
                        raise_if_dependency_failed(arg)
                    args, kwargs = ray._private.signature.recover_args(args)

            if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
                actor = worker.actors[core_worker.get_actor_id()]
                class_name = actor.__class__.__name__
                actor_title = f"{class_name}({args!r}, {kwargs!r})"
                core_worker.set_actor_title(actor_title.encode("utf-8"))
            # Execute the task.
            with core_worker.profile_event(b"task:execute"):
                task_exception = True
                try:
                    is_existing = core_worker.is_exiting()
                    if is_existing:
                        title = f"{title}::Exiting"
                        next_title = f"{next_title}::Exiting"
                    with ray.worker._changeproctitle(title, next_title):
                        if debugger_breakpoint != b"":
                            ray.util.pdb.set_trace(
                                breakpoint_uuid=debugger_breakpoint)
                        outputs = function_executor(*args, **kwargs)
                        next_breakpoint = (
                            ray.worker.global_worker.debugger_breakpoint)
                        if next_breakpoint != b"":
                            # If this happens, the user typed "remote" and
                            # there were no more remote calls left in this
                            # task. In that case we just exit the debugger.
                            ray.experimental.internal_kv._internal_kv_put(
                                "RAY_PDB_{}".format(next_breakpoint),
                                "{\"exit_debugger\": true}")
                            ray.experimental.internal_kv._internal_kv_del(
                                "RAY_PDB_CONTINUE_{}".format(next_breakpoint)
                            )
                            ray.worker.global_worker.debugger_breakpoint = b""
                    task_exception = False
                except AsyncioActorExit as e:
                    exit_current_actor_if_asyncio()
                except KeyboardInterrupt as e:
                    raise TaskCancelledError(
                            core_worker.get_current_task_id())
                except Exception as e:
                    is_application_level_error[0] = True
                    if core_worker.get_current_task_retry_exceptions():
                        logger.info("Task failed with retryable exception:"
                                    " {}.".format(
                                        core_worker.get_current_task_id()),
                                    exc_info=True)
                    raise e
                if c_return_ids.size() == 1:
                    outputs = (outputs,)
            if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
                # Record actor repr via :actor_name: magic token in the log.
                #
                # (Phase 2): after `__init__` finishes, we override the
                # log prefix with the full repr of the actor. The log monitor
                # will pick up the updated token.
                if (hasattr(actor_class, "__ray_actor_class__") and
                        "__repr__" in
                        actor_class.__ray_actor_class__.__dict__):
                    actor_magic_token = "{}{}".format(
                        ray_constants.LOG_PREFIX_ACTOR_NAME, repr(actor))
                    # Flush on both stdout and stderr.
                    print(actor_magic_token)
                    print(actor_magic_token, file=sys.stderr)
            # Check for a cancellation that was called when the function
            # was exiting and was raised after the except block.
            if not check_signals().ok():
                task_exception = True
                raise TaskCancelledError(
                            core_worker.get_current_task_id())
            if (c_return_ids.size() > 0 and
                    len(outputs) != int(c_return_ids.size())):
                raise ValueError(
                    "Task returned {} objects, but num_returns={}.".format(
                        len(outputs), c_return_ids.size()))
            # Store the outputs in the object store.
            with core_worker.profile_event(b"task:store_outputs"):
                core_worker.store_task_outputs(
                    worker, outputs, c_return_ids, returns)
        except Exception as error:
            # If the debugger is enabled, drop into the remote pdb here.
            if "RAY_PDB" in os.environ:
                ray.util.pdb.post_mortem()

            backtrace = ray._private.utils.format_error_message(
                traceback.format_exc(), task_exception=task_exception)

            # Generate the actor repr from the actor class.
            actor_repr = repr(actor) if actor else None

            if isinstance(error, RayTaskError):
                # Avoid recursive nesting of RayTaskError.
                failure_object = RayTaskError(function_name, backtrace,
                                              error.cause, proctitle=title,
                                              actor_repr=actor_repr)
            else:
                failure_object = RayTaskError(function_name, backtrace,
                                              error, proctitle=title,
                                              actor_repr=actor_repr)
            errors = []
            for _ in range(c_return_ids.size()):
                errors.append(failure_object)
            core_worker.store_task_outputs(
                worker, errors, c_return_ids, returns)
            ray._private.utils.push_error_to_driver(
                worker,
                ray_constants.TASK_PUSH_ERROR,
                str(failure_object),
                job_id=worker.current_job_id)
            if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
                raise RayActorError.from_task_error(failure_object)

    if execution_info.max_calls != 0:
        # Reset the state of the worker for the next task to execute.
        # Increase the task execution counter.
        manager.increase_task_counter(function_descriptor)
        # If we've reached the max number of executions for this worker, exit.
        task_counter = manager.get_task_counter(function_descriptor)
        if task_counter == execution_info.max_calls:
            exit = SystemExit(0)
            exit.is_ray_terminate = True
            raise exit

其中有关键代码如下:

1
2
# 531
function_executor = execution_info.function

execution_info 来至:

1
2
# _raylet.pyx 514
execution_info = execution_infos.get(function_descriptor)

execution_infos 来至:

1
2
3
# _raylet.pyx 466_
    cdef:
        dict execution_infos = manager.execution_infos

manager 来至:

1
2
3
# _raylet.pyx 462_
    worker = ray.worker.global_worker
    manager = worker.function_actor_manager
updatedupdated2021-12-202021-12-20