atproto
atproto is a wrapper library that supports the endpoints defined in Lexicon for com.atproto.*
.
All major endpoints of the AT Protocol API are supported, making it easy to manipulate data independent of specific services. If you want to use the Bluesky API, use bluesky instead.
If you want to use Bluesky APIs, see bluesky!
Features ⭐
- ✅ Zero Dependency
- ✅ Supports Powerful Built-In Retry using Exponential BackOff And Jitter
- ✅ Supports All Major Endpoints for
com.atproto.*
- ✅ Well Documented and Well Tested
- ✅ Supports Powerful Firehose API
- ✅ 100% Null Safety
- ✅ Applicable to services other than Bluesky
See API Supported Matrix for a list of endpoints supported by atproto.
Getting Started 💪
Install
See the Install Package section for more details on how to install a package in your Dart and Flutter app.
With Dart:
dart pub add atproto
dart pub get
With Flutter:
flutter pub add atproto
flutter pub get
Import
Just by writing following one-line import, you can use all endpoints provided by atproto.
import 'package:atproto/atproto.dart';
Instantiate ATProto
You need to use ATProto object to access most of the features supported by atproto. And there are two ways to instantiate an ATProto object.
As shown in the following example, the key point in instantiating ATProto object is whether the endpoint you wish to use requires authentication.
See API Supported Matrix for whether or not authentication is required for each endpoint.
If authentication is required, first create a session with the ATP server using your credentials with the .createSession
function.
The credentials passed to the .createSession
function should be your handle or email address as identifier
and your password or app password as password
.
Your credentials will be sent safely and securely to the ATP server when you execute the .createSession
function. And it will return a Session
object with an access token once authentication is complete.
You then do not need to be particularly aware of the contents of the retrieved Session object, just pass it to the .fromSession
constructor of ATProto to safely and securely create an instance of the ATProto object.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
// Let's authenticate here.
final session = await atp.createSession(
identifier: 'YOUR_HANDLE_OR_EMAIL', // Like "shinyakato.dev"
password: 'YOUR_PASSWORD',
);
print(session);
// Just pass created session data.
final atproto = atp.ATProto.fromSession(
session.data,
);
}
Or, it's very easy if authentication is not required , simply use the .anonymous()
constructor.
import 'package:atproto/atproto.dart';
Future<void> main() async {
// Just call anonymous constructor.
final atproto = ATProto.anonymous();
}
See Session Management for more details about authentication.
Supported Services
atproto supports following services.
Once an instance of the ATProto object has been created, service endpoints can be used by accessing the property
corresponding to each service as follows.
import 'package:atproto/atproto.dart';
Future<void> main() async {
final atproto = ATProto.anonymous();
// Use `findDID` in `IdentitiesService`.
final did = await atproto.identity.resolveHandle(
handle: 'shinyakato.dev',
);
}
See API Supported Matrix for a list of endpoints supported by atproto.
Let's Implement
Okay then, let's try some endpoints!
The following example first authenticates the user against bsky.social
, sends the post to Bluesky, and then deletes it using a reference to the created record.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
final session = await atp.createSession(
identifier: 'YOUR_HANDLE_OR_EMAIL', // Like "shinyakato.dev"
password: 'YOUR_PASSWORD',
);
final atproto = atp.ATProto.fromSession(session.data);
// Create a record to specific service like Bluesky.
final strongRef = await atproto.repo.createRecord(
collection: atp.NSID.create(
'feed.bsky.app',
'post',
),
record: {
'text': 'Hello, Bluesky!',
"createdAt": DateTime.now().toUtc().toIso8601String(),
},
);
// And delete it.
await atproto.repo.deleteRecord(
uri: strongRef.data.uri,
);
}
See API Support Matrix for all supported endpoints.
More Tips 🏄
Session Management
When using the AT Protocol API, there are endpoints that requires user authentication, and an access token created when a user is authenticated is represented as a Session
.
Okay, the most important factor here is how to create a session.
First, use the .createSession
function to create the most primitive session as follows.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
final session = await atp.createSession(
identifier: 'HANDLE_OR_EMAIL', // Like shinyakato.dev
password: 'PASSWORD', // App Password is recommended
);
print(session);
}
Then you can create ATProto object from authenticated session.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
final session = await atp.createSession(
identifier: 'HANDLE_OR_EMAIL', // Like shinyakato.dev
password: 'PASSWORD', // App Password is recommended
);
print(session);
// You can create ATProto object from authenticated session.
final atproto = atp.ATProto.fromSession(session.data);
// Do something with atproto
final did = await atproto.identity.resolveHandle(handle: session.data.handle);
}
App Password
App passwords have most of the same abilities as the user's account password, however they're restricted from destructive actions such as account deletion or account migration. They are also restricted from creating additional app passwords.
App passwords are of the form xxxx-xxxx-xxxx-xxxx
.
So, it's strongly recommended that App Password be used for login in AT Protocol's services.
Given the above reason, a possible use case is for the application to determine if the password given by the user is an App Password.
With atproto, you can easily determine if a password is in App Password format by using the .isValidAppPassword
function.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
atp.isValidAppPassword('xxxx-xxxx-xxxx-xxxx'); // => true
atp.isValidAppPassword('xxxxxxxxxxxxxxxxxxx'); // => false
}
Other Than bsky.social
The endpoints provided by atproto always access bsky.social
by default. But as you know, certain services such as Bluesky, built on the AT Protocol, are distributed services. In other words, there must be a way to access services other than bsky.social
as needed.
You can specify any service
as follows.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
final session = await atp.createSession(
// Add this.
service: 'boobee.blue',
identifier: 'YOUR_HANDLE_OR_EMAIL',
password: 'YOUR_PASSWORD',
);
final atproto = atp.ATProto.fromSession(
session.data,
// Add this, or resolve dynamically based on session.
service: 'boobee.blue',
);
}
De/Serialize
All objects representing JSON objects returned from the API provided by atproto are generated using freezed and json_serializable. So, it allows for easy JSON-based de/serialize of these model objects based on the common contract between the fromJson
and toJson
methods.
For example, if you have the following code:
import 'package:atproto/atproto.dart';
Future<void> main() async {
final atproto = ATProto.anonymous();
// Just find the DID of `shinyakato.dev`
final did = await atproto.identity.resolveHandle(
handle: 'shinyakato.dev',
);
}
Then you can deserialize DID
object as JSON with toJson
as follows:
print(did.toJson()); // => {did: did:plc:iijrtk7ocored6zuziwmqq3c}
And you can serialize JSON as DID
object with fromJson
as follows:
final json = did.toJson();
final serializedDID = DID.fromJson(json);
Thrown Exceptions
The following exceptions may be thrown as AT Protocol-related errors when using atproto. The specification of this exception conforms to the following document from the official.
Exception | Description | Retriable |
---|---|---|
XRPCException | Parent class of all the following exception classes. | ❌ |
UnauthorizedException | Thrown when a status code of 401 is returned from the ATP server. Indicating authentication failure. | ❌ |
RateLimitExceededException | Thrown when a status code of 429 is returned from the ATP server. Indicating rate limits exceeded. | ❌ |
XRPCNotSupportedException | Thrown when a status code of 1xx or 3xx is returned from the ATP server. Indicating unsupported error. | ❌ |
InvalidRequestException | Thrown when a status code of 4xx is returned from the ATP server. Indicating client error. | ❌ |
InternalServerErrorException | Thrown when a status code of 5xx is returned from the ATP server. Indicating server error. | ✅ |
Also, the following exceptions may be thrown due to temporary network failures.
Exception | Description | Retriable |
---|---|---|
SocketException | Thrown when a socket operation fails. | ✅ |
TimeoutException | Thrown when a scheduled timeout happens while waiting for an async result. | ✅ |
Exceptions with Retriable
set to ✅ are subject to automatic retry. Exceptions with ❌ cannot be retried.
Rate Limits
As with Web APIs in general, there is a rate limit for the AT Protocol API. The main purpose of setting a rate limit for the API is to prevent excessive requests to the server due to API abuse and to discourage spammy behavior.
Rate limits in the AT Protocol are defined in a common specification for the protocol and are set and you can easily access this information as follows.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
final session = await atp.createSession(
service: 'SERVICE_NAME',
identifier: 'YOUR_HANDLE_OR_EMAIL',
password: 'YOUR_PASSWORD',
);
final atproto = atp.ATProto.fromSession(session.data);
final response = await atproto.repo.createRecord(
collection: atp.NSID.create(
'app.bsky.feed',
'post',
),
record: {
'text': 'Hello!',
},
);
// This is rate limit!
print(response.rateLimit);
final rateLimit = response.rateLimit;
// Available properties.
print(rateLimit.limitCount);
print(rateLimit.remainingCount);
print(rateLimit.resetAt);
print(rateLimit.policy);
// When you need to handle rate limits.
print(rateLimit.isExceeded);
print(rateLimit.isNotExceeded);
// It waits until the rate limit is reset based on resetAt.
// If the rate limit is not exceeded, return immediately.
await rateLimit.waitUntilReset();
}
As in the example above, the rate limits when using atproto are always accessible from XRPCResponse.
In more detail, rate limit information is read from the HTTP response headers returned by the ATP server and can be accessed via the rateLimit
property of the XRPCResponse as a RateLimit object.
The following properties are available from the RateLimit object, which are needed to determine the rate limit for the application.
Property | HTTP Header | Description |
---|---|---|
limitCount | RateLimit-Limit | Maximum number of allowed requests. |
remainingCount | RateLimit-Remaining | Number of requests that can still be made. |
resetAt | RateLimit-Reset | The time when the rate limit will reset. |
policy | RateLimit-Policy | The rate limit policy being applied like 100;w=300 . |
The properties for each rate limit as shown in the table above are very intuitive and easy to understand.
Slightly different from the HTTP Header information is the resetAt
property. This is the date and time the rate limit is reset, not the number of seconds until the rate limit is reset.
The date and time of the resetAt
is calculated using the date and time when the response was created in GMT format in the date
field given in the HTTP response header.
In other words, the value of the resetAt
property is the sum of date + RateLimit-Reset given in the HTTP response headers.
With all this out of the way, you can easily handle rate limits in the following way.
final rateLimit = response.rateLimit;
if (rateLimit.isExceeded) {
// It waits until the rate limit is reset based on resetAt.
await rateLimit.waitUntilReset();
}
Rate limits per endpoint must be properly handled. If the request is sent again while the rate limit is exceeded, the HTTP status will always be 429 Too Many Requests
and a RateLimitExceededException will be thrown.
Some API endpoints have rate limits enabled, while others do not. This depends on the authentication method and the characteristics of each endpoint, but XRPCResponse always sets the RateLimit object, even for endpoints that do not have rate limiting enabled.
So you might ask what happens if you run the .isExceeded
property with RateLimit taken from an endpoint that does not have a rate limit in effect?
Nothing to worry about. The RateLimit object internally has a flag to indicate whether the rate limit is enabled or not.
That is, RateLimit object returned from an endpoint with no rate limit will be set as unlimited, and the isExceeded
property will always return false
.
Union Types
Since AT Protocol's Lexicon supports the Union type, there are several endpoints where multiple JSONs of different structures are returned at once. However, since Dart does not currently support Union as a language specification, there have been difficulties in marshaling JSON for this Union structure.
atproto solves this problem neatly by using freezed to represent a pseudo-Union type. Besides it's type safe. And all the Union types provided by these atproto are .when(...)
methods to handle them cleanly.
See, for example, Firehose API in the next section.
All Union types provided by atproto always have the property unknown
. This is because Union types not supported by atproto cannot be converted to specific model objects when returned from a particular endpoint.
When an unknown
event occurs, a raw JSON object that has not been marshalled into a specific model object is passed in the callback. This allows us to safely handle Union types with atproto even if they are suddenly added officially, and also allows for more customization.
Alternatively, you can handle these union objects more easily using pattern matching supported by Dart3.
For example, if pattern matching is used, the processing of .when
when using the Firehose API is replaced.
And all union objects have defined class names prefixed with U
.
So, if you want the Firehose API to handle only Commit
and Handle
events, you can use the USubscribedRepoCommit
and USubscribedRepoHandle
objects for pattern matching as follows.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
final atproto = atp.ATProto.anonymous();
final subscription = await atproto.sync.subscribeRepos();
await for (final event in subscription.data.stream) {
// No need to use `.when` method.
switch (event) {
// Specify an union object prefixed with `U` as the case.
case atp.USubscribedRepoCommit():
print(event.data.ops);
case atp.USubscribedRepoHandle():
print(event.data.handle);
}
}
}
Firehose API
atproto makes it easy to work with the powerful Firehose API
.
The Firehose API
in AT Protocol allows you to get all events that occur on a specific service, such as bsky.social
, in real time. This powerful and long-lived API can be used to calculate statistics using real-time data, develop interesting interactive BOTs, etc.
Using atproto to access the Firehose API
is very simple, just execute the subscribeRepos method provided by the SyncService as shown in the following example. Also, user authentication is not required to access the Firehose API
.
import 'package:atproto/atproto.dart';
Future<void> main() async {
// Authentication is not required.
final atproto = ATProto.anonymous();
final subscription = await atproto.sync.subscribeRepos();
// Get events in real time.
await for (final event in subscription.data.stream) {
event.when(
// Occurs when account committed records, such as Post and Like in Bluesky.
commit: (data) {
// A single commit may contain multiple records.
for (final op in data.ops) {
switch (op.action) {
case RepoAction.create:
case RepoAction.update:
// Created/Updated AT URI and specific record.
print(op.uri);
print(op.record);
break;
case RepoAction.delete:
// Deleted AT URI.
print(op.uri);
break;
}
}
},
// Occurs when account changed handle.
handle: (data) {
// Updated handle.
print(data.handle);
print(data.did);
},
migrate: print,
tombstone: print,
info: print,
unknown: print,
);
}
}
The above example may seem a bit difficult, but the SubscribedRepo that can be retrieved in real-time from the Stream is of type Union, so .when(...)
method can be used to easily handle each event.
For more details about Firehose API
, you can check this article.
Timeout Duration
When using the ATProto object for API communication, the default timeout period is set to 30 seconds. This means that by default, a TimeoutException
will be thrown if API communication takes longer than 30 seconds.
However, depending on system requirements, it may be necessary to set a time shorter than 30 seconds or even longer.
In that case, when creating an instance of the ATProto object, the timeout period can be specified as follows.
import 'package:atproto/atproto.dart';
Future<void> main() async {
final atproto = ATProto.anonymous(
// Add this.
timeout: Duration(seconds: 20),
);
}
Advanced Built-In Retry
atproto has an advanced retry feature using the Exponential BackOff And Jitter algorithm.
The reason why retry processing is so important in API communication is because the nature of the process of communicating with external servers means that there is always the possibility of temporary errors, such as network failures. In the event of a network failure, rather than throwing an exception and terminating the process, waiting for a while and retrying the process will often work.
And the algorithm is also important when retrying. This is because if the retry algorithm is simply one that waits and retries every fixed period of time, multiple retry requests will be sent to the server simultaneously before the temporary network failure is resolved. This condition will cause the server, which is temporarily out of shape, to generate even more heat.
The Exponential BackOff And Jitter used by atproto solves this problem. atproto uses a sophisticated algorithm with random numbers to distribute the load on the server while improving the probability of success on retries.
Jitter
means random number.
You can use this retry features as follows.
import 'package:atproto/atproto.dart';
Future<void> main() async {
final atproto = ATProto.anonymous(
// Add this.
retryConfig: RetryConfig(
// Required.
// You can set count of attempts.
maxAttempts: 3,
// Optional.
// Jitter can be specified as you want.
jitter: Jitter(
maxInSeconds: 10,
minInSeconds: 5,
),
// Optional.
// You can define the events that occur when Retry is executed.
onExecute: (event) => print(
'Retry after ${event.intervalInSeconds} seconds...'
'[${event.retryCount} times]',
),
),
);
}
Then it retries:
- When the status code of the response returned from ATP server is
5xx
- When the network is temporarily lost and a
SocketException
is thrown - When communication times out temporarily and a
TimeoutException
is thrown
Lexicon/Object IDs
Some objects returned from AT Protocol's API are identified by IDs defined in Lexicon. The ID defined in Lexicon is also very important when sending a request to the API server.
atproto provides all the IDs defined in Lexicon for com.atproto.*
as constants, and it can be easily used from package:atproto/ids.dart
as follows.
import 'package:atproto/ids.dart' as ids;
void main() {
// `blob`
ids.blob;
// `com.atproto.sync.subscribeRepos#commit`
ids.comAtprotoSyncSubscribeReposCommit;
}
These ID constants are automatically maintained when a new Lexicon is officially added. See script.
Pagination
Pagination in the AT Protocol is designed to be performed using cursor
. cursor
is a string indicating the beginning of the next page, and is returned by the ATP server if the next page exists.
For more details about design of pagination and cursor
in the AT Protocol, see official.
atproto also follows the common design of AT Protocol and allows paging by using cursor
. It can be easily implemented as in the following example.
import 'package:atproto/atproto.dart' as atp;
Future<void> main() async {
final atproto = atp.ATProto.fromSession(await _session);
// Pagination is performed on a per-cursor basis.
String? nextCursor;
do {
final records = await atproto.repo.listRecords(
repo: 'shinyakato.dev',
collection: atp.NSID.create(
'graph.bsky.app',
'follow',
),
cursor: nextCursor, // If null, it is ignored.
);
for (final record in records.data.records) {
print(record);
}
// Update pagination cursor.
nextCursor = records.data.cursor;
} while (nextCursor != null); // If there is no next page, it ends.
}
Endpoints that can be paged can be seen in this matrix.
This example is a very simple implementation, but it allows us to see pagination using atproto.
Whenever a method corresponding to a pagination-available endpoint is executed, the cursor
is always present in the root of the response data, like records.data.cursor
above.
If the next page does not exist, cursor
is basically null
.