Custom Indexer and Walrus
This topic examines how to use a custom indexer with Walrus. For a more in-depth look at creating a custom indexer, see Build Your First Custom Indexer.
Walrus is a decentralized storage and data availability protocol designed specifically for large binary files, or blobs. It is a content-addressable storage protocol, meaning data is identified and retrieved using a unique identifier called a blob. Blobs are derived from the content itself rather than from a file path or location. Consequently, if different users upload the same content, Walrus reuses the existing blob rather than creating a new one.
For uniqueness, each blob uploaded to Walrus also creates a corresponding Blob
NFT object on Sui with a unique ID. Furthermore, the associated Blob
object can optionally have a Metadata
dynamic field.
Metadata
dynamic fields are a key-value extension that allows an on-chain object’s data to be augmented at runtime. If set, this dynamic field acts as a mapping of key-value attribute pairs.
You can use the custom indexer framework to extend the existing functionality of Walrus.
The Walrus Foundation operates and controls Walrus. For the most accurate and up-to-date information on the Walrus protocol, consult the official Walrus Docs.
Blog platform using Walrus
The system derives the ID of a dynamic field from its type and parent object's ID. Each Metadata
dynamic field ID is also unique. You can leverage these unique characteristics and the Metadata
attribute pairs to build a blog platform that enables users to:
- Upload blog posts with titles.
- View their own posts and metrics.
- Delete posts they created.
- Edit post titles.
- Browse posts by other publishers.
Assume a blog platform service already exists to handle uploads to Walrus. When the service creates a blob and its associated NFT object on Walrus, it also attaches a Metadata
dynamic field containing key-value pairs for publisher
(the Sui Address that uploaded the blob), view_count
, and title
. The service prevents users from modifying the publisher
and view_count
pairs, but allows the publisher
to update the title
value.
When a user views a post, the service retrieves the relevant blog post Metadata
from the indexed database. It then uses the Owner
field to fetch the blob from the full node. The liveness of the Blob
object on Sui is used to represent whether a blog post is available. If the Blob
object is wrapped or deleted, the blog post is not accessible through the service, even if the underlying content on Walrus still exists.
Data modeling
One option for data modeling is to use a single table that maps publisher addresses to Metadata
dynamic fields. With this approach, the table is keyed on dynamic_field_id
because it both identifies your dApp data and uniquely represents the content of each uploaded blob.
For example, the up.sql
file to create this table might looks like the following:
-- This table maps a blob to its associated Sui Blob object and the latest dynamic field metadata
-- for traceability. The `view_count` is indexed to serve reads on the app.
CREATE TABLE IF NOT EXISTS blog_post (
-- ID of the Metadata dynamic field.
dynamic_field_id BYTEA NOT NULL,
-- Current version of the Metadata dynamic field.
df_version BIGINT NOT NULL,
-- Address that published the Walrus Blob.
publisher BYTEA NOT NULL,
-- ID of the Blob object on Sui, used during reads to fetch the actual blob content. If this
-- object has been wrapped or deleted, it will not be present on the live object set, which
-- means the corresponding content on Walrus is also not accessible.
blob_obj_id BYTEA NOT NULL,
view_count BIGINT NOT NULL,
title TEXT NOT NULL,
PRIMARY KEY (dynamic_field_id)
);
-- Index to support ordering and filtering by title
CREATE INDEX IF NOT EXISTS blog_post_by_title ON blog_post
(publisher, title);
Reads
To load blog posts from a particular publisher, pass the publisher
and LIMIT
values to the following query pattern:
SELECT *
FROM blog_post
WHERE publisher = $1
ORDER BY title
LIMIT $2;
Custom indexer implementation
This example uses a sequential pipeline, ensuring each checkpoint is committed once in strict order and as a single atomic operation. The sequential pipeline architecture is not required for this project, but it is a more straightforward option than implementing the concurrent architecture. You can always scale up to the concurrent pipeline if and when your project requires it.
This implementation tracks the latest object state at checkpoint boundary. When the Metadata
dynamic field is created, mutated, wrapped or deleted, or unwrapped, it appears among the transaction output under the object changes. You can see an example transaction on Testnet that creates the field. These dynamic fields have type 0x2::dynamic_field::Field<vector<u8>, 0xabc...123::metadata::Metadata>
.
Object change to Metadata dynamic field | Included in input objects | Included in live output objects | How to index |
---|---|---|---|
Creation (or unwrap) | ❌ | ✅ | Insert row |
Mutation | ✅ | ✅ | Update row |
Deletion (or wrap) | ✅ | ❌ | Delete row |
Processor
All pipelines implement the same Processor
trait, which defines the logic to transform a checkpoint from the ingestion task into an intermediate or final form to commit to the store. Data flows into and out of the processor, potentially out of order.
process
function
The process
function computes the checkpoint_input_objects
and latest_live_output_objects
sets to capture the state of objects entering and exiting a checkpoint. A Metadata
dynamic field that appears in checkpoint_input_objects
but not in latest_live_output_objects
means it has been either wrapped or deleted. In those cases, you need to record only the dynamic field ID for the commit function to handle later deletion. For creation, mutation, and unwrap operations, the objects always appear in at least the latest_live_output_objects
set.
/// This pipeline operates on a checkpoint granularity to produce a set of values reflecting the
/// state of relevant Metadata dynamic fields at checkpoint boundary.
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let checkpoint_input_objects = checkpoint_input_objects(checkpoint)?;
let latest_live_output_objects = checkpoint_output_objects(checkpoint)?;
// Collect values to be passed to committer. This map is keyed on the dynamic field id.
let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
// Process relevant Metadata dynamic fields that were wrapped or deleted in this checkpoint.
for (object_id, object) in &checkpoint_input_objects {
// If an object appears in both maps, it is still alive at the end of the checkpoint.
if latest_live_output_objects.contains_key(object_id) {
continue;
}
// Check the checkpoint input state of the Metadata dynamic field to see if it's
// relevant to our indexing.
let Some((_, _)) = extract_content_from_metadata(&self.metadata_type, object)? else {
continue;
};
// Since the table is keyed on the dynamic field id, this is all the information we need
// to delete the correct entry in the commit fn.
values.insert(*object_id, ProcessedWalrusMetadata::Delete(*object_id));
}
for (object_id, object) in &latest_live_output_objects {
let Some((blog_post_metadata, blob_obj_id)) =
extract_content_from_metadata(&self.metadata_type, object)?
else {
continue;
};
values.insert(
*object_id,
ProcessedWalrusMetadata::Upsert {
df_version: object.version().into(),
dynamic_field_id: *object_id,
blog_post_metadata,
blob_obj_id,
},
);
}
Ok(values.into_values().collect())
}
Committer
The second and final part of the sequential pipeline is the Committer
. Because data flows from the processor into the committer out of order, it is the committer's responsibility to batch and write the transformed data to the store in order on checkpoint boundaries.
batch
The batch
function defines how to batch transformed data from other processed checkpoints. This function maintains a mapping of dynamic_field_id
to the processed Walrus Metadata
. The batch
function guarantees that the next checkpoint to batch is the next contiguous checkpoint, which means it's safe for you to overwrite the existing entry.
fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
for value in values {
match value {
ProcessedWalrusMetadata::Upsert {
dynamic_field_id, ..
} => {
batch.insert(dynamic_field_id, value);
}
ProcessedWalrusMetadata::Delete(dynamic_field_id) => {
batch.insert(dynamic_field_id, value);
}
}
}
}
async fn commit<'a>(batch: &Self::Batch, conn: &mut postgres::Connection<'a>) -> Result<usize> {
// Partition the batch into items to delete and items to upsert
let (to_delete, to_upsert): (Vec<_>, Vec<_>) = batch
.values()
.partition(|item| matches!(item, ProcessedWalrusMetadata::Delete(_)));
let to_upsert: Vec<StoredBlogPost> = to_upsert
.into_iter()
.map(|item| item.to_stored())
.collect::<Result<Vec<_>>>()?;
let to_delete: Vec<ObjectID> = to_delete
.into_iter()
.map(|item| Ok(item.dynamic_field_id()))
.collect::<Result<Vec<_>>>()?;
let mut total_affected = 0;
if !to_delete.is_empty() {
let deleted_count = diesel::delete(blog_post::table)
.filter(blog_post::dynamic_field_id.eq_any(to_delete.iter().map(|id| id.to_vec())))
.execute(conn)
.await?;
total_affected += deleted_count;
}
if !to_upsert.is_empty() {
let upserted_count = diesel::insert_into(blog_post::table)
.values(&to_upsert)
.on_conflict(blog_post::dynamic_field_id)
.do_update()
.set((
blog_post::df_version.eq(excluded(blog_post::df_version)),
blog_post::title.eq(excluded(blog_post::title)),
blog_post::view_count.eq(excluded(blog_post::view_count)),
blog_post::blob_obj_id.eq(excluded(blog_post::blob_obj_id)),
))
.filter(blog_post::df_version.lt(excluded(blog_post::df_version)))
.execute(conn)
.await?;
total_affected += upserted_count;
}
Ok(total_affected)
}
commit
The commit
function conducts final transformations to the processed data before writing to the store. In this case, the logic partitions the processed data into to_delete
and to_upsert
.
async fn commit<'a>(batch: &Self::Batch, conn: &mut postgres::Connection<'a>) -> Result<usize> {
// Partition the batch into items to delete and items to upsert
let (to_delete, to_upsert): (Vec<_>, Vec<_>) = batch
.values()
.partition(|item| matches!(item, ProcessedWalrusMetadata::Delete(_)));
let to_upsert: Vec<StoredBlogPost> = to_upsert
.into_iter()
.map(|item| item.to_stored())
.collect::<Result<Vec<_>>>()?;
let to_delete: Vec<ObjectID> = to_delete
.into_iter()
.map(|item| Ok(item.dynamic_field_id()))
.collect::<Result<Vec<_>>>()?;
let mut total_affected = 0;
if !to_delete.is_empty() {
let deleted_count = diesel::delete(blog_post::table)
.filter(blog_post::dynamic_field_id.eq_any(to_delete.iter().map(|id| id.to_vec())))
.execute(conn)
.await?;
total_affected += deleted_count;
}
if !to_upsert.is_empty() {
let upserted_count = diesel::insert_into(blog_post::table)
.values(&to_upsert)
.on_conflict(blog_post::dynamic_field_id)
.do_update()
.set((
blog_post::df_version.eq(excluded(blog_post::df_version)),
blog_post::title.eq(excluded(blog_post::title)),
blog_post::view_count.eq(excluded(blog_post::view_count)),
blog_post::blob_obj_id.eq(excluded(blog_post::blob_obj_id)),
))
.filter(blog_post::df_version.lt(excluded(blog_post::df_version)))
.execute(conn)
.await?;
total_affected += upserted_count;
}
Ok(total_affected)
}
Putting it all together
The main
function for the service
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
// The `IndexerClusterBuilder` offers a convenient way to quickly set up an `IndexerCluster`,
// which consists of the base indexer, metrics service, and a cancellation token.
let mut indexer = IndexerClusterBuilder::new()
.with_database_url(args.database_url)
.with_args(args.cluster_args)
.with_migrations(&MIGRATIONS)
.build()
.await?;
let blog_post_pipeline = BlogPostPipeline::new(METADATA_DYNAMIC_FIELD_TYPE).unwrap();
// Other pipelines can be easily added with `.sequential_pipeline()` or
// `.concurrent_pipeline()`.
indexer
.sequential_pipeline(blog_post_pipeline, SequentialConfig::default())
.await?;
let _ = indexer.run().await?.await;
Ok(())
}
To provide users with a list of posts written by a publisher, your service first queries the
database on publisher
, yielding a result like the following. The service then uses the blob_obj_id
to fetch the
Blob
NFT contents. From there, you can retrieve the actual Walrus content.
dynamic_field_id | df_version | publisher | blob_obj_id | view_count | title
--------------------------------------------------------------------+------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+------------------
\x40b5ae12e780ae815d7b0956281291253c02f227657fe2b7a8ccf003a5f597f7 | 608253371 | \xfe9c7a465f63388e5b95c8fd2db857fad4356fc873f96900f4d8b6e7fc1e760e | \xcfb3d474c9a510fde93262d4b7de66cad62a2005a54f31a63e96f3033f465ed3 | 10 | Blog Post Module
Additional considerations
All Walrus blobs carry an associated lifetime, so you must track expiration changes. Whenever the Metadata
dynamic field changes, the parent Sui Blob
object should also appear in the output changes. You can read the blob’s lifetime directly from the Blob
object contents. However, lifetime changes usually occur on the Blob
object itself. Because updates to the parent object don’t affect the child dynamic field, unless you directly modify the child, these lifetime changes remain hidden in the current indexing setup. You can address this in several ways:
- Watch all
Blob
object changes. - Watch all
BlobCertified
events. - Construct PTBs that make calls to manage blob lifetime and ping the
Metadata
dynamic field in the same transaction.
If you don't want to perform additional work on the write side, then you are limited to the first two options. This requires two pipelines, one to do the work in the previous section of indexing metadata, and another to index BlobCertified
events (or Blob
object changes.)
Related links
- Walrus Docs: Walrus is a decentralized storage and data availability protocol designed specifically for large binary files, or blobs.
- Build Your First Indexer: Establishing a custom indexer helps improve latency, allows pruning the data of your Sui full node, and provides efficient assemblage of checkpoint data.
- Dynamic Fields: Dynamic fields and dynamic object fields on Sui are added and removed dynamically, affect gas only when accessed, and store heterogeneous values.
- Indexer with Walrus example files: Directory in the Sui repo containing the files for this example.