1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
|
// core_worker.cc 402
CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_id)
: options_(options),
get_call_site_(RayConfig::instance().record_ref_creation_sites()
? options_.get_lang_stack
: nullptr),
worker_context_(options_.worker_type, worker_id, GetProcessJobID(options_)),
io_work_(io_service_),
client_call_manager_(new rpc::ClientCallManager(io_service_)),
periodical_runner_(io_service_),
task_queue_length_(0),
num_executed_tasks_(0),
resource_ids_(new ResourceMappingType()),
grpc_service_(io_service_, *this),
task_execution_service_work_(task_execution_service_) {
RAY_LOG(DEBUG) << "Constructing CoreWorker, worker_id: " << worker_id;
// Initialize task receivers.
if (options_.worker_type == WorkerType::WORKER || options_.is_local_mode) {
RAY_CHECK(options_.task_execution_callback != nullptr);
auto execute_task = std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5);
direct_task_receiver_ = std::make_unique<CoreWorkerDirectTaskReceiver>(
worker_context_, task_execution_service_, execute_task,
[this] { return local_raylet_client_->TaskDone(); });
}
// Initialize raylet client.
// NOTE(edoakes): the core_worker_server_ must be running before registering with
// the raylet, as the raylet will start sending some RPC messages immediately.
// TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this can be changed later
// so that the worker (java/python .etc) can retrieve and handle the error
// instead of crashing.
// 通过 RPC 与 NodeManager 进行连接
auto grpc_client = rpc::NodeManagerWorkerClient::make(
options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_);
Status raylet_client_status;
NodeID local_raylet_id;
int assigned_port;
std::string serialized_job_config = options_.serialized_job_config;
// 构建一个连接 raylet 的 client
local_raylet_client_ = std::make_shared<raylet::RayletClient>(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
options_.worker_type, worker_context_.GetCurrentJobID(), options_.runtime_env_hash,
options_.language, options_.node_ip_address, &raylet_client_status,
&local_raylet_id, &assigned_port, &serialized_job_config, options_.worker_shim_pid,
options_.startup_token);
if (!raylet_client_status.ok()) {
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
RAY_LOG(ERROR) << "Failed to register worker " << worker_id << " to Raylet. "
<< raylet_client_status;
// Quit the process immediately.
QuickExit();
}
connected_ = true;
RAY_CHECK(assigned_port >= 0);
// Parse job config from serialized string.
job_config_.reset(new rpc::JobConfig());
job_config_->ParseFromString(serialized_job_config);
// Start RPC server after all the task receivers are properly initialized and we have
// our assigned port from the raylet.
// 启动 RPC server 用于接受任务
core_worker_server_ = std::make_unique<rpc::GrpcServer>(
WorkerTypeString(options_.worker_type), assigned_port,
options_.node_ip_address == "127.0.0.1");
core_worker_server_->RegisterService(grpc_service_);
core_worker_server_->Run();
// Set our own address.
RAY_CHECK(!local_raylet_id.IsNil());
rpc_address_.set_ip_address(options_.node_ip_address);
rpc_address_.set_port(core_worker_server_->GetPort());
rpc_address_.set_raylet_id(local_raylet_id.Binary());
rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary());
RAY_LOG(INFO) << "Initializing worker at address: " << rpc_address_.ip_address() << ":"
<< rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID()
<< ", raylet " << local_raylet_id;
// Begin to get gcs server address from raylet.
gcs_server_address_updater_ = std::make_unique<GcsServerAddressUpdater>(
options_.raylet_ip_address, options_.node_manager_port,
[this](std::string ip, int port) {
absl::MutexLock lock(&gcs_server_address_mutex_);
gcs_server_address_.first = ip;
gcs_server_address_.second = port;
});
// Initialize gcs client.
// As the synchronous and the asynchronous context of redis client is not used in this
// gcs client. We would not open connection for it by setting `enable_sync_conn` and
// `enable_async_conn` as false.
gcs::GcsClientOptions gcs_options = gcs::GcsClientOptions(
options_.gcs_options.server_ip_, options_.gcs_options.server_port_,
options_.gcs_options.password_,
/*enable_sync_conn=*/false, /*enable_async_conn=*/false,
/*enable_subscribe_conn=*/true);
gcs_client_ = std::make_shared<gcs::GcsClient>(
gcs_options, [this](std::pair<std::string, int> *address) {
absl::MutexLock lock(&gcs_server_address_mutex_);
if (gcs_server_address_.second != 0) {
address->first = gcs_server_address_.first;
address->second = gcs_server_address_.second;
return true;
}
return false;
});
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RegisterToGcs();
// Register a callback to monitor removed nodes.
auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeInfo &data) {
if (data.state() == rpc::GcsNodeInfo::DEAD) {
OnNodeRemoved(node_id);
}
};
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr));
// Initialize profiler.
profiler_ = std::make_shared<worker::Profiler>(
worker_context_, options_.node_ip_address, io_service_, gcs_client_);
core_worker_client_pool_ =
std::make_shared<rpc::CoreWorkerClientPool>(*client_call_manager_);
object_info_publisher_ = std::make_unique<pubsub::Publisher>(
/*channels=*/std::vector<
rpc::ChannelType>{rpc::ChannelType::WORKER_OBJECT_EVICTION,
rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL,
rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL},
/*periodical_runner=*/&periodical_runner_,
/*get_time_ms=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; },
/*subscriber_timeout_ms=*/RayConfig::instance().subscriber_timeout_ms(),
/*publish_batch_size_=*/RayConfig::instance().publish_batch_size());
object_info_subscriber_ = std::make_unique<pubsub::Subscriber>(
/*subscriber_id=*/GetWorkerID(),
/*channels=*/
std::vector<rpc::ChannelType>{rpc::ChannelType::WORKER_OBJECT_EVICTION,
rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL,
rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL},
/*max_command_batch_size*/ RayConfig::instance().max_command_batch_size(),
/*get_client=*/
[this](const rpc::Address &address) {
return core_worker_client_pool_->GetOrConnect(address);
},
/*callback_service*/ &io_service_);
reference_counter_ = std::make_shared<ReferenceCounter>(
rpc_address_,
/*object_info_publisher=*/object_info_publisher_.get(),
/*object_info_subscriber=*/object_info_subscriber_.get(),
RayConfig::instance().lineage_pinning_enabled(), [this](const rpc::Address &addr) {
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr, *client_call_manager_));
});
if (options_.worker_type == WorkerType::WORKER) {
periodical_runner_.RunFnPeriodically(
[this] { CheckForRayletFailure(); },
RayConfig::instance().raylet_death_check_interval_milliseconds());
}
plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider(
options_.store_socket, local_raylet_client_, reference_counter_,
options_.check_signals,
/*warmup=*/
(options_.worker_type != WorkerType::SPILL_WORKER &&
options_.worker_type != WorkerType::RESTORE_WORKER),
/*get_current_call_site=*/boost::bind(&CoreWorker::CurrentCallSite, this)));
memory_store_.reset(new CoreWorkerMemoryStore(
reference_counter_, local_raylet_client_, options_.check_signals,
[this](const RayObject &obj) {
// Run this on the event loop to avoid calling back into the language runtime
// from the middle of user operations.
io_service_.post(
[this, obj]() {
if (options_.unhandled_exception_handler != nullptr) {
options_.unhandled_exception_handler(obj);
}
},
"CoreWorker.HandleException");
}));
periodical_runner_.RunFnPeriodically([this] { InternalHeartbeat(); },
kInternalHeartbeatMillis);
auto check_node_alive_fn = [this](const NodeID &node_id) {
auto node = gcs_client_->Nodes().Get(node_id);
return node != nullptr;
};
auto reconstruct_object_callback = [this](const ObjectID &object_id) {
io_service_.post(
[this, object_id]() {
RAY_CHECK(object_recovery_manager_->RecoverObject(object_id));
},
"CoreWorker.ReconstructObject");
};
auto push_error_callback = [this](const JobID &job_id, const std::string &type,
const std::string &error_message, double timestamp) {
return PushError(job_id, type, error_message, timestamp);
};
task_manager_.reset(new TaskManager(
memory_store_, reference_counter_,
/*put_in_local_plasma_callback=*/
[this](const RayObject &object, const ObjectID &object_id) {
RAY_CHECK_OK(PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true));
},
/* retry_task_callback= */
[this](TaskSpecification &spec, bool delay) {
if (delay) {
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
uint32_t delay = RayConfig::instance().task_retry_delay_ms();
RAY_LOG(INFO) << "Will resubmit task after a " << delay
<< "ms delay: " << spec.DebugString();
absl::MutexLock lock(&mutex_);
to_resubmit_.push_back(std::make_pair(current_time_ms() + delay, spec));
} else {
RAY_LOG(INFO) << "Resubmitting task that produced lost plasma object: "
<< spec.DebugString();
if (spec.IsActorTask()) {
auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId());
actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject());
RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec));
} else {
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec));
}
}
},
check_node_alive_fn, reconstruct_object_callback, push_error_callback));
// Create an entry for the driver task in the task table. This task is
// added immediately with status RUNNING. This allows us to push errors
// related to this driver task back to the driver. For example, if the
// driver creates an object that is later evicted, we should notify the
// user that we're unable to reconstruct the object, since we cannot
// rerun the driver.
if (options_.worker_type == WorkerType::DRIVER) {
TaskSpecBuilder builder;
const TaskID task_id = TaskID::ForDriverTask(worker_context_.GetCurrentJobID());
builder.SetDriverTaskSpec(task_id, options_.language,
worker_context_.GetCurrentJobID(),
TaskID::ComputeDriverTaskId(worker_context_.GetWorkerID()),
GetCallerId(), rpc_address_);
std::shared_ptr<rpc::TaskTableData> data = std::make_shared<rpc::TaskTableData>();
data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage());
SetCurrentTaskId(task_id);
}
auto raylet_client_factory = [this](const std::string ip_address, int port) {
auto grpc_client =
rpc::NodeManagerWorkerClient::make(ip_address, port, *client_call_manager_);
return std::shared_ptr<raylet::RayletClient>(
new raylet::RayletClient(std::move(grpc_client)));
};
auto on_excess_queueing = [this](const ActorID &actor_id, int64_t num_queued) {
auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
std::ostringstream stream;
stream << "Warning: More than " << num_queued
<< " tasks are pending submission to actor " << actor_id
<< ". To reduce memory usage, wait for these tasks to finish before sending "
"more.";
RAY_CHECK_OK(
PushError(options_.job_id, "excess_queueing_warning", stream.str(), timestamp));
};
actor_creator_ = std::make_shared<DefaultActorCreator>(gcs_client_);
direct_actor_submitter_ = std::shared_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(*core_worker_client_pool_, *memory_store_,
*task_manager_, *actor_creator_,
on_excess_queueing));
auto node_addr_factory = [this](const NodeID &node_id) {
absl::optional<rpc::Address> addr;
if (auto node_info = gcs_client_->Nodes().Get(node_id)) {
rpc::Address address;
address.set_raylet_id(node_info->node_id());
address.set_ip_address(node_info->node_manager_address());
address.set_port(node_info->node_manager_port());
addr = address;
}
return addr;
};
auto lease_policy = RayConfig::instance().locality_aware_leasing_enabled()
? std::shared_ptr<LeasePolicyInterface>(
std::make_shared<LocalityAwareLeasePolicy>(
reference_counter_, node_addr_factory, rpc_address_))
: std::shared_ptr<LeasePolicyInterface>(
std::make_shared<LocalLeasePolicy>(rpc_address_));
direct_task_submitter_ = std::make_unique<CoreWorkerDirectTaskSubmitter>(
rpc_address_, local_raylet_client_, core_worker_client_pool_, raylet_client_factory,
std::move(lease_policy), memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds(), actor_creator_,
RayConfig::instance().max_tasks_in_flight_per_worker(),
boost::asio::steady_timer(io_service_),
RayConfig::instance().max_pending_lease_requests_per_scheduling_category());
auto report_locality_data_callback =
[this](const ObjectID &object_id, const absl::flat_hash_set<NodeID> &locations,
uint64_t object_size) {
reference_counter_->ReportLocalityData(object_id, locations, object_size);
};
future_resolver_.reset(new FutureResolver(memory_store_, reference_counter_,
std::move(report_locality_data_callback),
core_worker_client_pool_, rpc_address_));
// Unfortunately the raylet client has to be constructed after the receivers.
if (direct_task_receiver_ != nullptr) {
task_argument_waiter_.reset(new DependencyWaiterImpl(*local_raylet_client_));
direct_task_receiver_->Init(core_worker_client_pool_, rpc_address_,
task_argument_waiter_);
}
actor_manager_ = std::make_unique<ActorManager>(gcs_client_, direct_actor_submitter_,
reference_counter_);
std::function<Status(const ObjectID &object_id, const ObjectLookupCallback &callback)>
object_lookup_fn;
object_lookup_fn = [this, node_addr_factory](const ObjectID &object_id,
const ObjectLookupCallback &callback) {
std::vector<rpc::Address> locations;
const absl::optional<absl::flat_hash_set<NodeID>> object_locations =
reference_counter_->GetObjectLocations(object_id);
if (object_locations.has_value()) {
locations.reserve(object_locations.value().size());
for (const auto &node_id : object_locations.value()) {
absl::optional<rpc::Address> addr = node_addr_factory(node_id);
if (addr.has_value()) {
locations.push_back(addr.value());
} else {
// We're getting potentially stale locations directly from the reference
// counter, so the location might be a dead node.
RAY_LOG(DEBUG) << "Location " << node_id
<< " is dead, not using it in the recovery of object "
<< object_id;
}
}
}
callback(object_id, locations);
return Status::OK();
};
object_recovery_manager_ = std::make_unique<ObjectRecoveryManager>(
rpc_address_, raylet_client_factory, local_raylet_client_, object_lookup_fn,
task_manager_, reference_counter_, memory_store_,
[this](const ObjectID &object_id, rpc::ErrorType reason, bool pin_object) {
RAY_LOG(DEBUG) << "Failed to recover object " << object_id << " due to "
<< rpc::ErrorType_Name(reason);
RAY_CHECK_OK(Put(RayObject(reason),
/*contained_object_ids=*/{}, object_id,
/*pin_object=*/pin_object));
});
// Start the IO thread after all other members have been initialized, in case
// the thread calls back into any of our members.
io_thread_ = std::thread([this]() { RunIOService(); });
// Tell the raylet the port that we are listening on.
// NOTE: This also marks the worker as available in Raylet. We do this at the
// very end in case there is a problem during construction.
if (options.connect_on_start) {
RAY_CHECK_OK(
local_raylet_client_->AnnounceWorkerPort(core_worker_server_->GetPort()));
}
// Used to detect if the object is in the plasma store.
max_direct_call_object_size_ = RayConfig::instance().max_direct_call_object_size();
/// If periodic asio stats print is enabled, it will print it.
const auto event_stats_print_interval_ms =
RayConfig::instance().event_stats_print_interval_ms();
if (event_stats_print_interval_ms != -1 && RayConfig::instance().event_stats()) {
periodical_runner_.RunFnPeriodically(
[this] {
RAY_LOG(INFO) << "Event stats:\n\n" << io_service_.StatsString() << "\n\n";
},
event_stats_print_interval_ms);
}
// Set event context for current core worker thread.
RayEventContext::Instance().SetEventContext(
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
{{"worker_id", worker_id.Hex()}});
}
|