Skip to main content

Build Your First Custom Indexer

info

Refer to Custom Indexing Framework and Indexer Pipeline Architecture for a conceptual overview of the indexer framework.

Refer to Access Sui Data for an overview of options to access Sui network data.

To build a complete custom indexer, you use the sui-indexer-alt-framework. The steps that follow demonstrate how to create a sequential pipeline that extracts transaction digests from Sui checkpoints and stores them in a local PostgreSQL. You can find the source code for the framework in the Sui repo on GitHub.

tip

While this example uses PostgreSQL with Diesel (a popular Rust ORM and query builder) for minimalism and out-of-the-box support, the sui-indexer-alt-framework is designed for flexible storage. You can use different databases (such as MongoDB, CouchDB, or similar) or utilize other database clients if you prefer not to use Diesel. To achieve this, implement the framework's Store and Connection traits and define your database write logic directly within your Handler::commit() method.

Prerequisites

Before starting this exercise, ensure you have:

Required software

You need the following stack installed on your machine.

If you're unsure whether your system has the necessary software properly installed, you can verify installation with the following commands.

$ psql --version
$ diesel --version

To test PostgreSQL functionality, you can create and delete a test table.

$ createdb test_db && dropdb test_db

If you receive a new prompt with no console output, the test was successful. If you receive a createdb error similar to

createdb: error: connection to server on socket "/tmp/.s.PGSQL.5432" failed: FATAL:  role "username" does not exist

then you need to create the user (replace username with the name provided in your error message).

$ sudo -u postgres createuser --superuser username

Enter the password for your pgAdmin account when prompted, then try the createdb && dropdb command again.

Read the Custom Indexing Framework and Indexer Pipeline Architecture topics first to understand the overall architecture and concurrent pipeline concepts.

What you build

The following steps show how to create an indexer that:

  • Connects to Sui Testnet: Uses the remote checkpoint store at https://checkpoints.testnet.sui.io.
  • Processes checkpoints: Streams checkpoint data continuously.
  • Extracts transaction data: Pulls transaction digests from each checkpoint.
  • Stores in local PostgreSQL: Commits data to a local PostgreSQL database.
  • Implements sequential pipeline: Uses in-order processing with batching for optimal consistency and performance.

In the end, you have a working indexer that demonstrates all core framework concepts and can serve as a foundation for more complex custom indexers.

info

Sui provides checkpoint stores for both Mainnet and Testnet.

  • Testnet: https://checkpoints.testnet.sui.io
  • Mainnet: https://checkpoints.mainnet.sui.io

Step 1: Project setup

First, open your console to the directory you want to store your indexer project. Use the cargo new command to create a new Rust project and then navigate to its directory.

$ cargo new simple-sui-indexer
$ cd simple-sui-indexer

Step 2: Configure dependencies

Replace your Cargo.toml code with the following configuration and save.

[package]
name = "basic-sui-indexer"
version = "0.1.0"
edition = "2021"

[dependencies]
# Core framework dependencies
sui-indexer-alt-framework = { git = "https://github.com/MystenLabs/sui.git", branch = "testnet" }

# Async runtime
tokio = { version = "1.0", features = ["full"] }

# Error handling
anyhow = "1.0"

# Diesel PostgreSQL
diesel = { version = "2.0", features = ["postgres", "r2d2"] }
diesel-async = { version = "0.5", features = ["bb8", "postgres", "async-connection-wrapper"] }
diesel_migrations = "2.0"

# Async traits
async-trait = "0.1"

# URL parsing
url = "2.0"

# Use .env file
dotenvy = "0.15"

# Command line parsing
clap = { version = "4.0", features = ["derive"] }

The manifest now includes the following dependencies:

  • sui-indexer-alt-framework: Core framework providing pipeline infrastructure.
  • diesel/diesel-async: Type-safe database ORM with asynchronous support.
  • tokio: Async runtime required by the framework.
  • clap: Command-line argument parsing for configuration.
  • anyhow: Error handling and async-trait for trait implementations.
  • dotenvy: Ingest .env file that stores your PostgreSQL URL.

Step 3: Create database

Before configuring migrations, create and verify your local PostgreSQL database:

$ createdb sui_indexer

Get your connection details:

$ psql sui_indexer -c "\conninfo"

If successful, your console should display a message similar to the following:

You are connected to database "sui_indexer" as user "username" via socket in "/tmp" at port "5432".

You can now set a variable to your database URL as it's used in following commands. Make sure to change username to your actual username.

$ PSQL_URL=postgres://username@localhost:5432/sui_indexer

You can now test your connection with the following command:

$ psql $PSQL_URL -c "SELECT 'Connected';"

If successful, your console or terminal should respond with a message similar to the following:

