Bring Your Own Store (BYOS)
The IndexerCluster provides a convenient way to get started with PostgreSQL, but you might want to use a different database or storage system. This requires using the manual Indexer class and implementing custom Store and Connection traits from sui-indexer-alt-framework-store-traits.
lib.rs in sui-indexer-alt-framework-store-traits
lib.rs in sui-indexer-alt-framework-store-traits/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send {
/// If no existing watermark record exists, initializes it with `default_next_checkpoint`.
/// Returns the committer watermark `checkpoint_hi_inclusive`.
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>>;
/// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
/// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
/// from the `Store`. The indexer fetches this value for each pipeline added to determine which
/// checkpoint to resume processing from.
async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<CommitterWatermark>>;
/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
/// to determine the new `reader_lo` or inclusive lower bound of available data.
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>>;
/// Get the bounds for the region that the pruner is allowed to prune, and the time in
/// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
/// minimizes the possibility for the pruner to delete data still expected by inflight read
/// requests.
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<PrunerWatermark>>;
/// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
/// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
/// long as it raises the watermark stored in the database. Returns a boolean indicating whether
/// the watermark was actually updated or not.
async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: CommitterWatermark,
) -> anyhow::Result<bool>;
/// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
/// will reference this as the inclusive lower bound of available data for the corresponding
/// pipeline.
///
/// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
/// the watermark entry to the current time. Ideally, this would be from the perspective of the
/// store. If this is not possible, then it should come from some other common source of time
/// between the indexer and its readers. This timestamp is critical to the indexer's operations,
/// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
/// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
/// before beginning to prune.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool>;
/// Update the pruner watermark, returns true if the watermark was actually updated.
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool>;
}
/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
type Connection<'c>: Connection
where
Self: 'c;
/// Delimiter used to separate pipeline names from task identifiers when reading or writing the
/// committer watermark.
const DELIMITER: &'static str = "@";
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
}
When to use BYOS:
- Different database: MongoDB, CouchDB, or other non-PostgreSQL databases. This also applies if you prefer to use PostgreSQL but without the default Diesel ORM.
- Custom requirements: Specialized storage logic, partitioning, or performance optimizations.
Core implementation requirements
To implement BYOS, you need to:
- Define your
StoreandConnectionstruct that manages connections. - Implement the
Storetrait for connection management. - Implement the
Connectiontrait for watermark operations. - Use manual
Indexerinstead ofIndexerCluster.
Step 1: Define your store structure
use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;
#[derive(Clone)]
pub struct MyCustomStore {
// Your database connection details
connection_pool: MyDatabasePool,
config: MyConfig,
}
pub struct MyCustomConnection<'a> {
// A connection instance
conn: MyDatabaseConnection<'a>,
}
Step 2: Implement the Store trait
The Store trait manages the connection lifecycle:
#[async_trait]
impl Store for MyCustomStore {
type Connection<'c> = MyCustomConnection<'c>;
async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
// Your implementation
}
}
Step 3: Implement the Connection trait
The Connection trait handles watermark operations for pipeline coordination:
#[async_trait]
impl Connection for MyCustomConnection<'_> {
// Get the highest checkpoint processed by a pipeline
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>> {
// Query your database for watermark data
todo!("Implement based on your storage system")
}
// Get the lowest available checkpoint for readers
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>> {
// Implementation depends on your database schema
todo!("Implement based on your storage system")
}
// Implement other required methods...
}
For a complete reference, study the sui-pg-db implementation on Connection:
#[async_trait]
impl store::Connection for Connection<'_> {
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>> {
let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
// Do not create a watermark record with checkpoint_hi_inclusive = -1.
return Ok(self
.committer_watermark(pipeline_task)
.await?
.map(|w| w.checkpoint_hi_inclusive));
};
let stored_watermark = StoredWatermark {
pipeline: pipeline_task.to_string(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
reader_lo: default_next_checkpoint as i64,
pruner_timestamp: Utc::now().naive_utc(),
pruner_hi: default_next_checkpoint as i64,
};
use diesel::pg::upsert::excluded;
let checkpoint_hi_inclusive: i64 = diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
// Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
.do_update()
// When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
// `excluded` is a virtual table containing the existing row that there was a conflict with.
.set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
.returning(watermarks::checkpoint_hi_inclusive)
.get_result(self)
.await?;
Ok(Some(checkpoint_hi_inclusive as u64))
}
async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<store::CommitterWatermark>> {
let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
.select((
watermarks::epoch_hi_inclusive,
watermarks::checkpoint_hi_inclusive,
watermarks::tx_hi,
watermarks::timestamp_ms_hi_inclusive,
))
.filter(watermarks::pipeline.eq(pipeline_task))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::CommitterWatermark {
epoch_hi_inclusive: watermark.0 as u64,
checkpoint_hi_inclusive: watermark.1 as u64,
tx_hi: watermark.2 as u64,
timestamp_ms_hi_inclusive: watermark.3 as u64,
}))
} else {
Ok(None)
}
}
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<store::ReaderWatermark>> {
let watermark: Option<(i64, i64)> = watermarks::table
.select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::ReaderWatermark {
checkpoint_hi_inclusive: watermark.0 as u64,
reader_lo: watermark.1 as u64,
}))
} else {
Ok(None)
}
}
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<store::PrunerWatermark>> {
// |---------- + delay ---------------------|
// |--- wait_for ---|
// |-----------------------|----------------|
// ^ ^
// pruner_timestamp NOW()
let wait_for = sql!(as BigInt,
"CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
delay.as_millis() as i64,
);
let watermark: Option<(i64, i64, i64)> = watermarks::table
.select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::PrunerWatermark {
wait_for_ms: watermark.0,
pruner_hi: watermark.1 as u64,
reader_lo: watermark.2 as u64,
}))
} else {
Ok(None)
}
}
async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: store::CommitterWatermark,
) -> anyhow::Result<bool> {
// Create a StoredWatermark directly from CommitterWatermark
let stored_watermark = StoredWatermark {
pipeline: pipeline_task.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
tx_hi: watermark.tx_hi as i64,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
reader_lo: 0,
pruner_timestamp: DateTime::UNIX_EPOCH.naive_utc(),
pruner_hi: 0,
};
use diesel::query_dsl::methods::FilterDsl;
Ok(diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
.do_update()
.set((
watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
watermarks::tx_hi.eq(stored_watermark.tx_hi),
watermarks::timestamp_ms_hi_inclusive
.eq(stored_watermark.timestamp_ms_hi_inclusive),
))
.filter(
watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
)
.execute(self)
.await?
> 0)
}
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set((
watermarks::reader_lo.eq(reader_lo as i64),
watermarks::pruner_timestamp.eq(diesel::dsl::now),
))
.filter(watermarks::pipeline.eq(pipeline))
.filter(watermarks::reader_lo.lt(reader_lo as i64))
.execute(self)
.await?
> 0)
}
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set(watermarks::pruner_hi.eq(pruner_hi as i64))
.filter(watermarks::pipeline.eq(pipeline))
.execute(self)
.await?
> 0)
}
}
Step 4: Use manual indexer
Replace IndexerCluster with manual Indexer:
use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{
ClientArgs, IngestionConfig,
ingestion_client::IngestionClientArgs,
};
async fn main() -> anyhow::Result<()> {
// Initialize your custom store
let store = MyCustomStore::new(config).await?;
// Configure indexer manually
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
ingestion: IngestionClientArgs {
remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
..Default::default()
},
..Default::default()
},
IngestionConfig::default(),
&prometheus::Registry::new(),
tokio_util::sync::CancellationToken::new(),
).await?;
// Add your pipelines
indexer.concurrent_pipeline(
YourHandler::default(),
ConcurrentConfig::default(),
).await?;
// Start the indexer
indexer.run().await?;
Ok(())
}
Example: ClickHouse implementation
For a complete working example of BYOS with ClickHouse (a high-performance columnar database for analytics), see the example project in the Sui repo.
ClickHouse example README
ClickHouse Sui Indexer
A simple example of how to build a custom Sui indexer that writes transaction data to ClickHouse.
Quick Start
1. Start ClickHouse
docker run -d --name clickhouse-dev -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse/clickhouse-server
2. Set up database user
docker exec clickhouse-dev clickhouse-client --query "CREATE USER IF NOT EXISTS dev IDENTIFIED WITH no_password"
docker exec clickhouse-dev clickhouse-client --query "GRANT CREATE, INSERT, SELECT, ALTER, UPDATE, DELETE ON *.* TO dev"
3. Run the indexer
cargo run -- --remote-store-url https://checkpoints.testnet.sui.io --last-checkpoint=10
That’s it! The indexer will:
- Create the necessary tables automatically
- Fetch checkpoints from the Sui testnet
- Write transaction data to ClickHouse
Verify Data
Check that data was written:
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT COUNT(*) FROM transactions"
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT * FROM transactions LIMIT 5"
Clean Up
Stop and remove the ClickHouse container:
docker stop clickhouse-dev && docker rm clickhouse-dev
What This Example Shows
- Custom Store Implementation: How to implement the
Storetrait for ClickHouse - Concurrent Pipeline: Uses the concurrent pipeline for better pruning and watermark testing
- Watermark Management: Tracking indexer progress with committer, reader, and pruner watermarks
- Transaction Processing: Extracting and storing transaction digests from checkpoints
- Simple Setup: Minimal configuration for local development
Architecture
Sui Network → Checkpoints → Concurrent Pipeline → ClickHouse Store → ClickHouse DB
The indexer uses a concurrent pipeline that processes checkpoints out-of-order with separate reader, committer, and pruner components. This is ideal for testing watermark functionality and pruning behavior.
This example demonstrates:
- Custom store implementation using the ClickHouse Rust client.
- Watermark persistence with ClickHouse-specific SQL syntax.
- Transaction digest indexing similar to the built-in PostgreSQL handler.
The example includes 3 main components:
-
store.rs- ClickHouseStore implementingStoreandConnectiontraits.Click to openstore.rsuse anyhow::Result;
use async_trait::async_trait;
use chrono::Utc;
use clickhouse::{Client, Row};
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use sui_indexer_alt_framework::store::{
CommitterWatermark, Connection, PrunerWatermark, ReaderWatermark, Store, TransactionalStore,
};
use url::Url;
#[derive(Clone)]
pub struct ClickHouseStore {
client: Client,
}
pub struct ClickHouseConnection {
pub client: Client,
}
/// Row structure for watermark table operations
#[derive(Row, Serialize, Deserialize, Debug, Default)]
struct WatermarkRow {
pipeline_task: String,
epoch_hi_inclusive: u64,
checkpoint_hi_inclusive: u64,
tx_hi: u64,
timestamp_ms_hi_inclusive: u64,
reader_lo: u64,
pruner_hi: u64,
pruner_timestamp: u64, // Unix timestamp in milliseconds
}
impl ClickHouseStore {
pub fn new(url: Url) -> Self {
let client = Client::default()
.with_url(url.as_str())
.with_user("dev") // Simple user for local development
.with_compression(clickhouse::Compression::Lz4);
Self { client }
}
/// Create tables if they don't exist
pub async fn create_tables_if_not_exists(&self) -> Result<()> {
// Create watermarks table for pipeline state management
self.client
.query(
"
CREATE TABLE IF NOT EXISTS watermarks
(
pipeline String,
epoch_hi_inclusive UInt64,
checkpoint_hi_inclusive UInt64,
tx_hi UInt64,
timestamp_ms_hi_inclusive UInt64,
reader_lo UInt64,
pruner_hi UInt64,
pruner_timestamp UInt64
)
ENGINE = MergeTree()
ORDER BY pipeline
",
)
.execute()
.await?;
// Create transactions table for the actual indexing data
self.client
.query(
"
CREATE TABLE IF NOT EXISTS transactions
(
checkpoint_sequence_number UInt64,
transaction_digest String,
indexed_at DateTime64(3, 'UTC') DEFAULT now()
)
ENGINE = MergeTree()
ORDER BY checkpoint_sequence_number
",
)
.execute()
.await?;
Ok(())
}
}
#[async_trait]
impl Store for ClickHouseStore {
type Connection<'c> = ClickHouseConnection;
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>> {
Ok(ClickHouseConnection {
client: self.client.clone(),
})
}
}
#[async_trait]
impl TransactionalStore for ClickHouseStore {
async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
where
R: Send + 'a,
F: Send + 'a,
F: for<'r> FnOnce(
&'r mut Self::Connection<'_>,
) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
{
let mut conn = self.connect().await?;
f(&mut conn).await
}
}
#[async_trait]
impl Connection for ClickHouseConnection {
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>> {
let existing = self.committer_watermark(pipeline_task).await?;
let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
return Ok(existing.map(|w| w.checkpoint_hi_inclusive));
};
if let Some(existing) = existing {
return Ok(Some(existing.checkpoint_hi_inclusive));
}
let mut inserter = self.client.inserter("watermarks")?;
inserter.write(&WatermarkRow {
pipeline_task: pipeline_task.to_string(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
reader_lo: default_next_checkpoint,
pruner_hi: default_next_checkpoint,
pruner_timestamp: 0,
})?;
inserter.end().await?;
Ok(Some(checkpoint_hi_inclusive))
}
async fn committer_watermark(&mut self, pipeline: &str) -> Result<Option<CommitterWatermark>> {
let mut cursor = self
.client
.query(
"SELECT epoch_hi_inclusive, checkpoint_hi_inclusive, tx_hi, timestamp_ms_hi_inclusive
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1"
)
.bind(pipeline)
.fetch::<(u64, u64, u64, u64)>()?;
let row: Option<(u64, u64, u64, u64)> = cursor.next().await?;
Ok(row.map(
|(epoch_hi, checkpoint_hi, tx_hi, timestamp_hi)| CommitterWatermark {
epoch_hi_inclusive: epoch_hi,
checkpoint_hi_inclusive: checkpoint_hi,
tx_hi,
timestamp_ms_hi_inclusive: timestamp_hi,
},
))
}
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> Result<Option<ReaderWatermark>> {
let mut cursor = self
.client
.query(
"SELECT checkpoint_hi_inclusive, reader_lo
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1",
)
.bind(pipeline)
.fetch::<(u64, u64)>()?;
let row: Option<(u64, u64)> = cursor.next().await?;
Ok(row.map(|(checkpoint_hi, reader_lo)| ReaderWatermark {
checkpoint_hi_inclusive: checkpoint_hi,
reader_lo,
}))
}
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> Result<Option<PrunerWatermark>> {
// Follow PostgreSQL pattern: calculate wait_for_ms on database side
// We do this so that we can rely on the database to keep a consistent sense of time.
// Using own clocks can potentially be subject to some clock skew.
let delay_ms = delay.as_millis() as i64;
let mut cursor = self
.client
.query(
"SELECT reader_lo, pruner_hi,
toInt64(? + (pruner_timestamp - toUnixTimestamp64Milli(now64()))) as wait_for_ms
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1"
)
.bind(delay_ms)
.bind(pipeline)
.fetch::<(u64, u64, i64)>()?;
let row: Option<(u64, u64, i64)> = cursor.next().await?;
Ok(
row.map(|(reader_lo, pruner_hi, wait_for_ms)| PrunerWatermark {
wait_for_ms,
reader_lo,
pruner_hi,
}),
)
}
async fn set_committer_watermark(
&mut self,
pipeline: &str,
watermark: CommitterWatermark,
) -> Result<bool> {
// Follow PostgreSQL pattern: check if row exists, then UPDATE or INSERT accordingly
// First check if pipeline exists and get current checkpoint
let mut cursor = self
.client
.query("SELECT checkpoint_hi_inclusive FROM watermarks WHERE pipeline = ? LIMIT 1")
.bind(pipeline)
.fetch::<u64>()?;
let existing_checkpoint: Option<u64> = cursor.next().await?;
if let Some(existing_checkpoint) = existing_checkpoint {
// Row exists - only update if checkpoint advances
if existing_checkpoint < watermark.checkpoint_hi_inclusive {
self.client
.query(
"ALTER TABLE watermarks
UPDATE
epoch_hi_inclusive = ?,
checkpoint_hi_inclusive = ?,
tx_hi = ?,
timestamp_ms_hi_inclusive = ?
WHERE pipeline = ?",
)
.bind(watermark.epoch_hi_inclusive)
.bind(watermark.checkpoint_hi_inclusive)
.bind(watermark.tx_hi)
.bind(watermark.timestamp_ms_hi_inclusive)
.bind(pipeline)
.execute()
.await?;
}
} else {
// No existing row - insert new one
let mut inserter = self.client.inserter("watermarks")?;
inserter.write(&WatermarkRow {
pipeline_task: pipeline.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
tx_hi: watermark.tx_hi,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
reader_lo: 0, // Will be updated by reader
pruner_hi: 0, // Will be updated by pruner
pruner_timestamp: Utc::now().timestamp_millis() as u64,
})?;
inserter.end().await?;
}
Ok(true)
}
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> Result<bool> {
// Follow PostgreSQL pattern: simple UPDATE with timestamp update and advancement check
self.client
.query(
"ALTER TABLE watermarks
UPDATE reader_lo = ?, pruner_timestamp = toUnixTimestamp64Milli(now64())
WHERE pipeline = ? AND reader_lo < ?",
)
.bind(reader_lo)
.bind(pipeline)
.bind(reader_lo)
.execute()
.await?;
Ok(true)
}
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> Result<bool> {
// Follow PostgreSQL pattern: simple UPDATE statement
self.client
.query(
"ALTER TABLE watermarks
UPDATE pruner_hi = ?
WHERE pipeline = ?",
)
.bind(pruner_hi)
.bind(pipeline)
.execute()
.await?;
Ok(true)
}
} -
handlers.rs-TxDigesthandler processing checkpoint data.Click to openhandlers.rsuse anyhow::Result;
use clickhouse::Row;
use serde::Serialize;
use std::sync::Arc;
use sui_indexer_alt_framework::{
FieldCount,
pipeline::{
Processor,
concurrent::{BatchStatus, Handler},
},
store::Store,
types::full_checkpoint_content::Checkpoint,
};
use crate::store::ClickHouseStore;
/// Structure representing a transaction digest record in ClickHouse
/// Aligned with sui-indexer-alt's StoredTxDigest structure
#[derive(Row, Serialize, Clone, Debug, FieldCount)]
pub struct StoredTxDigest {
pub tx_sequence_number: i64,
pub tx_digest: Vec<u8>,
}
/// Handler that processes checkpoint data and extracts transaction digests
/// Named to align with sui-indexer-alt's TxDigests handler
#[derive(Clone, Default)]
pub struct TxDigests;
#[async_trait::async_trait]
impl Processor for TxDigests {
const NAME: &'static str = "tx_digests";
type Value = StoredTxDigest;
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let Checkpoint {
transactions,
summary,
..
} = checkpoint.as_ref();
let first_tx = summary.network_total_transactions as usize - transactions.len();
Ok(transactions
.iter()
.enumerate()
.map(|(i, tx)| StoredTxDigest {
tx_sequence_number: (first_tx + i) as i64,
tx_digest: tx.transaction.digest().inner().to_vec(),
})
.collect())
}
}
#[async_trait::async_trait]
impl Handler for TxDigests {
type Store = ClickHouseStore;
type Batch = Vec<Self::Value>;
fn batch(
&self,
batch: &mut Self::Batch,
values: &mut std::vec::IntoIter<Self::Value>,
) -> BatchStatus {
batch.extend(values);
BatchStatus::Pending
}
async fn commit<'a>(
&self,
values: &Self::Batch,
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> Result<usize> {
let row_count = values.len();
if row_count == 0 {
return Ok(0);
}
// Use ClickHouse inserter for efficient bulk inserts
let mut inserter = conn.client.inserter("tx_digests")?;
for tx_digest in values {
inserter.write(tx_digest)?;
}
inserter.end().await?;
Ok(row_count)
}
} -
main.rs- Manual indexer setup with ClickHouse backend.Click to openmain.rsmod handlers;
mod store;
use anyhow::{Result, bail};
use clap::Parser;
use sui_indexer_alt_framework::{
Indexer, IndexerArgs,
ingestion::{ClientArgs, IngestionConfig},
pipeline::concurrent::ConcurrentConfig,
service::Error,
};
use url::Url;
use handlers::TxDigests;
use store::ClickHouseStore;
#[derive(clap::Parser, Debug, Clone)]
struct Args {
#[clap(flatten)]
pub indexer_args: IndexerArgs,
#[clap(flatten)]
pub client_args: ClientArgs,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize crypto provider for HTTPS connections (needed for remote checkpoint fetching)
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install crypto provider");
// Parse command-line arguments
let args = Args::parse();
// ClickHouse connection (uses 'dev' user by default for local development)
let clickhouse_url = "http://localhost:8123".parse::<Url>()?;
println!("Connecting to ClickHouse at: {}", clickhouse_url);
// Create our custom ClickHouse store
let store = ClickHouseStore::new(clickhouse_url);
// Ensure the database tables are created before starting the indexer
store.create_tables_if_not_exists().await?;
// Manually build the indexer with our custom ClickHouse store
// This is the key difference from basic-sui-indexer which uses IndexerCluster::builder()
let mut indexer = Indexer::new(
store.clone(),
args.indexer_args,
args.client_args,
IngestionConfig::default(),
None, // No metrics prefix
&Default::default(), // Empty prometheus registry
)
.await?;
// Register our concurrent pipeline handler (better for testing pruning)
// This processes checkpoints with separate reader and pruner components
indexer
.concurrent_pipeline(
TxDigests,
// ConcurrentConfig default comes with no pruning.
ConcurrentConfig::default(),
)
.await?;
println!("Starting ClickHouse Sui indexer...");
// Start the indexer and wait for it to complete
match indexer.run().await?.main().await {
Ok(()) | Err(Error::Terminated) => Ok(()),
Err(Error::Aborted) => {
bail!("Indexer aborted due to an unexpected error")
}
Err(Error::Task(e)) => {
bail!(e)
}
}
}
Deserializing Move events
When Move smart contracts execute on Sui, they can emit events using the sui::event module. These events are stored in checkpoints as BCS-serialized bytes that your indexer needs to deserialize to extract meaningful data.
Why deserialization is needed
Move contracts emit events like the following:
// Move smart contract
use sui::event;
public fun transfer_balance(...) {
event::emit(BalanceEvent {
balance_manager_id: id,
asset: asset_id,
amount: 100,
deposit: true
});
}
In checkpoint data, these events arrive as raw BCS bytes that need to be converted back to Rust structs for processing.
Step-by-step deserialization
-
Add BCS dependency.
[dependencies]
bcs = "0.1.6"
serde = { version = "1.0", features = ["derive"] } -
Define the
Eventstruct in Rust.Define the same structure in Rust as declared in Move. You can do this manually or use
move-bindingto auto-generate it from on-chain packages.use serde::Deserialize;
use sui_indexer_alt_framework::types::::base_types::ObjectID;
#[derive(Deserialize, Debug)]
struct BalanceEvent {
balance_manager_id: ObjectID,
asset: ObjectID,
amount: u64,
deposit: bool,
}importantField order and types must match the Move event exactly.
-
Extract event bytes in your processor.
impl Processor for YourHandler {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
let mut results = Vec::new();
for transaction in &checkpoint.transactions {
for event in &transaction.events {
// Get the raw BCS bytes
let event_bytes = &event.contents;
// Deserialize to your Rust struct
if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
// Do something
}
}
}
Ok(results)
}
}
Related links
The sui-indexer-alt crate in the Sui repo.
The indexer that the Move Registry (MVR) implements.
The indexer that DeepBook implements.
The sui-indexer-alt-framework is a powerful Rust framework for building high-performance, custom blockchain indexers on Sui. It provides customizable, production-ready components for data ingestion, processing, and storage.
The sui-indexer-alt-framework provides two distinct pipeline architectures. Understand the differences between the sequential and concurrent pipelines that the sui-indexer-alt-framework provides to decide which best suits your project needs.