Search
๐Ÿ“ธ

Axon Framework Snapshot(Cache)

Date
2025/04/06
Tag
Spring
Axon
Redis
์ˆจ๊ธฐ๊ธฐ

Axon & EventSourcing

Axon Framework๋Š” DDD, CQRS, EventSourcing์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ ๊ฐ•๋ ฅํ•œ ํ”„๋ ˆ์ž„์›Œํฌ์ด๋‹ค.
์ด๋ฒคํŠธ์†Œ์‹ฑ(EventSourcing)์—์„œ ์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ(Aggregate)๊ฐ€ ๋ช…๋ น(Command)์„ ์ˆ˜ํ–‰ํ•  ๋•Œ, ๊ณผ๊ฑฐ์— ์ด๋ฒคํŠธ(Event)๋“ค์„ ์ˆœ์ฐจ์ ์œผ๋กœ ์žฌ์ƒํ•˜์—ฌ ์ตœ์‹  ์ƒํƒœ๋ฅผ ๋ณต์›ํ•œ๋‹ค. ํ•˜์ง€๋งŒ ์ด๋ฒคํŠธ ์ˆ˜๊ฐ€ ๋งŽ์•„์งˆ์ˆ˜๋ก ์žฌ์ƒ ๊ณผ์ •์ด ๋А๋ ค์ง€๊ณ  ์„ฑ๋Šฅ ์ด์Šˆ๋กœ ์ด์–ด์งˆ ์ˆ˜ ์žˆ๋‹ค.
์ด๋Ÿฌํ•œ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด ์Šค๋ƒ…์ƒท(Snapshot)์ด๋ž€ ๊ฐœ๋…์ด ์กด์žฌํ•œ๋‹ค.

์Šค๋ƒ…์ƒท(Snapshot)

์Šค๋ƒ…์ƒท์€ ํŠน์ • ์‹œ์ ์˜ ์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ์˜ ์ƒํƒœ๋ฅผ ์ €์žฅํ•ด๋‘๊ณ , ์ดํ›„์—๋Š” ํ•ด๋‹น ์Šค๋ƒ…์ƒท๋ถ€ํ„ฐ ์ด๋ฒคํŠธ๋ฅผ ์žฌ์ƒํ•จ์œผ๋กœ์จ ์ „์ฒด ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜์Œ๋ถ€ํ„ฐ ์žฌ์ƒํ•˜๋Š” ๋น„์šฉ์„ ์ค„์—ฌ์ค€๋‹ค.
Axon์—์„œ๋Š” ๋‘ ๊ฐ€์ง€์˜ ์Šค๋ƒ…์ƒท ์ƒ์„ฑ ์ „๋žต(SnapshotTriggerDefinition)์„ ์ œ๊ณตํ•œ๋‹ค.
1.
AggregateLoadTimeSnapshotTriggerDefinition
์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ์˜ ๋กœ๋”ฉ ์‹œ๊ฐ„์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์Šค๋ƒ…์ƒท์„ ์ƒ์„ฑํ•œ๋‹ค
2.
EventCountSnapshotTriggerDefinition
์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ์˜ ์ด๋ฒคํŠธ ์ˆ˜๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์Šค๋ƒ…์ƒท์„ ์ƒ์„ฑํ•œ๋‹ค
์Šค๋ƒ…์ƒท์„ ์‚ฌ์šฉ(JDBC ์ด๋ฒคํŠธ ์ €์žฅ์†Œ๋ฅผ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ)ํ•˜๋”๋ผ๋„ ๋งค ์žฌ์ƒ ์‹œ ์Šค๋ƒ…์ƒท ์ •๋ณด๋ฅผ DB์—์„œ ์ฟผ๋ฆฌํ•ด์˜ค๊ธฐ ๋•Œ๋ฌธ์— DB์— ๋ถ€ํ•˜๋ฅผ ์ค„ ์ˆ˜ ์žˆ๋‹ค. ์ด ๋˜ํ•œ ๊ฐœ์„ ํ•  ์ˆ˜ ์žˆ๋„๋ก axon์—์„œ๋Š” ์Šค๋ƒ…์ƒท ์บ์‹œ ๊ธฐ๋Šฅ๋„ ์ œ๊ณตํ•˜๊ณ  ์žˆ๋‹ค.

