package confort
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/daichitakahashi/confort/internal/beacon"
"github.com/daichitakahashi/confort/wait"
"github.com/docker/cli/cli/command/image/build"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/go-connections/nat"
"github.com/lestrrat-go/backoff/v2"
"go.uber.org/multierr"
)
type (
Backend interface {
Namespace(ctx context.Context, namespace string) (Namespace, error)
BuildImage(ctx context.Context, buildContext io.Reader, buildOptions types.ImageBuildOptions, force bool, buildOut io.Writer) error
}
Namespace interface {
Namespace() string
Network() *types.NetworkResource
CreateContainer(ctx context.Context, name string, container *container.Config, host *container.HostConfig,
network *network.NetworkingConfig, configConsistency bool,
wait *wait.Waiter, pullOptions *types.ImagePullOptions, pullOut io.Writer) (string, error)
StartContainer(ctx context.Context, name string) (Ports, error)
Release(ctx context.Context) error
}
)
type Ports nat.PortMap
// Binding returns the first value associated with the given container port.
// If there are no values associated with the port, Binding returns zero value.
// To access multiple values, use the nat.PortMap directly.
func (p Ports) Binding(port nat.Port) (b nat.PortBinding) {
bindings := p[port]
if len(bindings) == 0 {
return b
}
return bindings[0]
}
// HostPort returns "host:port" style string of the first value associated with the given container port.
// If there are no values associated with the port, HostPort returns empty string.
func (p Ports) HostPort(port nat.Port) string {
bindings := p[port]
if len(bindings) == 0 {
return ""
}
return bindings[0].HostIP + ":" + bindings[0].HostPort
}
// URL returns "scheme://host:port" style string of the first value associated with the given container port.
// If there are no values associated with the port, URL returns empty string.
// And if scheme is empty, use "http" as a default scheme.
func (p Ports) URL(port nat.Port, scheme string) string {
if scheme == "" {
scheme = "http"
}
if s := p.HostPort(port); s != "" {
return fmt.Sprintf("%s://%s", scheme, s)
}
return ""
}
type ResourcePolicy string
const (
ResourcePolicyError ResourcePolicy = beacon.ResourcePolicyError
ResourcePolicyReuse ResourcePolicy = beacon.ResourcePolicyReuse
ResourcePolicyReusable ResourcePolicy = beacon.ResourcePolicyReusable
ResourcePolicyTakeOver ResourcePolicy = beacon.ResourcePolicyTakeOver
)
type dockerBackend struct {
cli *client.Client // inject
policy ResourcePolicy
labels map[string]string
}
func (d *dockerBackend) Namespace(ctx context.Context, namespace string) (Namespace, error) {
networkName := namespace
namespace += "-"
var nw *types.NetworkResource
// create network if not exists
list, err := d.cli.NetworkList(ctx, types.NetworkListOptions{})
if err != nil {
return nil, err
}
for _, n := range list {
if n.Name == networkName {
if d.policy == ResourcePolicyError {
return nil, fmt.Errorf("dockerBackend: network %q already exists", networkName)
}
nw = &n
break
}
}
var nwCreated bool
if nw == nil {
resp, err := d.cli.NetworkCreate(ctx, networkName, types.NetworkCreate{
Driver: "bridge",
CheckDuplicate: true,
Labels: d.labels,
})
if err != nil {
return nil, err
}
n, err := d.cli.NetworkInspect(ctx, resp.ID, types.NetworkInspectOptions{
Verbose: true,
})
if err != nil {
return nil, err
}
nwCreated = true
nw = &n
}
// resolve host ip
hostIP, err := resolveHostIP(d.cli.DaemonHost(), nw.IPAM)
if err != nil {
return nil, fmt.Errorf("failed to resolve docker host ip: %w", err)
}
var term []func(context.Context) error
if (nwCreated && d.policy != ResourcePolicyReusable) || d.policy == ResourcePolicyTakeOver {
term = append(term, func(ctx context.Context) error {
return d.cli.NetworkRemove(ctx, nw.ID)
})
}
return &dockerNamespace{
dockerBackend: d,
namespace: namespace,
network: nw,
hostIP: hostIP,
labels: d.labels,
terminate: term,
containers: map[string]*containerInfo{},
}, nil
}
// see: https://github.com/testcontainers/testcontainers-go/blob/34481cf9027b79aaad4f6aa2dbdb7091dd9c49fb/docker.go#L1245
func resolveHostIP(daemonHost string, ipamConfig network.IPAM) (string, error) {
hostURL, err := url.Parse(daemonHost)
if err != nil {
return "", err
}
switch hostURL.Scheme {
case "http", "https", "tcp":
return hostURL.Hostname(), nil
case "unix", "npipe":
if _, err := os.Stat("/.dockerenv"); err == nil { // inside a container
// Use "host.docker.internal" if enabled.
addr, err := net.ResolveIPAddr("", "host.docker.internal")
if err == nil {
return addr.String(), nil
}
// Use Gateway IP of bridge network.
// This doesn't work in Docker Desktop for Mac.
for _, cfg := range ipamConfig.Config {
if cfg.Gateway != "" {
return cfg.Gateway, nil
}
}
}
return "localhost", nil
default:
return "", fmt.Errorf("unknown scheme found in daemon host: %s", hostURL.String())
}
}
func (d *dockerBackend) BuildImage(ctx context.Context, buildContext io.Reader, buildOptions types.ImageBuildOptions, force bool, buildOut io.Writer) (err error) {
image := buildOptions.Tags[0]
if !force {
// check if the same image already exists
summaries, err := d.cli.ImageList(ctx, types.ImageListOptions{
All: true,
})
if err != nil {
return err
}
for _, s := range summaries {
for _, t := range s.RepoTags {
if t == image {
return nil
}
}
}
}
resp, err := d.cli.ImageBuild(ctx, buildContext, buildOptions)
if err != nil {
return err
}
defer func() {
err = multierr.Append(err, resp.Body.Close())
}()
return handleJSONMessageStream(buildOut, resp.Body)
}
func createArchive(ctxDir, dockerfilePath string) (io.ReadCloser, string, error) {
absContextDir, relDockerfile, err := build.GetContextFromLocalDir(ctxDir, dockerfilePath)
if err != nil {
return nil, "", err
}
excludes, err := build.ReadDockerignore(absContextDir)
if err != nil {
return nil, "", err
}
// We have to include docker-ignored Dockerfile and .dockerignore for build.
// When `ADD` or `COPY` executes, daemon excludes these docker-ignored files.
excludes = build.TrimBuildFilesFromExcludes(excludes, relDockerfile, false)
err = build.ValidateContextDirectory(absContextDir, excludes)
if err != nil {
return nil, "", err
}
tarball, err := archive.TarWithOptions(absContextDir, &archive.TarOptions{
ExcludePatterns: excludes,
Compression: archive.Uncompressed,
NoLchown: true,
})
if err != nil {
return nil, "", err
}
return tarball, relDockerfile, nil
}
var _ Backend = (*dockerBackend)(nil)
type dockerNamespace struct {
*dockerBackend
namespace string
network *types.NetworkResource
hostIP string
labels map[string]string
m sync.RWMutex
terminate []func(ctx context.Context) error
containers map[string]*containerInfo
}
type containerInfo struct {
containerID string
container *container.Config
host *container.HostConfig
network *network.NetworkingConfig
ports Ports
wait *wait.Waiter
running bool
}
func (d *dockerNamespace) Namespace() string {
return d.namespace
}
func (d *dockerNamespace) Network() *types.NetworkResource {
return d.network
}
func (d *dockerNamespace) CreateContainer(
ctx context.Context, name string, container *container.Config,
host *container.HostConfig, networking *network.NetworkingConfig, configConsistency bool,
wait *wait.Waiter, pullOptions *types.ImagePullOptions, pullOut io.Writer,
) (string, error) {
var err error
// merge labels
if container.Labels == nil {
container.Labels = d.labels
} else {
for k, v := range d.labels {
container.Labels[k] = v
}
}
d.m.Lock()
defer d.m.Unlock()
fullName := "/" + name
c, ok := d.containers[name]
if ok {
if configConsistency {
err = checkConfigConsistency(
container, c.container,
host, c.host,
networking.EndpointsConfig, c.network.EndpointsConfig,
)
}
return c.containerID, err
}
containers, err := d.cli.ContainerList(ctx, types.ContainerListOptions{
All: true, // contains exiting/paused images
Filters: filters.NewArgs(
filters.Arg("name", name),
),
})
if err != nil {
return "", err
}
var existing *types.Container
LOOP:
for _, c := range containers {
for _, n := range c.Names {
if fullName == n {
if c.Image != container.Image {
return "", errors.New(containerNameConflict(name, container.Image, c.Image))
}
existing = &c
break LOOP
}
}
}
// try pull image when container not exists
if existing == nil && pullOptions != nil {
err := d.pull(ctx, container.Image, *pullOptions, pullOut)
if err != nil {
return "", err
}
}
var containerID string
var connected bool
if existing != nil {
if d.policy == ResourcePolicyError {
return "", fmt.Errorf("dockerNamespace: container %q(%s) already exists", name, container.Image)
}
info, err := d.cli.ContainerInspect(ctx, existing.ID)
if err != nil {
return "", err
}
if configConsistency {
err = checkConfigConsistency(
container, info.Config,
host, info.HostConfig,
networking.EndpointsConfig, info.NetworkSettings.Networks,
)
if err != nil {
return "", err
}
}
switch existing.State {
case "running", "created":
containerID = existing.ID
var found bool
for _, setting := range existing.NetworkSettings.Networks {
if setting.NetworkID == d.network.ID {
found = true
break
}
}
if !found {
err := d.cli.NetworkConnect(ctx, d.network.ID, containerID, &network.EndpointSettings{
NetworkID: d.network.ID,
Aliases: []string{
strings.TrimPrefix(name, d.namespace),
},
})
if err != nil {
return "", err
}
connected = true
}
case "paused":
// MEMO: bound port is still existing
return "", fmt.Errorf("dockerNamespace: cannot start %q, unpause is not supported", name)
default:
return "", fmt.Errorf("dockerNamespace: cannot start %q, unexpected container state %q", name, existing.State)
}
} else {
created, err := d.cli.ContainerCreate(ctx, container, host, networking, nil, name)
if err != nil {
return "", err
}
containerID = created.ID
}
if (existing == nil && d.policy != ResourcePolicyReusable) || d.policy == ResourcePolicyTakeOver {
d.terminate = append(d.terminate, func(ctx context.Context) error {
return d.cli.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{
Force: true,
RemoveVolumes: true,
})
})
} else if connected {
d.terminate = append(d.terminate, func(ctx context.Context) error {
return d.cli.NetworkDisconnect(ctx, d.network.ID, containerID, true)
})
}
d.containers[name] = &containerInfo{
containerID: containerID,
container: container,
host: host,
network: networking,
wait: wait,
running: false,
}
return containerID, nil
}
func (d *dockerNamespace) pull(ctx context.Context, image string, pullOptions types.ImagePullOptions, out io.Writer) (err error) {
rc, err := d.cli.ImagePull(ctx, image, pullOptions)
if err != nil {
return err
}
defer func() {
err = multierr.Append(err, rc.Close())
}()
return handleJSONMessageStream(out, rc)
}
func containerNameConflict(name, wantImage, gotImage string) string {
return fmt.Sprintf("container name %q already exists but image is not %q(%q)", name, wantImage, gotImage)
}
func (d *dockerNamespace) containerPortMap(ctx context.Context, containerID string, requiredPorts nat.PortSet) (nat.PortMap, error) {
if len(requiredPorts) == 0 {
return nat.PortMap{}, nil
}
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
defer cancel()
}
b := backoff.Constant(
backoff.WithInterval(200*time.Millisecond),
backoff.WithMaxRetries(0),
).Start(ctx)
retry:
for backoff.Continue(b) {
i, err := d.cli.ContainerInspect(ctx, containerID)
if err != nil {
return nil, err
}
for p, bindings := range i.NetworkSettings.Ports {
if _, ok := requiredPorts[p]; !ok {
continue
} else if len(bindings) == 0 {
// endpoint not bound yet
continue retry
}
// Use this port. Replace host ip.
for i := range bindings {
bindings[i].HostIP = d.hostIP
}
}
return i.NetworkSettings.Ports, nil
}
return nil, errors.New("cannot get endpoints")
}
func (d *dockerNamespace) StartContainer(ctx context.Context, name string) (Ports, error) {
d.m.RLock()
c, ok := d.containers[name]
d.m.RUnlock()
if !ok {
return nil, fmt.Errorf("dockerNamespace: container %q not found", name)
} else if c.running {
return c.ports, nil
}
err := d.cli.ContainerStart(ctx, c.containerID, types.ContainerStartOptions{})
if err != nil {
return nil, err
}
portMap, err := d.containerPortMap(ctx, c.containerID, c.container.ExposedPorts)
if err != nil {
return nil, err
}
if c.wait != nil {
err = c.wait.Wait(ctx, &fetcher{
cli: d.cli,
containerID: c.containerID,
ports: portMap,
})
if err != nil {
return nil, err
}
}
c.running = true
c.ports = Ports(portMap)
return c.ports, nil
}
func (d *dockerNamespace) Release(ctx context.Context) error {
d.m.Lock()
defer d.m.Unlock()
var err error
last := len(d.terminate) - 1
for i := range d.terminate {
err = multierr.Append(err, d.terminate[last-i](ctx))
}
return err
}
var _ Namespace = (*dockerNamespace)(nil)
// write stream message line by line
func handleJSONMessageStream(dst io.Writer, src io.Reader) error {
dec := json.NewDecoder(src)
buf := &bytes.Buffer{}
for {
var msg jsonmessage.JSONMessage
err := dec.Decode(&msg)
if err != nil {
if err == io.EOF {
break
}
return err
}
err = msg.Display(buf, false)
if err != nil {
return err
}
_, err = dst.Write(buf.Bytes())
if err != nil {
return err
}
buf.Reset()
}
return nil
}
// wait.Fetcher implementation
type fetcher struct {
cli *client.Client
containerID string
ports nat.PortMap
}
func (f *fetcher) ContainerID() string {
return f.containerID
}
func (f *fetcher) Status(ctx context.Context) (*types.ContainerState, error) {
i, err := f.cli.ContainerInspect(ctx, f.containerID)
if err != nil {
return nil, err
}
return i.State, nil
}
func (f *fetcher) Ports() nat.PortMap {
return f.ports
}
func (f *fetcher) Log(ctx context.Context) (io.ReadCloser, error) {
return f.cli.ContainerLogs(ctx, f.containerID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
})
}
func (f *fetcher) Exec(ctx context.Context, cmd ...string) ([]byte, error) {
r, err := f.cli.ContainerExecCreate(ctx, f.containerID, types.ExecConfig{
AttachStderr: true,
AttachStdout: true,
Cmd: cmd,
})
if err != nil {
return nil, err
}
hijackedResp, err := f.cli.ContainerExecAttach(ctx, r.ID, types.ExecStartCheck{})
if err != nil {
return nil, err
}
defer hijackedResp.Close()
buf := bytes.NewBuffer(nil)
_, err = stdcopy.StdCopy(buf, buf, hijackedResp.Reader)
if err != nil {
return nil, err
}
info, err := f.cli.ContainerExecInspect(ctx, r.ID)
if err != nil {
return nil, err
}
if info.ExitCode != 0 {
return nil, &ExitError{
ExitCode: info.ExitCode,
}
}
return buf.Bytes(), nil
}
var _ wait.Fetcher = (*fetcher)(nil)
package main
import (
"context"
"flag"
"os"
"github.com/daichitakahashi/confort/internal/cmd"
)
func main() {
ctx := context.Background()
command := cmd.NewCommands(
flag.CommandLine,
cmd.NewOperation(),
)
flag.Parse()
os.Exit(
int(command.Execute(ctx)),
)
}
package confort
import (
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/goccy/go-reflect"
"github.com/google/go-cmp/cmp"
"go.uber.org/multierr"
)
func checkConfigConsistency(
container1, container2 *container.Config,
host1, host2 *container.HostConfig,
network1, network2 map[string]*network.EndpointSettings,
) (err error) {
if err = checkContainerConfigConsistency(container2, container1); err != nil {
return fmt.Errorf("inconsistent container config\n %w", err)
}
if err = checkHostConfigConsistency(host2, host1); err != nil {
return fmt.Errorf("inconsistent host config\n%w", err)
}
if err = checkEndpointSettingsConsistency(network2, network1); err != nil {
return fmt.Errorf("inconsistent network config\n%w", err)
}
return nil
}
func checkContainerConfigConsistency(expected, target *container.Config) error {
return multierr.Combine(
stringSubset("Hostname", expected.Hostname, target.Hostname),
stringSubset("Hostname", expected.Hostname, target.Hostname),
stringSubset("Domainname", expected.Domainname, target.Domainname),
stringSubset("User", expected.User, target.User),
// AttachStdin
// AttachStdout
// AttachStderr
mapSubset("ExposedPorts", expected.ExposedPorts, target.ExposedPorts),
// Tty
// OpenStdin
// StdinOnce
sliceSubset("Env", expected.Env, target.Env),
sequentialSubset("Cmd", expected.Cmd, target.Cmd),
pointerSubset("Healthcheck", expected.Healthcheck, target.Healthcheck),
equals("ArgsEscaped", expected.ArgsEscaped, target.ArgsEscaped),
equals("Image", expected.Image, target.Image),
mapSubset("Volumes", expected.Volumes, target.Volumes),
equals("WorkingDir", expected.WorkingDir, target.WorkingDir),
sequentialSubset("Entrypoint", expected.Entrypoint, target.Entrypoint),
// NetworkDisabled
stringSubset("MacAddress", expected.MacAddress, target.MacAddress),
// OnBuild
mapSubset("Labels", expected.Labels, target.Labels),
stringSubset("StopSignal", expected.StopSignal, target.StopSignal),
pointerSubset("StopTimeout", expected.StopTimeout, target.StopTimeout),
sequentialSubset("Shell", expected.Shell, target.Shell),
)
}
func checkHostConfigConsistency(expected, target *container.HostConfig) error {
return multierr.Combine(
sliceSubset("Binds", expected.Binds, target.Binds),
// ContainerIDFile string
// LogConfig
stringSubset("NetworkMode", string(expected.NetworkMode), string(target.NetworkMode)),
// TODO: ("PortBindings", expected.PortBindings, target.PortBindings),
stringSubset("RestartPolicy", expected.RestartPolicy.Name, target.RestartPolicy.Name),
// AutoRemove
stringSubset("VolumeDriver", expected.VolumeDriver, target.VolumeDriver),
sliceSubset("VolumesFrom", expected.VolumesFrom, target.VolumesFrom),
// Applicable to UNIX platforms
sliceSubset("CapAdd", expected.CapAdd, target.CapAdd),
sliceSubset("CapDrop", expected.CapDrop, target.CapDrop),
stringSubset("CgroupsMode", string(expected.CgroupnsMode), string(target.CgroupnsMode)),
sliceSubset("DNS", expected.DNS, target.DNS),
sliceSubset("DNSOptions", expected.DNSOptions, target.DNSOptions),
sliceSubset("DNSSearch", expected.DNSSearch, target.DNSSearch),
sliceSubset("ExtraHosts", expected.ExtraHosts, target.ExtraHosts),
sliceSubset("GroupAdd", expected.GroupAdd, target.GroupAdd),
stringSubset("IpcMode", string(expected.IpcMode), string(target.IpcMode)),
stringSubset("Cgroup", string(expected.Cgroup), string(target.Cgroup)),
sliceSubset("Links", expected.Links, target.Links),
// OomScoreAdj
stringSubset("PidMode", string(expected.PidMode), string(target.PidMode)),
equals("Privileged", expected.Privileged, target.Privileged),
equals("PublishAllPorts", expected.PublishAllPorts, target.PublishAllPorts),
equals("ReadonlyRootfs", expected.ReadonlyRootfs, target.ReadonlyRootfs),
sliceSubset("SecurityOpt", expected.SecurityOpt, target.SecurityOpt),
mapSubset("StorageOpt", expected.StorageOpt, target.StorageOpt),
mapSubset("Tmpfs", expected.Tmpfs, target.Tmpfs),
// UTSMode
// UsernsMode
// ShmSize
// Sysctls
// Runtime
// Applicable to Windows
// ConsoleSize
// Isolation
// Contains container's resources (cgroups, ulimits)
// Resources
// Mounts specs used by the container
// TODO: ("Mounts", expected.Mounts, target.Mounts),
// MaskedPaths is the list of paths to be masked inside the container (this overrides the default set of paths)
// MaskedPaths
// ReadonlyPaths is the list of paths to be set as read-only inside the container (this overrides the default set of paths)
// ReadonlyPaths
// Run a custom init inside the container, if null, use the daemon's configured settings
// Init
)
}
func checkEndpointSettingsConsistency(expected, target map[string]*network.EndpointSettings) (err error) {
for networkName, settings := range target {
var e error
if expectedSettings, ok := expected[networkName]; !ok {
// e = fmt.Errorf("%s: %s", networkName, cmp.Diff(expectedSettings, settings))
} else {
e = multierr.Combine(
pointerSubset("IPAMConfig", expectedSettings.IPAMConfig, settings.IPAMConfig),
sliceSubset("Links", expectedSettings.Links, settings.Links),
sliceSubset("Aliases", expectedSettings.Aliases, settings.Aliases),
equals("NetworkID", expectedSettings.NetworkID, settings.NetworkID),
// EndpointID
stringSubset("Gateway", expectedSettings.Gateway, settings.Gateway),
stringSubset("IPAddress", expectedSettings.IPAddress, settings.IPAddress),
// equals("IPPrefixLen", expectedSettings.IPPrefixLen, settings.IPPrefixLen),
stringSubset("IPv6Gateway", expectedSettings.IPv6Gateway, settings.IPv6Gateway),
stringSubset("GlobalIPv6Address", expectedSettings.GlobalIPv6Address, settings.GlobalIPv6Address),
// equals("GlobalIPv6PrefixLen", expectedSettings.GlobalIPv6PrefixLen, settings.GlobalIPv6PrefixLen),
stringSubset("MacAddress", expectedSettings.MacAddress, settings.MacAddress),
mapSubset("DriverOpts", expectedSettings.DriverOpts, settings.DriverOpts),
)
}
if e != nil {
err = multierr.Append(err, fmt.Errorf("%s: %w", networkName, e))
}
}
return err
}
func stringSubset(name, expected, target string) (err error) {
if target == "" || target == expected {
return nil
}
return diffError(name, expected, target)
}
func sliceSubset[T comparable](name string, expected, target []T) (err error) {
if len(target) == 0 {
return nil
} else if len(expected) == 0 {
return diffError(name, expected, target)
}
exp := make(map[T]bool)
for _, t := range expected {
exp[t] = true
}
for _, t := range target {
if !exp[t] {
return diffError(name, expected, target)
}
}
return nil
}
func sequentialSubset[T comparable](name string, expected, target []T) (err error) {
if len(target) == 0 {
return nil
} else if len(expected) == 0 || !reflect.DeepEqual(expected, target) {
return diffError(name, expected, target)
}
return nil
}
func mapSubset[K, V comparable](name string, expected, target map[K]V) error {
if len(target) == 0 {
return nil
} else if len(expected) == 0 {
return diffError(name, expected, target)
}
for k, v := range target {
e, ok := expected[k]
if !ok || e != v {
return diffError(name, expected, target)
}
}
return nil
}
func pointerSubset[T any](name string, expected, target *T) (err error) {
if target == nil {
return nil
} else if expected == nil || !reflect.DeepEqual(expected, target) {
return diffError(name, expected, target)
}
return nil
}
func equals(name string, expected, target any) error {
diff := cmp.Diff(expected, target)
if diff != "" {
return fmt.Errorf("%s\n%s", name, diff)
}
return nil
}
func diffError(msg string, expected, target any) error {
diff := cmp.Diff(expected, target)
return fmt.Errorf("%s\n%s", msg, diff)
}
package confort
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
"time"
"github.com/daichitakahashi/confort/internal/beacon"
"github.com/daichitakahashi/confort/internal/beacon/proto"
"github.com/daichitakahashi/confort/internal/exclusion"
"github.com/daichitakahashi/confort/internal/logging"
"github.com/daichitakahashi/confort/wait"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/lestrrat-go/option"
"go.uber.org/multierr"
)
var initOnce sync.Once
func lazyInit() {
initOnce.Do(func() {
v, ok := os.LookupEnv(beacon.LogLevelEnv)
if !ok {
return
}
i, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("confort: failed to parse the value of %s, default value is 2", beacon.LogLevelEnv)
return
}
logging.SetLogLevel(logging.LogLevel(i))
})
}
type Confort struct {
backend Backend
namespace Namespace
cli *client.Client
defaultTimeout time.Duration
ex exclusion.Control
term func() error
}
type (
NewOption interface {
option.Interface
new() NewOption
}
identOptionClientOptions struct{}
identOptionNamespace struct{}
namespaceOption struct {
namespace string
force bool
}
identOptionDefaultTimeout struct{}
identOptionResourcePolicy struct{}
identOptionBeacon struct{}
newOption struct{ option.Interface }
)
func (o newOption) new() NewOption { return o }
// WithClientOptions sets options for Docker API client.
// Default option is client.FromEnv.
// For detail, see client.NewClientWithOpts.
func WithClientOptions(opts ...client.Opt) NewOption {
return newOption{
Interface: option.New(identOptionClientOptions{}, opts),
}.new()
}
// WithNamespace specifies namespace of Confort.
// Default namespace is the value of the CFT_NAMESPACE environment variable.
// The "confort test" command has "-namespace" option that overrides the variable.
// If force is true, the value of the argument namespace takes precedence.
//
// If neither CFT_NAMESPACE nor WithNamespace is set, New fails.
func WithNamespace(namespace string, force bool) NewOption {
return newOption{
Interface: option.New(identOptionNamespace{}, namespaceOption{
namespace: namespace,
force: force,
}),
}.new()
}
// WithDefaultTimeout sets the default timeout for each request to the Docker API and beacon server.
// The default value of the "default timeout" is 1 min.
// If default timeout is 0, Confort doesn't apply any timeout for ctx.
//
// If a timeout has already been set to ctx, the default timeout is not applied.
func WithDefaultTimeout(d time.Duration) NewOption {
return newOption{
Interface: option.New(identOptionDefaultTimeout{}, d),
}.new()
}
// WithResourcePolicy overrides the policy for handling Docker resources that already exist,
// such as containers and networks.
// By default, ResourcePolicyReuse or the value of the CFT_RESOURCE_POLICY environment variable, if set, is used.
// The "confort test" command has "-policy" option that overrides the variable.
func WithResourcePolicy(s ResourcePolicy) NewOption {
return newOption{
Interface: option.New(identOptionResourcePolicy{}, s),
}.new()
}
// WithBeacon configures Confort to integrate with a starting beacon server.
// The beacon server is started by the "confort" command.
// The address of server will be read from CFT_BEACON_ADDR or lock file specified as CFT_LOCKFILE.
//
// # With `confort test` command
//
// This command starts beacon server and sets the address as CFT_BEACON_ADDR automatically.
//
// # With `confort start` command
//
// This command starts beacon server and creates a lock file that contains the address.
// The default filename is ".confort.lock" and you don't need to set the file name as CFT_LOCKFILE.
// If you set a custom filename with "-lock-file" option, also you have to set the file name as CFT_LOCKFILE,
// or you can set address that read from lock file as CFT_BEACON_ADDR.
func WithBeacon() NewOption {
return newOption{
Interface: option.New(identOptionBeacon{}, true),
}.new()
}
// New creates Confort instance which is an interface of controlling containers.
// Confort creates docker resources like a network and containers. Also, it
// provides an exclusion control of container usage.
//
// If you want to control the same containers across parallelized tests, enable
// integration with the beacon server by using `confort` command and WithBeacon
// option.
func New(ctx context.Context, opts ...NewOption) (cft *Confort, err error) {
lazyInit()
var (
skipDeletion bool
beaconConn *beacon.Connection
ex = exclusion.NewControl()
clientOps = []client.Opt{
client.FromEnv,
}
namespace = os.Getenv(beacon.NamespaceEnv)
timeout = time.Minute
policy ResourcePolicy
)
if s := os.Getenv(beacon.ResourcePolicyEnv); s != "" {
policy = ResourcePolicy(s)
}
for _, opt := range opts {
switch opt.Ident() {
case identOptionClientOptions{}:
clientOps = opt.Value().([]client.Opt)
case identOptionNamespace{}:
o := opt.Value().(namespaceOption)
if namespace == "" || o.force {
if namespace != "" {
logging.Infof("namespace is overwritten by WithNamespace: %q -> %q", namespace, o.namespace)
}
namespace = o.namespace
}
case identOptionDefaultTimeout{}:
timeout = opt.Value().(time.Duration)
case identOptionResourcePolicy{}:
newPolicy := opt.Value().(ResourcePolicy)
if policy != "" && policy != newPolicy {
logging.Infof("resource policy is overwritten by WithResourcePolicy: %q -> %q", policy, newPolicy)
}
policy = newPolicy
case identOptionBeacon{}:
conn, err := beacon.Connect(ctx)
if err != nil {
return nil, err
}
if conn.Enabled() {
ex = exclusion.NewBeaconControl(
proto.NewBeaconServiceClient(conn.Conn),
)
skipDeletion = true
beaconConn = conn
}
}
}
if beaconConn != nil {
defer func() {
if err != nil {
_ = beaconConn.Close()
}
}()
}
if namespace == "" {
return nil, errors.New("confort: empty namespace")
}
logging.Debugf("namespace: %s", namespace)
if policy == "" {
policy = ResourcePolicyReuse // default
}
if !beacon.ValidResourcePolicy(string(policy)) {
return nil, fmt.Errorf("confort: invalid resource policy: %s", policy)
}
logging.Debugf("resource policy: %s", policy)
ctx, cancel := applyTimeout(ctx, timeout)
defer cancel()
cli, err := client.NewClientWithOpts(clientOps...)
if err != nil {
return nil, fmt.Errorf("confort: %w", err)
}
cli.NegotiateAPIVersion(ctx)
var beaconAddr string
if beaconConn.Enabled() {
beaconAddr = beaconConn.Addr
}
backend := &dockerBackend{
cli: cli,
policy: policy,
labels: map[string]string{
beacon.LabelIdentifier: beacon.Identifier(beaconAddr),
},
}
logging.Debug("acquire LockForNamespace")
unlock, err := ex.LockForNamespace(ctx)
if err != nil {
return nil, fmt.Errorf("confort: %w", err)
}
defer func() {
logging.Debug("release LockForNamespace")
unlock()
}()
logging.Debugf("create namespace %q", namespace)
ns, err := backend.Namespace(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("confort: %w", err)
}
term := func() error {
var err error
if beaconConn.Enabled() {
// TODO: disconnected from beacon server
err = beaconConn.Close()
}
if skipDeletion {
return err
}
// release all resources
logging.Debugf("release all resources bound with namespace %q", namespace)
return multierr.Append(err, ns.Release(context.Background()))
}
return &Confort{
backend: backend,
namespace: ns,
cli: cli,
defaultTimeout: timeout,
ex: ex,
term: term,
}, nil
}
// Close releases all created resources with cft.
func (cft *Confort) Close() error {
return cft.term()
}
func applyTimeout(ctx context.Context, defaultTimeout time.Duration) (context.Context, context.CancelFunc) {
if defaultTimeout == 0 {
return ctx, func() {}
}
_, ok := ctx.Deadline()
if ok {
return ctx, func() {}
}
return context.WithTimeout(ctx, defaultTimeout)
}
// APIClient returns client.APIClient used by Confort.
func (cft *Confort) APIClient() *client.Client {
return cft.cli
}
// Namespace returns namespace associated with cft.
func (cft *Confort) Namespace() string {
return cft.namespace.Namespace()
}
type (
BuildOption interface {
option.Interface
build() BuildOption
}
identOptionImageBuildOptions struct{}
identOptionForceBuild struct{}
identOptionBuildOutput struct{}
buildOption struct{ option.Interface }
)
func (o buildOption) build() BuildOption { return o }
// WithImageBuildOptions modifies the configuration of build.
// The argument `option` already contains required values, according to Build.
func WithImageBuildOptions(f func(option *types.ImageBuildOptions)) BuildOption {
return buildOption{
Interface: option.New(identOptionImageBuildOptions{}, f),
}.build()
}
// WithForceBuild forces to build an image even if it already exists.
func WithForceBuild() BuildOption {
return buildOption{
Interface: option.New(identOptionForceBuild{}, true),
}.build()
}
// WithBuildOutput sets dst that the output during build will be written.
func WithBuildOutput(dst io.Writer) BuildOption {
return buildOption{
Interface: option.New(identOptionBuildOutput{}, dst),
}.build()
}
type BuildParams struct {
Image string
Dockerfile string
ContextDir string
BuildArgs map[string]*string
// RegistryAuth sets authentication config per registry host.
//
// BuildParam{
// RegistryAuth: map[string] types.AuthConfig {
// "https://your.docker.registry.com": {
// Username: "your_user",
// Password: "your_password",
// },
// }
// }
RegistryAuth map[string]types.AuthConfig
Platform string
}
// Build creates new image from given Dockerfile and context directory.
//
// When same name image already exists, it doesn't perform building.
// WithForceBuild enables us to build image on every call of Build.
func (cft *Confort) Build(ctx context.Context, b *BuildParams, opts ...BuildOption) error {
buildOut := io.Discard
ctx, cancel := applyTimeout(ctx, cft.defaultTimeout)
defer cancel()
var (
modifyBuildOptions func(option *types.ImageBuildOptions)
force bool
)
for _, opt := range opts {
switch opt.Ident() {
case identOptionImageBuildOptions{}:
modifyBuildOptions = opt.Value().(func(option *types.ImageBuildOptions))
case identOptionForceBuild{}:
force = opt.Value().(bool)
case identOptionBuildOutput{}:
out := opt.Value().(io.Writer)
if out != nil {
buildOut = out
}
}
}
tarball, relDockerfile, err := createArchive(b.ContextDir, b.Dockerfile)
if err != nil {
return fmt.Errorf("confort: %w", err)
}
defer func() {
_, _ = io.Copy(io.Discard, tarball)
}()
buildOption := types.ImageBuildOptions{
Tags: []string{b.Image},
SuppressOutput: buildOut == io.Discard,
Remove: true,
PullParent: true,
Dockerfile: relDockerfile,
BuildArgs: b.BuildArgs,
AuthConfigs: b.RegistryAuth,
Target: "",
SessionID: "",
Platform: b.Platform,
}
if modifyBuildOptions != nil {
modifyBuildOptions(&buildOption)
}
if len(buildOption.Tags) == 0 {
return errors.New("confort: image tag not specified")
}
logging.Debugf("LockForBuild: %s", buildOption.Tags[0])
unlock, err := cft.ex.LockForBuild(ctx, buildOption.Tags[0])
if err != nil {
return fmt.Errorf("confort: %w", err)
}
defer func() {
logging.Debugf("release LockForBuild: %s", buildOption.Tags[0])
unlock()
}()
logging.Debugf("build image %q", buildOption.Tags[0])
err = cft.backend.BuildImage(ctx, tarball, buildOption, force, buildOut)
if err != nil {
return fmt.Errorf("confort: %w", err)
}
return nil
}
type ContainerParams struct {
Name string
Image string
Env map[string]string
Entrypoint []string
Cmd []string
WorkingDir string
ExposedPorts []string
StopTimeout *int
Mounts []mount.Mount
Waiter *wait.Waiter
}
func (cft *Confort) createContainer(ctx context.Context, name, alias string, c *ContainerParams, opts ...RunOption) (string, error) {
var modifyContainer func(config *container.Config)
var modifyHost func(config *container.HostConfig)
var modifyNetworking func(config *network.NetworkingConfig)
var checkConsistency bool
var pullOpts *types.ImagePullOptions
pullOut := io.Discard
for _, opt := range opts {
switch opt.Ident() {
case identOptionContainerConfig{}:
modifyContainer = opt.Value().(func(config *container.Config))
case identOptionHostConfig{}:
modifyHost = opt.Value().(func(config *container.HostConfig))
case identOptionNetworkingConfig{}:
modifyNetworking = opt.Value().(func(config *network.NetworkingConfig))
case identOptionConfigConsistency{}:
checkConsistency = opt.Value().(bool)
case identOptionPullOption{}:
o := opt.Value().(pullOptions)
pullOpts = o.pullOption
if o.pullOut != nil {
pullOut = o.pullOut
}
}
}
portSet, portBindings, err := nat.ParsePortSpecs(c.ExposedPorts)
if err != nil {
return "", err
}
env := make([]string, 0, len(c.Env))
for envKey, envVar := range c.Env {
env = append(env, envKey+"="+envVar)
}
cc := &container.Config{
Image: c.Image,
ExposedPorts: portSet,
Env: env,
Cmd: c.Cmd,
Entrypoint: c.Entrypoint,
WorkingDir: c.WorkingDir,
StopTimeout: c.StopTimeout,
}
if modifyContainer != nil {
modifyContainer(cc)
}
hc := &container.HostConfig{
PortBindings: portBindings,
AutoRemove: true,
Mounts: c.Mounts,
}
if modifyHost != nil {
modifyHost(hc)
}
nw := cft.namespace.Network()
nc := &network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
nw.Name: {
NetworkID: nw.ID,
Aliases: []string{alias},
},
},
}
if modifyNetworking != nil {
modifyNetworking(nc)
}
return cft.namespace.CreateContainer(ctx, name, cc, hc, nc, checkConsistency, c.Waiter, pullOpts, pullOut)
}
type (
RunOption interface {
option.Interface
run() RunOption
}
identOptionContainerConfig struct{}
identOptionHostConfig struct{}
identOptionNetworkingConfig struct{}
identOptionConfigConsistency struct{}
identOptionPullOption struct{}
pullOptions struct {
pullOption *types.ImagePullOptions
pullOut io.Writer
}
runOption struct{ option.Interface }
)
func (o runOption) run() RunOption { return o }
// WithContainerConfig modifies the configuration of container.
// The argument `config` already contains required values to create container,
// apply your values with care.
func WithContainerConfig(f func(config *container.Config)) RunOption {
return runOption{
Interface: option.New(identOptionContainerConfig{}, f),
}.run()
}
// WithHostConfig modifies the configuration of container from host side.
// The argument `config` already contains required values to create container,
// apply your values with care.
func WithHostConfig(f func(config *container.HostConfig)) RunOption {
return runOption{
Interface: option.New(identOptionHostConfig{}, f),
}.run()
}
// WithNetworkingConfig modifies the configuration of network.
// The argument `config` already contains required values to connecting to bridge network,
// and a container cannot join multi-networks on container creation.
func WithNetworkingConfig(f func(config *network.NetworkingConfig)) RunOption {
return runOption{
Interface: option.New(identOptionNetworkingConfig{}, f),
}.run()
}
// WithConfigConsistency enables/disables the test checking consistency of configurations.
// By default, this test is disabled.
// NOTICE: This is quite experimental feature.
func WithConfigConsistency(check bool) RunOption {
return runOption{
Interface: option.New(identOptionConfigConsistency{}, check),
}.run()
}
// WithPullOptions enables to pull image that not exists.
// For example, if you want to use an image hosted in private repository,
// you have to fill RegistryAuth field.
//
// The output will be written to `out`. If nil, io.Discard will be used.
func WithPullOptions(opts *types.ImagePullOptions, out io.Writer) RunOption {
return runOption{
Interface: option.New(identOptionPullOption{}, pullOptions{
pullOption: opts,
pullOut: out,
}),
}.run()
}
// Container represents a created container and its controller.
type Container struct {
cft *Confort
id string
name string
alias string
ports Ports
}
// ID returns its container id.
func (c *Container) ID() string { return c.id }
// Name returns an actual name of the container.
func (c *Container) Name() string { return c.name }
// Alias returns a host name of the container. The alias is valid only in
// a docker network created in New or attached by Confort.Run.
func (c *Container) Alias() string { return c.alias }
// Run starts container with given parameters.
// If container already exists and not started, it starts.
// It reuses already started container and its endpoint information.
//
// When container is already existing and connected to another network, Run and other
// methods let the container connect to this network and create alias.
// For now, without specifying host port, container loses the port binding occasionally.
// If you want to use port binding and use a container with several network,
// and encounter such trouble, give it a try.
func (cft *Confort) Run(ctx context.Context, c *ContainerParams, opts ...RunOption) (*Container, error) {
alias := c.Name
name := cft.namespace.Namespace() + c.Name
ctx, cancel := applyTimeout(ctx, cft.defaultTimeout)
defer cancel()
logging.Debugf("acquire LockForContainerSetup: %s", name)
unlock, err := cft.ex.LockForContainerSetup(ctx, name)
if err != nil {
return nil, fmt.Errorf("confort: %w", err)
}
defer func() {
logging.Debugf("release LockForContainerSetup: %s", name)
unlock()
}()
logging.Debugf("create container if not exists: %s", name)
containerID, err := cft.createContainer(ctx, name, alias, c, opts...)
if err != nil {
return nil, fmt.Errorf("confort: %w", err)
}
logging.Debugf("start container if not started: %s", name)
ports, err := cft.namespace.StartContainer(ctx, name)
if err != nil {
return nil, fmt.Errorf("confort: %w", err)
}
return &Container{
cft: cft,
id: containerID,
name: name,
alias: alias,
ports: ports,
}, nil
}
type (
UseOption interface {
option.Interface
use() UseOption
}
identOptionInitFunc struct{}
useOption struct {
option.Interface
}
)
func (o useOption) use() UseOption { return o }
type (
ReleaseFunc func()
InitFunc func(ctx context.Context, ports Ports) error
)
// WithInitFunc sets initializer to set up container using the given port.
// The init will be performed only once per container, executed with an exclusive lock.
// If you use a container with Confort.UseShared, the lock state is downgraded to the shared lock after init.
//
// The returned error makes the acquired lock released and testing.TB fail.
// After that, you can attempt to use the container and init again.
func WithInitFunc(init InitFunc) UseOption {
return useOption{
Interface: option.New(identOptionInitFunc{}, init),
}.use()
}
// Use acquires a lock for using the container and returns its endpoint. If exclusive is true, it requires to
// use the container exclusively.
// When other tests have already acquired an exclusive or shared lock for the container, it blocks until all
// previous locks are released.
func (c *Container) Use(ctx context.Context, exclusive bool, opts ...UseOption) (Ports, ReleaseFunc, error) {
var initFunc InitFunc
for _, opt := range opts {
switch opt.Ident() {
case identOptionInitFunc{}:
initFunc = opt.Value().(InitFunc)
}
}
var init func(ctx context.Context) error
if initFunc != nil {
init = func(ctx context.Context) error {
logging.Debugf("call InitFunc: %s", c.name)
return initFunc(ctx, c.ports)
}
}
// If initFunc is not nil, it will be called after acquisition of exclusive lock.
// After that, the lock is downgraded to shared lock when exclusive is false.
// When initFunc returns error, the acquisition of lock fails.
logging.Debugf("acquire LockForContainerUse: %s(exclusive=%t)", c.name, exclusive)
unlockContainer, err := c.cft.ex.LockForContainerUse(ctx, map[string]exclusion.ContainerUseParam{
c.name: {
Exclusive: exclusive,
Init: init,
},
})
if err != nil {
return nil, nil, fmt.Errorf("confort: %w", err)
}
release := func() {
logging.Debugf("release LockForContainerUse: %s(exclusive=%t)", c.name, exclusive)
unlockContainer()
}
return c.ports, release, nil
}
// UseExclusive acquires an exclusive lock for using the container explicitly and returns its endpoint.
func (c *Container) UseExclusive(ctx context.Context, opts ...UseOption) (Ports, ReleaseFunc, error) {
return c.Use(ctx, true, opts...)
}
// UseShared acquires a shared lock for using the container explicitly and returns its endpoint.
func (c *Container) UseShared(ctx context.Context, opts ...UseOption) (Ports, ReleaseFunc, error) {
return c.Use(ctx, false, opts...)
}
// Network returns docker network representation associated with Confort.
func (cft *Confort) Network() *types.NetworkResource {
return cft.namespace.Network()
}
type Acquirer struct {
targets []*Container
params map[string]exclusion.ContainerUseParam
}
// Acquire initiates the acquisition of locks of the multi-containers.
// To avoid the deadlock in your test cases, use Acquire as below:
//
// ports, release, err := Acquire().
// Use(container1, true).
// Use(container2, false, WithInitFunc(initContainer2)).
// Do(ctx)
// if err != nil {
// t.Fatal(err)
// }
// t.Cleanup(release)
//
// ports1 := ports[container1]
// ports2 := ports[container2]
//
// * Acquire locks of container1 and container2 at the same time
// * If either lock acquisition or initContainer2 fails, lock acquisition for all containers fails
// * If initContainer2 succeeded but acquisition failed, the successful result of init is preserved
// * Returned func releases all acquired locks
func Acquire() *Acquirer {
return &Acquirer{
params: map[string]exclusion.ContainerUseParam{},
}
}
// Use registers a container as the target of acquiring lock.
func (a *Acquirer) Use(c *Container, exclusive bool, opts ...UseOption) *Acquirer {
var initFunc InitFunc
for _, opt := range opts {
switch opt.Ident() {
case identOptionInitFunc{}:
initFunc = opt.Value().(InitFunc)
}
}
var init func(ctx context.Context) error
if initFunc != nil {
init = func(ctx context.Context) error {
logging.Debugf("call InitFunc: %s", c.name)
return initFunc(ctx, c.ports)
}
}
logging.Debugf("register target for LockForContainerUse: %s(exclusive=%t) to %p", c.name, exclusive, a)
a.targets = append(a.targets, c)
a.params[c.name] = exclusion.ContainerUseParam{
Exclusive: exclusive,
Init: init,
}
return a
}
// UseExclusive registers a container as the target of acquiring exclusive lock.
func (a *Acquirer) UseExclusive(c *Container, opts ...UseOption) *Acquirer {
return a.Use(c, true, opts...)
}
// UseShared registers a container as the target of acquiring shared lock.
func (a *Acquirer) UseShared(c *Container, opts ...UseOption) *Acquirer {
return a.Use(c, false, opts...)
}
// Do acquisition of locks.
func (a *Acquirer) Do(ctx context.Context) (map[*Container]Ports, ReleaseFunc, error) {
if len(a.targets) == 0 {
return nil, nil, errors.New("no targets")
}
ex := a.targets[0].cft.ex
logging.Debugf("acquire LockForContainerUse: %p", a)
release, err := ex.LockForContainerUse(ctx, a.params)
if err != nil {
return nil, nil, err
}
ports := map[*Container]Ports{}
for _, c := range a.targets {
ports[c] = c.ports
}
logging.Debugf("release LockForContainerUse: %p", a)
return ports, release, nil
}
package database
import (
"context"
"errors"
"os"
"strconv"
"testing"
"time"
"github.com/daichitakahashi/confort"
"github.com/daichitakahashi/confort/wait"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4/pgxpool"
)
type ConnectFunc func(tb testing.TB, ctx context.Context, exclusive bool) *pgxpool.Pool
const (
dbUser = "confort_test"
dbPassword = "confort_pass"
Database = dbUser
)
func InitDatabase(ctx context.Context) (_ ConnectFunc, _ func(), err error) {
cft, err := confort.New(ctx,
confort.WithBeacon(),
confort.WithNamespace("integrationtest", false),
)
if err != nil {
return nil, nil, err
}
term := func() {
_ = cft.Close()
}
defer func() {
if err != nil {
term()
}
}()
db, err := cft.Run(ctx, &confort.ContainerParams{
Name: "db",
Image: "postgres:14.4-alpine3.16",
Env: map[string]string{
"POSTGRES_USER": dbUser,
"POSTGRES_PASSWORD": dbPassword,
},
ExposedPorts: []string{"5432/tcp"},
Waiter: wait.Healthy(),
},
confort.WithPullOptions(&types.ImagePullOptions{}, os.Stderr),
confort.WithContainerConfig(func(config *container.Config) {
config.Healthcheck = &container.HealthConfig{
Test: []string{"CMD-SHELL", "pg_isready"},
Interval: 5 * time.Second,
Timeout: 3 * time.Second,
}
}),
)
if err != nil {
return nil, nil, err
}
return func(tb testing.TB, ctx context.Context, exclusive bool) *pgxpool.Pool {
tb.Helper()
ports, release, err := db.Use(ctx, exclusive, confort.WithInitFunc(func(ctx context.Context, ports confort.Ports) error {
cfg, err := configFromPorts(ports)
if err != nil {
return err
}
pool, err := Connect(ctx, cfg)
if err != nil {
return err
}
defer pool.Close()
return CreateTableIfNotExists(ctx, pool)
}))
if err != nil {
tb.Fatal(err)
}
tb.Cleanup(release)
cfg, err := configFromPorts(ports)
if err != nil {
tb.Fatal(err)
}
pool, err := Connect(ctx, cfg)
if err != nil {
tb.Fatal("ConnectFunc:", err)
}
tb.Cleanup(func() {
pool.Close()
})
return pool
}, term, nil
}
func configFromPorts(ports confort.Ports) (cfg pgconn.Config, err error) {
binding := ports.Binding("5432/tcp")
if binding.HostIP == "" {
return cfg, errors.New("port not found")
}
p, err := strconv.ParseUint(binding.HostPort, 10, 16)
if err != nil {
return cfg, err
}
return pgconn.Config{
Host: "127.0.0.1",
Port: uint16(p),
User: dbUser,
Password: dbPassword,
Database: Database,
}, nil
}
//go:generate go run github.com/kyleconroy/sqlc/cmd/sqlc@v1.14.0 generate
package database
import (
"context"
_ "embed"
"fmt"
"regexp"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4/pgxpool"
)
func Connect(ctx context.Context, c pgconn.Config) (*pgxpool.Pool, error) {
// "postgres://username:password@localhost:5432/database_name"
s := fmt.Sprintf("postgres://%s:%s@%s:%d/%s", c.User, c.Password, c.Host, c.Port, c.Database)
return pgxpool.Connect(ctx, s)
}
//go:embed schema.sql
var createTables string
var r = regexp.MustCompile(`(?sU)create table .+ \(.+\);`)
func CreateTableIfNotExists(ctx context.Context, conn *pgxpool.Pool) error {
rr := r.FindAllString(createTables, -1)
for _, sql := range rr {
_, err := conn.Exec(ctx, sql)
if err != nil {
return err
}
}
return nil
}
package confort
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"github.com/daichitakahashi/confort/internal/logging"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/lestrrat-go/option"
)
type ContainerExec struct {
c *Container
cmd []string
workingDir string
env []string
cli *client.Client
execID string
Stdout io.Writer
Stderr io.Writer
}
type (
execIdent interface{ exec() }
ExecOption interface {
option.Interface
execIdent
}
identOptionExecWorkingDir struct{}
identOptionExecEnv struct{}
execOption struct {
option.Interface
execIdent
}
)
// WithExecWorkingDir specifies working directory inside the container.
func WithExecWorkingDir(s string) ExecOption {
return execOption{
Interface: option.New(identOptionExecWorkingDir{}, s),
}
}
// WithExecEnv specifies environment variables using in the container.
func WithExecEnv(kv map[string]string) ExecOption {
list := make([]string, 0, len(kv))
for k, v := range kv {
list = append(list, fmt.Sprintf("%s=%s", k, v))
}
return execOption{
Interface: option.New(identOptionExecEnv{}, list),
}
}
// CreateExec creates new ContainerExec that executes the specified command on the container.
func (c *Container) CreateExec(ctx context.Context, cmd []string, opts ...ExecOption) (*ContainerExec, error) {
var (
workingDir string
execEnv []string
)
for _, opt := range opts {
switch opt.Ident() {
case identOptionExecWorkingDir{}:
workingDir = opt.Value().(string)
case identOptionExecEnv{}:
execEnv = opt.Value().([]string)
}
}
if _, err := c.cft.cli.ContainerInspect(ctx, c.id); err != nil {
return nil, err
}
return &ContainerExec{
c: c,
cmd: cmd,
workingDir: workingDir,
env: execEnv,
cli: c.cft.cli,
}, nil
}
// Start executes the command but does not wait for it to complete.
func (e *ContainerExec) Start(ctx context.Context) error {
if e.execID != "" {
return errors.New("confort: exec: already started")
}
logging.Debugf("exec on container %q: %v", e.c.name, e.cmd)
execConfig := types.ExecConfig{
Cmd: e.cmd,
WorkingDir: e.workingDir,
Env: e.env,
AttachStdout: e.Stdout != nil,
AttachStderr: e.Stderr != nil,
}
// When both stdout and stderr haven't attached, ContainerExecCreate behaves like a detached mode.
// So, to wait execution done, make stdout attached.
if !execConfig.AttachStdout && !execConfig.AttachStderr {
execConfig.AttachStdout = true
}
resp, err := e.cli.ContainerExecCreate(ctx, e.c.id, execConfig)
if err != nil {
return err
}
e.execID = resp.ID
return nil
}
type ExitError struct {
ExitCode int
}
func (e *ExitError) Error() string {
return fmt.Sprintf("confort: exec: exit status %d", e.ExitCode)
}
// Wait waits for the specified command to exit and waits for copying from stdout or stderr to complete.
// The command must have been started by Start.
// The returned error is nil if the command runs, has no problems copying stdin, stdout, and stderr, and exits with a zero exit status.
// If the command fails to run or doesn't complete successfully, the error is of type *ExitError.
func (e *ContainerExec) Wait(ctx context.Context) error {
if e.execID == "" {
return errors.New("confort: exec: not started")
}
hijackedResp, err := e.cli.ContainerExecAttach(ctx, e.execID, types.ExecStartCheck{})
if err != nil {
return err
}
defer hijackedResp.Close()
var (
stdout = io.Discard
stderr = io.Discard
)
if e.Stdout != nil {
stdout = e.Stdout
}
if e.Stderr != nil {
stderr = e.Stderr
}
_, err = stdcopy.StdCopy(stdout, stderr, hijackedResp.Reader)
if err != nil {
return err
}
info, err := e.cli.ContainerExecInspect(ctx, e.execID)
if err != nil {
return err
}
if info.ExitCode != 0 {
return &ExitError{
ExitCode: info.ExitCode,
}
}
return nil
}
// Run starts the specified command and waits for it to complete.
func (e *ContainerExec) Run(ctx context.Context) error {
err := e.Start(ctx)
if err != nil {
return err
}
return e.Wait(ctx)
}
// Output runs the command and returns its standard output.
func (e *ContainerExec) Output(ctx context.Context) ([]byte, error) {
if e.Stdout != nil {
return nil, errors.New("confort: exec: Stdout already set")
}
buf := bytes.NewBuffer(nil)
e.Stdout = buf
err := e.Run(ctx)
return buf.Bytes(), err
}
// CombinedOutput runs the command and returns its combined standard output and standard error.
// Because the difference of stdout and stderr, an order of the lines of the combined output is not preserved.
func (e *ContainerExec) CombinedOutput(ctx context.Context) ([]byte, error) {
if e.Stdout != nil {
return nil, errors.New("confort: exec: Stdout already set")
}
if e.Stderr != nil {
return nil, errors.New("confort: exec: Stderr already set")
}
buf := bytes.NewBuffer(nil)
e.Stdout = buf
e.Stderr = buf
err := e.Run(ctx)
return buf.Bytes(), err
}
package beacon
import (
"context"
"errors"
"io/fs"
"os"
"strings"
"time"
)
// LockFile is default file name of lock file.
const LockFile = ".confort.lock"
// LockFilePath returns file name of lock file.
// If CFT_LOCKFILE is set, return its value, or else return LockFile.
func LockFilePath() string {
v, ok := os.LookupEnv(LockFileEnv)
if ok {
return v
}
return LockFile
}
var ErrIntegrationDisabled = errors.New("the integration with beacon server is disabled")
// Address returns address of beacon server.
// It returns value of CFT_BEACON_ADDR if exists.
// If the value of CFT_BEACON_ADDR equals "disabled", this returns ErrIntegrationDisabled.
//
// If the variable not exists, Address try to read from lockFile.
func Address(ctx context.Context, lockFile string) (string, error) {
addr := os.Getenv(AddressEnv)
if addr != "" {
if strings.EqualFold(addr, "disabled") {
return "", ErrIntegrationDisabled
}
return addr, nil
}
for i := 0; i < 10; i++ {
select {
case <-time.After(200 * time.Millisecond):
data, err := os.ReadFile(lockFile)
if errors.Is(err, fs.ErrNotExist) {
continue
}
if err != nil {
return "", err
}
return string(data), nil
case <-ctx.Done():
return "", ctx.Err()
}
}
return "", nil
}
func StoreAddressToLockFile(lockFile, addr string) error {
return os.WriteFile(lockFile, []byte(addr), 0644)
}
func DeleteLockFile(lockFile string) error {
_, err := os.Stat(lockFile)
if errors.Is(err, fs.ErrNotExist) {
return nil
} else if err != nil {
return err
}
return os.Remove(lockFile)
}
package beacon
import (
"context"
"errors"
"fmt"
"time"
"github.com/daichitakahashi/confort/internal/logging"
"github.com/lestrrat-go/backoff/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
health "google.golang.org/grpc/health/grpc_health_v1"
)
type Connection struct {
Conn *grpc.ClientConn
Addr string
}
func (c *Connection) Enabled() bool {
return c != nil && c.Conn != nil
}
func (c *Connection) Close() error {
return c.Conn.Close()
}
func Connect(ctx context.Context) (*Connection, error) {
return connect(ctx)
}
func connect(ctx context.Context) (*Connection, error) {
addr, err := Address(ctx, LockFilePath())
if err != nil {
if errors.Is(err, ErrIntegrationDisabled) {
logging.Info(err)
return &Connection{}, nil
}
return nil, err
}
if addr == "" {
logging.Info("cannot get the address of beacon server")
return &Connection{}, nil
}
logging.Debugf("the address of beacon server: %s", addr)
conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(
insecure.NewCredentials(),
))
if err != nil {
return nil, err
}
// health check
hc := health.NewHealthClient(conn)
var status health.HealthCheckResponse_ServingStatus
ctl := backoff.Constant(
backoff.WithInterval(time.Millisecond*100),
backoff.WithMaxRetries(20),
).Start(ctx)
for backoff.Continue(ctl) {
var resp *health.HealthCheckResponse
resp, err = hc.Check(ctx, &health.HealthCheckRequest{
Service: "beacon",
})
status = resp.GetStatus()
logging.Debugf("got health check status of beacon server: %s", status)
if status == health.HealthCheckResponse_SERVING {
break
}
}
if err != nil {
return nil, err
}
if status != health.HealthCheckResponse_SERVING {
return nil, fmt.Errorf("unexpected service status %s", status)
}
return &Connection{
Conn: conn,
Addr: addr,
}, nil
}
package beacon
import (
"crypto/sha256"
"encoding/hex"
"os"
)
const (
LabelIdentifier = "daichitakahashi.confort.beacon.identifier"
)
func Identifier(s string) string {
if id := os.Getenv(IdentifierEnv); id != "" {
return id
}
hash := sha256.Sum256([]byte(s))
return hex.EncodeToString(hash[:])
}
package beacon
const (
ResourcePolicyError = "error"
ResourcePolicyReuse = "reuse"
ResourcePolicyReusable = "reusable"
ResourcePolicyTakeOver = "takeover"
)
func ValidResourcePolicy(s string) bool {
switch s {
case ResourcePolicyError, ResourcePolicyReuse, ResourcePolicyReusable, ResourcePolicyTakeOver:
return true
default:
return false
}
}
package server
import (
"context"
"io"
"github.com/daichitakahashi/confort/internal/beacon/proto"
"github.com/daichitakahashi/confort/internal/exclusion"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
type beaconServer struct {
proto.UnimplementedBeaconServiceServer
l *exclusion.Locker
interrupt func() error
}
func (b *beaconServer) LockForNamespace(stream proto.BeaconService_LockForNamespaceServer) error {
var unlock func()
defer func() {
if unlock != nil {
unlock()
}
}()
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
switch req.GetOperation() {
case proto.LockOp_LOCK_OP_LOCK:
if unlock != nil {
return status.Error(codes.InvalidArgument, "trying second lock")
}
unlock = b.l.LockForNamespace()
err = stream.Send(&proto.LockResponse{
State: proto.LockState_LOCK_STATE_LOCKED,
})
if err != nil {
return err
}
case proto.LockOp_LOCK_OP_UNLOCK:
if unlock == nil {
return status.Error(codes.InvalidArgument, "unlock on unlocked")
}
unlock()
unlock = nil
err = stream.Send(&proto.LockResponse{
State: proto.LockState_LOCK_STATE_UNLOCKED,
})
if err != nil {
return err
}
}
}
}
func (b *beaconServer) LockForBuild(stream proto.BeaconService_LockForBuildServer) error {
ctx := stream.Context()
var key string
var unlock func()
defer func() {
if unlock != nil {
unlock()
}
}()
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
k := req.GetKey()
if k == "" {
return status.Error(codes.InvalidArgument, "empty key")
}
switch req.GetOperation() {
case proto.LockOp_LOCK_OP_LOCK:
if unlock != nil {
return status.Error(codes.InvalidArgument, "trying second lock")
}
key = k
unlock, err = b.l.LockForBuild(ctx, key)
if err != nil {
return err
}
err = stream.Send(&proto.LockResponse{
State: proto.LockState_LOCK_STATE_LOCKED,
})
if err != nil {
return err
}
case proto.LockOp_LOCK_OP_UNLOCK:
if unlock == nil || k != key {
return status.Error(codes.InvalidArgument, "unlock on unlocked key")
}
unlock()
key = ""
unlock = nil
err = stream.Send(&proto.LockResponse{
State: proto.LockState_LOCK_STATE_UNLOCKED,
})
if err != nil {
return err
}
}
}
}
func (b *beaconServer) LockForContainerSetup(stream proto.BeaconService_LockForContainerSetupServer) error {
ctx := stream.Context()
var key string
var unlock func()
defer func() {
if unlock != nil {
unlock()
}
}()
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
k := req.GetKey()
if k == "" {
return status.Error(codes.InvalidArgument, "empty key")
}
switch req.GetOperation() {
case proto.LockOp_LOCK_OP_LOCK:
if unlock != nil {
return status.Error(codes.InvalidArgument, "trying second lock")
}
key = k
unlock, err = b.l.LockForContainerSetup(ctx, key)
if err != nil {
return err
}
err = stream.Send(&proto.LockResponse{
State: proto.LockState_LOCK_STATE_LOCKED,
})
if err != nil {
return err
}
case proto.LockOp_LOCK_OP_UNLOCK:
if unlock == nil || k != key {
return status.Error(codes.InvalidArgument, "unlock on unlocked key")
}
unlock()
key = ""
unlock = nil
err = stream.Send(&proto.LockResponse{
State: proto.LockState_LOCK_STATE_UNLOCKED,
})
if err != nil {
return err
}
}
}
}
func (b *beaconServer) AcquireContainerLock(stream proto.BeaconService_AcquireContainerLockServer) error {
ctx := stream.Context()
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
acquireParam, ok := req.GetParam().(*proto.AcquireLockRequest_Acquire)
if !ok {
return status.Error(codes.InvalidArgument, "invalid operation")
}
entries := map[string]*exclusion.AcquireContainerLockEntry{}
for key, target := range acquireParam.Acquire.GetTargets() {
if key == "" {
return status.Error(codes.InvalidArgument, "empty key")
}
var exclusive, init bool
switch target.GetOperation() {
case proto.AcquireOp_ACQUIRE_OP_LOCK:
exclusive = true
case proto.AcquireOp_ACQUIRE_OP_INIT_LOCK:
exclusive = true
init = true
case proto.AcquireOp_ACQUIRE_OP_SHARED_LOCK:
exclusive = false
case proto.AcquireOp_ACQUIRE_OP_INIT_SHARED_LOCK:
exclusive = false
init = true
default:
return status.Error(codes.InvalidArgument, "invalid operation")
}
entries[key] = &exclusion.AcquireContainerLockEntry{
Exclusive: exclusive,
Init: init,
}
}
release, err := b.l.AcquireContainerLock(ctx, entries)
if err != nil {
return err
}
initTargets := map[string]struct{}{}
results := map[string]*proto.AcquireLockResult{}
for key, entry := range entries {
initAcquired := entry.ContainerLock().InitAcquired()
var state proto.LockState
if entry.Exclusive {
state = proto.LockState_LOCK_STATE_LOCKED
} else if initAcquired {
state = proto.LockState_LOCK_STATE_LOCKED
} else {
state = proto.LockState_LOCK_STATE_SHARED_LOCKED
}
if initAcquired {
initTargets[key] = struct{}{}
}
results[key] = &proto.AcquireLockResult{
State: state,
AcquireInit: initAcquired,
}
}
err = stream.Send(&proto.AcquireLockResponse{
Results: results,
})
if err != nil {
release()
return err
}
var e error
InitLoop:
for i := 0; i < len(initTargets); i++ {
req, err := stream.Recv()
if err != nil {
release()
return err
}
switch param := req.GetParam().(type) {
case *proto.AcquireLockRequest_Init:
key := param.Init.GetKey()
succeeded := param.Init.GetInitSucceeded()
entries[key].ContainerLock().SetInitResult(succeeded)
delete(initTargets, key)
if !succeeded {
break InitLoop
}
case *proto.AcquireLockRequest_Release:
break InitLoop
default:
e = status.Error(codes.InvalidArgument, "invalid operation")
break InitLoop
}
}
var failed bool
for key := range initTargets {
entries[key].ContainerLock().SetInitResult(false)
failed = true
}
if e != nil {
release()
return e
} else if failed {
release()
continue
}
req, err = stream.Recv()
if err != nil {
release()
return err
}
_, ok = req.GetParam().(*proto.AcquireLockRequest_Release)
if !ok {
return status.Error(codes.InvalidArgument, "invalid operation")
}
release()
}
}
func (b *beaconServer) Interrupt(_ context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
err := b.interrupt()
return &emptypb.Empty{}, err
}
var _ proto.BeaconServiceServer = (*beaconServer)(nil)
package server
import (
"context"
"log"
health "google.golang.org/grpc/health/grpc_health_v1"
)
type HealthChecker interface {
HealthCheck(ctx context.Context) error
}
type HealthCheckFunc func(ctx context.Context) error
func (f HealthCheckFunc) HealthCheck(ctx context.Context) error {
return f(ctx)
}
var _ HealthChecker = (HealthCheckFunc)(nil)
type healthServer struct {
health.UnimplementedHealthServer
checker HealthChecker
}
func (h *healthServer) Check(ctx context.Context, _ *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
err := h.checker.HealthCheck(ctx)
if err != nil {
log.Println("health check failed:", err)
return &health.HealthCheckResponse{
Status: health.HealthCheckResponse_UNKNOWN,
}, nil
}
return &health.HealthCheckResponse{
Status: health.HealthCheckResponse_SERVING,
}, nil
}
var _ health.HealthServer = (*healthServer)(nil)
package server
import (
"context"
"github.com/daichitakahashi/confort/internal/beacon/proto"
"github.com/daichitakahashi/confort/internal/exclusion"
"google.golang.org/grpc"
health "google.golang.org/grpc/health/grpc_health_v1"
)
func Register(serv *grpc.Server, interrupt func() error) {
proto.RegisterBeaconServiceServer(serv, &beaconServer{
l: exclusion.NewLocker(),
interrupt: interrupt,
})
proto.RegisterUniqueValueServiceServer(serv, &uniqueValueServer{})
health.RegisterHealthServer(serv, &healthServer{
checker: HealthCheckFunc(func(ctx context.Context) error {
return nil
}),
})
}
package server
import (
"context"
"sync"
"github.com/daichitakahashi/confort/internal/beacon/proto"
)
type uniqueValueServer struct {
proto.UnimplementedUniqueValueServiceServer
stores sync.Map
}
type valueStore struct {
m sync.Mutex
values map[string]bool
}
func (s *valueStore) tryStore(v string) bool {
s.m.Lock()
defer s.m.Unlock()
// lazy init
if s.values == nil {
s.values = map[string]bool{}
}
// check if value is existing
if s.values[v] {
return false
}
// set v as new unique value
s.values[v] = true
return true
}
func (u *uniqueValueServer) StoreUniqueValue(_ context.Context, req *proto.StoreUniqueValueRequest) (*proto.StoreUniqueValueResponse, error) {
v, _ := u.stores.LoadOrStore(req.GetStore(), &valueStore{})
store := v.(*valueStore)
return &proto.StoreUniqueValueResponse{
Succeeded: store.tryStore(req.GetValue()),
}, nil
}
var _ proto.UniqueValueServiceServer = (*uniqueValueServer)(nil)
package cmd
import (
"context"
"errors"
"flag"
"fmt"
"io/fs"
"log"
"os"
"os/exec"
"os/signal"
"syscall"
"github.com/daichitakahashi/confort/internal/beacon"
"github.com/daichitakahashi/gocmd"
"github.com/google/subcommands"
)
func NewCommands(set *flag.FlagSet, op Operation) *subcommands.Commander {
cmd := subcommands.NewCommander(set, set.Name())
cmd.Register(cmd.CommandsCommand(), "help")
cmd.Register(cmd.FlagsCommand(), "help")
cmd.Register(cmd.HelpCommand(), "help")
cmd.Register(&StartCommand{
Operation: op,
}, "")
cmd.Register(&StopCommand{
Operation: op,
}, "")
cmd.Register(&TestCommand{
Operation: op,
}, "")
return cmd
}
type StartCommand struct {
Operation Operation
// flags
lockFile string
}
func (s *StartCommand) Name() string {
return "start"
}
func (s *StartCommand) Synopsis() string {
return `Start beacon server for test.`
}
func (s *StartCommand) Usage() string {
return `$ confort start (-lock-file <filename>)
Start the beacon server and output its endpoint to lock file.
Use "confort stop" command to stop beacon server.
By using "-lock-file" option, you can use a user-defined file name as a lock file.
Default file name is ".confort.lock".
To tell the endpoint to test code, you have to set file name as environment variable CFT_LOCKFILE.
If the variable is already set, this command regards CFT_LOCKFILE as the default file name.
See the document of confort.WithBeacon.
If lock file already exists, this command fails.
`
}
func (s *StartCommand) SetFlags(f *flag.FlagSet) {
defaultLockfile := os.Getenv(beacon.LockFileEnv) // it regards CFT_LOCKFILE as default value
if defaultLockfile == "" {
defaultLockfile = beacon.LockFile
}
f.StringVar(&s.lockFile, "lock-file", defaultLockfile, "user-defined filename of the lock file")
}
func (s *StartCommand) Execute(ctx context.Context, _ *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
// check lock file
// if the file already exists, "confort start" fails
_, err := os.Stat(s.lockFile)
if err == nil {
log.Printf(`Lock file %q already exists. Please wait until the test finishes or run "confort stop".`, s.lockFile)
log.Printf(`* If your test has already finished, you can delete %q directly.`, s.lockFile)
return subcommands.ExitFailure
} else if !errors.Is(err, fs.ErrNotExist) {
log.Println(err)
return subcommands.ExitFailure
}
// start server asynchronously
addr, done, err := s.Operation.StartBeaconServer(ctx)
if err != nil {
log.Println("failed to start beacon server:", err)
return subcommands.ExitFailure
}
// write address into lock file
err = beacon.StoreAddressToLockFile(s.lockFile, addr)
if err != nil {
log.Printf("failed to create lock file: %q", s.lockFile)
log.Println(err)
err = s.Operation.StopBeaconServer(ctx, addr)
if err != nil {
log.Println("failed to stop beacon server:", err)
}
return subcommands.ExitFailure
}
// wait until finished
<-done
return subcommands.ExitSuccess
}
var _ subcommands.Command = (*StartCommand)(nil)
type StopCommand struct {
Operation Operation
// flags
lockFile string
}
func (s *StopCommand) Name() string {
return "stop"
}
func (s *StopCommand) Synopsis() string {
return `Stop beacon server.`
}
func (s *StopCommand) Usage() string {
return `$ confort stop (-lock-file <filename>)
Stop the beacon server started by "confort start" command.
The target server address will be read from lock file(".confort.lock"), and the lock file will be removed.
If "confort start" has accompanied by "-lock-file" option, this command requires the same.
`
}
func (s *StopCommand) SetFlags(f *flag.FlagSet) {
defaultLockfile := os.Getenv(beacon.LockFileEnv) // it regards CFT_LOCKFILE as default value
if defaultLockfile == "" {
defaultLockfile = beacon.LockFile
}
f.StringVar(&s.lockFile, "lock-file", defaultLockfile, "user-defined filename of the lock file")
}
func (s *StopCommand) Execute(ctx context.Context, _ *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
// read address from env or lock file
addr, err := beacon.Address(ctx, s.lockFile)
if errors.Is(err, beacon.ErrIntegrationDisabled) {
log.Println(err)
return subcommands.ExitFailure
}
if err != nil {
log.Printf("failed to read lock file %q\n", s.lockFile)
log.Println(err)
return subcommands.ExitFailure
}
err = s.Operation.StopBeaconServer(ctx, addr)
if err != nil {
log.Println(err)
return subcommands.ExitFailure
}
// delete all docker resources created in test
err = s.Operation.CleanupResources(ctx, beacon.LabelIdentifier, beacon.Identifier(addr))
if err != nil {
log.Println(err)
return subcommands.ExitFailure
}
// delete lock file if it exists
err = beacon.DeleteLockFile(s.lockFile)
if err != nil {
log.Printf("failed to delete lock file %q", s.lockFile)
log.Println(err)
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
}
var _ subcommands.Command = (*StopCommand)(nil)
type TestCommand struct {
Operation Operation
// flags
namespace string
policy resourcePolicy
goVer string
goMode goMode
}
func (t *TestCommand) Name() string {
return "test"
}
func (t *TestCommand) Synopsis() string {
return `Start beacon server and execute test.`
}
func (t *TestCommand) Usage() string {
return `$ confort test (-namespace <namespace> -policy <resource policy> -go <go version> -go-mode <mode>) (-- -p=4 -shuffle=on)
Start the beacon server and execute tests.
After the tests are finished, the beacon server will be stopped automatically.
If you want to use options of "go test", put them after "--".
`
}
func (t *TestCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&t.namespace, "namespace", os.Getenv(beacon.NamespaceEnv), "namespace")
t.policy = beacon.ResourcePolicyReuse
f.Var(&t.policy, "policy", `resource policy
* With "error", the existing same resource(network and container) makes test failed
* With "reuse", tests reuse resources if already exist
* "reusable" is similar to "reuse", but created resources with this policy will not be removed after the tests finished
* "takeover" is also similar to "reuse", but reused resources with this policy will be removed after the tests`)
f.StringVar(&t.goVer, "go", "", `specify go version. "-go=mod" enables to use go version written in your go.mod`)
t.goMode = goMode(gocmd.ModeFallback)
f.Var(&t.goMode, "go-mode", `use with -go option
* "exact" finds go command that has the exact same version as given in "-go"
* "latest" finds go command that has the same major version as given in "-go"
* "fallback" behaves like "latest", but if no command was found, fallbacks to "go" command`)
}
func (t *TestCommand) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
workingDir, err := os.Getwd()
if err != nil {
log.Println(err)
return subcommands.ExitFailure
}
// start server asynchronously
addr, _, err := t.Operation.StartBeaconServer(ctx)
if err != nil {
log.Println("failed to start beacon server:", err)
return subcommands.ExitFailure
}
defer func() {
err = t.Operation.StopBeaconServer(ctx, addr)
if err != nil {
log.Println("(CAUTION) error occurred in stopping beacon server:", err)
}
}()
goCmd, ver, err := t.determineGoCommand()
if err != nil {
log.Println(err)
return subcommands.ExitFailure
}
fmt.Println("use go version:", ver)
identifier := beacon.Identifier(workingDir + ":" + t.namespace)
// prepare environment variables
env := os.Environ()
env = append(env, fmt.Sprintf("%s=%s", beacon.AddressEnv, addr))
if t.namespace != "" {
env = append(env, fmt.Sprintf("%s=%s", beacon.NamespaceEnv, t.namespace))
}
env = append(env, fmt.Sprintf("%s=%s", beacon.ResourcePolicyEnv, t.policy))
env = append(env, fmt.Sprintf("%s=%s", beacon.IdentifierEnv, identifier))
// trap signal for graceful shutdown
signal.Notify(
make(chan os.Signal, 1),
syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM,
)
// execute test
var status subcommands.ExitStatus
err = t.Operation.ExecuteTest(ctx, goCmd, f.Args(), env)
var ee *exec.ExitError
if errors.As(err, &ee) {
status = subcommands.ExitStatus(ee.ExitCode())
} else if err != nil {
log.Println(err)
return subcommands.ExitFailure
} else {
status = subcommands.ExitSuccess
}
if t.policy != beacon.ResourcePolicyReusable {
// delete all docker resources created in TestCommand
err = t.Operation.CleanupResources(ctx, beacon.LabelIdentifier, identifier)
if err != nil {
log.Println(err)
return subcommands.ExitFailure
}
}
return status
}
func (t *TestCommand) determineGoCommand() (string, string, error) {
switch t.goVer {
case "":
goVer, err := gocmd.CurrentVersion()
return "go", goVer, err
case "mod":
modVer, err := gocmd.ModuleGoVersion()
if err != nil {
return "", "", fmt.Errorf("failed to read go.mod: %w", err)
}
return gocmd.Determine(modVer, gocmd.Mode(t.goMode))
default:
return gocmd.Determine(t.goVer, gocmd.Mode(t.goMode))
}
}
var _ subcommands.Command = (*TestCommand)(nil)
type resourcePolicy string
func (r *resourcePolicy) String() string {
return string(*r)
}
func (r *resourcePolicy) Set(v string) error {
if !beacon.ValidResourcePolicy(v) {
return fmt.Errorf("invalid resource policy: %s", v)
}
*r = resourcePolicy(v)
return nil
}
var _ flag.Value = (*resourcePolicy)(nil)
type goMode gocmd.Mode
func (g *goMode) String() string {
switch gocmd.Mode(*g) {
case gocmd.ModeExact:
return "exact"
case gocmd.ModeLatest:
return "latest"
case gocmd.ModeFallback:
return "fallback"
}
return ""
}
func (g *goMode) Set(v string) error {
switch v {
case "exact":
*g = goMode(gocmd.ModeExact)
case "latest":
*g = goMode(gocmd.ModeLatest)
case "fallback", "":
*g = goMode(gocmd.ModeFallback)
default:
return fmt.Errorf("invalid value: %s", v)
}
return nil
}
var _ flag.Value = (*goMode)(nil)
package cmd
import (
"context"
"errors"
"log"
"net"
"os"
"os/exec"
"time"
"github.com/daichitakahashi/confort/internal/beacon/proto"
"github.com/daichitakahashi/confort/internal/beacon/server"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/lestrrat-go/backoff/v2"
"go.uber.org/multierr"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/protobuf/types/known/emptypb"
)
type Operation interface {
StartBeaconServer(ctx context.Context) (string, <-chan struct{}, error)
StopBeaconServer(ctx context.Context, addr string) error
CleanupResources(ctx context.Context, label, value string) error
ExecuteTest(ctx context.Context, goCmd string, args []string, environments []string) error
}
type operation struct {
srv *grpc.Server
ln net.Listener
}
func NewOperation() Operation {
srv := grpc.NewServer(
grpc.ConnectionTimeout(time.Minute * 5), // TODO: configure
)
op := &operation{
srv: srv,
}
server.Register(srv, func() error {
go func() {
<-time.After(time.Millisecond * 500)
op.srv.Stop()
_ = op.ln.Close()
}()
return nil
})
return op
}
func (o *operation) StartBeaconServer(ctx context.Context) (_ string, _ <-chan struct{}, err error) {
ln, err := net.Listen("tcp", ":0")
if err != nil {
return "", nil, err
}
o.ln = ln
done := make(chan struct{})
go func() {
defer close(done)
err := o.srv.Serve(ln)
if err != nil {
log.Fatal(err)
}
}()
defer func() {
if err != nil { // failed
o.srv.Stop()
_ = ln.Close()
}
}()
conn, err := grpc.DialContext(ctx, ln.Addr().String(), grpc.WithTransportCredentials(
insecure.NewCredentials(),
))
if err != nil {
return "", nil, err
}
defer func() {
_ = conn.Close()
}()
cli := health.NewHealthClient(conn)
b := backoff.Constant(
backoff.WithInterval(200*time.Millisecond),
backoff.WithMaxRetries(150),
).Start(ctx)
for backoff.Continue(b) {
resp, err := cli.Check(ctx, &health.HealthCheckRequest{
Service: "beacon",
})
if err != nil {
continue
}
if resp.Status == health.HealthCheckResponse_SERVING {
return ln.Addr().String(), done, nil
}
}
return "", nil, errors.New("cannot obtain beacon endpoint")
}
func (o *operation) StopBeaconServer(ctx context.Context, addr string) error {
conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(
insecure.NewCredentials(),
))
if err != nil {
return err
}
defer func() {
_ = conn.Close()
}()
cli := proto.NewBeaconServiceClient(conn)
_, err = cli.Interrupt(ctx, &emptypb.Empty{})
return err
}
func (o *operation) CleanupResources(ctx context.Context, label, value string) error {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return err
}
cli.NegotiateAPIVersion(ctx)
f := filters.NewArgs(
filters.Arg("label", label+"="+value),
)
var errs []error
// remove container
containers, err := cli.ContainerList(ctx, types.ContainerListOptions{
All: true,
Filters: f,
})
if err != nil {
errs = append(errs, err)
}
for _, c := range containers {
err := cli.ContainerRemove(ctx, c.ID, types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
})
if err != nil {
errs = append(errs, err)
}
}
// remove network
networks, err := cli.NetworkList(ctx, types.NetworkListOptions{
Filters: f,
})
if err != nil {
errs = append(errs, err)
}
for _, n := range networks {
err := cli.NetworkRemove(ctx, n.ID)
if err != nil {
errs = append(errs, err)
}
}
return multierr.Combine(errs...)
}
func (o *operation) ExecuteTest(ctx context.Context, goCmd string, args, environments []string) error {
cmd := exec.CommandContext(ctx, goCmd, append([]string{"test"}, args...)...)
cmd.Env = environments
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}
var _ Operation = (*operation)(nil)
package exclusion
import (
"container/list"
"context"
"sync"
"golang.org/x/sync/errgroup"
)
type AcquireParam struct {
Lock func(ctx context.Context, notifyLock func()) error
Unlock func()
}
type Acquirer struct {
c *acquireController
}
func NewAcquirer() *Acquirer {
return &Acquirer{
c: &acquireController{
m: new(sync.Mutex),
queue: list.New(),
},
}
}
func (a *Acquirer) Acquire(ctx context.Context, params map[string]AcquireParam) error {
set := map[string]struct{}{}
for key := range params {
set[key] = struct{}{}
}
e, proceed := a.c.accept(set)
select {
case <-ctx.Done():
return ctx.Err()
case <-proceed:
}
eg, ctx := errgroup.WithContext(ctx)
locked := make([]*AcquireParam, len(params))
var i int
for key, param := range params {
idx := i
key := key
param := param
eg.Go(func() error {
err := param.Lock(ctx, func() {
a.c.removeLockedKey(e, key)
})
if err != nil {
return err
}
locked[idx] = ¶m
return nil
})
i++
}
err := eg.Wait()
if err != nil {
a.c.remove(e)
var wg sync.WaitGroup
for _, lockedParam := range locked {
if lockedParam == nil {
continue
}
p := lockedParam
wg.Add(1)
go func() {
defer wg.Done()
p.Unlock()
}()
}
wg.Wait()
return err
}
return nil
}
func (a *Acquirer) Release(params map[string]AcquireParam) {
for _, p := range params {
p.Unlock()
}
}
type acquireSet struct {
set map[string]struct{}
proceed chan struct{}
}
type acquireController struct {
m *sync.Mutex
queue *list.List
}
type entry = list.Element
func (q *acquireController) accept(set map[string]struct{}) (*entry, chan struct{}) {
q.m.Lock()
defer q.m.Unlock()
s := &acquireSet{
set: set,
proceed: make(chan struct{}),
}
e := q.queue.PushBack(s)
q.proceed()
return e, s.proceed
}
func (q *acquireController) removeLockedKey(e *entry, key string) {
q.m.Lock()
defer q.m.Unlock()
s := e.Value.(*acquireSet)
if len(s.set) > 1 {
delete(s.set, key)
} else {
// if all lock has acquired, remove entry from queue
q.queue.Remove(e)
}
q.proceed()
}
func (q *acquireController) remove(e *entry) {
q.m.Lock()
defer q.m.Unlock()
q.queue.Remove(e)
q.proceed()
}
func (q *acquireController) proceed() {
if q.queue.Len() == 0 {
return
}
// check queue
waitSet := map[string]struct{}{}
for e := q.queue.Front(); e != nil; e = e.Next() {
s := e.Value.(*acquireSet)
select {
case <-s.proceed:
// lock already started
for key := range s.set {
waitSet[key] = struct{}{}
}
default:
var pending bool
for key := range s.set {
_, found := waitSet[key]
if found {
pending = true
} else {
waitSet[key] = struct{}{}
}
}
if !pending {
close(s.proceed)
}
}
}
}
package exclusion
import (
"context"
"fmt"
"github.com/daichitakahashi/confort/internal/beacon/proto"
"go.uber.org/multierr"
"google.golang.org/protobuf/types/known/emptypb"
)
type Control interface {
LockForNamespace(ctx context.Context) (func(), error)
LockForBuild(ctx context.Context, image string) (func(), error)
LockForContainerSetup(ctx context.Context, name string) (func(), error)
LockForContainerUse(ctx context.Context, params map[string]ContainerUseParam) (unlock func(), err error)
}
type control struct {
l *Locker
}
func NewControl() Control {
return &control{
l: NewLocker(),
}
}
func (c *control) LockForNamespace(_ context.Context) (func(), error) {
unlock := c.l.LockForNamespace()
return unlock, nil
}
func (c *control) LockForBuild(ctx context.Context, image string) (func(), error) {
return c.l.LockForBuild(ctx, image)
}
func (c *control) LockForContainerSetup(ctx context.Context, name string) (func(), error) {
return c.l.LockForContainerSetup(ctx, name)
}
type ContainerUseParam struct {
Exclusive bool
Init func(ctx context.Context) error
}
func (c *control) LockForContainerUse(ctx context.Context, params map[string]ContainerUseParam) (unlock func(), err error) {
entries := map[string]*AcquireContainerLockEntry{}
for name, param := range params {
entries[name] = &AcquireContainerLockEntry{
Exclusive: param.Exclusive,
Init: param.Init != nil,
}
}
unlock, err = c.l.AcquireContainerLock(ctx, entries)
if err != nil {
return nil, err
}
var initErr error
for name, entry := range entries {
lock := entry.ContainerLock()
if lock.InitAcquired() {
if initErr == nil {
initErr = initSafe(ctx, params[name].Init)
}
lock.SetInitResult(initErr == nil)
}
}
if initErr != nil {
unlock()
return nil, initErr
}
return unlock, nil
}
func initSafe(ctx context.Context, init func(ctx context.Context) error) (err error) {
defer func() {
r := recover()
if r != nil {
err = fmt.Errorf("%v", r)
}
}()
return init(ctx)
}
var _ Control = (*control)(nil)
type beaconControl struct {
cli proto.BeaconServiceClient
}
func NewBeaconControl(cli proto.BeaconServiceClient) Control {
return &beaconControl{
cli: cli,
}
}
func (b *beaconControl) LockForNamespace(ctx context.Context) (func(), error) {
stream, err := b.cli.LockForNamespace(ctx)
if err != nil {
return nil, err
}
err = stream.Send(&proto.LockRequest{
Operation: proto.LockOp_LOCK_OP_LOCK,
})
if err != nil {
return nil, err
}
_, err = stream.Recv()
if err != nil {
return nil, err
}
return func() {
err := stream.Send(&proto.LockRequest{
Operation: proto.LockOp_LOCK_OP_UNLOCK,
})
_ = err // TODO: error handling
_ = stream.CloseSend()
}, nil
}
func (b *beaconControl) LockForBuild(ctx context.Context, image string) (func(), error) {
stream, err := b.cli.LockForBuild(ctx)
if err != nil {
return nil, err
}
err = stream.Send(&proto.KeyedLockRequest{
Key: image,
Operation: proto.LockOp_LOCK_OP_LOCK,
})
if err != nil {
return nil, err
}
_, err = stream.Recv()
if err != nil {
return nil, err
}
return func() {
err := stream.Send(&proto.KeyedLockRequest{
Key: image,
Operation: proto.LockOp_LOCK_OP_UNLOCK,
})
_ = err // TODO: error handling
_ = stream.CloseSend()
}, nil
}
func (b *beaconControl) LockForContainerSetup(ctx context.Context, name string) (func(), error) {
stream, err := b.cli.LockForContainerSetup(ctx)
if err != nil {
return nil, err
}
err = stream.Send(&proto.KeyedLockRequest{
Key: name,
Operation: proto.LockOp_LOCK_OP_LOCK,
})
if err != nil {
return nil, err
}
_, err = stream.Recv()
if err != nil {
return nil, err
}
return func() {
err := stream.Send(&proto.KeyedLockRequest{
Key: name,
Operation: proto.LockOp_LOCK_OP_UNLOCK,
})
_ = err // TODO: error handling
_ = stream.CloseSend()
}, nil
}
func (b *beaconControl) LockForContainerUse(ctx context.Context, params map[string]ContainerUseParam) (unlock func(), err error) {
targets := map[string]*proto.AcquireLockParam{}
for name, param := range params {
var op proto.AcquireOp
if param.Exclusive {
if param.Init == nil {
op = proto.AcquireOp_ACQUIRE_OP_LOCK
} else {
op = proto.AcquireOp_ACQUIRE_OP_INIT_LOCK
}
} else {
if param.Init == nil {
op = proto.AcquireOp_ACQUIRE_OP_SHARED_LOCK
} else {
op = proto.AcquireOp_ACQUIRE_OP_INIT_SHARED_LOCK
}
}
targets[name] = &proto.AcquireLockParam{
Operation: op,
}
}
stream, err := b.cli.AcquireContainerLock(ctx)
if err != nil {
return nil, err
}
err = stream.Send(&proto.AcquireLockRequest{
Param: &proto.AcquireLockRequest_Acquire{
Acquire: &proto.AcquireLockAcquireParam{
Targets: targets,
},
},
})
if err != nil {
return nil, err
}
resp, err := stream.Recv()
if err != nil {
return nil, err
}
for name, result := range resp.GetResults() {
if result.GetAcquireInit() {
initErr := initSafe(ctx, params[name].Init)
err = stream.Send(&proto.AcquireLockRequest{
Param: &proto.AcquireLockRequest_Init{
Init: &proto.AcquireLockInitParam{
Key: name,
InitSucceeded: initErr == nil,
},
},
})
if err != nil {
return nil, multierr.Append(initErr, err)
}
if initErr != nil {
return nil, multierr.Append(initErr, stream.CloseSend())
}
}
}
return func() {
err := stream.Send(&proto.AcquireLockRequest{
Param: &proto.AcquireLockRequest_Release{
Release: &emptypb.Empty{},
},
})
_ = err // TODO: error handling
_ = stream.CloseSend()
}, nil
}
var _ Control = (*beaconControl)(nil)
package exclusion
import (
"context"
"sync"
"golang.org/x/sync/semaphore"
)
type KeyedLock struct {
m *sync.Map
}
func NewKeyedLock() *KeyedLock {
return &KeyedLock{
m: new(sync.Map),
}
}
const max = 1<<63 - 1
func (k *KeyedLock) Lock(ctx context.Context, key string) error {
v, _ := k.m.LoadOrStore(key, semaphore.NewWeighted(max))
return v.(*semaphore.Weighted).Acquire(ctx, max)
}
func (k *KeyedLock) Unlock(key string) {
v, ok := k.m.Load(key)
if !ok {
panic("Unlock of unlocked mutex")
}
v.(*semaphore.Weighted).Release(max)
}
func (k *KeyedLock) Downgrade(key string) {
v, ok := k.m.Load(key)
if !ok {
panic("Downgrade of unlocked mutex")
}
v.(*semaphore.Weighted).Release(max - 1)
}
func (k *KeyedLock) RLock(ctx context.Context, key string) error {
v, _ := k.m.LoadOrStore(key, semaphore.NewWeighted(max))
return v.(*semaphore.Weighted).Acquire(ctx, 1)
}
func (k *KeyedLock) RUnlock(key string) {
v, ok := k.m.Load(key)
if !ok {
panic("RUnlock of unlocked mutex")
}
v.(*semaphore.Weighted).Release(1)
}
package exclusion
import (
"context"
"sync"
"sync/atomic"
"github.com/daichitakahashi/oncewait"
)
type Locker struct {
namespace sync.Mutex
build *KeyedLock
containerSetup *KeyedLock
containerUse *KeyedLock
acquirer *Acquirer
once *oncewait.Factory
}
func NewLocker() *Locker {
return &Locker{
build: NewKeyedLock(),
containerSetup: NewKeyedLock(),
containerUse: NewKeyedLock(),
acquirer: NewAcquirer(),
once: &oncewait.Factory{},
}
}
func (l *Locker) LockForNamespace() func() {
l.namespace.Lock()
return l.namespace.Unlock
}
func (l *Locker) LockForBuild(ctx context.Context, image string) (func(), error) {
err := l.build.Lock(ctx, image)
if err != nil {
return nil, err
}
return func() {
l.build.Unlock(image)
}, nil
}
func (l *Locker) LockForContainerSetup(ctx context.Context, name string) (func(), error) {
err := l.containerSetup.Lock(ctx, name)
if err != nil {
return nil, err
}
return func() {
l.containerSetup.Unlock(name)
}, nil
}
type ContainerLock struct {
l *KeyedLock
once *oncewait.Factory
name string
init bool
exclusive bool
downgraded int32
}
func (l *ContainerLock) InitAcquired() bool {
return l.init
}
func (l *ContainerLock) SetInitResult(ok bool) {
if l.init {
if ok {
if !l.exclusive && atomic.CompareAndSwapInt32(&l.downgraded, 0, 1) {
l.l.Downgrade(l.name)
}
} else {
l.once.Refresh(l.name)
}
}
}
func (l *ContainerLock) Release() {
if l.exclusive || atomic.LoadInt32(&l.downgraded) == 0 { // exclusive
l.l.Unlock(l.name)
} else { // shared/downgraded
l.l.RUnlock(l.name)
}
}
type AcquireContainerLockEntry struct {
Exclusive bool
Init bool
l *Locker
cl *ContainerLock
p AcquireParam
}
func (p *AcquireContainerLockEntry) init(l *Locker, name string) {
p.l = l
p.p = AcquireParam{
Lock: func(ctx context.Context, notifyLock func()) error {
var err error
if p.Init {
var initAcquired bool
l.once.Get(name).Do(func() {
err = l.containerUse.Lock(ctx, name) // exclusive lock
if err != nil {
l.once.Refresh(name)
return
}
initAcquired = true
})
if err != nil {
return err
}
if initAcquired {
notifyLock()
p.cl = &ContainerLock{
l: l.containerUse,
once: l.once,
name: name,
init: true,
exclusive: p.Exclusive,
downgraded: 0,
}
return nil
}
}
// no init
var downgraded int32
if p.Exclusive {
err = l.containerUse.Lock(ctx, name)
} else {
err = l.containerUse.RLock(ctx, name)
downgraded = 1
}
if err != nil {
return err
}
notifyLock()
p.cl = &ContainerLock{
l: l.containerUse,
once: l.once,
name: name,
init: false,
exclusive: p.Exclusive,
downgraded: downgraded,
}
return nil
},
Unlock: func() {
if p.cl != nil {
p.cl.Release()
}
},
}
}
func (p *AcquireContainerLockEntry) ContainerLock() *ContainerLock {
return p.cl
}
func (l *Locker) AcquireContainerLock(ctx context.Context, entries map[string]*AcquireContainerLockEntry) (func(), error) {
p := map[string]AcquireParam{}
for name, e := range entries {
e.init(l, name)
p[name] = e.p
}
err := l.acquirer.Acquire(ctx, p)
if err != nil {
return nil, err
}
return func() {
l.acquirer.Release(p)
}, nil
}
package logging
import (
"fmt"
"log"
"path"
"runtime"
)
type LogLevel int
const (
LevelDebug LogLevel = iota
LevelInfo
LevelError
)
var level = LevelError
func (l LogLevel) enabled(base LogLevel) bool {
return base <= l
}
// SetLogLevel sets log level.
// This func is unsafe for concurrent use.
func SetLogLevel(l LogLevel) {
level = l
}
var (
debugPrefix = "confort[DEBUG]:"
infoPrefix = "confort[INFO]:"
)
func file(depth int) string {
pc, filename, line, _ := runtime.Caller(depth)
funcName := path.Base(runtime.FuncForPC(pc).Name())
return fmt.Sprintf("\n\t%s:%d:%s", filename, line, funcName)
}
func Debug(args ...any) {
if LevelDebug.enabled(level) {
log.Println(debugPrefix, fmt.Sprint(args...), file(2))
}
}
func Debugf(format string, args ...any) {
if LevelDebug.enabled(level) {
log.Println(debugPrefix, fmt.Sprintf(format, args...), file(2))
}
}
func Info(args ...any) {
if LevelInfo.enabled(level) {
log.Println(infoPrefix, fmt.Sprint(args...))
}
}
func Infof(format string, args ...any) {
if LevelInfo.enabled(level) {
log.Println(infoPrefix, fmt.Sprintf(format, args...))
}
}
package unique
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"unsafe"
"github.com/daichitakahashi/confort/internal/beacon"
"github.com/daichitakahashi/confort/internal/beacon/proto"
"github.com/lestrrat-go/option"
)
type Unique[T comparable] struct {
g uniqueValueGenerator[T]
retry uint
}
type uniqueValueGenerator[T comparable] interface {
generate(retry uint) (T, error)
}
type (
Option interface {
option.Interface
unique() Option
}
identOptionRetry struct{}
identOptionBeacon struct{}
uniqueOption struct{ option.Interface }
)
func (o uniqueOption) unique() Option { return o }
// WithRetry configures the maximum number of retries for unique value generation.
func WithRetry(n uint) Option {
if n == 0 {
n--
}
return uniqueOption{
Interface: option.New(identOptionRetry{}, n),
}.unique()
}
// WithBeacon configures Unique to integrate with a starting beacon server.
// It enables us to generate unique values through all tests that reference
// the same beacon server and storeName.
//
// See confort.WithBeacon.
func WithBeacon(storeName string) Option {
return uniqueOption{
Interface: option.New(identOptionBeacon{}, storeName),
}.unique()
}
// ErrRetryable indicates that the generation of a unique value has temporarily
// failed, but may succeed by retrying.
var ErrRetryable = errors.New("cannot create unique value but retryable")
// New creates unique value generator. Argument fn is an arbitrary generator function.
// When the generated value by fn is not unique or fn returns ErrRetryable, Unique retries.
// By default, Unique retries 10 times.
func New[T comparable](ctx context.Context, fn func() (T, error), opts ...Option) (*Unique[T], error) {
u := &Unique[T]{
g: &generator[T]{
f: fn,
m: make(map[T]struct{}),
},
retry: 10,
}
var storeName string
for _, opt := range opts {
switch opt.Ident() {
case identOptionRetry{}:
u.retry = opt.Value().(uint)
case identOptionBeacon{}:
storeName = opt.Value().(string)
}
}
if storeName != "" {
conn, err := beacon.Connect(ctx)
if err != nil {
return nil, err
}
if conn.Enabled() {
u.g = &globalGenerator[T]{
f: fn,
cli: proto.NewUniqueValueServiceClient(conn.Conn),
store: storeName,
}
}
}
return u, nil
}
// Must is a helper that wraps a call to a function returning (*Unique[T], error)
// and panics if the error is non-nil. It is intended for use in variable initializations
// such as
//
// var u = unique.Must(unique.String(context.Background(), 10))
func Must[T comparable](u *Unique[T], err error) *Unique[T] {
if err != nil {
panic(err)
}
return u
}
// New returns unique value.
func (u *Unique[T]) New() (T, error) {
return u.g.generate(u.retry)
}
// Must returns unique value.
// If a unique value cannot be generated within the maximum number of retries,
// the test fails.
func (u *Unique[T]) Must(tb testing.TB) T {
tb.Helper()
v, err := u.g.generate(u.retry)
if err != nil {
tb.Fatal(err)
}
return v
}
var errFailedToGenerate = errors.New("cannot create new unique value")
type generator[T comparable] struct {
f func() (T, error)
mu sync.Mutex
m map[T]struct{}
}
func (g *generator[T]) generate(retry uint) (zero T, _ error) {
g.mu.Lock()
defer g.mu.Unlock()
for i := uint(0); i < retry; i++ {
v, err := g.f()
if errors.Is(err, ErrRetryable) {
continue
} else if err != nil {
return zero, err
}
if _, ok := g.m[v]; ok { // not unique, retry
continue
}
g.m[v] = struct{}{}
return v, nil
}
return zero, errFailedToGenerate
}
var _ uniqueValueGenerator[int] = (*generator[int])(nil)
type globalGenerator[T comparable] struct {
f func() (T, error)
cli proto.UniqueValueServiceClient
store string
}
func (g *globalGenerator[T]) generate(retry uint) (zero T, _ error) {
ctx := context.Background()
for i := uint(0); i < retry; i++ {
v, err := g.f()
if errors.Is(err, ErrRetryable) {
continue
} else if err != nil {
return zero, err
}
resp, err := g.cli.StoreUniqueValue(ctx, &proto.StoreUniqueValueRequest{
Store: g.store,
Value: fmt.Sprint(v),
})
if err != nil {
return zero, err
} else if !resp.GetSucceeded() { // not unique, retry
continue
}
return v, nil
}
return zero, errFailedToGenerate
}
var _ uniqueValueGenerator[int] = (*globalGenerator[int])(nil)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
letterIdxBits = 6
letterIdxMask = 1<<letterIdxBits - 1
letterIdxMax = 63 / letterIdxBits
)
// StringFunc is an n-digit random string generator.
// It uses upper/lower case alphanumeric characters.
func StringFunc(n int) func() (string, error) {
randSrc := rand.NewSource(time.Now().UnixNano())
return func() (string, error) {
b := make([]byte, n)
cache, remain := randSrc.Int63(), letterIdxMax
for i := n - 1; i >= 0; {
if remain == 0 {
cache, remain = randSrc.Int63(), letterIdxMax
}
idx := int(cache & letterIdxMask)
if idx < len(letters) {
b[i] = letters[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return *(*string)(unsafe.Pointer(&b)), nil
}
}
// String is a shorthand of New(StringFunc(n)).
func String(ctx context.Context, n int, opts ...Option) (*Unique[string], error) {
return New(ctx, StringFunc(n), opts...)
}
package wait
import (
"bytes"
"context"
"io"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/go-connections/nat"
"github.com/lestrrat-go/option"
)
type Waiter struct {
interval time.Duration
timeout time.Duration
check CheckFunc
}
// Fetcher provides several ways to access the state of the container.
type Fetcher interface {
ContainerID() string
Status(ctx context.Context) (*types.ContainerState, error)
Ports() nat.PortMap
Log(ctx context.Context) (io.ReadCloser, error)
Exec(ctx context.Context, cmd ...string) ([]byte, error)
}
type (
Option interface {
option.Interface
wait() Option
}
identOptionInterval struct{}
identOptionTimeout struct{}
waitOption struct{ option.Interface }
)
func (o waitOption) wait() Option { return o }
// WithInterval sets the interval between container readiness checks.
func WithInterval(d time.Duration) Option {
return waitOption{
Interface: option.New(identOptionInterval{}, d),
}.wait()
}
// WithTimeout sets the timeout for waiting for the container to be ready.
func WithTimeout(d time.Duration) Option {
return waitOption{
Interface: option.New(identOptionTimeout{}, d),
}.wait()
}
const (
defaultInterval = 500 * time.Millisecond
defaultTimeout = 30 * time.Second
)
type CheckFunc func(ctx context.Context, f Fetcher) (bool, error)
// New creates a Waiter that waits for the container to be ready.
// CheckFunc is the criteria for evaluating readiness. Use Fetcher to obtain
// the container status.
//
// Waiter repeatedly checks the readiness until first success. We can set
// interval and timeout by WithInterval and WithTimeout. The default value for
// the interval is 500ms and for the timeout is 30sec.
func New(check CheckFunc, opts ...Option) *Waiter {
w := &Waiter{
interval: defaultInterval,
timeout: defaultTimeout,
check: check,
}
for _, opt := range opts {
switch opt.Ident() {
case identOptionInterval{}:
w.interval = opt.Value().(time.Duration)
case identOptionTimeout{}:
w.timeout = opt.Value().(time.Duration)
}
}
return w
}
// LogContains waits for the given number of occurrences of the given message
// in the container log.
func LogContains(message string, occurrence int, opts ...Option) *Waiter {
return New(CheckLogOccurrence(message, occurrence), opts...)
}
// CheckLogOccurrence creates CheckFunc. See LogContains.
func CheckLogOccurrence(message string, occurrence int) CheckFunc {
msg := []byte(message)
return func(ctx context.Context, f Fetcher) (bool, error) {
rc, err := f.Log(ctx)
if err != nil {
return false, err
}
defer func() {
_ = rc.Close()
}()
data, err := io.ReadAll(rc)
if err != nil {
return false, err
}
return bytes.Count(data, msg) >= occurrence, nil
}
}
// Healthy waits for the container's health status to be healthy.
func Healthy(opts ...Option) *Waiter {
return New(CheckHealthy, opts...)
}
// CheckHealthy is a CheckFunc. See Healthy.
func CheckHealthy(ctx context.Context, f Fetcher) (bool, error) {
status, err := f.Status(ctx)
if err != nil {
return false, err
}
return status.Health != nil && status.Health.Status == "healthy", nil
}
// CommandSucceeds waits for the success of given command.
func CommandSucceeds(cmd []string, opts ...Option) *Waiter {
return New(CheckCommandSucceeds(cmd), opts...)
}
// CheckCommandSucceeds creates CheckFunc. See CommandSucceeds.
func CheckCommandSucceeds(cmd []string) CheckFunc {
return func(ctx context.Context, f Fetcher) (bool, error) {
_, err := f.Exec(ctx, cmd...)
return err == nil, nil
}
}
// Wait calls CheckFunc with given Fetcher repeatedly until the first success.
func (w *Waiter) Wait(ctx context.Context, f Fetcher) error {
ctx, cancel := context.WithTimeout(ctx, w.timeout)
defer cancel()
for {
ok, err := w.check(ctx, f)
if err != nil {
return err
} else if ok {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(w.interval):
}
}
}