Build Your First Custom Indexer
Refer to Custom Indexing Framework and Indexer Pipeline Architecture for a conceptual overview of the indexer framework.
Refer to Access Sui Data for an overview of options to access Sui network data.
To build a complete custom indexer, you use the sui-indexer-alt-framework
. The steps that follow demonstrate how to create a sequential pipeline that extracts transaction digests from Sui checkpoints and stores them in a local PostgreSQL. You can find the source code for the framework in the Sui repo on GitHub.
While this example uses PostgreSQL with Diesel (a popular Rust ORM and query builder) for minimalism and out-of-the-box support, the sui-indexer-alt-framework
is designed for flexible storage. You can use different databases (such as MongoDB, CouchDB, or similar) or utilize other database clients if you prefer not to use Diesel. To achieve this, implement the framework's Store
and Connection
traits and define your database write logic directly within your Handler::commit()
method.
Prerequisites
Before starting this exercise, ensure you have:
Required software
You need the following stack installed on your machine.
If you're unsure whether your system has the necessary software properly installed, you can verify installation with the following commands.
$ psql --version
$ diesel --version
To test PostgreSQL functionality, you can create and delete a test table.
$ createdb test_db && dropdb test_db
If you receive a new prompt with no console output, the test was successful. If you receive a createdb
error similar to
createdb: error: connection to server on socket "/tmp/.s.PGSQL.5432" failed: FATAL: role "username" does not exist
then you need to create the user (replace username
with the name provided in your error message).
$ sudo -u postgres createuser --superuser username
Enter the password for your pgAdmin account when prompted, then try the createdb && dropdb
command again.
Recommended reading
Read the Custom Indexing Framework and Indexer Pipeline Architecture topics first to understand the overall architecture and concurrent pipeline concepts.
What you build
The following steps show how to create an indexer that:
- Connects to Sui Testnet: Uses the remote checkpoint store at https://checkpoints.testnet.sui.io.
- Processes checkpoints: Streams checkpoint data continuously.
- Extracts transaction data: Pulls transaction digests from each checkpoint.
- Stores in local PostgreSQL: Commits data to a local PostgreSQL database.
- Implements sequential pipeline: Uses in-order processing with batching for optimal consistency and performance.
In the end, you have a working indexer that demonstrates all core framework concepts and can serve as a foundation for more complex custom indexers.
Sui provides checkpoint stores for both Mainnet and Testnet.
- Testnet:
https://checkpoints.testnet.sui.io
- Mainnet:
https://checkpoints.mainnet.sui.io
Step 1: Project setup
First, open your console to the directory you want to store your indexer project. Use the cargo new
command to create a new Rust project and then navigate to its directory.
$ cargo new simple-sui-indexer
$ cd simple-sui-indexer
Step 2: Configure dependencies
Replace your Cargo.toml
code with the following configuration and save.
[package]
name = "basic-sui-indexer"
version = "0.1.0"
edition = "2021"
[dependencies]
# Core framework dependencies
sui-indexer-alt-framework = { git = "https://github.com/MystenLabs/sui.git", branch = "testnet" }
# Async runtime
tokio = { version = "1.0", features = ["full"] }
# Error handling
anyhow = "1.0"
# Diesel PostgreSQL
diesel = { version = "2.0", features = ["postgres", "r2d2"] }
diesel-async = { version = "0.5", features = ["bb8", "postgres", "async-connection-wrapper"] }
diesel_migrations = "2.0"
# Async traits
async-trait = "0.1"
# URL parsing
url = "2.0"
# Use .env file
dotenvy = "0.15"
# Command line parsing
clap = { version = "4.0", features = ["derive"] }
The manifest now includes the following dependencies:
sui-indexer-alt-framework
: Core framework providing pipeline infrastructure.diesel/diesel-async
: Type-safe database ORM with asynchronous support.tokio
: Async runtime required by the framework.clap
: Command-line argument parsing for configuration.anyhow
: Error handling and async-trait for trait implementations.dotenvy
: Ingest.env
file that stores your PostgreSQL URL.
Step 3: Create database
Before configuring migrations, create and verify your local PostgreSQL database:
$ createdb sui_indexer
Get your connection details:
$ psql sui_indexer -c "\conninfo"
If successful, your console should display a message similar to the following:
You are connected to database "sui_indexer" as user "username" via socket in "/tmp" at port "5432".
You can now set a variable to your database URL as it's used in following commands. Make sure to change username
to your actual username.
$ PSQL_URL=postgres://username@localhost:5432/sui_indexer
You can now test your connection with the following command:
$ psql $PSQL_URL -c "SELECT 'Connected';"
If successful, your console or terminal should respond with a message similar to the following:
?column?
-----------
Connected
(1 row)
Step 4: Database setup
Before you start coding, make sure you set up a local PostgreSQL database from the previous step. This is required for the indexer to store the extracted transaction data.
The following database setup steps have you:
- Create a database table to store the data.
- Use Diesel to manage the process.
- Generate Rust code that maps to the database table.
Step 4.1: Configure Diesel
First, create a diesel.toml
file (within the same folder as cargo.toml
) to configure database migrations.
$ touch diesel.toml
Update and save the file with the following code:
[print_schema]
file = "src/schema.rs"
[migrations_directory]
dir = "migrations"
Step 4.2: Create database table using Diesel migrations
Diesel migrations are a way of creating and managing database tables using SQL files. Each migration has two files:
up.sql
: Creates and changes the table.down.sql
: Removes and undoes the changes.
Use the diesel setup
command to create the necessary directory structure, passing your database URL with the --database-url
argument.
$ diesel setup --database-url $PSQL_URL
Use the diesel migration
command at the root of your project to then generate the migration files.
$ diesel migration generate transaction_digests
You should now have a migrations
folder in your project. There should be a subdirectory in this folder with the name format YYYY-MM-DD-HHMMSS_transaction_digests
. This folder should contain the up.sql
and down.sql
files.
Open up.sql
and replace its contents with the following code (using the actual folder name):
CREATE TABLE IF NOT EXISTS transaction_digests (
tx_digest TEXT PRIMARY KEY,
checkpoint_sequence_number BIGINT NOT NULL
);
This example uses the TEXT
data type for tx_digest
, but best practice for a production indexer is to use the BYTEA
data type.
The TEXT
type is used to make the transaction digest easily readable and directly usable with external tools. Digests are Base58
encoded, and because PostgreSQL cannot natively display BYTEA
data in this format, storing it as TEXT
allows you to copy the digest from a query and paste it into an explorer like SuiScan.
For a production environment, however, BYTEA
is strongly recommended. It offers superior storage and query efficiency by storing the raw byte representation, which is more compact and significantly faster for comparisons than a string. Refer to Binary data performance in PostgreSQL on the CYBERTEC website for more information.
Save up.sql
, then open down.sql
to edit. Replace the contents of the file with the following code and save it:
DROP TABLE IF EXISTS transaction_digests;
Step 4.3: Apply migration and generate Rust schema
From the root of your project, use the diesel migration
command to create tables.
$ diesel migration run --database-url $PSQL_URL
Then use the diesel print-schema
command to generate the schema.rs
file from the actual database.
$ diesel print-schema --database-url $PSQL_URL > src/schema.rs
Your src/schema.rs
file should now look like the following:
// @generated automatically by Diesel CLI.
diesel::table! {
transaction_digests (tx_digest) {
tx_digest -> Text,
checkpoint_sequence_number -> Int8,
}
}
After running the previous commands, your project is set up for the next steps:
- PostgreSQL now has a
transaction_digests
table with the defined columns. src/schema.rs
contains automatically generated Rust code that represents this table structure.- You can now write type-safe Rust code that talks to this specific table.
The Diesel's migration system evolves the database schema over time in a structured and version-controlled way. For a complete walkthrough, see the official Diesel Getting Started guide.
Step 5: Create data structure
To simplify writes to Diesel, you can define a struct that represents a record on the transaction_digests
table.
use diesel::prelude::*;
use sui_indexer_alt_framework::FieldCount;
use crate::schema::transaction_digests;
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = transaction_digests)]
pub struct StoredTransactionDigest {
pub tx_digest: String,
pub checkpoint_sequence_number: i64,
}
Key annotations:
FieldCount
: Required bysui-indexer-alt-framework
for memory optimization and batch processing efficiency. It is used to limit the max size of a batch so that we don't exceed the postgres limit on the number of bind parameters a single SQL statement can have.diesel(table_name = transaction_digests)
: Maps this Rust struct to thetransaction_digests
table, whose schema is generated in a previous step.Insertable
: Allows this struct to be inserted into the database using Diesel.
Step 6: Define the Handler struct in handler.rs
Create a handlers.rs
file in your src
directory.
$ touch ./src/handlers.rs
Open the file and define a concrete struct to implement the Processor
and Handler
traits:
pub struct TransactionDigestHandler;
Save the file but keep it open as the next steps add to its code.
Step 7: Implement the Processor
The Processor
trait defines how to extract and transform data from checkpoints. The resulting data is then passed to Handler::commit
.
Add the necessary dependencies at the top of the file.
use std::sync::Arc;
use anyhow::Result;
use sui_indexer_alt_framework::{
pipeline::Processor,
types::full_checkpoint_content::CheckpointData,
};
use crate::models::StoredTransactionDigest;
use crate::schema::transaction_digests::dsl::*;
After the TransactionDigestHandler
struct, add the Processor
code:
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";
type Value = StoredTransactionDigest;
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.checkpoint_summary.sequence_number as i64;
let digests = checkpoint.transactions.iter().map(|tx| {
StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
}
}).collect();
Ok(digests)
}
}
Key concepts:
NAME
: Unique identifier for this processor used in monitoring and logging.type Value
: Defines what data flows through the pipeline, which ensures type safety.process()
: Core logic that transforms checkpoint data into your custom data structure.
Save the handlers.rs
file.
Processor trait definition
/// Implementors of this trait are responsible for transforming checkpoint into rows for their
/// table. The `FANOUT` associated value controls how many concurrent workers will be used to
/// process checkpoint information.
pub trait Processor {
/// Used to identify the pipeline in logs and metrics.
const NAME: &'static str;
/// How much concurrency to use when processing checkpoint data.
const FANOUT: usize = 10;
/// The type of value being inserted by the handler.
type Value: Send + Sync + 'static;
/// The processing logic for turning a checkpoint into rows of the table.
fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>>;
}
Step 8: Implement the Handler
The Handler
trait defines how to commit data to the database. Append the Handler
dependencies to bottom of the dependency list you created in the previous step.
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
postgres::{Connection, Db},
pipeline::sequential::Handler,
};
Add the logic for Handler
after the Processor
code. The complete code is available at the end of this step.
#[async_trait::async_trait]
impl Handler for TransactionDigestHandler {
type Store = Db;
type Batch = Vec<Self::Value>;
fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
batch.extend(values);
}
async fn commit<'a>(
batch: &Self::Batch,
conn: &mut Connection<'a>,
) -> Result<usize> {
let inserted = diesel::insert_into(transaction_digests)
.values(batch)
.on_conflict(tx_digest)
.do_nothing()
.execute(conn)
.await?;
Ok(inserted)
}
}
How sequential batching works:
process()
returns values for each checkpoint.batch()
accumulates values from multiple checkpoints.commit()
writes the batch when framework reaches limits (H::MAX_BATCH_CHECKPOINTS
).
while batch_checkpoints < H::MAX_BATCH_CHECKPOINTS {
if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
break;
}
let Some(entry) = pending.first_entry() else {
break;
};
match next_checkpoint.cmp(entry.key()) {
// Next pending checkpoint is from the future.
Ordering::Less => break,
// This is the next checkpoint -- include it.
Ordering::Equal => {
let indexed = entry.remove();
batch_rows += indexed.len();
batch_checkpoints += 1;
H::batch(&mut batch, indexed.values);
watermark = indexed.watermark;
next_checkpoint += 1;
}
// Next pending checkpoint is in the past, ignore it to avoid double
// writes.
Ordering::Greater => {
metrics
.total_watermarks_out_of_order
.with_label_values(&[H::NAME])
.inc();
let indexed = entry.remove();
pending_rows -= indexed.len();
}
}
}
You can override the default batch limits by implementing constants in your Handler
.
The handlers.rs
file is now complete. Save the file.
Complete handler.rs
file
handler.rs
fileuse std::sync::Arc;
use anyhow::Result;
use sui_indexer_alt_framework::{
pipeline::Processor,
types::full_checkpoint_content::CheckpointData,
};
use crate::models::StoredTransactionDigest;
use crate::schema::transaction_digests::dsl::*;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
postgres::{Connection, Db},
pipeline::sequential::Handler,
};
pub struct TransactionDigestHandler;
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";
type Value = StoredTransactionDigest;
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.checkpoint_summary.sequence_number as i64;
let digests = checkpoint.transactions.iter().map(|tx| {
StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
}
}).collect();
Ok(digests)
}
}
#[async_trait::async_trait]
impl Handler for TransactionDigestHandler {
type Store = Db;
type Batch = Vec<Self::Value>;
fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
batch.extend(values);
}
async fn commit<'a>(
batch: &Self::Batch,
conn: &mut Connection<'a>,
) -> Result<usize> {
let inserted = diesel::insert_into(transaction_digests)
.values(batch)
.on_conflict(tx_digest)
.do_nothing()
.execute(conn)
.await?;
Ok(inserted)
}
}
Handler trait definition
/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
///
/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
/// usage, but each handle may choose to override these defaults, e.g.
///
/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
/// sizes).
/// - Handlers that do more work during processing may wish to increase their fanout so more of it
/// can be done concurrently, to preserve throughput.
///
/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
/// checkpoint below which all data has been committed.
///
/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
/// back to the ingestion service.
#[async_trait::async_trait]
pub trait Handler: Processor<Value: FieldCount> {
type Store: Store;
/// If at least this many rows are pending, the committer will commit them eagerly.
const MIN_EAGER_ROWS: usize = 50;
/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_ROWS: usize = 5000;
/// The maximum number of watermarks that can show up in a single batch.
/// This limit exists to deal with pipelines that produce no data for a majority of
/// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
const MAX_WATERMARK_UPDATES: usize = 10_000;
/// Take a chunk of values and commit them to the database, returning the number of rows
/// affected.
async fn commit<'a>(
values: &[Self::Value],
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> anyhow::Result<usize>;
/// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune<'a>(
&self,
_from: u64,
_to_exclusive: u64,
_conn: &mut <Self::Store as Store>::Connection<'a>,
) -> anyhow::Result<usize> {
Ok(0)
}
}
Step 9: Create .env file
The main function you create in the next step needs the value you stored to the shell variable $PSQL_URL
. To make it available, create a .env
file with that data.
- Bash/zsh
- fish
- PowerShell
echo "DATABASE_URL=$PSQL_URL" > .env
echo "DATABASE_URL=$PSQL_URL" > .env
"DATABASE_URL=$env:PSQL_URL" | Out-File -Encoding UTF8 .env
After running the command for your environment, make sure the .env
file exists at your project root with the correct data.
Step 10: Create main function
Now, to tie everything together in the main function, open your main.rs
file. Replace the default code with the following and save the file:
mod models;
mod handlers;
use handlers::TransactionDigestHandler;
pub mod schema;
use anyhow::Result;
use clap::Parser;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use sui_indexer_alt_framework::{
cluster::{Args, IndexerCluster},
pipeline::sequential::SequentialConfig,
};
use tokio;
use url::Url;
// Embed database migrations into the binary so they run automatically on startup
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
#[tokio::main]
async fn main() -> Result<()> {
// Load .env data
dotenvy::dotenv().ok();
// Local database URL created in step 3 above
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set in the environment")
.parse::<Url>()
.expect("Invalid database URL");
// Parse command-line arguments (checkpoint range, URLs, performance settings)
let args = Args::parse();
// Build and configure the indexer cluster
let mut cluster = IndexerCluster::builder()
.with_args(args) // Apply command-line configuration
.with_database_url(database_url) // Set up database URL
.with_migrations(&MIGRATIONS) // Enable automatic schema migrations
.build()
.await?;
// Register our custom sequential pipeline with the cluster
cluster.sequential_pipeline(
TransactionDigestHandler, // Our processor/handler implementation
SequentialConfig::default(), // Use default batch sizes and checkpoint lag
).await?;
// Start the indexer and wait for completion
let handle = cluster.run().await?;
handle.await?;
Ok(())
}
Key components explained:
embed_migrations!
: Includes your migration files in the binary so the indexer automatically updates the database schema on startup.Args::parse()
: Provides command-line configuration like--first-checkpoint
,--remote-store-url
, and so on.IndexerCluster::builder()
: Sets up the framework infrastructure (database connections, checkpoint streaming, monitoring).sequential_pipeline()
: Registers a sequential pipeline that processes checkpoints in order with smart batching.SequentialConfig::default()
: Uses framework defaults for batch sizes and checkpoint lag (how many checkpoints to batch together).cluster.run()
: Starts processing checkpoints and blocks until completion.
Your indexer is now complete. The next steps walk you through running the indexer and checking its functionality.
Step 11: Run your indexer
Use the cargo run
command to run your indexer against Testnet with remote checkpoint storage.
$ cargo run -- --remote-store-url https://checkpoints.testnet.sui.io
Allow incoming network requests if your operating system requests it for the basic-sui-indexer
application.
If successful, your console informs you that the indexer is running.
Step 12: Verify results
Open a new terminal or console and connect to your database to check the results:
$ psql sui_indexer
Ater connecting, run a few queries to verify your indexer is working:
Check how many transaction digests are indexed:
$ SELECT COUNT(*) FROM transaction_digests;
View sample records:
$ SELECT * FROM transaction_digests LIMIT 5;
To confirm your data is accurate, copy any transaction digest from your database and verify it on SuiScan: https://suiscan.xyz/testnet/home
You've built a working custom indexer 🎉
The key concepts covered here apply to any custom indexer: define your data structure, implement the Processor
and Handler
traits, and let the framework handle the infrastructure.
Related links
- Sui Indexer Alt: The
sui-indexer-alt
crate in the Sui repo. - Move Registry: The indexer that the Move Registry (MVR) implements.
- DeepBook Indexer: The indexer that DeepBook implements.
- Custom Indexing Framework: The
sui-indexer-alt-framework
is a powerful Rust framework for building high-performance, custom blockchain indexers on Sui, providing customizable, production-ready components for data ingestion, processing, and storage. - Indexer Pipeline Architecture: The
sui-indexer-alt-framework
provides two distinct pipeline architectures. Understand the differences between the sequential and concurrent pipelines that thesui-indexer-alt-framework
provides to decide which best suits your project needs.