๋ ˆํฌ์ง€ํ† ๋ฆฌ

EventCountSnapshotTriggerDefinition ์„ ์‚ฌ์šฉํ•˜๋Š” ๋ถ„์‚ฐ ํ™˜๊ฒฝ ์Šค๋ƒ…์ƒท ์˜ˆ์‹œ ์ฝ”๋“œ
axon-snapshot-demo
eTaphee

์‚ฌ์šฉ ์ผ€์ด์Šค

์˜ˆ์ œ ์ฝ”๋“œ์—์„œ๋Š” 3๋ฒˆ์˜ ์ด๋ฒคํŠธ๋งˆ๋‹ค ์Šค๋ƒ…์ƒท์ด ์ƒ์„ฑ๋˜๋„๋ก ์„ค์ •์„ ํ–ˆ๋‹ค.
@Bean fun snapshotTrigger(snapshotter: Snapshotter): SnapshotTriggerDefinition { return EventCountSnapshotTriggerDefinition(snapshotter, 3) }
Kotlin
๋ณต์‚ฌ

1. ๋‹จ์ผ ์ธ์Šคํ„ด์Šค & ์ธ๋ฉ”๋ชจ๋ฆฌ ์บ์‹œ

๋‹จ์ผ ์ธ์Šคํ„ด์Šค ํ™˜๊ฒฝ์—์„œ ์ธ๋ฉ”๋ชจ๋ฆฌ ์บ์‹œ๋ฅผ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ์—๋Š” ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธธ ์ˆ˜ ์—†๋‹ค. ํ•ญ์ƒ ์ธ๋ฉ”๋ชจ๋ฆฌ์— ์žˆ๋Š” ์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ ๋ฒ„์ „์€ ์ตœ์‹ ์ด๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.
# ์ƒ์„ฑ FooAggregate created with id: 0 # ์ด๋ฒคํŠธ ์ €์žฅ(seq=0) Hibernate: insert into domain_event_entry (aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index) values ======================================================================================== # ์š”์ฒญ ๋ช…๋ น ์ˆ˜ํ–‰ (์Šค๋ƒ…์ƒท ๋ฐ ์ด๋ฒคํŠธ๋ฅผ ์กฐํšŒํ•˜์ง€ ์•Š์Œ) call increase foo value FooAggregate value increased to: 1 # ์ด๋ฒคํŠธ ์ €์žฅ(seq=1) Hibernate: insert into domain_event_entry (aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ======================================================================================== # ์š”์ฒญ ๋ช…๋ น ์ˆ˜ํ–‰ call increase foo value FooAggregate value increased to: 2 # ์ด๋ฒคํŠธ ์ €์žฅ(seq=2) Hibernate: insert into domain_event_entry (aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) # ์ด๋ฒคํŠธ ์นด์šดํ„ฐ๊ฐ€ 3์ด ๋˜์—ˆ์œผ๋‹ˆ, ์ƒˆ๋กœ์šด ์Šค๋ƒ…์ƒท ์ €์žฅ Hibernate: select see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.type, see1_0.event_identifier, see1_0.meta_data, see1_0.payload, see1_0.payload_revision, see1_0.payload_type, see1_0.time_stamp from snapshot_event_entry see1_0 where ( see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.type ) in ((?, ?, ?))
Plain Text
๋ณต์‚ฌ
<im.etap.domain.aggregate.FooAggregate> <id>0</id> <value>2</value> </im.etap.domain.aggregate.FooAggregate>
XML
๋ณต์‚ฌ

2. ๋ฉ€ํ‹ฐ ์ธ์Šคํ„ด์Šค & ์ธ๋ฉ”๋ชจ๋ฆฌ ์บ์‹œ

