1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
|
# _raylet.pyx 444_
cdef execute_task(
CTaskType task_type,
const c_string name,
const CRayFunction &ray_function,
const unordered_map[c_string, double] &c_resources,
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectReference] &c_arg_refs,
const c_vector[CObjectID] &c_return_ids,
const c_string debugger_breakpoint,
c_vector[shared_ptr[CRayObject]] *returns,
c_bool *is_application_level_error,
# This parameter is only used for actor creation task to define
# the concurrency groups of this actor.
const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups,
const c_string c_name_of_concurrency_group_to_execute):
is_application_level_error[0] = False
worker = ray.worker.global_worker
manager = worker.function_actor_manager
actor = None
cdef:
dict execution_infos = manager.execution_infos
CoreWorker core_worker = worker.core_worker
JobID job_id = core_worker.get_current_job_id()
TaskID task_id = core_worker.get_current_task_id()
CFiberEvent task_done_event
# Automatically restrict the GPUs available to this task.
ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids())
# Helper method used to exit current asyncio actor.
# This is called when a KeyboardInterrupt is received by the main thread.
# Upon receiving a KeyboardInterrupt signal, Ray will exit the current
# worker. If the worker is processing normal tasks, Ray treat it as task
# cancellation from ray.cancel(object_ref). If the worker is an asyncio
# actor, Ray will exit the actor.
def exit_current_actor_if_asyncio():
if core_worker.current_actor_is_asyncio():
error = SystemExit(0)
error.is_ray_terminate = True
raise error
function_descriptor = CFunctionDescriptorToPython(
ray_function.GetFunctionDescriptor())
if <int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK:
actor_class = manager.load_actor_class(job_id, function_descriptor)
actor_id = core_worker.get_actor_id()
actor = actor_class.__new__(actor_class)
worker.actors[actor_id] = actor
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
# Record the actor class via :actor_name: magic token in the log.
#
# (Phase 1): this covers code run before __init__ finishes.
# We need to handle this separately because `__repr__` may not be
# runnable until after `__init__` (e.g., if it accesses fields
# defined in the constructor).
actor_magic_token = "{}{}".format(
ray_constants.LOG_PREFIX_ACTOR_NAME, actor_class.__name__)
# Flush to both .out and .err
print(actor_magic_token)
print(actor_magic_token, file=sys.stderr)
# Initial eventloops for asyncio for this actor.
if core_worker.current_actor_is_asyncio():
core_worker.initialize_eventloops_for_actor_concurrency_group(
c_defined_concurrency_groups)
execution_info = execution_infos.get(function_descriptor)
if not execution_info:
execution_info = manager.get_execution_info(
job_id, function_descriptor)
execution_infos[function_descriptor] = execution_info
function_name = execution_info.function_name
extra_data = (b'{"name": ' + function_name.encode("ascii") +
b' "task_id": ' + task_id.hex().encode("ascii") + b'}')
task_name = name.decode("utf-8")
name_of_concurrency_group_to_execute = \
c_name_of_concurrency_group_to_execute.decode("ascii")
title = f"ray::{task_name}"
if <int>task_type == <int>TASK_TYPE_NORMAL_TASK:
next_title = "ray::IDLE"
function_executor = execution_info.function
# Record the task name via :task_name: magic token in the log file.
# This is used for the prefix in driver logs `(task_name pid=123) ...`
task_name_magic_token = "{}{}".format(
ray_constants.LOG_PREFIX_TASK_NAME, task_name.replace("()", ""))
# Print on both .out and .err
print(task_name_magic_token)
print(task_name_magic_token, file=sys.stderr)
else:
actor = worker.actors[core_worker.get_actor_id()]
class_name = actor.__class__.__name__
next_title = f"ray::{class_name}"
pid = os.getpid()
worker_name = f"ray_{class_name}_{pid}"
if c_resources.find(b"object_store_memory") != c_resources.end():
worker.core_worker.set_object_store_client_options(
worker_name,
int(ray_constants.from_memory_units(
dereference(
c_resources.find(b"object_store_memory")).second)))
def function_executor(*arguments, **kwarguments):
function = execution_info.function
if core_worker.current_actor_is_asyncio():
# Increase recursion limit if necessary. In asyncio mode,
# we have many parallel callstacks (represented in fibers)
# that's suspended for execution. Python interpreter will
# mistakenly count each callstack towards recusion limit.
# We don't need to worry about stackoverflow here because
# the max number of callstacks is limited in direct actor
# transport with max_concurrency flag.
increase_recursion_limit()
if inspect.iscoroutinefunction(function.method):
async_function = function
else:
# Just execute the method if it's ray internal method.
if function.name.startswith("__ray"):
return function(actor, *arguments, **kwarguments)
async_function = sync_to_async(function)
return core_worker.run_async_func_in_event_loop(
async_function, function_descriptor,
name_of_concurrency_group_to_execute, actor,
*arguments, **kwarguments)
return function(actor, *arguments, **kwarguments)
with core_worker.profile_event(b"task", extra_data=extra_data):
try:
task_exception = False
if not (<int>task_type == <int>TASK_TYPE_ACTOR_TASK
and function_name == "__ray_terminate__"):
worker.memory_monitor.raise_if_low_memory()
with core_worker.profile_event(b"task:deserialize_arguments"):
if c_args.empty():
args, kwargs = [], {}
else:
metadata_pairs = RayObjectsToDataMetadataPairs(c_args)
object_refs = VectorToObjectRefs(c_arg_refs)
if core_worker.current_actor_is_asyncio():
# We deserialize objects in event loop thread to
# prevent segfaults. See #7799
async def deserialize_args():
return (ray.worker.global_worker
.deserialize_objects(
metadata_pairs, object_refs))
args = core_worker.run_async_func_in_event_loop(
deserialize_args, function_descriptor,
name_of_concurrency_group_to_execute)
else:
args = ray.worker.global_worker.deserialize_objects(
metadata_pairs, object_refs)
for arg in args:
raise_if_dependency_failed(arg)
args, kwargs = ray._private.signature.recover_args(args)
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
actor = worker.actors[core_worker.get_actor_id()]
class_name = actor.__class__.__name__
actor_title = f"{class_name}({args!r}, {kwargs!r})"
core_worker.set_actor_title(actor_title.encode("utf-8"))
# Execute the task.
with core_worker.profile_event(b"task:execute"):
task_exception = True
try:
is_existing = core_worker.is_exiting()
if is_existing:
title = f"{title}::Exiting"
next_title = f"{next_title}::Exiting"
with ray.worker._changeproctitle(title, next_title):
if debugger_breakpoint != b"":
ray.util.pdb.set_trace(
breakpoint_uuid=debugger_breakpoint)
outputs = function_executor(*args, **kwargs)
next_breakpoint = (
ray.worker.global_worker.debugger_breakpoint)
if next_breakpoint != b"":
# If this happens, the user typed "remote" and
# there were no more remote calls left in this
# task. In that case we just exit the debugger.
ray.experimental.internal_kv._internal_kv_put(
"RAY_PDB_{}".format(next_breakpoint),
"{\"exit_debugger\": true}")
ray.experimental.internal_kv._internal_kv_del(
"RAY_PDB_CONTINUE_{}".format(next_breakpoint)
)
ray.worker.global_worker.debugger_breakpoint = b""
task_exception = False
except AsyncioActorExit as e:
exit_current_actor_if_asyncio()
except KeyboardInterrupt as e:
raise TaskCancelledError(
core_worker.get_current_task_id())
except Exception as e:
is_application_level_error[0] = True
if core_worker.get_current_task_retry_exceptions():
logger.info("Task failed with retryable exception:"
" {}.".format(
core_worker.get_current_task_id()),
exc_info=True)
raise e
if c_return_ids.size() == 1:
outputs = (outputs,)
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
# Record actor repr via :actor_name: magic token in the log.
#
# (Phase 2): after `__init__` finishes, we override the
# log prefix with the full repr of the actor. The log monitor
# will pick up the updated token.
if (hasattr(actor_class, "__ray_actor_class__") and
"__repr__" in
actor_class.__ray_actor_class__.__dict__):
actor_magic_token = "{}{}".format(
ray_constants.LOG_PREFIX_ACTOR_NAME, repr(actor))
# Flush on both stdout and stderr.
print(actor_magic_token)
print(actor_magic_token, file=sys.stderr)
# Check for a cancellation that was called when the function
# was exiting and was raised after the except block.
if not check_signals().ok():
task_exception = True
raise TaskCancelledError(
core_worker.get_current_task_id())
if (c_return_ids.size() > 0 and
len(outputs) != int(c_return_ids.size())):
raise ValueError(
"Task returned {} objects, but num_returns={}.".format(
len(outputs), c_return_ids.size()))
# Store the outputs in the object store.
with core_worker.profile_event(b"task:store_outputs"):
core_worker.store_task_outputs(
worker, outputs, c_return_ids, returns)
except Exception as error:
# If the debugger is enabled, drop into the remote pdb here.
if "RAY_PDB" in os.environ:
ray.util.pdb.post_mortem()
backtrace = ray._private.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)
# Generate the actor repr from the actor class.
actor_repr = repr(actor) if actor else None
if isinstance(error, RayTaskError):
# Avoid recursive nesting of RayTaskError.
failure_object = RayTaskError(function_name, backtrace,
error.cause, proctitle=title,
actor_repr=actor_repr)
else:
failure_object = RayTaskError(function_name, backtrace,
error, proctitle=title,
actor_repr=actor_repr)
errors = []
for _ in range(c_return_ids.size()):
errors.append(failure_object)
core_worker.store_task_outputs(
worker, errors, c_return_ids, returns)
ray._private.utils.push_error_to_driver(
worker,
ray_constants.TASK_PUSH_ERROR,
str(failure_object),
job_id=worker.current_job_id)
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
raise RayActorError.from_task_error(failure_object)
if execution_info.max_calls != 0:
# Reset the state of the worker for the next task to execute.
# Increase the task execution counter.
manager.increase_task_counter(function_descriptor)
# If we've reached the max number of executions for this worker, exit.
task_counter = manager.get_task_counter(function_descriptor)
if task_counter == execution_info.max_calls:
exit = SystemExit(0)
exit.is_ray_terminate = True
raise exit
|