?column?
-----------
Connected
(1 row)

Step 4: Database setup

Before you start coding, make sure you set up a local PostgreSQL database from the previous step. This is required for the indexer to store the extracted transaction data.

The following database setup steps have you:

  1. Create a database table to store the data.
  2. Use Diesel to manage the process.
  3. Generate Rust code that maps to the database table.

Step 4.1: Configure Diesel

First, create a diesel.toml file (within the same folder as cargo.toml) to configure database migrations.

$ touch diesel.toml

Update and save the file with the following code:

[print_schema]
file = "src/schema.rs"

[migrations_directory]
dir = "migrations"

Step 4.2: Create database table using Diesel migrations

Diesel migrations are a way of creating and managing database tables using SQL files. Each migration has two files:

  • up.sql: Creates and changes the table.
  • down.sql: Removes and undoes the changes.

Use the diesel setup command to create the necessary directory structure, passing your database URL with the --database-url argument.

$ diesel setup --database-url $PSQL_URL

Use the diesel migration command at the root of your project to then generate the migration files.

$ diesel migration generate transaction_digests

You should now have a migrations folder in your project. There should be a subdirectory in this folder with the name format YYYY-MM-DD-HHMMSS_transaction_digests. This folder should contain the up.sql and down.sql files.

Open up.sql and replace its contents with the following code (using the actual folder name):

CREATE TABLE IF NOT EXISTS transaction_digests (
tx_digest TEXT PRIMARY KEY,
checkpoint_sequence_number BIGINT NOT NULL
);
tip

This example uses the TEXT data type for tx_digest, but best practice for a production indexer is to use the BYTEA data type.

The TEXT type is used to make the transaction digest easily readable and directly usable with external tools. Digests are Base58 encoded, and because PostgreSQL cannot natively display BYTEA data in this format, storing it as TEXT allows you to copy the digest from a query and paste it into an explorer like SuiScan.

For a production environment, however, BYTEA is strongly recommended. It offers superior storage and query efficiency by storing the raw byte representation, which is more compact and significantly faster for comparisons than a string. Refer to Binary data performance in PostgreSQL on the CYBERTEC website for more information.

Save up.sql, then open down.sql to edit. Replace the contents of the file with the following code and save it:

Step 4.3: Apply migration and generate Rust schema

From the root of your project, use the diesel migration command to create tables.

$ diesel migration run --database-url $PSQL_URL

Then use the diesel print-schema command to generate the schema.rs file from the actual database.

$ diesel print-schema --database-url $PSQL_URL > src/schema.rs

Your src/schema.rs file should now look like the following:

// @generated automatically by Diesel CLI.

diesel::table! {
transaction_digests (tx_digest) {
tx_digest -> Text,
checkpoint_sequence_number -> Int8,
}
}

After running the previous commands, your project is set up for the next steps:

  • PostgreSQL now has a transaction_digests table with the defined columns.
  • src/schema.rs contains automatically generated Rust code that represents this table structure.
  • You can now write type-safe Rust code that talks to this specific table.

The Diesel's migration system evolves the database schema over time in a structured and version-controlled way. For a complete walkthrough, see the official Diesel Getting Started guide.

Step 5: Create data structure

To simplify writes to Diesel, you can define a struct that represents a record on the transaction_digests table.

use diesel::prelude::*;
use sui_indexer_alt_framework::FieldCount;
use crate::schema::transaction_digests;

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = transaction_digests)]
pub struct StoredTransactionDigest {
pub tx_digest: String,
pub checkpoint_sequence_number: i64,
}

Key annotations:

  • FieldCount: Required by sui-indexer-alt-framework for memory optimization and batch processing efficiency. It is used to limit the max size of a batch so that we don't exceed the postgres limit on the number of bind parameters a single SQL statement can have.
  • diesel(table_name = transaction_digests): Maps this Rust struct to the transaction_digests table, whose schema is generated in a previous step.
  • Insertable: Allows this struct to be inserted into the database using Diesel.

Step 6: Define the Handler struct in handler.rs

Create a handlers.rs file in your src directory.

$ touch ./src/handlers.rs

Open the file and define a concrete struct to implement the Processor and Handler traits:

pub struct TransactionDigestHandler;

Save the file but keep it open as the next steps add to its code.

Step 7: Implement the Processor

The Processor trait defines how to extract and transform data from checkpoints. The resulting data is then passed to Handler::commit.

Add the necessary dependencies at the top of the file.

use std::sync::Arc;
use anyhow::Result;
use sui_indexer_alt_framework::{
pipeline::Processor,
types::full_checkpoint_content::CheckpointData,
};

use crate::models::StoredTransactionDigest;
use crate::schema::transaction_digests::dsl::*;