๋ฉ€ํ‹ฐ ์ธ์Šคํ„ด์Šค ํ™˜๊ฒฝ์—์„œ๋Š” ์ธ๋ชจ๋ฉ”๋ฆฌ ์บ์‹œ๋ฅผ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ ์ธ์Šคํ„ด์Šค ๋งˆ๋‹ค ์บ์‹œ ์ €์žฅ์†Œ๋ฅผ ์ƒ์„ฑํ•˜๊ธฐ ๋•Œ๋ฌธ์—, ์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ ๋ฒ„์ „ ๋ถˆ์ผ์น˜๊ฐ€ ๋ฐœ์ƒํ•˜์—ฌ ์š”์ฒญ์ด ์‹คํŒจํ•˜๊ฒŒ ๋œ๋‹ค.
instance-2 ๊ฐ€ ๋จผ์ € ์‹คํ–‰๋˜์–ด, foo(id=0) ์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ๋ฅผ ๋จผ์ € ์ƒ์„ฑ ํ•œ ์ƒํƒœ์ด๋ฏ€๋กœ, sequence(0)์ธ ์บ์‹œ๊ฐ€ ์ €์ •๋œ ์ƒํƒœ์ด๋‹ค.
์ฒซ ๋ฒˆ์งธ ์š”์ฒญ(instance-1) call increase foo value # api ์š”์ฒญ Hibernate: select see1_0.type, see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.event_identifier, see1_0.time_stamp, see1_0.payload_type, see1_0.payload_revision, see1_0.payload, see1_0.meta_data from snapshot_event_entry see1_0 where see1_0.aggregate_identifier=? order by see1_0.sequence_number desc limit ? Hibernate: select dee1_0.type, dee1_0.aggregate_identifier, dee1_0.sequence_number, dee1_0.event_identifier, dee1_0.time_stamp, dee1_0.payload_type, dee1_0.payload_revision, dee1_0.payload, dee1_0.meta_data from domain_event_entry dee1_0 where dee1_0.aggregate_identifier=? and dee1_0.sequence_number>=? order by dee1_0.sequence_number limit ? # instance-2์—์„œ ๋จผ์ € ์ƒ์„ฑ๋œ ์ด๋ฒคํŠธ๋ฅผ ์žฌ์ƒ(sequence=0) FooAggregate created with id: 0 # ์š”์ฒญ์œผ๋กœ ๋“ค์–ด์˜จ ๋ช…๋ น ์ˆ˜ํ–‰ FooAggregate value increased to: 1 # ์ด๋ฒคํŠธ ์ €์žฅ(sequence=1) Hibernate: insert into domain_event_entry (aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Plain Text
๋ณต์‚ฌ
# ๋‘ ๋ฒˆ์งธ ์š”์ฒญ(instance-2) call increase foo value # ์ด๋ฏธ ๋ฉ”๋ชจ๋ฆฌ์— ์ƒ์„ฑ์— ๋Œ€ํ•œ ์Šค๋ƒ…์ƒท sequence=0์ด ๋‚จ์•„ ์žˆ์œผ๋ฏ€๋กœ, ์ƒ์„ฑ ์ด๋ฒคํŠธ๋Š” ์žฌ์ƒํ•˜์ง€ ์•Š๋Š”๋‹ค. FooAggregate value increased to: 1 # ์ด๋ฒคํŠธ ์ €์žฅ(sequence=1) Hibernate: insert into domain_event_entry (aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) # instance-1์—์„œ sequence=1์— ๋Œ€ํ•œ ์ด๋ฒคํŠธ๊ฐ€ ์ €์žฅ๋˜์–ด ์žˆ์œผ๋ฏ€๋กœ ์ค‘๋ณตํ‚ค ์œ„๋ฐ˜์ด ๋ฐœ์ƒํ•œ๋‹ค. Error: 1062-23000: Duplicate entry '0-1' for key 'UK8s1f994p4la2ipb13me2xqm1w' SQL Error: 1062, SQLState: 23000 (conn=55) Duplicate entry '0-1' for key 'UK8s1f994p4la2ipb13me2xqm1w' Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [0] at sequence [1] was already inserted] with root cause
Plain Text
๋ณต์‚ฌ

3. ๋ฉ€ํ‹ฐ ์ธ์Šคํ„ด์Šค & ๋ถ„์‚ฐ ์บ์‹œ

