Indexer Data and Integration
Building a custom indexer on Sui lets you take full control of data ingestion, storage, and processing. You can choose from multiple checkpoint data sources such as remote store, local files, or direct RPC API access, depending on whether you're indexing Mainnet data, testing against known checkpoints, or working on a local network.
For storage, you can either use the built-in IndexerCluster
with PostgreSQL or implement your own Store
and Connection
traits to integrate a different database or storage backend. After connected, you can wire up a manual indexer, add your custom pipelines, and handle watermark coordination to keep data in sync.
Finally, you need to deserialize Move events from raw BCS bytes into Rust structs, using bcs
and serde
, so that your pipelines can work with strongly-typed data. This gives you a reproducible, end-to-end setup that you can tune for performance, reliability, and custom analytics.
Checkpoint data sources
The sui-indexer-alt-framework
supports three data sources for accessing Sui blockchain data. For all data sources, the indexing framework is using a polling technique to check for new checkpoints. Data sources are configured through command-line arguments.
Remote store
The most direct stream source is a subscription to a remote checkpoint store. Sui provides the following endpoints:
- Testnet:
https://checkpoints.testnet.sui.io
- Mainnet:
https://checkpoints.mainnet.sui.io
$ cargo run -- --remote-store-url https://checkpoints.testnet.sui.io
Local path
Local checkpoint files are useful for development and testing scenarios:
$ cargo run -- --local-ingestion-path /path/to/checkpoints
Common use cases:
- Unit and integration testing with known checkpoint data
- Development workflows with reproducible datasets
RPC API
RPC API access queries full nodes directly using gRPC:
$ cargo run -- --rpc-api-url https://fullnode.testnet.sui.io:443
Endpoint format: https://fullnode.NETWORK.sui.io:443
where NETWORK
is one of the available networks:
testnet
devnet
mainnet
When to use RPC API:
- Networks without official remote store (devnet, localnet, custom networks)
- Development against local Sui networks
Bring your own store (BYOS)
The IndexerCluster
provides a convenient way to get started with PostgreSQL, but you might want to use a different database or storage system. This requires using the manual Indexer
class and implementing custom Store
and Connection
traits from sui-indexer-alt-framework-store-traits
.
lib.rs
in sui-indexer-alt-framework-store-traits
lib.rs
in sui-indexer-alt-framework-store-traits
/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send {
/// Given a pipeline, return the committer watermark from the `Store`. This is used by the
/// indexer on startup to determine which checkpoint to resume processing from.
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>>;
/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
/// to determine the new `reader_lo` or inclusive lower bound of available data.
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>>;
/// Get the bounds for the region that the pruner is allowed to prune, and the time in
/// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
/// minimizes the possibility for the pruner to delete data still expected by inflight read
/// requests.
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<PrunerWatermark>>;
/// Upsert the high watermark as long as it raises the watermark stored in the database. Returns
/// a boolean indicating whether the watermark was actually updated or not.
async fn set_committer_watermark(
&mut self,
pipeline: &'static str,
watermark: CommitterWatermark,
) -> anyhow::Result<bool>;
/// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
/// will reference this as the inclusive lower bound of available data for the corresponding
/// pipeline.
///
/// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
/// the watermark entry to the current time. Ideally, this would be from the perspective of the
/// store. If this is not possible, then it should come from some other common source of time
/// between the indexer and its readers. This timestamp is critical to the indexer's operations,
/// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
/// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
/// before beginning to prune.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool>;
/// Update the pruner watermark, returns true if the watermark was actually updated
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool>;
}
/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
type Connection<'c>: Connection
where
Self: 'c;
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
}
When to use BYOS:
- Different database: MongoDB, CouchDB, or other non-PostgreSQL databases. This also applies if you prefer to use PostgreSQL but without the default Diesel ORM.
- Custom requirements: Specialized storage logic, partitioning, or performance optimizations.
Core implementation requirements
To implement BYOS, you need to:
- Define your
Store
andConnection
struct that manages connections. - Implement the
Store
trait for connection management. - Implement the
Connection
trait for watermark operations. - Use manual
Indexer
instead ofIndexerCluster
.
Step 1: Define your store structure
use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;
#[derive(Clone)]
pub struct MyCustomStore {
// Your database connection details
connection_pool: MyDatabasePool,
config: MyConfig,
}
pub struct MyCustomConnection<'a> {
// A connection instance
conn: MyDatabaseConnection<'a>,
}
Step 2: Implement the Store
trait
The Store
trait manages connection lifecycle:
#[async_trait]
impl Store for MyCustomStore {
type Connection<'c> = MyCustomConnection<'c>;
async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
// Your implementation
}
}
Step 3: Implement the Connection
trait
The Connection
trait handles watermark operations for pipeline coordination:
#[async_trait]
impl Connection for MyCustomConnection<'_> {
// Get the highest checkpoint processed by a pipeline
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>> {
// Query your database for watermark data
todo!("Implement based on your storage system")
}
// Get the lowest available checkpoint for readers
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>> {
// Implementation depends on your database schema
todo!("Implement based on your storage system")
}
// Implement other required methods...
}
For a complete reference, study the sui-pg-db
implementation on Connection
:
#[async_trait]
impl store::Connection for Connection<'_> {
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<store::CommitterWatermark>> {
let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
.select((
watermarks::epoch_hi_inclusive,
watermarks::checkpoint_hi_inclusive,
watermarks::tx_hi,
watermarks::timestamp_ms_hi_inclusive,
))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::CommitterWatermark {
epoch_hi_inclusive: watermark.0 as u64,
checkpoint_hi_inclusive: watermark.1 as u64,
tx_hi: watermark.2 as u64,
timestamp_ms_hi_inclusive: watermark.3 as u64,
}))
} else {
Ok(None)
}
}
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<store::ReaderWatermark>> {
let watermark: Option<(i64, i64)> = watermarks::table
.select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::ReaderWatermark {
checkpoint_hi_inclusive: watermark.0 as u64,
reader_lo: watermark.1 as u64,
}))
} else {
Ok(None)
}
}
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<store::PrunerWatermark>> {
// |---------- + delay ---------------------|
// |--- wait_for ---|
// |-----------------------|----------------|
// ^ ^
// pruner_timestamp NOW()
let wait_for = sql!(as BigInt,
"CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
delay.as_millis() as i64,
);
let watermark: Option<(i64, i64, i64)> = watermarks::table
.select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::PrunerWatermark {
wait_for_ms: watermark.0,
pruner_hi: watermark.1 as u64,
reader_lo: watermark.2 as u64,
}))
} else {
Ok(None)
}
}
async fn set_committer_watermark(
&mut self,
pipeline: &'static str,
watermark: store::CommitterWatermark,
) -> anyhow::Result<bool> {
// Create a StoredWatermark directly from CommitterWatermark
let stored_watermark = StoredWatermark {
pipeline: pipeline.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
tx_hi: watermark.tx_hi as i64,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
reader_lo: 0,
pruner_timestamp: NaiveDateTime::UNIX_EPOCH,
pruner_hi: 0,
};
use diesel::query_dsl::methods::FilterDsl;
Ok(diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
.do_update()
.set((
watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
watermarks::tx_hi.eq(stored_watermark.tx_hi),
watermarks::timestamp_ms_hi_inclusive
.eq(stored_watermark.timestamp_ms_hi_inclusive),
))
.filter(
watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
)
.execute(self)
.await?
> 0)
}
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set((
watermarks::reader_lo.eq(reader_lo as i64),
watermarks::pruner_timestamp.eq(diesel::dsl::now),
))
.filter(watermarks::pipeline.eq(pipeline))
.filter(watermarks::reader_lo.lt(reader_lo as i64))
.execute(self)
.await?
> 0)
}
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set(watermarks::pruner_hi.eq(pruner_hi as i64))
.filter(watermarks::pipeline.eq(pipeline))
.execute(self)
.await?
> 0)
}
}
Step 4: Use manual indexer
Replace IndexerCluster
with manual Indexer
:
use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig};
async fn main() -> anyhow::Result<()> {
// Initialize your custom store
let store = MyCustomStore::new(config).await?;
// Configure indexer manually
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
local_ingestion_path: None,
rpc_api_url: None,
rpc_username: None,
rpc_password: None,
},
IngestionConfig::default(),
&prometheus::Registry::new(),
tokio_util::sync::CancellationToken::new(),
).await?;
// Add your pipelines
indexer.concurrent_pipeline(
YourHandler::default(),
ConcurrentConfig::default(),
).await?;
// Start the indexer
indexer.run().await?;
Ok(())
}
Example: ClickHouse implementation
For a complete working example of BYOS with ClickHouse (a high-performance columnar database for analytics), see the example project in the Sui repo.
ClickHouse example README
ClickHouse Sui Indexer
A simple example of how to build a custom Sui indexer that writes transaction data to ClickHouse.
Quick Start
1. Start ClickHouse
docker run -d --name clickhouse-dev -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse/clickhouse-server
2. Set up database user
docker exec clickhouse-dev clickhouse-client --query "CREATE USER IF NOT EXISTS dev IDENTIFIED WITH no_password"
docker exec clickhouse-dev clickhouse-client --query "GRANT CREATE, INSERT, SELECT, ALTER, UPDATE, DELETE ON *.* TO dev"
3. Run the indexer
cargo run -- --remote-store-url https://checkpoints.testnet.sui.io --last-checkpoint=10
That’s it! The indexer will:
- Create the necessary tables automatically
- Fetch checkpoints from the Sui testnet
- Write transaction data to ClickHouse
Verify Data
Check that data was written:
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT COUNT(*) FROM transactions"
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT * FROM transactions LIMIT 5"
Clean Up
Stop and remove the ClickHouse container:
docker stop clickhouse-dev && docker rm clickhouse-dev
What This Example Shows
- Custom Store Implementation: How to implement the
Store
trait for ClickHouse - Concurrent Pipeline: Uses the concurrent pipeline for better pruning and watermark testing
- Watermark Management: Tracking indexer progress with committer, reader, and pruner watermarks
- Transaction Processing: Extracting and storing transaction digests from checkpoints
- Simple Setup: Minimal configuration for local development
Architecture
Sui Network → Checkpoints → Concurrent Pipeline → ClickHouse Store → ClickHouse DB
The indexer uses a concurrent pipeline that processes checkpoints out-of-order with separate reader, committer, and pruner components. This is ideal for testing watermark functionality and pruning behavior.
This example demonstrates:
- Custom store implementation using the ClickHouse Rust client.
- Watermark persistence with ClickHouse-specific SQL syntax.
- Transaction digest indexing similar to the built-in PostgreSQL handler.
The example includes three main components:
-
store.rs
- ClickHouseStore implementingStore
andConnection
traits.Click to openstore.rs
use anyhow::Result;
use async_trait::async_trait;
use chrono::Utc;
use clickhouse::{Client, Row};
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use sui_indexer_alt_framework_store_traits::{
CommitterWatermark, Connection, PrunerWatermark, ReaderWatermark, Store, TransactionalStore,
};
use url::Url;
#[derive(Clone)]
pub struct ClickHouseStore {
client: Client,
}
pub struct ClickHouseConnection {
pub client: Client,
}
/// Row structure for watermark table operations
#[derive(Row, Serialize, Deserialize, Debug, Default)]
struct WatermarkRow {
pipeline: String,
epoch_hi_inclusive: u64,
checkpoint_hi_inclusive: u64,
tx_hi: u64,
timestamp_ms_hi_inclusive: u64,
reader_lo: u64,
pruner_hi: u64,
pruner_timestamp: u64, // Unix timestamp in milliseconds
}
impl ClickHouseStore {
pub fn new(url: Url) -> Self {
let client = Client::default()
.with_url(url.as_str())
.with_user("dev") // Simple user for local development
.with_compression(clickhouse::Compression::Lz4);
Self { client }
}
/// Create tables if they don't exist
pub async fn create_tables_if_not_exists(&self) -> Result<()> {
// Create watermarks table for pipeline state management
self.client
.query(
"
CREATE TABLE IF NOT EXISTS watermarks
(
pipeline String,
epoch_hi_inclusive UInt64,
checkpoint_hi_inclusive UInt64,
tx_hi UInt64,
timestamp_ms_hi_inclusive UInt64,
reader_lo UInt64,
pruner_hi UInt64,
pruner_timestamp UInt64
)
ENGINE = MergeTree()
ORDER BY pipeline
",
)
.execute()
.await?;
// Create transactions table for the actual indexing data
self.client
.query(
"
CREATE TABLE IF NOT EXISTS transactions
(
checkpoint_sequence_number UInt64,
transaction_digest String,
indexed_at DateTime64(3, 'UTC') DEFAULT now()
)
ENGINE = MergeTree()
ORDER BY checkpoint_sequence_number
",
)
.execute()
.await?;
Ok(())
}
}
#[async_trait]
impl Store for ClickHouseStore {
type Connection<'c> = ClickHouseConnection;
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>> {
Ok(ClickHouseConnection {
client: self.client.clone(),
})
}
}
#[async_trait]
impl TransactionalStore for ClickHouseStore {
async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
where
R: Send + 'a,
F: Send + 'a,
F: for<'r> FnOnce(
&'r mut Self::Connection<'_>,
) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
{
let mut conn = self.connect().await?;
f(&mut conn).await
}
}
#[async_trait]
impl Connection for ClickHouseConnection {
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> Result<Option<CommitterWatermark>> {
let mut cursor = self
.client
.query(
"SELECT epoch_hi_inclusive, checkpoint_hi_inclusive, tx_hi, timestamp_ms_hi_inclusive
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1"
)
.bind(pipeline)
.fetch::<(u64, u64, u64, u64)>()?;
let row: Option<(u64, u64, u64, u64)> = cursor.next().await?;
Ok(row.map(
|(epoch_hi, checkpoint_hi, tx_hi, timestamp_hi)| CommitterWatermark {
epoch_hi_inclusive: epoch_hi,
checkpoint_hi_inclusive: checkpoint_hi,
tx_hi,
timestamp_ms_hi_inclusive: timestamp_hi,
},
))
}
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> Result<Option<ReaderWatermark>> {
let mut cursor = self
.client
.query(
"SELECT checkpoint_hi_inclusive, reader_lo
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1",
)
.bind(pipeline)
.fetch::<(u64, u64)>()?;
let row: Option<(u64, u64)> = cursor.next().await?;
Ok(row.map(|(checkpoint_hi, reader_lo)| ReaderWatermark {
checkpoint_hi_inclusive: checkpoint_hi,
reader_lo,
}))
}
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> Result<Option<PrunerWatermark>> {
// Follow PostgreSQL pattern: calculate wait_for_ms on database side
// We do this so that we can rely on the database to keep a consistent sense of time.
// Using own clocks can potentially be subject to some clock skew.
let delay_ms = delay.as_millis() as i64;
let mut cursor = self
.client
.query(
"SELECT reader_lo, pruner_hi,
toInt64(? + (pruner_timestamp - toUnixTimestamp64Milli(now64()))) as wait_for_ms
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1"
)
.bind(delay_ms)
.bind(pipeline)
.fetch::<(u64, u64, i64)>()?;
let row: Option<(u64, u64, i64)> = cursor.next().await?;
Ok(
row.map(|(reader_lo, pruner_hi, wait_for_ms)| PrunerWatermark {
wait_for_ms,
reader_lo,
pruner_hi,
}),
)
}
async fn set_committer_watermark(
&mut self,
pipeline: &'static str,
watermark: CommitterWatermark,
) -> Result<bool> {
// Follow PostgreSQL pattern: check if row exists, then UPDATE or INSERT accordingly
// First check if pipeline exists and get current checkpoint
let mut cursor = self
.client
.query("SELECT checkpoint_hi_inclusive FROM watermarks WHERE pipeline = ? LIMIT 1")
.bind(pipeline)
.fetch::<u64>()?;
let existing_checkpoint: Option<u64> = cursor.next().await?;
if let Some(existing_checkpoint) = existing_checkpoint {
// Row exists - only update if checkpoint advances
if existing_checkpoint < watermark.checkpoint_hi_inclusive {
self.client
.query(
"ALTER TABLE watermarks
UPDATE
epoch_hi_inclusive = ?,
checkpoint_hi_inclusive = ?,
tx_hi = ?,
timestamp_ms_hi_inclusive = ?
WHERE pipeline = ?",
)
.bind(watermark.epoch_hi_inclusive)
.bind(watermark.checkpoint_hi_inclusive)
.bind(watermark.tx_hi)
.bind(watermark.timestamp_ms_hi_inclusive)
.bind(pipeline)
.execute()
.await?;
}
} else {
// No existing row - insert new one
let mut inserter = self.client.inserter("watermarks")?;
inserter.write(&WatermarkRow {
pipeline: pipeline.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
tx_hi: watermark.tx_hi,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
reader_lo: 0, // Will be updated by reader
pruner_hi: 0, // Will be updated by pruner
pruner_timestamp: Utc::now().timestamp_millis() as u64,
})?;
inserter.end().await?;
}
Ok(true)
}
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> Result<bool> {
// Follow PostgreSQL pattern: simple UPDATE with timestamp update and advancement check
self.client
.query(
"ALTER TABLE watermarks
UPDATE reader_lo = ?, pruner_timestamp = toUnixTimestamp64Milli(now64())
WHERE pipeline = ? AND reader_lo < ?",
)
.bind(reader_lo)
.bind(pipeline)
.bind(reader_lo)
.execute()
.await?;
Ok(true)
}
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> Result<bool> {
// Follow PostgreSQL pattern: simple UPDATE statement
self.client
.query(
"ALTER TABLE watermarks
UPDATE pruner_hi = ?
WHERE pipeline = ?",
)
.bind(pruner_hi)
.bind(pipeline)
.execute()
.await?;
Ok(true)
}
} -
handlers.rs
-TxDigest
handler processing checkpoint data.Click to openhandlers.rs
use anyhow::Result;
use async_trait::async_trait;
use clickhouse::Row;
use serde::Serialize;
use std::sync::Arc;
use sui_indexer_alt_framework::{
pipeline::{concurrent::Handler, Processor},
FieldCount,
};
use sui_indexer_alt_framework_store_traits::Store;
use sui_types::full_checkpoint_content::CheckpointData;
use crate::store::ClickHouseStore;
/// Structure representing a transaction digest record in ClickHouse
/// Aligned with sui-indexer-alt's StoredTxDigest structure
#[derive(Row, Serialize, Clone, Debug, FieldCount)]
pub struct StoredTxDigest {
pub tx_sequence_number: i64,
pub tx_digest: Vec<u8>,
}
/// Handler that processes checkpoint data and extracts transaction digests
/// Named to align with sui-indexer-alt's TxDigests handler
#[derive(Clone, Default)]
pub struct TxDigests;
impl Processor for TxDigests {
const NAME: &'static str = "tx_digests";
type Value = StoredTxDigest;
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
..
} = checkpoint.as_ref();
let first_tx = checkpoint_summary.network_total_transactions as usize - transactions.len();
Ok(transactions
.iter()
.enumerate()
.map(|(i, tx)| StoredTxDigest {
tx_sequence_number: (first_tx + i) as i64,
tx_digest: tx.transaction.digest().inner().to_vec(),
})
.collect())
}
}
#[async_trait]
impl Handler for TxDigests {
type Store = ClickHouseStore;
async fn commit<'a>(
values: &[Self::Value],
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> Result<usize> {
let row_count = values.len();
if row_count == 0 {
return Ok(0);
}
// Use ClickHouse inserter for efficient bulk inserts
let mut inserter = conn.client.inserter("tx_digests")?;
for tx_digest in values {
inserter.write(tx_digest)?;
}
inserter.end().await?;
Ok(row_count)
}
} -
main.rs
- Manual indexer setup with ClickHouse backend.Click to openmain.rs
mod handlers;
mod store;
use anyhow::Result;
use clap::Parser;
use sui_indexer_alt_framework::{
ingestion::{ClientArgs, IngestionConfig},
pipeline::concurrent::ConcurrentConfig,
Indexer, IndexerArgs,
};
use url::Url;
use handlers::TxDigests;
use store::ClickHouseStore;
#[derive(clap::Parser, Debug, Clone)]
struct Args {
#[clap(flatten)]
pub indexer_args: IndexerArgs,
#[clap(flatten)]
pub client_args: ClientArgs,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize crypto provider for HTTPS connections (needed for remote checkpoint fetching)
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install crypto provider");
// Parse command-line arguments
let args = Args::parse();
// ClickHouse connection (uses 'dev' user by default for local development)
let clickhouse_url = "http://localhost:8123".parse::<Url>()?;
println!("Connecting to ClickHouse at: {}", clickhouse_url);
// Create our custom ClickHouse store
let store = ClickHouseStore::new(clickhouse_url);
// Ensure the database tables are created before starting the indexer
store.create_tables_if_not_exists().await?;
// Manually build the indexer with our custom ClickHouse store
// This is the key difference from basic-sui-indexer which uses IndexerCluster::builder()
let mut indexer = Indexer::new(
store.clone(),
args.indexer_args,
args.client_args,
IngestionConfig::default(),
None, // No metrics prefix
&Default::default(), // Empty prometheus registry
tokio_util::sync::CancellationToken::new(),
)
.await?;
// Register our concurrent pipeline handler (better for testing pruning)
// This processes checkpoints with separate reader and pruner components
indexer
.concurrent_pipeline(
TxDigests,
// ConcurrentConfig default comes with no pruning.
ConcurrentConfig::default(),
)
.await?;
println!("Starting ClickHouse Sui indexer...");
// Start the indexer and wait for it to complete
let handle = indexer.run().await?;
// This will run until the indexer is stopped (e.g., by Ctrl+C)
handle.await?;
println!("Indexer stopped");
Ok(())
}
Deserializing Move events
When Move smart contracts execute on Sui, they can emit events using the sui::event
module. These events are stored in checkpoints as BCS-serialized bytes that your indexer needs to deserialize to extract meaningful data.
Why deserialization is needed
Move contracts emit events like this:
// Move smart contract
use sui::event;
public fun transfer_balance(...) {
event::emit(BalanceEvent {
balance_manager_id: id,
asset: asset_id,
amount: 100,
deposit: true
});
}
But in checkpoint data, these events arrive as raw BCS bytes that need to be converted back to Rust structs for processing.
Step-by-step deserialization
-
Add BCS dependency
[dependencies]
bcs = "0.1.6"
serde = { version = "1.0", features = ["derive"] } -
Define the
Event
struct in RustDefine the same structure in Rust as declared in Move. You can do this manually or use
move-binding
to auto-generate it from on-chain packages.use serde::Deserialize;
use sui_indexer_alt_framework::types::::base_types::ObjectID;
#[derive(Deserialize, Debug)]
struct BalanceEvent {
balance_manager_id: ObjectID,
asset: ObjectID,
amount: u64,
deposit: bool,
}importantField order and types must match the Move event exactly.
-
Extract event bytes in your processor
impl Processor for YourHandler {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
let mut results = Vec::new();
for transaction in &checkpoint.transactions {
for event in &transaction.events {
// Get the raw BCS bytes
let event_bytes = &event.contents;
// Deserialize to your Rust struct
if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
// Do something
}
}
}
Ok(results)
}
}
Related links
The sui-indexer-alt-framework
is a powerful Rust framework for building high-performance, custom blockchain indexers on Sui. It provides customizable, production-ready components for data ingestion, processing, and storage.
Establishing a custom indexer helps improve latency, allows pruning the data of your Sui full node, and provides efficient assemblage of checkpoint data.