package rsmap import ( "bytes" "context" "net" "net/http" "os" "sync" "time" connect_go "github.com/bufbuild/connect-go" "github.com/daichitakahashi/deps" "github.com/lestrrat-go/backoff/v2" "go.etcd.io/bbolt" logsv1 "github.com/daichitakahashi/rsmap/internal/proto/logs/v1" resource_mapv1 "github.com/daichitakahashi/rsmap/internal/proto/resource_map/v1" "github.com/daichitakahashi/rsmap/internal/proto/resource_map/v1/resource_mapv1connect" "github.com/daichitakahashi/rsmap/logs" ) type config struct { dbFile string addrFile string retryPolicy backoff.Policy httpCli *http.Client } // Open database for server. // This blocks indefinitely. func (c *config) openDB() (*bbolt.DB, error) { return bbolt.Open(c.dbFile, 0644, nil) // Set options if required. } // Read server address. func (c *config) readAddr() (string, error) { data, err := os.ReadFile(c.addrFile) if err != nil { return "", err } return string(bytes.TrimSpace(data)), nil } // Write server address for other clients. func (c *config) writeAddr(addr string) error { return os.WriteFile(c.addrFile, []byte(addr), 0644) } var ( serverMu sync.Mutex serverSem = make(map[string]chan struct{}) ) func (m *Map) launchServer(dir string, callers logs.CallerContext) func() { root := deps.New() // Get semaphore for the directory where the logs.db exists. serverMu.Lock() sem, ok := serverSem[dir] if !ok { sem = make(chan struct{}, 1) serverSem[dir] = sem } serverMu.Unlock() go func(dep *deps.Dependency) (err error) { defer dep.Stop(&err) select { case <-dep.Aborted(): return case sem <- struct{}{}: // Avoid launching multiple server from same process. defer func() { <-sem }() } db, err := m._cfg.openDB() if err != nil { return err } defer db.Close() select { case <-dep.Aborted(): return nil default: } info, err := logs.NewInfoStore(db) if err != nil { return err } closing := make(chan struct{}) rm, err := newServerSideMap(db, closing) if err != nil { return err } // Launch server. ln, err := net.Listen("tcp", ":0") if err != nil { return err } defer func() { _ = ln.Close() }() mux := http.NewServeMux() mux.Handle( resource_mapv1connect.NewResourceMapServiceHandler(&resourceMapHandler{ _rm: rm, }), ) s := http.Server{ Handler: mux, } go func() { _ = s.Serve(ln) }() // Write addr for other clients. addr := "http://" + ln.Addr().String() err = m._cfg.writeAddr(addr) if err != nil { return err } // Record launched server. err = info.PutServerLog(&logsv1.ServerLog{ Event: logsv1.ServerEvent_SERVER_EVENT_LAUNCHED, Addr: addr, Context: callers, Timestamp: time.Now().UnixNano(), }) if err != nil { return err } // Replace resourceMap with serverSideMap. m._mu.Lock() m._rm = rm m._mu.Unlock() <-dep.Aborted() close(closing) _ = s.Shutdown(dep.AbortContext()) // Record stopped server. return info.PutServerLog(&logsv1.ServerLog{ Event: logsv1.ServerEvent_SERVER_EVENT_STOPPED, Context: callers, Timestamp: time.Now().UnixNano(), }) }(root.Dependent()) return sync.OnceFunc(func() { _ = root.Abort(context.Background()) }) } type resourceMapHandler struct { _rm *serverSideMap } func (h *resourceMapHandler) TryInitResource(ctx context.Context, req *connect_go.Request[resource_mapv1.TryInitResourceRequest]) (*connect_go.Response[resource_mapv1.TryInitResourceResponse], error) { try, err := h._rm.tryInit(ctx, req.Msg.ResourceName, logs.CallerContext(req.Msg.Context)) if err != nil { return nil, err } return connect_go.NewResponse(&resource_mapv1.TryInitResourceResponse{ ShouldTry: try, }), nil } func (h *resourceMapHandler) CompleteInitResource(ctx context.Context, req *connect_go.Request[resource_mapv1.CompleteInitResourceRequest]) (*connect_go.Response[resource_mapv1.CompleteInitResourceResponse], error) { err := h._rm.completeInit(ctx, req.Msg.ResourceName, logs.CallerContext(req.Msg.Context)) if err != nil { return nil, err } return connect_go.NewResponse(&resource_mapv1.CompleteInitResourceResponse{}), nil } func (h *resourceMapHandler) FailInitResource(ctx context.Context, req *connect_go.Request[resource_mapv1.FailInitResourceRequest]) (*connect_go.Response[resource_mapv1.FailInitResourceResponse], error) { err := h._rm.failInit(ctx, req.Msg.ResourceName, logs.CallerContext(req.Msg.Context)) if err != nil { return nil, err } return connect_go.NewResponse(&resource_mapv1.FailInitResourceResponse{}), nil } func (h *resourceMapHandler) Acquire(ctx context.Context, req *connect_go.Request[resource_mapv1.AcquireRequest]) (*connect_go.Response[resource_mapv1.AcquireResponse], error) { err := h._rm.acquire(ctx, req.Msg.ResourceName, logs.CallerContext(req.Msg.Context), req.Msg.MaxParallelism, req.Msg.Exclusive) if err != nil { return nil, err } return connect_go.NewResponse(&resource_mapv1.AcquireResponse{}), nil } func (h *resourceMapHandler) AcquireMulti(ctx context.Context, req *connect_go.Request[resource_mapv1.AcquireMultiRequest]) (*connect_go.Response[resource_mapv1.AcquireMultiResponse], error) { err := h._rm.acquireMulti(ctx, req.Msg.Resources) if err != nil { return nil, err } return connect_go.NewResponse(&resource_mapv1.AcquireMultiResponse{}), nil } func (h *resourceMapHandler) Release(ctx context.Context, req *connect_go.Request[resource_mapv1.ReleaseRequest]) (*connect_go.Response[resource_mapv1.ReleaseResponse], error) { err := h._rm.release(ctx, req.Msg.ResourceName, logs.CallerContext(req.Msg.Context)) if err != nil { return nil, err } return connect_go.NewResponse(&resource_mapv1.ReleaseResponse{}), nil } func (h *resourceMapHandler) ReleaseMulti(ctx context.Context, req *connect_go.Request[resource_mapv1.ReleaseMultiRequest]) (*connect_go.Response[resource_mapv1.ReleaseMultiResponse], error) { err := h._rm.releaseMulti(ctx, req.Msg.Resources) if err != nil { return nil, err } return connect_go.NewResponse(&resource_mapv1.ReleaseMultiResponse{}), nil } var _ resource_mapv1connect.ResourceMapServiceHandler = (*resourceMapHandler)(nil) type clientSideMap struct { _cfg config } func newClientSideMap(cfg config) *clientSideMap { return &clientSideMap{ _cfg: cfg, } } func (m *clientSideMap) try(ctx context.Context, op func(ctx context.Context, cli resource_mapv1connect.ResourceMapServiceClient) error) error { var ( addr string err error ctl = m._cfg.retryPolicy.Start(ctx) ) for { select { case <-ctl.Done(): // When ctx is canceled, or retry count is exceeded. if e := ctx.Err(); e != nil { return e } return err case <-ctl.Next(): addr, err = m._cfg.readAddr() if err != nil { // Retry! continue } // MEMO: Do we need to reuse service clients? cli := resource_mapv1connect.NewResourceMapServiceClient(m._cfg.httpCli, addr) if err = op(ctx, cli); err != nil { // Retry! continue } return nil } } } func (m *clientSideMap) tryInit(ctx context.Context, resourceName string, operator logs.CallerContext) (try bool, _ error) { err := m.try(ctx, func(ctx context.Context, cli resource_mapv1connect.ResourceMapServiceClient) error { resp, err := cli.TryInitResource(ctx, connect_go.NewRequest(&resource_mapv1.TryInitResourceRequest{ ResourceName: resourceName, Context: operator, })) if err != nil { return err } try = resp.Msg.ShouldTry return nil }) return try, err } func (m *clientSideMap) completeInit(ctx context.Context, resourceName string, operator logs.CallerContext) error { return m.try(ctx, func(ctx context.Context, cli resource_mapv1connect.ResourceMapServiceClient) error { _, err := cli.CompleteInitResource(ctx, connect_go.NewRequest(&resource_mapv1.CompleteInitResourceRequest{ ResourceName: resourceName, Context: operator, })) return err }) } func (m *clientSideMap) failInit(ctx context.Context, resourceName string, operator logs.CallerContext) error { return m.try(ctx, func(ctx context.Context, cli resource_mapv1connect.ResourceMapServiceClient) error { _, err := cli.FailInitResource(ctx, connect_go.NewRequest(&resource_mapv1.FailInitResourceRequest{ ResourceName: resourceName, Context: operator, })) return err }) } func (m *clientSideMap) acquire(ctx context.Context, resourceName string, operator logs.CallerContext, max int64, exclusive bool) error { return m.try(ctx, func(ctx context.Context, cli resource_mapv1connect.ResourceMapServiceClient) error { _, err := cli.Acquire(ctx, connect_go.NewRequest(&resource_mapv1.AcquireRequest{ ResourceName: resourceName, Context: operator, MaxParallelism: max, Exclusive: exclusive, })) return err }) } func (m *clientSideMap) acquireMulti(ctx context.Context, resources []*resource_mapv1.AcquireMultiEntry) error { return m.try(ctx, func(ctx context.Context, cli resource_mapv1connect.ResourceMapServiceClient) error { _, err := cli.AcquireMulti(ctx, connect_go.NewRequest(&resource_mapv1.AcquireMultiRequest{ Resources: resources, })) return err }) } func (m *clientSideMap) release(ctx context.Context, resourceName string, operator logs.CallerContext) error { return m.try(ctx, func(ctx context.Context, cli resource_mapv1connect.ResourceMapServiceClient) error { _, err := cli.Release(ctx, connect_go.NewRequest(&resource_mapv1.ReleaseRequest{ ResourceName: resourceName, Context: operator, })) return err }) } func (m *clientSideMap) releaseMulti(ctx context.Context, resources []*resource_mapv1.ReleaseMultiEntry) error { return m.try(ctx, func(ctx context.Context, cli resource_mapv1connect.ResourceMapServiceClient) error { _, err := cli.ReleaseMulti(ctx, connect_go.NewRequest(&resource_mapv1.ReleaseMultiRequest{ Resources: resources, })) return err }) } var _ resourceMap = (*clientSideMap)(nil)
package rsmap import ( "context" "errors" "sync" "time" "github.com/daichitakahashi/oncewait" "golang.org/x/sync/errgroup" "github.com/daichitakahashi/rsmap/internal/ctl" logsv1 "github.com/daichitakahashi/rsmap/internal/proto/logs/v1" resource_mapv1 "github.com/daichitakahashi/rsmap/internal/proto/resource_map/v1" "github.com/daichitakahashi/rsmap/internal/rendezvous" "github.com/daichitakahashi/rsmap/logs" ) // TODO // * timeout for init and acquisition var errClosing = errors.New("closing") type initController struct { _store logs.ResourceRecordStore[logsv1.InitRecord] _resources sync.Map _closing <-chan struct{} } func loadInitController(store logs.ResourceRecordStore[logsv1.InitRecord], closing <-chan struct{}) (*initController, error) { c := &initController{ _store: store, _closing: closing, } err := c._store.ForEach(func(name string, obj *logsv1.InitRecord) error { if len(obj.Logs) == 0 { return nil // Impossible path. } // Get last init status and operator. last := obj.Logs[len(obj.Logs)-1] if last.Event == logsv1.InitEvent_INIT_EVENT_FAILED { return nil // Former try is failed and anyone haven't started next try yet. } completed := last.Event == logsv1.InitEvent_INIT_EVENT_COMPLETED initCtl := ctl.NewInitCtl(completed) if !completed { <-initCtl.TryInit( context.Background(), logs.CallerContext(last.Context).String(), ) } c._resources.Store(name, initCtl) return nil }) if err != nil { return nil, err } return c, nil } func (c *initController) tryInit(ctx context.Context, resourceName string, operator logs.CallerContext) (bool, error) { v, _ := c._resources.LoadOrStore(resourceName, ctl.NewInitCtl(false)) initCtl := v.(*ctl.InitCtl) var result ctl.TryInitResult select { case <-c._closing: return false, errClosing case result = <-initCtl.TryInit(ctx, operator.String()): if result.Err != nil { return false, result.Err } if !result.Try { return false, nil } } if result.Initiated { // Update data on key value store. err := c._store.Put([]string{resourceName}, func(_ string, r *logsv1.InitRecord, _ bool) { r.Logs = append(r.Logs, &logsv1.InitLog{ Event: logsv1.InitEvent_INIT_EVENT_STARTED, Context: operator, Timestamp: time.Now().UnixNano(), }) }) if err != nil { return false, err } } return true, nil } func (c *initController) complete(resourceName string, operator logs.CallerContext) error { select { case <-c._closing: return errClosing default: } v, found := c._resources.Load(resourceName) if !found { return errors.New("resource not found") } ctl := v.(*ctl.InitCtl) err := ctl.Complete(operator.String()) if err != nil { return err } return c._store.Put([]string{resourceName}, func(_ string, r *logsv1.InitRecord, _ bool) { r.Logs = append(r.Logs, &logsv1.InitLog{ Event: logsv1.InitEvent_INIT_EVENT_COMPLETED, Context: operator, Timestamp: time.Now().UnixNano(), }) }) } func (c *initController) fail(resourceName string, operator logs.CallerContext) error { select { case <-c._closing: return errClosing default: } v, found := c._resources.Load(resourceName) if !found { return errors.New("resource not found") } ctl := v.(*ctl.InitCtl) err := ctl.Fail(operator.String()) if err != nil { return err } return c._store.Put([]string{resourceName}, func(_ string, r *logsv1.InitRecord, _ bool) { r.Logs = append(r.Logs, &logsv1.InitLog{ Event: logsv1.InitEvent_INIT_EVENT_FAILED, Context: operator, Timestamp: time.Now().UnixNano(), }) }) } type ( // Control acquisition status and persistence. acquireController struct { _kv logs.ResourceRecordStore[logsv1.AcquisitionRecord] _resources sync.Map _closing <-chan struct{} _multiMu sync.Mutex } resource struct { once *oncewait.OnceWaiter queue rendezvous.LimitedTermQueue ctl *ctl.AcquisitionCtl } ) func loadAcquireController(store logs.ResourceRecordStore[logsv1.AcquisitionRecord], acquiringQueueTimeout time.Duration, closing <-chan struct{}) (*acquireController, error) { c := &acquireController{ _kv: store, _closing: closing, } err := store.ForEach(func(name string, obj *logsv1.AcquisitionRecord) error { acquired := map[string]int64{} b := rendezvous.NewBuilder() // Replay stored acquisitions of the resource. for _, log := range obj.Logs { operator := logs.CallerContext(log.Context).String() switch log.Event { case logsv1.AcquisitionEvent_ACQUISITION_EVENT_ACQUIRING: // Queue as "acquiring". b.Add(operator) case logsv1.AcquisitionEvent_ACQUISITION_EVENT_ACQUIRED: // Consecutive acquisition is not recorded. // So, we can skip the check of existing value. // // See: `(*ctl.AcquisitionCtl).Acquire()` acquired[operator] = log.N // Remove already acquired operation from queue. b.Remove(operator) case logsv1.AcquisitionEvent_ACQUISITION_EVENT_RELEASED: // We assume that acquisition log is already processed. delete(acquired, operator) } } // Set replayed acquireCtl. c._resources.Store( name, &resource{ queue: b.Start(acquiringQueueTimeout), ctl: ctl.NewAcquisitionCtl(obj.Max, acquired), }, ) return nil }) if err != nil { return nil, err } return c, nil } var emptyQueue = rendezvous.NewBuilder().Start(0) func (r *resource) init(max int64) *resource { if r.once != nil { r.once.Do(func() { r.queue = emptyQueue r.ctl = ctl.NewAcquisitionCtl(max, map[string]int64{}) }) } return r } func (r *resource) acquire(ctx context.Context, operator string, exclusive bool) (<-chan ctl.AcquisitionResult, bool) { var ( ch <-chan ctl.AcquisitionResult acquiring bool ) // Wait dequeuing, because replayed "acquiring" operators take precedence. r.queue.Dequeue(operator, func(bool) { ch, acquiring = r.ctl.Acquire(ctx, operator, exclusive) }) return ch, acquiring } func (c *acquireController) acquire(ctx context.Context, resourceName string, operator logs.CallerContext, max int64, exclusive bool) error { select { case <-c._closing: return errClosing default: } v, _ := c._resources.LoadOrStore(resourceName, &resource{ once: oncewait.New(), }) r := v.(*resource).init(max) // Start acquisition. acCh, acquiring := r.acquire(ctx, operator.String(), exclusive) // Due to trial of consecutive acquisition, not acquired. if !acquiring { return nil } // Append log "acquiring". err := c._kv.Put([]string{resourceName}, func(_ string, r *logsv1.AcquisitionRecord, update bool) { // Initial acquisition. if !update { r.Max = max } r.Logs = append(r.Logs, &logsv1.AcquisitionLog{ Event: logsv1.AcquisitionEvent_ACQUISITION_EVENT_ACQUIRING, Context: operator, Timestamp: time.Now().UnixNano(), }) }) if err != nil { return err } var result ctl.AcquisitionResult select { case <-c._closing: return errClosing case result = <-acCh: if result.Err != nil { return result.Err } } return c._kv.Put([]string{resourceName}, func(_ string, r *logsv1.AcquisitionRecord, update bool) { r.Logs = append(r.Logs, &logsv1.AcquisitionLog{ Event: logsv1.AcquisitionEvent_ACQUISITION_EVENT_ACQUIRED, N: result.Acquired, Context: operator, Timestamp: time.Now().UnixNano(), }) }) } func (c *acquireController) acquireMulti(ctx context.Context, resources []*resource_mapv1.AcquireMultiEntry) error { select { case <-c._closing: return errClosing default: } type acquiringEntry struct { entry *resource_mapv1.AcquireMultiEntry acquired <-chan ctl.AcquisitionResult } identifiers := make([]string, 0, len(resources)) entries := make(map[string]acquiringEntry, len(resources)) // Lock for multiple locking. c._multiMu.Lock() for _, entry := range resources { v, _ := c._resources.LoadOrStore(entry.ResourceName, &resource{ once: oncewait.New(), }) r := v.(*resource).init(entry.MaxParallelism) // Start acquisition. acCh, acquiring := r.acquire(ctx, logs.CallerContext(entry.Context).String(), entry.Exclusive) // Due to trial of consecutive acquisition, not acquired. if acquiring { identifiers = append(identifiers, entry.ResourceName) entries[entry.ResourceName] = acquiringEntry{ entry: entry, acquired: acCh, } } } c._multiMu.Unlock() if len(identifiers) == 0 { return nil } // Append log "acquiring". ts := time.Now().UnixNano() err := c._kv.Put(identifiers, func(identifier string, r *logsv1.AcquisitionRecord, update bool) { e := entries[identifier] // Initial acquisition. if !update { r.Max = e.entry.MaxParallelism } r.Logs = append(r.Logs, &logsv1.AcquisitionLog{ Event: logsv1.AcquisitionEvent_ACQUISITION_EVENT_ACQUIRING, Context: e.entry.Context, Timestamp: ts, }) }) if err != nil { return err } eg, _ := errgroup.WithContext(ctx) for _, entry := range entries { e := entry eg.Go(func() error { var result ctl.AcquisitionResult select { case <-c._closing: return errClosing case result = <-e.acquired: if result.Err != nil { return err } } return c._kv.Put([]string{e.entry.ResourceName}, func(identifier string, r *logsv1.AcquisitionRecord, update bool) { r.Logs = append(r.Logs, &logsv1.AcquisitionLog{ Event: logsv1.AcquisitionEvent_ACQUISITION_EVENT_ACQUIRED, N: result.Acquired, Context: e.entry.Context, Timestamp: time.Now().UnixNano(), }) }) }) } return eg.Wait() } func (c *acquireController) release(resourceName string, operator logs.CallerContext) error { select { case <-c._closing: return errClosing default: } v, found := c._resources.Load(resourceName) if !found { // If the resource not found, return without error. return nil } op := operator.String() r := v.(*resource) if !r.ctl.Acquired(op) { // If not acquired, return without error. return nil } err := c._kv.Put([]string{resourceName}, func(_ string, r *logsv1.AcquisitionRecord, _ bool) { r.Logs = append(r.Logs, &logsv1.AcquisitionLog{ Event: logsv1.AcquisitionEvent_ACQUISITION_EVENT_RELEASED, N: 0, Context: operator, Timestamp: time.Now().UnixNano(), }) }) if err != nil { return err } r.ctl.Release(op) return nil } func (c *acquireController) releaseMulti(resources []*resource_mapv1.ReleaseMultiEntry) error { select { case <-c._closing: return errClosing default: } entries := make(map[string]*resource_mapv1.ReleaseMultiEntry, len(resources)) identifiers := make([]string, 0, len(resources)) for _, entry := range resources { v, found := c._resources.Load(entry.ResourceName) if !found { // If the resource not found, skip it. continue } r := v.(*resource) op := logs.CallerContext(entry.Context).String() if !r.ctl.Acquired(op) { // If not acquired, skip it. continue } entries[entry.ResourceName] = entry identifiers = append(identifiers, entry.ResourceName) // Release after log write. defer r.ctl.Release(op) } ts := time.Now().UnixNano() return c._kv.Put(identifiers, func(identifier string, r *logsv1.AcquisitionRecord, _ bool) { e := entries[identifier] r.Logs = append(r.Logs, &logsv1.AcquisitionLog{ Event: logsv1.AcquisitionEvent_ACQUISITION_EVENT_RELEASED, N: 0, Context: e.Context, Timestamp: ts, }) }) }
package ctl import ( "context" "sync" ) type ( // AcquisitionCtl is a primitive for controlling acquisition status. AcquisitionCtl struct { _sem *semaphore _max int64 _m sync.Mutex _acquired map[string]int64 } AcquisitionResult struct { Acquired int64 Err error } ) // NewAcquisitionCtl creates new AcquisitionCtl. func NewAcquisitionCtl(max int64, acquired map[string]int64) *AcquisitionCtl { sem := newSemaphore(max) // Replay acquisitions. for _, n := range acquired { if n > 0 { <-sem.acquire(context.Background(), n, nil) } } return &AcquisitionCtl{ _sem: sem, _max: max, _acquired: acquired, } } func (c *AcquisitionCtl) Acquired(operator string) bool { c._m.Lock() defer c._m.Unlock() _, ok := c._acquired[operator] return ok } // Acquire acquires exclusive/shared lock. func (c *AcquisitionCtl) Acquire(ctx context.Context, operator string, exclusive bool) (<-chan AcquisitionResult, bool) { c._m.Lock() defer c._m.Unlock() _, ok := c._acquired[operator] if ok { // If already acquired by this operator, return without acquisition. // MEMO: Do we need to await ongoing acquisition? return nil, false } n := int64(1) if exclusive { n = c._max } // Record acquired operator. c._acquired[operator] = n return c._sem.acquire(ctx, n, func(r AcquisitionResult) { if r.Err != nil { // On cancel. c._m.Lock() delete(c._acquired, operator) c._m.Unlock() } }), true } // Release releases acquired lock. func (c *AcquisitionCtl) Release(operator string) bool { c._m.Lock() n, ok := c._acquired[operator] if !ok { c._m.Unlock() // If not acquired, return without error. return false } delete(c._acquired, operator) c._m.Unlock() c._sem.release(n) // Do release outside of Lock/Unlock scope. return true }
package ctl import ( "context" "errors" "sync" ) type ( // InitCtl is a primitive for controlling init status. InitCtl struct { _lock chan struct{} _m sync.RWMutex _completed bool _operator string } TryInitResult struct { Try bool Initiated bool Err error } ) // NewInitCtl creates new InitCtl. func NewInitCtl(completed bool) *InitCtl { return &InitCtl{ _lock: make(chan struct{}, 1), // Allocate minimum buffer. _completed: completed, } } // TryInit tries to acquire lock and start init operation. // When operation is already completed, 'try' will be false. func (i *InitCtl) TryInit(ctx context.Context, operator string) <-chan TryInitResult { try := make(chan TryInitResult, 1) i._m.RLock() completed, op := i._completed, i._operator i._m.RUnlock() if completed { try <- TryInitResult{} return try } // If the operator acquires lock for init but the fact is not recognized by operator, // give a second chance to try. if !completed && op == operator { try <- TryInitResult{ Try: true, Initiated: false, } return try } go func() { select { case <-ctx.Done(): try <- TryInitResult{ Try: false, Err: ctx.Err(), } return case i._lock <- struct{}{}: i._m.Lock() defer i._m.Unlock() if i._completed { try <- TryInitResult{ Try: false, } <-i._lock // Release. return } // Set current operator. i._operator = operator try <- TryInitResult{ Try: true, Initiated: true, Err: nil, } // Keep locked. } }() return try } // Complete marks init operation as completed. func (i *InitCtl) Complete(operator string) error { i._m.Lock() defer i._m.Unlock() if i._operator != operator { return errors.New("invalid operation") } i._completed = true <-i._lock // Release. return nil } // Fail marks init operation as failed. func (i *InitCtl) Fail(operator string) error { i._m.Lock() defer i._m.Unlock() if i._operator != operator { return errors.New("invalid operation") } <-i._lock // Release. return nil }
package ctl import ( "container/list" "context" "fmt" "sync" ) type ( // Channel based semaphore. semaphore struct { _size int64 _cur int64 _mu sync.Mutex _waiters *list.List } waiter struct { _n int64 _ready chan<- struct{} _done <-chan struct{} } ) func newSemaphore(max int64) *semaphore { return &semaphore{ _size: max, _waiters: list.New(), } } func (s *semaphore) acquire(ctx context.Context, n int64, hook func(r AcquisitionResult)) <-chan AcquisitionResult { s._mu.Lock() defer s._mu.Unlock() if s._size-s._cur >= n && s._waiters.Len() == 0 { s._cur += n r := AcquisitionResult{ Acquired: n, } if hook != nil { hook(r) } ch := make(chan AcquisitionResult, 1) ch <- r return ch } if n > s._size { panic(fmt.Sprintf("semaphore: acquire more than %d(%d)", s._size, n)) } var ( ready = make(chan struct{}) result = make(chan AcquisitionResult, 1) done = ctx.Done() ) s._waiters.PushBack(waiter{ _n: n, _ready: ready, _done: done, }) begin := make(chan struct{}) go func() { close(begin) var r AcquisitionResult select { case <-done: r.Err = ctx.Err() defer func() { s._mu.Lock() s._notifyWaiters() s._mu.Unlock() }() case <-ready: r.Acquired = n } if hook != nil { hook(r) } result <- r }() <-begin return result } func (s *semaphore) release(n int64) { s._mu.Lock() defer s._mu.Unlock() s._cur -= n if s._cur < 0 { panic("semaphore: released more than held") } s._notifyWaiters() } func (s *semaphore) _notifyWaiters() { LOOP: for { next := s._waiters.Front() if next == nil { break // No more waiters blocked. } w := next.Value.(waiter) if s._size-s._cur < w._n { // Insufficient slot. select { case <-w._done: // Already canceled, remove and skip it. s._waiters.Remove(next) continue LOOP default: break LOOP // Wait next notification. } } select { case w._ready <- struct{}{}: s._cur += w._n default: // Already canceled. } s._waiters.Remove(next) } }
package rendezvous import "sync" type cancelableCond[T any] struct { L sync.Locker _sig chan struct{} _sigM sync.RWMutex // locker for refreshing _sig _cancel <-chan T } func newCancelableCond[T any](l sync.Locker, cancel <-chan T) *cancelableCond[T] { return &cancelableCond[T]{ L: l, _sig: make(chan struct{}), _cancel: cancel, } } func (c *cancelableCond[T]) wait() bool { c._sigM.RLock() s := c._sig c._sigM.RUnlock() c.L.Unlock() defer c.L.Lock() select { case <-s: return true case <-c._cancel: return false } } func (c *cancelableCond[T]) broadcast() { c._sigM.Lock() close(c._sig) c._sig = make(chan struct{}) c._sigM.Unlock() }
package rendezvous import ( "container/list" "context" "sync" "time" ) type ( Builder struct { _l *list.List _m map[string]*list.Element } LimitedTermQueue interface { Dequeue(key string, fn func(bool)) } ) func NewBuilder() *Builder { return &Builder{ _l: list.New(), _m: map[string]*list.Element{}, } } func (b *Builder) Add(s string) { if _, ok := b._m[s]; !ok { b._m[s] = b._l.PushBack(s) } } func (b *Builder) Remove(s string) { if e, ok := b._m[s]; ok { b._l.Remove(e) } } type limitedTermQueue struct { _l *list.List _m map[string]*list.Element _cond *cancelableCond[struct{}] _done <-chan struct{} } type emptyQueue struct{} func (b *Builder) Start(timeout time.Duration) LimitedTermQueue { if b._l.Len() == 0 { return &emptyQueue{} } ctx, cancel := context.WithTimeout(context.Background(), timeout) done := ctx.Done() q := &limitedTermQueue{ _l: b._l, _m: b._m, _done: done, _cond: newCancelableCond(&sync.Mutex{}, done), } b._l = list.New() b._m = map[string]*list.Element{} go func() { defer cancel() q._cond.L.Lock() defer q._cond.L.Unlock() for { if q._l.Len() == 0 { cancel() // return } if cont := q._cond.wait(); !cont { return } } }() return q } func (q *limitedTermQueue) Dequeue(s string, fn func(dequeue bool)) { select { case <-q._done: fn(false) return default: } q._cond.L.Lock() for { if e := q._l.Front(); e != nil && e.Value == s { q._l.Remove(e) fn(true) q._cond.broadcast() break } if cont := q._cond.wait(); !cont { fn(false) break } } q._cond.L.Unlock() } func (emptyQueue) Dequeue(_ string, fn func(bool)) { fn(false) }
package testutil import ( "bytes" "sync" ) type SafeBuffer struct { *bytes.Buffer m sync.Mutex } func NewSafeBuffer() *SafeBuffer { return &SafeBuffer{ Buffer: bytes.NewBuffer(nil), } } func (s *SafeBuffer) Write(p []byte) (int, error) { s.m.Lock() defer s.m.Unlock() return s.Buffer.Write(p) }
package logs import ( "encoding/binary" "encoding/hex" "fmt" "math/rand" "strings" logsv1 "github.com/daichitakahashi/rsmap/internal/proto/logs/v1" ) type CallerContext []*logsv1.Caller func (c CallerContext) Append(file string, line int) CallerContext { return append(c, &logsv1.Caller{ File: file, Line: int64(line), Hash: newHash(), }) } func (c CallerContext) String() string { var b strings.Builder for _, caller := range c { if b.Len() > 0 { b.WriteString("->") } fmt.Fprintf(&b, "%s:%d", caller.File, caller.Line) b.WriteRune('(') b.WriteString(caller.Hash) b.WriteRune(')') } return b.String() } func (c CallerContext) ShortString() string { var b strings.Builder for _, caller := range c { if b.Len() > 0 { b.WriteString("->") } b.WriteString(caller.Hash) } return b.String() } func newHash() string { return hex.EncodeToString( binary.BigEndian.AppendUint32([]byte{}, rand.Uint32()), ) }
package logs import ( "errors" "go.etcd.io/bbolt" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" logsv1 "github.com/daichitakahashi/rsmap/internal/proto/logs/v1" ) var ( bucketInfo = []byte("info") bucketInit = []byte("init") bucketAcquire = []byte("acquire") infoServerKey = []byte("server") ) type InfoStore struct { _db *bbolt.DB _server *logsv1.ServerRecord } func NewInfoStore(db *bbolt.DB) (*InfoStore, error) { var server logsv1.ServerRecord err := db.Update(func(tx *bbolt.Tx) error { b, err := tx.CreateBucketIfNotExists(bucketInfo) if err != nil { return err } data := b.Get(infoServerKey) if data != nil { err = proto.Unmarshal(data, &server) if err != nil { return err } } return nil }) if err != nil { return nil, err } return &InfoStore{ _db: db, _server: &server, }, nil } func (s *InfoStore) ServerRecord() *logsv1.ServerRecord { return s._server } func (s *InfoStore) PutServerLog(l *logsv1.ServerLog) error { return s._db.Update(func(tx *bbolt.Tx) error { s._server.Logs = append(s._server.Logs, l) data, err := proto.Marshal(s._server) if err != nil { return err } return tx.Bucket(bucketInfo).Put(infoServerKey, data) }) } type ( ResourceRecordStore[T any] interface { Get(identifier string) (*T, error) Put(identifiers []string, update func(identifier string, r *T, update bool)) error ForEach(fn func(identifier string, record *T) error) error } ptr[L logsv1.InitRecord | logsv1.AcquisitionRecord] interface { *L protoreflect.ProtoMessage } recordStore[T logsv1.InitRecord | logsv1.AcquisitionRecord, P ptr[T]] struct { _bucketName []byte _db *bbolt.DB } ) var ErrRecordNotFound = errors.New("record not found on key value store") func NewResourceRecordStore[T logsv1.InitRecord | logsv1.AcquisitionRecord, P ptr[T]](db *bbolt.DB) (ResourceRecordStore[T], error) { var ( p P = new(T) v protoreflect.ProtoMessage = p bucketName []byte ) switch v.(type) { case *logsv1.InitRecord: bucketName = bucketInit case *logsv1.AcquisitionRecord: bucketName = bucketAcquire } err := db.Update(func(tx *bbolt.Tx) error { // Create bucket for records. _, err := tx.CreateBucketIfNotExists(bucketName) return err }) if err != nil { return nil, err } return &recordStore[T, P]{ _bucketName: bucketName, _db: db, }, err } func (s *recordStore[T, P]) Get(identifier string) (*T, error) { var r P = new(T) err := s._db.View(func(tx *bbolt.Tx) error { data := tx.Bucket(s._bucketName).Get([]byte(identifier)) if data == nil { return ErrRecordNotFound } return proto.Unmarshal(data, r) }) if err != nil { return nil, err } return r, nil } func (s *recordStore[T, P]) Put(identifiers []string, fn func(identifier string, r *T, update bool)) error { return s._db.Update(func(tx *bbolt.Tx) error { for _, identifier := range identifiers { var ( b = tx.Bucket(s._bucketName) key = []byte(identifier) ) var ( r P = new(T) update bool ) data := b.Get(key) if data != nil { update = true if err := proto.Unmarshal(data, r); err != nil { return err } } fn(identifier, r, update) newData, err := proto.Marshal(r) if err != nil { return err } err = b.Put(key, newData) if err != nil { return err } } return nil }) } func (s *recordStore[T, P]) ForEach(fn func(identifier string, record *T) error) error { return s._db.View(func(tx *bbolt.Tx) error { return tx.Bucket(s._bucketName).ForEach(func(k, v []byte) error { var r P = new(T) err := proto.Unmarshal(v, r) if err != nil { return err } return fn(string(k), r) }) }) }
package rsmap import ( "context" "errors" "fmt" "net/http" "os" "path/filepath" "runtime" "strconv" "sync" "time" "github.com/lestrrat-go/backoff/v2" "github.com/lestrrat-go/option" "go.etcd.io/bbolt" logsv1 "github.com/daichitakahashi/rsmap/internal/proto/logs/v1" resource_mapv1 "github.com/daichitakahashi/rsmap/internal/proto/resource_map/v1" "github.com/daichitakahashi/rsmap/logs" ) type ( // Map is the controller for external resource usage. Map struct { _callers logs.CallerContext _cfg config _mu sync.RWMutex _rm resourceMap _stop func() } // Resource brings an ability of acquire/release lock for the dedicated resource. Resource struct { _callers logs.CallerContext _m *Map _max int64 _name string } ) type ( NewOption struct { option.Interface } identOptionRetryPolicy struct{} identOptionHTTPClient struct{} ) // WithRetryPolicy specifies a retry policy of each operations(resource initializations, lock acquisitions). // For example, interval, exponential-backoff and max retry. // For detailed settings, see [backoff.NewExponentialPolicy] or [backoff.NewConstantPolicy]. func WithRetryPolicy(p backoff.Policy) *NewOption { return &NewOption{ Interface: option.New(identOptionRetryPolicy{}, p), } } // WithHTTPClient specifies [http.Client] used for communication with server process. // If your process launches server, this client may not be used. func WithHTTPClient(c *http.Client) *NewOption { return &NewOption{ Interface: option.New(identOptionHTTPClient{}, c), } } const ( EnvExecutionID = "RSMAP_EXECUTION_ID" ) func logsDir(base string) (string, error) { if !filepath.IsAbs(base) { wd, err := os.Getwd() if err != nil { return "", err } base = filepath.Join(wd, base) } // Get execution ID. executionID := strconv.Itoa(os.Getppid()) // The process of `go test` if id, ok := os.LookupEnv(EnvExecutionID); ok { executionID = id } dir := filepath.Join(base, executionID) // Check logs.db and addr not exists as directory. logsFilename := filepath.Join(dir, "logs.db") info, err := os.Stat(logsFilename) if err == nil && info.IsDir() { return "", fmt.Errorf("logs.db already exists as a directory: %s", logsFilename) } addrFilename := filepath.Join(dir, "addr") info, err = os.Stat(addrFilename) if err == nil && info.IsDir() { return "", fmt.Errorf("addr already exists as a directory: %s", addrFilename) } return dir, nil } // New creates an instance of [Map] that enables us to reuse external resources with thread safety. // Most common use-case is Go's parallelized testing of multiple packages (`go test -p=N ./...`.) // // Map has server mode and client mode. // If Map has initialized in server mode, it creates database `logs.db` and write server address to `addr` under `${rsmapDir}/${executionID}/`. // Other Map reads address of the server, and requests the server to acquire locks. // So, every Go packages(directories) must specify same location as an argument. Otherwise, we cannot provide correct control. // It's user's responsibility. // // `executionID` is the identifier of the execution of `go test`. In default, we use the value of [os.Getppid()]. // If you want to specify the id explicitly, set the value to `RSMAP_EXECUTION_ID` environment variable. // // In almost cases, following code can be helpful. // // p, _ := exec.Command("go", "mod", "GOMOD").Output() // Get file path of "go.mod". // m, _ := rsmap.New(filepath.Join(filepath.Dir(strings.TrimSpace(string(p))), ".rsmap")) func New(rsmapDir string, opts ...*NewOption) (*Map, error) { _, file, line, _ := runtime.Caller(1) var callers logs.CallerContext callers = callers.Append(file, line) // Get directory for `logs.db` and `addr`. dir, err := logsDir(rsmapDir) if err != nil { return nil, err } // Create directory if not exists. if err = os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("rsmap: failed to prepare directory(rsmapDir): %w", err) } cfg := config{ dbFile: filepath.Join(dir, "logs.db"), addrFile: filepath.Join(dir, "addr"), retryPolicy: backoff.NewConstantPolicy( // FIXME: Reconsider default policy. backoff.WithMaxRetries(200), backoff.WithInterval(time.Millisecond*200), ), httpCli: &http.Client{}, } // Apply options. for _, opt := range opts { switch opt.Ident() { case identOptionRetryPolicy{}: cfg.retryPolicy = opt.Value().(backoff.Policy) case identOptionHTTPClient{}: cfg.httpCli = opt.Value().(*http.Client) } } m := &Map{ _callers: callers, _cfg: cfg, _rm: newClientSideMap(cfg), } // Start server launch process, and set release function. m._stop = m.launchServer(dir, m._callers) return m, nil } func (m *Map) Close() { m._stop() } func (m *Map) resourceMap() resourceMap { m._mu.RLock() defer m._mu.RUnlock() return m._rm } type ( // ResourceOption represents option for [Resource] ResourceOption struct { option.Interface } identOptionParallelism struct{} identOptionInit struct{} ) // WithMaxParallelism specifies max parallelism of the resource usage. func WithMaxParallelism(n int64) *ResourceOption { return &ResourceOption{ Interface: option.New(identOptionParallelism{}, n), } } type InitFunc func(ctx context.Context) error // WithInit specifies InitFunc for resource initialization. // // InitFunc will be called only once globally, at first declaration by [Resource]. // Other process waits until the completion of this initialization. // So, if Resource is called without this option, we cannot perform initializations with concurrency safety. func WithInit(init InitFunc) *ResourceOption { return &ResourceOption{ Interface: option.New(identOptionInit{}, init), } } // Resource creates [Resource] object that provides control for resource usage. // // Resource has a setting for max parallelism, you can specify the value by [WithMaxParallelism](default value is 5.) // And you want to perform an initialization of the resource, use [WithInit]. func (m *Map) Resource(ctx context.Context, name string, opts ...*ResourceOption) (*Resource, error) { _, file, line, _ := runtime.Caller(1) callers := m._callers.Append(file, line) var ( n = int64(5) init InitFunc = func(ctx context.Context) error { // Do nothing. return nil } ) // Apply options. for _, opt := range opts { switch opt.Ident() { case identOptionParallelism{}: n = opt.Value().(int64) case identOptionInit{}: init = opt.Value().(InitFunc) } } m._mu.RLock() rm := m._rm m._mu.RUnlock() try, err := rm.tryInit(ctx, name, callers) if err != nil { return nil, err } if try { // Initialization of the resource. err = func() (err error) { var notPanicked bool defer func() { m._mu.RLock() rm := m._rm m._mu.RUnlock() // If init succeeds, mark as complete. if notPanicked && err == nil { err = rm.completeInit(ctx, name, callers) return } // Mark as failed when error or panic occurred. // CAUTION: Do not recover panic to preserve stacktrace. err = errors.Join( err, rm.failInit(ctx, name, callers), ) }() err = init(ctx) notPanicked = true return }() if err != nil { return nil, err } } return &Resource{ _callers: callers, _m: m, _max: n, _name: name, }, nil } // RLock acquires shared lock of the Resource. // The instance of Resource can acquire only one lock. // Consecutive acquisition without unlock doesn't fail, but do nothing. // // To release lock, use [UnlockAny]. func (r *Resource) RLock(ctx context.Context) error { return r._m.resourceMap().acquire(ctx, r._name, r._callers, r._max, false) } // Lock acquires exclusive lock of the Resource. // The instance of Resource can acquire only one lock. // Consecutive acquisition without unlock doesn't fail, but do nothing. // // To release lock, use [UnlockAny]. func (r *Resource) Lock(ctx context.Context) error { return r._m.resourceMap().acquire(ctx, r._name, r._callers, r._max, true) } // UnlockAny releases acquired shared/exclusive lock by the Resource. func (r *Resource) UnlockAny() error { return r._m.resourceMap().release(context.Background(), r._name, r._callers) } type ResourceLocker struct { _r *Resource _exclusive bool } // EXclusive returns ResourceLocker for Resource. func (r *Resource) Exclusive() *ResourceLocker { return &ResourceLocker{ _r: r, _exclusive: true, } } // Shared returns ResourceLocker for Resource. func (r *Resource) Shared() *ResourceLocker { return &ResourceLocker{ _r: r, _exclusive: false, } } // LockResources acquires exclusive/shared locks for multiple resources. // Returned function releases all locks acquired. func LockResources(ctx context.Context, resources ...*ResourceLocker) (func() error, error) { var m resourceMap acquireEntries := make([]*resource_mapv1.AcquireMultiEntry, 0, len(resources)) releaseEntries := make([]*resource_mapv1.ReleaseMultiEntry, 0, len(resources)) for _, r := range resources { mm := r._r._m.resourceMap() if m == nil { m = mm } else if m != mm { return nil, errors.New("rsmap: all ResourceLocker must be derived from same Map") } acquireEntries = append(acquireEntries, &resource_mapv1.AcquireMultiEntry{ ResourceName: r._r._name, Context: r._r._callers, MaxParallelism: r._r._max, Exclusive: r._exclusive, }) releaseEntries = append(releaseEntries, &resource_mapv1.ReleaseMultiEntry{ ResourceName: r._r._name, Context: r._r._callers, }) } err := m.acquireMulti(ctx, acquireEntries) if err != nil { return nil, err } return func() error { return m.releaseMulti(context.Background(), releaseEntries) }, nil } // Core interface for control operations for both server and client side. type resourceMap interface { tryInit(ctx context.Context, resourceName string, operator logs.CallerContext) (bool, error) completeInit(ctx context.Context, resourceName string, operator logs.CallerContext) error failInit(ctx context.Context, resourceName string, operator logs.CallerContext) error acquire(ctx context.Context, resourceName string, operator logs.CallerContext, max int64, exclusive bool) error acquireMulti(ctx context.Context, resources []*resource_mapv1.AcquireMultiEntry) error release(ctx context.Context, resourceName string, operator logs.CallerContext) error releaseMulti(ctx context.Context, resources []*resource_mapv1.ReleaseMultiEntry) error } type serverSideMap struct { _init *initController _acquire *acquireController } // Create resourceMap for server side. // This map reads and updates bbolt.DB directly. func newServerSideMap(db *bbolt.DB, closing <-chan struct{}) (*serverSideMap, error) { initRecordStore, err := logs.NewResourceRecordStore[logsv1.InitRecord](db) if err != nil { return nil, err } acquireRecordStore, err := logs.NewResourceRecordStore[logsv1.AcquisitionRecord](db) if err != nil { return nil, err } init, err := loadInitController(initRecordStore, closing) if err != nil { return nil, err } acquire, err := loadAcquireController(acquireRecordStore, time.Second*2, closing) if err != nil { return nil, err } return &serverSideMap{ _init: init, _acquire: acquire, }, nil } func (m *serverSideMap) tryInit(ctx context.Context, resourceName string, operator logs.CallerContext) (bool, error) { return m._init.tryInit(ctx, resourceName, operator) } func (m *serverSideMap) completeInit(_ context.Context, resourceName string, operator logs.CallerContext) error { return m._init.complete(resourceName, operator) } func (m *serverSideMap) failInit(_ context.Context, resourceName string, operator logs.CallerContext) error { return m._init.fail(resourceName, operator) } func (m *serverSideMap) acquire(ctx context.Context, resourceName string, operator logs.CallerContext, max int64, exclusive bool) error { return m._acquire.acquire(ctx, resourceName, operator, max, exclusive) } func (m *serverSideMap) acquireMulti(ctx context.Context, resources []*resource_mapv1.AcquireMultiEntry) error { return m._acquire.acquireMulti(ctx, resources) } func (m *serverSideMap) release(_ context.Context, resourceName string, operator logs.CallerContext) error { return m._acquire.release(resourceName, operator) } func (m *serverSideMap) releaseMulti(_ context.Context, resources []*resource_mapv1.ReleaseMultiEntry) error { return m._acquire.releaseMulti(resources) } var _ resourceMap = (*serverSideMap)(nil)