What is a Snapshot and when is it useful?
Snapshots are performance optimization and can often be ignored altogether except in the systems where latency is mission critical. Snapshots are a materialization of the stream at a certain revision. The snapshot can then be consumed by an aggregate to bring it back to a known state before applying all events which have occurred since the snapshot. Snapshots, if required, should be handled either by an outside process or, on a minimum, a different thread to avoid blocking main message processingNEventStore wiki
This is a training project, so I want to see the very basics for implementing some snapshooting policies. That’s what I’ve done in 5 moves:
- Implement IMemento interface for my aggregates
public class ToDoListMemento : IMemento { public Guid Id { get; set; } public int Version { get; set; } public string Title { get; private set; } public string Description { get; private set; } public ToDoListMemento(Guid id, int version, string title, string description) { Id = id; Version = version; Title = title; Description = description; } }
- Create a factory method of these memento objects in my aggregates
public IMemento CreateMemento() { return new ToDoListMemento(Id, Version, Title, Description); }
- Create a service that has the snapshooting policies. The snapshots are added to the NEventStore through its method. The following are very naïve code, but it’s ok for this exercise
public abstract class SnapshotCreator<T> where T : AggregateBase { private readonly IRepository _repo; private readonly IStoreEvents _store; public SnapshotCreator(IRepository repo, IStoreEvents store) { Contract.Requires<ArgumentNullException>(repo != null, "repo"); Contract.Requires<ArgumentNullException>(store != null, "repo"); _repo = repo; _store = store; } /// <summary> /// Save new Aggregate Snapshot depending on specific Snapshoting policies. /// NOTE: In real context, it should be an external thread save snapshots, without interfere with online process /// </summary> /// <param name="command"></param> public void SaveSnapShot(Command command) { T list = _repo.GetById<T>(command.Id); // Create a Snapshot every 1000 version of the Aggregate // NOTE: very nasty logic/implementation, but just for training purposes if (list.Version % 1000 == 0) _store.Advanced.AddSnapshot(new Snapshot(list.Id.ToString(), list.Version, ((IMementoCreator)list).CreateMemento())); } }
- Add the snapshooting policy to the process (after events commit is done). That’s no code for production, but it helps to understand how a real snapshooting policy could be realized
#if DEBUG foreach (var handler in handlers) { // If commandHandler is a SnapshotCreator it handles the snapshot persistence. // NOTE: Very unrealistic Snapshooting policy/implementation, but it's just for training purposes if (handler.GetType().IsSubclassOf(typeof(SnapshotCreator<>))) { MethodInfo method = handler.GetType().GetMethod("SaveSnapShot"); method.Invoke(handler, new object[] { message }); } } #endif
- Finally, modify the AggregateFactory to create a new Aggregate instance from last snapshot retrieved by NEventStore
public class AggregateFactory : IConstructAggregates { public IAggregate Build(Type type, Guid id, IMemento snapshot) { Type typeParam = snapshot != null ? snapshot.GetType() : typeof(Guid); object[] paramArray; if (snapshot != null) paramArray = new object[] { snapshot }; else paramArray = new object[] { id }; ConstructorInfo constructor = type.GetConstructor( BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeParam }, null); if (constructor == null) { throw new InvalidOperationException( string.Format("Aggregate {0} cannot be created: constructor with proper parameter not provided", type.Name)); } return constructor.Invoke(paramArray) as IAggregate; } }
All the code is published on this commit.
Comments