์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ ๋ช…๋ น ์ˆ˜ํ–‰ ์‹œ ์บ์‹œ ์Šคํ† ์–ด์— ์žˆ๋Š” ์ตœ์‹  ์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ๋ฅผ ๊ฐ€์ ธ์™€์„œ ์ˆ˜ํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ฒ„์ „ ๋ถˆ์ผ์น˜ ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธฐ์ง€ ์•Š๋Š”๋‹ค.
# instance-2 ์‹œ์ž‘ FooAggregate created with id: 0 # FooCreatedEvent Hibernate: insert into domain_event_entry (aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Plain Text
๋ณต์‚ฌ
# instance-1 ์š”์ฒญ call increase foo value # ์บ์‹œ์— ์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ ์ •๋ณด๊ฐ€ ์—†์–ด ์ด๋ฒคํŠธ ์Šคํ† ์–ด์—์„œ ์Šค๋ƒ…์ƒท&์ด๋ฒคํŠธ ์ •๋ณด๋ฅผ ์กฐํšŒ Hibernate: select see1_0.type, see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.event_identifier, see1_0.time_stamp, see1_0.payload_type, see1_0.payload_revision, see1_0.payload, see1_0.meta_data from snapshot_event_entry see1_0 where see1_0.aggregate_identifier=? order by see1_0.sequence_number desc limit ? Hibernate: select dee1_0.type, dee1_0.aggregate_identifier, dee1_0.sequence_number, dee1_0.event_identifier, dee1_0.time_stamp, dee1_0.payload_type, dee1_0.payload_revision, dee1_0.payload, dee1_0.meta_data from domain_event_entry dee1_0 where dee1_0.aggregate_identifier=? and dee1_0.sequence_number>=? order by dee1_0.sequence_number limit ? ==================================================================================== # ์กฐํšŒํ•œ ์ด๋ฒคํŠธ๋ฅผ ์žฌ์ƒ & ๋ช…๋ น ์ˆ˜ํ–‰ FooAggregate created with id: 0 FooAggregate value increased to: 1 Hibernate: insert into domain_event_entry (aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Plain Text
๋ณต์‚ฌ
# instance-2 ์š”์ฒญ call increase foo value # ์บ์‹œ์—์„œ ์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ ์กฐํšŒ ํ›„ ๋ช…๋ น ์ˆ˜ํ–‰ FooAggregate value increased to: 2 Hibernate: insert into domain_event_entry (aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) # ์ด๋ฒคํŠธ ์นด์šดํ„ฐ๊ฐ€ 3, ์ด์ „ ์Šค๋ƒ…์ƒท ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ Hibernate: select see1_0.type, see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.event_identifier, see1_0.time_stamp, see1_0.payload_type, see1_0.payload_revision, see1_0.payload, see1_0.meta_data from snapshot_event_entry see1_0 where see1_0.aggregate_identifier=? order by see1_0.sequence_number desc limit ? Hibernate: select dee1_0.type, dee1_0.aggregate_identifier, dee1_0.sequence_number, dee1_0.event_identifier, dee1_0.time_stamp, dee1_0.payload_type, dee1_0.payload_revision, dee1_0.payload, dee1_0.meta_data from domain_event_entry dee1_0 where dee1_0.aggregate_identifier=? and dee1_0.sequence_number>=? order by dee1_0.sequence_number limit ? # ์ƒˆ๋กœ์šด ์Šค๋ƒ…์ƒท ์ƒ์„ฑ FooAggregate created with id: 0 FooAggregate value increased to: 1 FooAggregate value increased to: 2 Hibernate: select see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.type, see1_0.event_identifier, see1_0.meta_data, see1_0.payload, see1_0.payload_revision, see1_0.payload_type, see1_0.time_stamp from snapshot_event_entry see1_0 where ( see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.type ) in ((?, ?, ?)) Hibernate: insert into snapshot_event_entry (event_identifier, meta_data, payload, payload_revision, payload_type, time_stamp, aggregate_identifier, sequence_number, type) values (?, ?, ?, ?, ?, ?, ?, ?, ?) Hibernate: delete see1_0 from snapshot_event_entry see1_0 where see1_0.aggregate_identifier=? and see1_0.sequence_number<?
Plain Text
๋ณต์‚ฌ