After the TransactionDigestHandler struct, add the Processor code:

impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";

type Value = StoredTransactionDigest;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.checkpoint_summary.sequence_number as i64;

let digests = checkpoint.transactions.iter().map(|tx| {
StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
}
}).collect();

Ok(digests)
}
}

Key concepts:

  • NAME: Unique identifier for this processor used in monitoring and logging.
  • type Value: Defines what data flows through the pipeline, which ensures type safety.
  • process(): Core logic that transforms checkpoint data into your custom data structure.

Save the handlers.rs file.

Click to open

Processor trait definition

/// Implementors of this trait are responsible for transforming checkpoint into rows for their
/// table. The `FANOUT` associated value controls how many concurrent workers will be used to
/// process checkpoint information.
pub trait Processor {
/// Used to identify the pipeline in logs and metrics.
const NAME: &'static str;

/// How much concurrency to use when processing checkpoint data.
const FANOUT: usize = 10;

/// The type of value being inserted by the handler.
type Value: Send + Sync + 'static;

/// The processing logic for turning a checkpoint into rows of the table.
fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>>;
}

Step 8: Implement the Handler

The Handler trait defines how to commit data to the database. Append the Handler dependencies to bottom of the dependency list you created in the previous step.

use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
postgres::{Connection, Db},
pipeline::sequential::Handler,
};

Add the logic for Handler after the Processor code. The complete code is available at the end of this step.

#[async_trait::async_trait]
impl Handler for TransactionDigestHandler {
type Store = Db;
type Batch = Vec<Self::Value>;

fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
batch.extend(values);
}

async fn commit<'a>(
batch: &Self::Batch,
conn: &mut Connection<'a>,
) -> Result<usize> {
let inserted = diesel::insert_into(transaction_digests)
.values(batch)
.on_conflict(tx_digest)
.do_nothing()
.execute(conn)
.await?;

Ok(inserted)
}
}

How sequential batching works:

  1. process() returns values for each checkpoint.
  2. batch() accumulates values from multiple checkpoints.
  3. commit() writes the batch when framework reaches limits (H::MAX_BATCH_CHECKPOINTS).
while batch_checkpoints < H::MAX_BATCH_CHECKPOINTS {
if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
break;
}

let Some(entry) = pending.first_entry() else {
break;
};

match next_checkpoint.cmp(entry.key()) {
// Next pending checkpoint is from the future.
Ordering::Less => break,

// This is the next checkpoint -- include it.
Ordering::Equal => {
let indexed = entry.remove();
batch_rows += indexed.len();
batch_checkpoints += 1;
H::batch(&mut batch, indexed.values);
watermark = indexed.watermark;
next_checkpoint += 1;
}

// Next pending checkpoint is in the past, ignore it to avoid double
// writes.
Ordering::Greater => {
metrics
.total_watermarks_out_of_order
.with_label_values(&[H::NAME])
.inc();

let indexed = entry.remove();
pending_rows -= indexed.len();
}
}
}
tip

You can override the default batch limits by implementing constants in your Handler.

The handlers.rs file is now complete. Save the file.

Click to open

Complete handler.rs file

use std::sync::Arc;
use anyhow::Result;
use sui_indexer_alt_framework::{
pipeline::Processor,
types::full_checkpoint_content::CheckpointData,
};

use crate::models::StoredTransactionDigest;
use crate::schema::transaction_digests::dsl::*;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
postgres::{Connection, Db},
pipeline::sequential::Handler,
};

pub struct TransactionDigestHandler;
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";

type Value = StoredTransactionDigest;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.checkpoint_summary.sequence_number as i64;

let digests = checkpoint.transactions.iter().map(|tx| {
StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
}
}).collect();

Ok(digests)
}
}
#[async_trait::async_trait]
impl Handler for TransactionDigestHandler {
type Store = Db;
type Batch = Vec<Self::Value>;

fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
batch.extend(values);
}

async fn commit<'a>(
batch: &Self::Batch,
conn: &mut Connection<'a>,
) -> Result<usize> {
let inserted = diesel::insert_into(transaction_digests)
.values(batch)
.on_conflict(tx_digest)
.do_nothing()
.execute(conn)
.await?;

Ok(inserted)
}
}
Click to open

Handler trait definition

