Event streaming and event sourcing are two related but different concepts in event-driven architecture.
Event streaming is the process of continuously capturing and recording events that occur in the system. These events can be processed and analyzed immediately or stored for later analysis. Event streaming is typically used in systems that need to process large amounts of real-time data, such as financial transactions or social media platforms.
The following is a simple example of event streaming in Go using the popular Kafka messaging system:
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { // 设置Kafka生产者以将事件发送到主题 writer := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: "my-topic", }) // 发送一些事件到主题 writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte("key1"), Value: []byte("value1"), }, kafka.Message{ Key: []byte("key2"), Value: []byte("value2"), }, ) // 设置Kafka消费者以从主题读取事件 reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "my-topic", }) // 从主题读取事件 for { msg, err := reader.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("Received message: key=%s, value=%sn", string(msg.Key), string(msg.Value)) } }
Event sourcing is a pattern for building systems that stores all changes in application state as a sequence of events. These events can then be used to reconstruct the state of the application at any point in time. Event sourcing is often used in systems that require auditability, traceability, or compliance, such as financial systems or healthcare systems.
The following is a simple example of using in-memory event storage for event sourcing in Go:
package main import ( "fmt" ) type Event struct { Type string Data interface{} } type EventStore struct { events []Event } func (store *EventStore) Append(event Event) { store.events = append(store.events, event) } func (store *EventStore) GetEvents() []Event { return store.events } type Account struct { idstring balance int store *EventStore } func NewAccount(id string, store *EventStore) *Account { return &Account{ id:id, balance: 0, store: store, } } func (account *Account) Deposit(amount int) { event := Event{ Type: "deposit", Data: amount, } account.store.Append(event) account.balance += amount } func (account *Account) Withdraw(amount int) { if account.balance >= amount { event := Event{ Type: "withdraw", Data: amount, } account.store.Append(event) account.balance -= amount } } func (account *Account) GetBalance() int { return account.balance } func main() { store := &EventStore{} account := NewAccount("123", store) account.Deposit(100) account.Withdraw(50) account.Deposit(25) events := store.GetEvents() for _, event := range events { switch event.Type { case "deposit": amount := event.Data.(int) fmt.Printf("Deposited %dn", amount) case "withdraw": amount := event.Data.(int) fmt.Printf("Withdrew %dn", amount) } } fmt.Printf("Final balance: %dn", account.GetBalance()) }
Event sourcing is a method by recording each modification to an aggregate as an event and appending it to a continuous stream. To reconstruct the final state of the aggregate, these events need to be read in sequence and then applied to the aggregate. This is in contrast to the on-the-fly modifications performed in a create, read, update, and delete (CRUD) system. In a CRUD system, any changes to a record's state are stored in the database, essentially overwriting the same
A previous version of an aggregate.After price changes have been saved to the Products table, only the price will be updated, while the other parts will remain unchanged. However, this approach can result in losing previous prices and the context behind changes, as shown in Figure 5.1.
In order to retain information including the new price and key metadata (such as the reason for the adjustment), the change record will be stored as an event in the Events table. The previous price will remain unchanged to ensure that it can be retrieved if necessary.
In order to achieve effective event sourcing, it is recommended to use an event store that provides strong consistency guarantees and uses optimistic concurrency control. In practice, this means that when multiple modifications occur simultaneously, only the initial modification can be appended to the stream. Subsequent modifications may need to be retried or may fail.
The above is the detailed content of Event flow and event sourcing. For more information, please follow other related articles on the PHP Chinese website!