JCacheAdapter

์–ด๊ทธ๋ฆฌ๊ฑฐํŠธ์˜ ์บ์‹œ๋ฅผ redis์— ์ €์žฅํ•˜๊ธฐ ์œ„ํ•ด JCacheAdapter ๋ฅผ ์‚ฌ์šฉํ–ˆ๋‹ค. JCache๋Š” Java ํ‘œ์ค€ ์บ์‹ฑ API๋กœ, javax.cache ํŒจํ‚ค์ง€์— ํฌํ•จ๋˜์–ด ์žˆ์œผ๋ฉฐ, ๊ตฌํ˜„์ฒด๋กœ๋Š” Redisson์„ ์‚ฌ์šฉํ–ˆ๋‹ค.
JCacheAdatper ์‚ฌ์šฉ ์‹œ, RedissonClient๋ฅผ ๋“ฑ๋กํ•ด์•ผ ํ•˜๋ฉฐ, ์ด ๋•Œ ์ฝ”๋ฑ์„ ์ถ”๊ฐ€๋กœ ์ง€์ •ํ•ด์•ผ ํ•œ๋‹ค. Axon์€ ๊ธฐ๋ณธ์ ์œผ๋กœ XML ํฌ๋งท ๊ธฐ๋ฐ˜์˜ XStreamSerializer ๋ฅผ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ, ์ด์— ๋งž๋Š” ์ฝ”๋ฑ์„ ๋ณ„๋„๋กœ ๊ตฌํ˜„ํ•ด์•ผํ•œ๋‹ค.
@Profile("redis") @Bean(name = ["snapshotCache"]) fun redisCache(@Qualifier("axonCacheManager") cacheManager: CacheManager): Cache { val config = MutableConfiguration<String, Any>() config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_DAY)) if (!cacheManager.cacheNames.contains(SNAPSHOT_CACHE_NAME)) { cacheManager.createCache(SNAPSHOT_CACHE_NAME, config) } return JCacheAdapter(cacheManager.getCache(SNAPSHOT_CACHE_NAME)) } @Bean(name = ["axonRedissonClient"]) fun axonCacheRedissonClient(serializer: Serializer, redisProperties: RedisProperties): RedissonClient { return Redisson.create(Config().apply { useSingleServer().apply { address = buildRedisUri(redisProperties) username = redisProperties.username password = redisProperties.password } codec = AxonCacheCodec(serializer) }) } @Bean(name = ["axonCacheManager"]) fun axonCacheManager( @Qualifier("axonRedissonClient") redissonClient: RedissonClient, redisProperties: RedisProperties ): CacheManager { return JCacheManager( redissonClient as Redisson, javaClass.classLoader, Caching.getCachingProvider(), Properties(), URI.create(buildRedisUri(redisProperties)) ) } class AxonCacheCodec( private val serializer: Serializer, ) : BaseCodec() { private val encoder = Encoder { obj -> val buf: ByteBuf = ByteBufAllocator.DEFAULT.buffer() try { ByteBufOutputStream(buf).use { os -> val serialized = serializer.serialize(obj, ByteArray::class.java) val data = serialized.data as ByteArray os.write(data) os.flush() } buf } catch (e: IOException) { buf.release() throw e } catch (e: Exception) { buf.release() throw IOException(e) } } private val decoder = Decoder { buf, _ -> ByteBufInputStream(buf).use { input -> val bytes = ByteArray(buf.readableBytes()) input.read(bytes) val xmlString = String(bytes, StandardCharsets.UTF_8) val simpleSerializedObject = SimpleSerializedObject( xmlString, String::class.java, AggregateCacheEntry::class.qualifiedName, "" ) return@Decoder serializer.deserialize<String, Any>(simpleSerializedObject) } } override fun getValueDecoder(): Decoder<Any> { return decoder } override fun getValueEncoder(): Encoder { return encoder } }
Kotlin
๋ณต์‚ฌ