/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
///
/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
/// usage, but each handle may choose to override these defaults, e.g.
///
/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
/// sizes).
/// - Handlers that do more work during processing may wish to increase their fanout so more of it
/// can be done concurrently, to preserve throughput.
///
/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
/// checkpoint below which all data has been committed.
///
/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
/// back to the ingestion service.
#[async_trait::async_trait]
pub trait Handler: Processor<Value: FieldCount> {
type Store: Store;

/// If at least this many rows are pending, the committer will commit them eagerly.
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_ROWS: usize = 5000;

/// The maximum number of watermarks that can show up in a single batch.
/// This limit exists to deal with pipelines that produce no data for a majority of
/// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
const MAX_WATERMARK_UPDATES: usize = 10_000;

/// Take a chunk of values and commit them to the database, returning the number of rows
/// affected.
async fn commit<'a>(
values: &[Self::Value],
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> anyhow::Result<usize>;

/// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune<'a>(
&self,
_from: u64,
_to_exclusive: u64,
_conn: &mut <Self::Store as Store>::Connection<'a>,
) -> anyhow::Result<usize> {
Ok(0)
}
}

Step 9: Create .env file

The main function you create in the next step needs the value you stored to the shell variable $PSQL_URL. To make it available, create a .env file with that data.

echo "DATABASE_URL=$PSQL_URL" > .env

After running the command for your environment, make sure the .env file exists at your project root with the correct data.

Step 10: Create main function

Now, to tie everything together in the main function, open your main.rs file. Replace the default code with the following and save the file:

mod models;
mod handlers;

use handlers::TransactionDigestHandler;

pub mod schema;

use anyhow::Result;
use clap::Parser;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use sui_indexer_alt_framework::{
cluster::{Args, IndexerCluster},
pipeline::sequential::SequentialConfig,
};
use tokio;
use url::Url;

// Embed database migrations into the binary so they run automatically on startup
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

#[tokio::main]
async fn main() -> Result<()> {
// Load .env data
dotenvy::dotenv().ok();

// Local database URL created in step 3 above
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set in the environment")
.parse::<Url>()
.expect("Invalid database URL");

// Parse command-line arguments (checkpoint range, URLs, performance settings)
let args = Args::parse();

// Build and configure the indexer cluster
let mut cluster = IndexerCluster::builder()
.with_args(args) // Apply command-line configuration
.with_database_url(database_url) // Set up database URL
.with_migrations(&MIGRATIONS) // Enable automatic schema migrations
.build()
.await?;

// Register our custom sequential pipeline with the cluster
cluster.sequential_pipeline(
TransactionDigestHandler, // Our processor/handler implementation
SequentialConfig::default(), // Use default batch sizes and checkpoint lag
).await?;

// Start the indexer and wait for completion
let handle = cluster.run().await?;
handle.await?;

Ok(())
}

Key components explained:

  • embed_migrations!: Includes your migration files in the binary so the indexer automatically updates the database schema on startup.
  • Args::parse(): Provides command-line configuration like --first-checkpoint, --remote-store-url, and so on.
  • IndexerCluster::builder(): Sets up the framework infrastructure (database connections, checkpoint streaming, monitoring).
  • sequential_pipeline(): Registers a sequential pipeline that processes checkpoints in order with smart batching.
  • SequentialConfig::default(): Uses framework defaults for batch sizes and checkpoint lag (how many checkpoints to batch together).
  • cluster.run(): Starts processing checkpoints and blocks until completion.

Your indexer is now complete. The next steps walk you through running the indexer and checking its functionality.

Step 11: Run your indexer

Use the cargo run command to run your indexer against Testnet with remote checkpoint storage.

$ cargo run -- --remote-store-url https://checkpoints.testnet.sui.io
info

Allow incoming network requests if your operating system requests it for the basic-sui-indexer application.

If successful, your console informs you that the indexer is running.

Step 12: Verify results

Open a new terminal or console and connect to your database to check the results:

$ psql sui_indexer

Ater connecting, run a few queries to verify your indexer is working:

Check how many transaction digests are indexed:

$ SELECT COUNT(*) FROM transaction_digests;

View sample records:

$ SELECT * FROM transaction_digests LIMIT 5;

To confirm your data is accurate, copy any transaction digest from your database and verify it on SuiScan: https://suiscan.xyz/testnet/home

You've built a working custom indexer 🎉

The key concepts covered here apply to any custom indexer: define your data structure, implement the Processor and Handler traits, and let the framework handle the infrastructure.

  • Sui Indexer Alt: The sui-indexer-alt crate in the Sui repo.
  • Move Registry: The indexer that the Move Registry (MVR) implements.
  • DeepBook Indexer: The indexer that DeepBook implements.
  • Custom Indexing Framework: The sui-indexer-alt-framework is a powerful Rust framework for building high-performance, custom blockchain indexers on Sui, providing customizable, production-ready components for data ingestion, processing, and storage.
  • Indexer Pipeline Architecture: The sui-indexer-alt-framework provides two distinct pipeline architectures. Understand the differences between the sequential and concurrent pipelines that the sui-indexer-alt-framework provides to decide which best suits your project needs.