Axon & EventSourcing
Axon Framework๋ DDD, CQRS, EventSourcing์ ๊ธฐ๋ฐ์ผ๋ก ํ ๊ฐ๋ ฅํ ํ๋ ์์ํฌ์ด๋ค.
์ด๋ฒคํธ์์ฑ(EventSourcing)์์ ์ด๊ทธ๋ฆฌ๊ฑฐํธ(Aggregate)๊ฐ ๋ช
๋ น(Command)์ ์ํํ ๋, ๊ณผ๊ฑฐ์ ์ด๋ฒคํธ(Event)๋ค์ ์์ฐจ์ ์ผ๋ก ์ฌ์ํ์ฌ ์ต์ ์ํ๋ฅผ ๋ณต์ํ๋ค. ํ์ง๋ง ์ด๋ฒคํธ ์๊ฐ ๋ง์์ง์๋ก ์ฌ์ ๊ณผ์ ์ด ๋๋ ค์ง๊ณ ์ฑ๋ฅ ์ด์๋ก ์ด์ด์ง ์ ์๋ค.
์ด๋ฌํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด ์ค๋
์ท(Snapshot)์ด๋ ๊ฐ๋
์ด ์กด์ฌํ๋ค.
์ค๋ ์ท(Snapshot)
์ค๋
์ท์ ํน์ ์์ ์ ์ด๊ทธ๋ฆฌ๊ฑฐํธ์ ์ํ๋ฅผ ์ ์ฅํด๋๊ณ , ์ดํ์๋ ํด๋น ์ค๋
์ท๋ถํฐ ์ด๋ฒคํธ๋ฅผ ์ฌ์ํจ์ผ๋ก์จ ์ ์ฒด ์ด๋ฒคํธ๋ฅผ ์ฒ์๋ถํฐ ์ฌ์ํ๋ ๋น์ฉ์ ์ค์ฌ์ค๋ค.
Axon์์๋ ๋ ๊ฐ์ง์ ์ค๋
์ท ์์ฑ ์ ๋ต(SnapshotTriggerDefinition)์ ์ ๊ณตํ๋ค.
1.
AggregateLoadTimeSnapshotTriggerDefinition
์ด๊ทธ๋ฆฌ๊ฑฐํธ์ ๋ก๋ฉ ์๊ฐ์ ๊ธฐ๋ฐ์ผ๋ก ์ค๋
์ท์ ์์ฑํ๋ค
2.
EventCountSnapshotTriggerDefinition
์ด๊ทธ๋ฆฌ๊ฑฐํธ์ ์ด๋ฒคํธ ์๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ค๋
์ท์ ์์ฑํ๋ค
์ค๋
์ท์ ์ฌ์ฉ(JDBC ์ด๋ฒคํธ ์ ์ฅ์๋ฅผ ์ฌ์ฉํ ๊ฒฝ์ฐ)ํ๋๋ผ๋ ๋งค ์ฌ์ ์ ์ค๋
์ท ์ ๋ณด๋ฅผ DB์์ ์ฟผ๋ฆฌํด์ค๊ธฐ ๋๋ฌธ์ DB์ ๋ถํ๋ฅผ ์ค ์ ์๋ค. ์ด ๋ํ ๊ฐ์ ํ ์ ์๋๋ก axon์์๋ ์ค๋
์ท ์บ์ ๊ธฐ๋ฅ๋ ์ ๊ณตํ๊ณ ์๋ค.
๋ ํฌ์งํ ๋ฆฌ
EventCountSnapshotTriggerDefinition ์ ์ฌ์ฉํ๋ ๋ถ์ฐ ํ๊ฒฝ ์ค๋
์ท ์์ ์ฝ๋
์ฌ์ฉ ์ผ์ด์ค
์์ ์ฝ๋์์๋ 3๋ฒ์ ์ด๋ฒคํธ๋ง๋ค ์ค๋
์ท์ด ์์ฑ๋๋๋ก ์ค์ ์ ํ๋ค.
@Bean
fun snapshotTrigger(snapshotter: Snapshotter): SnapshotTriggerDefinition {
return EventCountSnapshotTriggerDefinition(snapshotter, 3)
}
Kotlin
๋ณต์ฌ
1. ๋จ์ผ ์ธ์คํด์ค & ์ธ๋ฉ๋ชจ๋ฆฌ ์บ์
๋จ์ผ ์ธ์คํด์ค ํ๊ฒฝ์์ ์ธ๋ฉ๋ชจ๋ฆฌ ์บ์๋ฅผ ์ฌ์ฉํ ๊ฒฝ์ฐ์๋ ๋ฌธ์ ๊ฐ ์๊ธธ ์ ์๋ค. ํญ์ ์ธ๋ฉ๋ชจ๋ฆฌ์ ์๋ ์ด๊ทธ๋ฆฌ๊ฑฐํธ ๋ฒ์ ์ ์ต์ ์ด๊ธฐ ๋๋ฌธ์ด๋ค.
# ์์ฑ
FooAggregate created with id: 0
# ์ด๋ฒคํธ ์ ์ฅ(seq=0)
Hibernate:
insert
into
domain_event_entry
(aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index)
values
========================================================================================
# ์์ฒญ ๋ช
๋ น ์ํ (์ค๋
์ท ๋ฐ ์ด๋ฒคํธ๋ฅผ ์กฐํํ์ง ์์)
call increase foo value
FooAggregate value increased to: 1
# ์ด๋ฒคํธ ์ ์ฅ(seq=1)
Hibernate:
insert
into
domain_event_entry
(aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index)
values
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
========================================================================================
# ์์ฒญ ๋ช
๋ น ์ํ
call increase foo value
FooAggregate value increased to: 2
# ์ด๋ฒคํธ ์ ์ฅ(seq=2)
Hibernate:
insert
into
domain_event_entry
(aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index)
values
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
# ์ด๋ฒคํธ ์นด์ดํฐ๊ฐ 3์ด ๋์์ผ๋, ์๋ก์ด ์ค๋
์ท ์ ์ฅ
Hibernate:
select
see1_0.aggregate_identifier,
see1_0.sequence_number,
see1_0.type,
see1_0.event_identifier,
see1_0.meta_data,
see1_0.payload,
see1_0.payload_revision,
see1_0.payload_type,
see1_0.time_stamp
from
snapshot_event_entry see1_0
where
(
see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.type
) in ((?, ?, ?))
Plain Text
๋ณต์ฌ
<im.etap.domain.aggregate.FooAggregate>
<id>0</id>
<value>2</value>
</im.etap.domain.aggregate.FooAggregate>
XML
๋ณต์ฌ
2. ๋ฉํฐ ์ธ์คํด์ค & ์ธ๋ฉ๋ชจ๋ฆฌ ์บ์
๋ฉํฐ ์ธ์คํด์ค ํ๊ฒฝ์์๋ ์ธ๋ชจ๋ฉ๋ฆฌ ์บ์๋ฅผ ์ฌ์ฉํ ๊ฒฝ์ฐ ์ธ์คํด์ค ๋ง๋ค ์บ์ ์ ์ฅ์๋ฅผ ์์ฑํ๊ธฐ ๋๋ฌธ์, ์ด๊ทธ๋ฆฌ๊ฑฐํธ ๋ฒ์ ๋ถ์ผ์น๊ฐ ๋ฐ์ํ์ฌ ์์ฒญ์ด ์คํจํ๊ฒ ๋๋ค.
instance-2 ๊ฐ ๋จผ์ ์คํ๋์ด, foo(id=0) ์ด๊ทธ๋ฆฌ๊ฑฐํธ๋ฅผ ๋จผ์ ์์ฑ ํ ์ํ์ด๋ฏ๋ก, sequence(0)์ธ ์บ์๊ฐ ์ ์ ๋ ์ํ์ด๋ค.
์ฒซ ๋ฒ์งธ ์์ฒญ(instance-1)
call increase foo value # api ์์ฒญ
Hibernate:
select
see1_0.type,
see1_0.aggregate_identifier,
see1_0.sequence_number,
see1_0.event_identifier,
see1_0.time_stamp,
see1_0.payload_type,
see1_0.payload_revision,
see1_0.payload,
see1_0.meta_data
from
snapshot_event_entry see1_0
where
see1_0.aggregate_identifier=?
order by
see1_0.sequence_number desc
limit
?
Hibernate:
select
dee1_0.type,
dee1_0.aggregate_identifier,
dee1_0.sequence_number,
dee1_0.event_identifier,
dee1_0.time_stamp,
dee1_0.payload_type,
dee1_0.payload_revision,
dee1_0.payload,
dee1_0.meta_data
from
domain_event_entry dee1_0
where
dee1_0.aggregate_identifier=?
and dee1_0.sequence_number>=?
order by
dee1_0.sequence_number
limit
?
# instance-2์์ ๋จผ์ ์์ฑ๋ ์ด๋ฒคํธ๋ฅผ ์ฌ์(sequence=0)
FooAggregate created with id: 0
# ์์ฒญ์ผ๋ก ๋ค์ด์จ ๋ช
๋ น ์ํ
FooAggregate value increased to: 1
# ์ด๋ฒคํธ ์ ์ฅ(sequence=1)
Hibernate:
insert
into
domain_event_entry
(aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index)
values
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Plain Text
๋ณต์ฌ
# ๋ ๋ฒ์งธ ์์ฒญ(instance-2)
call increase foo value
# ์ด๋ฏธ ๋ฉ๋ชจ๋ฆฌ์ ์์ฑ์ ๋ํ ์ค๋
์ท sequence=0์ด ๋จ์ ์์ผ๋ฏ๋ก, ์์ฑ ์ด๋ฒคํธ๋ ์ฌ์ํ์ง ์๋๋ค.
FooAggregate value increased to: 1
# ์ด๋ฒคํธ ์ ์ฅ(sequence=1)
Hibernate:
insert
into
domain_event_entry
(aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index)
values
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
# instance-1์์ sequence=1์ ๋ํ ์ด๋ฒคํธ๊ฐ ์ ์ฅ๋์ด ์์ผ๋ฏ๋ก ์ค๋ณตํค ์๋ฐ์ด ๋ฐ์ํ๋ค.
Error: 1062-23000: Duplicate entry '0-1' for key 'UK8s1f994p4la2ipb13me2xqm1w'
SQL Error: 1062, SQLState: 23000
(conn=55) Duplicate entry '0-1' for key 'UK8s1f994p4la2ipb13me2xqm1w'
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [0] at sequence [1] was already inserted] with root cause
Plain Text
๋ณต์ฌ
3. ๋ฉํฐ ์ธ์คํด์ค & ๋ถ์ฐ ์บ์
์ด๊ทธ๋ฆฌ๊ฑฐํธ ๋ช
๋ น ์ํ ์ ์บ์ ์คํ ์ด์ ์๋ ์ต์ ์ด๊ทธ๋ฆฌ๊ฑฐํธ๋ฅผ ๊ฐ์ ธ์์ ์ํํ๊ธฐ ๋๋ฌธ์ ๋ฒ์ ๋ถ์ผ์น ๋ฌธ์ ๊ฐ ์๊ธฐ์ง ์๋๋ค.
# instance-2 ์์
FooAggregate created with id: 0
# FooCreatedEvent
Hibernate:
insert
into
domain_event_entry
(aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index)
values
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Plain Text
๋ณต์ฌ
# instance-1 ์์ฒญ
call increase foo value
# ์บ์์ ์ด๊ทธ๋ฆฌ๊ฑฐํธ ์ ๋ณด๊ฐ ์์ด ์ด๋ฒคํธ ์คํ ์ด์์ ์ค๋
์ท&์ด๋ฒคํธ ์ ๋ณด๋ฅผ ์กฐํ
Hibernate:
select
see1_0.type,
see1_0.aggregate_identifier,
see1_0.sequence_number,
see1_0.event_identifier,
see1_0.time_stamp,
see1_0.payload_type,
see1_0.payload_revision,
see1_0.payload,
see1_0.meta_data
from
snapshot_event_entry see1_0
where
see1_0.aggregate_identifier=?
order by
see1_0.sequence_number desc
limit
?
Hibernate:
select
dee1_0.type,
dee1_0.aggregate_identifier,
dee1_0.sequence_number,
dee1_0.event_identifier,
dee1_0.time_stamp,
dee1_0.payload_type,
dee1_0.payload_revision,
dee1_0.payload,
dee1_0.meta_data
from
domain_event_entry dee1_0
where
dee1_0.aggregate_identifier=?
and dee1_0.sequence_number>=?
order by
dee1_0.sequence_number
limit
?
====================================================================================
# ์กฐํํ ์ด๋ฒคํธ๋ฅผ ์ฌ์ & ๋ช
๋ น ์ํ
FooAggregate created with id: 0
FooAggregate value increased to: 1
Hibernate:
insert
into
domain_event_entry
(aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index)
values
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Plain Text
๋ณต์ฌ
# instance-2 ์์ฒญ
call increase foo value
# ์บ์์์ ์ด๊ทธ๋ฆฌ๊ฑฐํธ ์กฐํ ํ ๋ช
๋ น ์ํ
FooAggregate value increased to: 2
Hibernate:
insert
into
domain_event_entry
(aggregate_identifier, event_identifier, meta_data, payload, payload_revision, payload_type, sequence_number, time_stamp, type, global_index)
values
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
# ์ด๋ฒคํธ ์นด์ดํฐ๊ฐ 3, ์ด์ ์ค๋
์ท ์กด์ฌ ์ฌ๋ถ ํ์ธ
Hibernate:
select
see1_0.type,
see1_0.aggregate_identifier,
see1_0.sequence_number,
see1_0.event_identifier,
see1_0.time_stamp,
see1_0.payload_type,
see1_0.payload_revision,
see1_0.payload,
see1_0.meta_data
from
snapshot_event_entry see1_0
where
see1_0.aggregate_identifier=?
order by
see1_0.sequence_number desc
limit
?
Hibernate:
select
dee1_0.type,
dee1_0.aggregate_identifier,
dee1_0.sequence_number,
dee1_0.event_identifier,
dee1_0.time_stamp,
dee1_0.payload_type,
dee1_0.payload_revision,
dee1_0.payload,
dee1_0.meta_data
from
domain_event_entry dee1_0
where
dee1_0.aggregate_identifier=?
and dee1_0.sequence_number>=?
order by
dee1_0.sequence_number
limit
?
# ์๋ก์ด ์ค๋
์ท ์์ฑ
FooAggregate created with id: 0
FooAggregate value increased to: 1
FooAggregate value increased to: 2
Hibernate:
select
see1_0.aggregate_identifier,
see1_0.sequence_number,
see1_0.type,
see1_0.event_identifier,
see1_0.meta_data,
see1_0.payload,
see1_0.payload_revision,
see1_0.payload_type,
see1_0.time_stamp
from
snapshot_event_entry see1_0
where
(
see1_0.aggregate_identifier, see1_0.sequence_number, see1_0.type
) in ((?, ?, ?))
Hibernate:
insert
into
snapshot_event_entry
(event_identifier, meta_data, payload, payload_revision, payload_type, time_stamp, aggregate_identifier, sequence_number, type)
values
(?, ?, ?, ?, ?, ?, ?, ?, ?)
Hibernate:
delete see1_0
from
snapshot_event_entry see1_0
where
see1_0.aggregate_identifier=?
and see1_0.sequence_number<?
Plain Text
๋ณต์ฌ
JCacheAdapter
์ด๊ทธ๋ฆฌ๊ฑฐํธ์ ์บ์๋ฅผ redis์ ์ ์ฅํ๊ธฐ ์ํด JCacheAdapter ๋ฅผ ์ฌ์ฉํ๋ค. JCache๋ Java ํ์ค ์บ์ฑ API๋ก, javax.cache ํจํค์ง์ ํฌํจ๋์ด ์์ผ๋ฉฐ, ๊ตฌํ์ฒด๋ก๋ Redisson์ ์ฌ์ฉํ๋ค.
JCacheAdatper ์ฌ์ฉ ์, RedissonClient๋ฅผ ๋ฑ๋กํด์ผ ํ๋ฉฐ, ์ด ๋ ์ฝ๋ฑ์ ์ถ๊ฐ๋ก ์ง์ ํด์ผ ํ๋ค. Axon์ ๊ธฐ๋ณธ์ ์ผ๋ก XML ํฌ๋งท ๊ธฐ๋ฐ์ XStreamSerializer ๋ฅผ ์ฌ์ฉํ๋ฏ๋ก, ์ด์ ๋ง๋ ์ฝ๋ฑ์ ๋ณ๋๋ก ๊ตฌํํด์ผํ๋ค.
@Profile("redis")
@Bean(name = ["snapshotCache"])
fun redisCache(@Qualifier("axonCacheManager") cacheManager: CacheManager): Cache {
val config = MutableConfiguration<String, Any>()
config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_DAY))
if (!cacheManager.cacheNames.contains(SNAPSHOT_CACHE_NAME)) {
cacheManager.createCache(SNAPSHOT_CACHE_NAME, config)
}
return JCacheAdapter(cacheManager.getCache(SNAPSHOT_CACHE_NAME))
}
@Bean(name = ["axonRedissonClient"])
fun axonCacheRedissonClient(serializer: Serializer, redisProperties: RedisProperties): RedissonClient {
return Redisson.create(Config().apply {
useSingleServer().apply {
address = buildRedisUri(redisProperties)
username = redisProperties.username
password = redisProperties.password
}
codec = AxonCacheCodec(serializer)
})
}
@Bean(name = ["axonCacheManager"])
fun axonCacheManager(
@Qualifier("axonRedissonClient") redissonClient: RedissonClient,
redisProperties: RedisProperties
): CacheManager {
return JCacheManager(
redissonClient as Redisson,
javaClass.classLoader,
Caching.getCachingProvider(),
Properties(),
URI.create(buildRedisUri(redisProperties))
)
}
class AxonCacheCodec(
private val serializer: Serializer,
) : BaseCodec() {
private val encoder = Encoder { obj ->
val buf: ByteBuf = ByteBufAllocator.DEFAULT.buffer()
try {
ByteBufOutputStream(buf).use { os ->
val serialized = serializer.serialize(obj, ByteArray::class.java)
val data = serialized.data as ByteArray
os.write(data)
os.flush()
}
buf
} catch (e: IOException) {
buf.release()
throw e
} catch (e: Exception) {
buf.release()
throw IOException(e)
}
}
private val decoder = Decoder { buf, _ ->
ByteBufInputStream(buf).use { input ->
val bytes = ByteArray(buf.readableBytes())
input.read(bytes)
val xmlString = String(bytes, StandardCharsets.UTF_8)
val simpleSerializedObject = SimpleSerializedObject(
xmlString,
String::class.java,
AggregateCacheEntry::class.qualifiedName,
""
)
return@Decoder serializer.deserialize<String, Any>(simpleSerializedObject)
}
}
override fun getValueDecoder(): Decoder<Any> {
return decoder
}
override fun getValueEncoder(): Encoder {
return encoder
}
}
Kotlin
๋ณต์ฌ