Actor events & state persistence
During its runtime, the Actor receives Actor events sent by the Apify platform or generated by the Apify SDK itself.
Event types
A listener can optionally receive a single argument, a Pydantic model with the event's data. The following table lists the events, the type of that data object, and when each event is emitted.
| Event | Data | Description |
|---|---|---|
SYSTEM_INFO | EventSystemInfoData | Emitted regularly to report the Actor's current resource usage. The
|
MIGRATING | EventMigratingData | Emitted when the Actor running on the Apify platform
is going to be migrated
to another worker server soon. The Actor.reboot
to reboot the Actor and trigger the migration immediately, to speed up the process. |
ABORTING | EventAbortingData | When a user aborts an Actor run on the Apify platform,
they can choose to abort gracefully to allow the Actor some time before getting killed.
This graceful abort emits the |
PERSIST_STATE | EventPersistStateData | Emitted in regular intervals (by default 60 seconds) to notify the Actor that it should persist its state, in order to avoid repeating all work when the Actor restarts. This event is also emitted automatically when the PERSIST_STATE event is provided merely for user convenience,
you can achieve the same effect by persisting the state regularly in an interval and listening for the migrating event. |
EXIT | EventExitData | Emitted by the SDK (not the platform) when the Actor is about to exit. You can use this event to perform final cleanup tasks, such as closing external connections or sending notifications, before the Actor shuts down. |
Adding handlers to events
To add handlers to these events, you use the Actor.on method,
and to remove them, you use the Actor.off method.
import asyncio
from apify import Actor, Event, EventPersistStateData
async def main() -> None:
async with Actor:
total_items = 1000
# Load the state if it's saved from some previous execution
processed_items = 0
actor_state = await Actor.get_value('STATE')
if actor_state is not None:
processed_items = actor_state
# Save the state when the `PERSIST_STATE` event happens
async def save_state(event_data: EventPersistStateData) -> None:
nonlocal processed_items
Actor.log.info(
'Persisting Actor state (migrating=%s)', event_data.is_migrating
)
await Actor.set_value('STATE', processed_items)
Actor.on(Event.PERSIST_STATE, save_state)
# Do some fake work
for i in range(processed_items, total_items):
Actor.log.info(f'Processing item {i}...')
processed_items = i
await asyncio.sleep(0.1)
# Suppose we can stop saving the state now
Actor.off(Event.PERSIST_STATE, save_state)
# Do some more fake work, this time something that can't be restarted,
# so no point persisting the state
for j in range(10):
Actor.log.info(f'Processing item {j} of another kind...')
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
Automatic state persistence
The example above shows how to manually persist state using the PERSIST_STATE event. For most use cases, you can use the Actor.use_state method instead, which handles state persistence automatically.
Actor.use_state returns a dictionary that is automatically saved to the default key-value store at regular intervals and whenever a migration or shutdown occurs. You can modify the dictionary in place, and changes are persisted without any manual set_value calls.
You can optionally specify a key (the key-value store key under which the state is stored) and a kvs_name (the name of the key-value store to use). By default, the state is stored in the default key-value store under a default key.
import asyncio
from apify import Actor
async def main() -> None:
async with Actor:
# Get or create an auto-persisted state dict.
# On restart or migration, the state is loaded from the KVS.
state = await Actor.use_state(default_value={'processed_items': 0})
# Resume from the persisted state (stored as JSON, so narrow the type).
start_index = state['processed_items']
if not isinstance(start_index, int):
start_index = 0
Actor.log.info(f'Resuming from item {start_index}')
# Do some work and update the state — it is persisted automatically
for i in range(start_index, 100):
Actor.log.info(f'Processing item {i}...')
state['processed_items'] = i + 1
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())
Conclusion
This page has described the events emitted during a run (SYSTEM_INFO, MIGRATING, ABORTING, PERSIST_STATE, and EXIT): how to handle them with Actor.on, and how to persist state automatically with Actor.use_state.
For more details on platform events and state persistence, see the system events and state persistence documentation on the Apify platform.