From 573f0c1e5bd01d40d9afbd9c87a0673285534484 Mon Sep 17 00:00:00 2001 From: Alexis Couvreur Date: Sun, 2 Oct 2022 21:51:12 +0000 Subject: [PATCH] refactor: put acouvreur/tinykv inside the repository --- app/sablier.go | 2 +- go.mod | 12 +- go.sum | 26 +- pkg/tinykv/heap.go | 118 +++++++ pkg/tinykv/retry.go | 20 ++ pkg/tinykv/timeout_heap.go | 81 +++++ pkg/tinykv/tinykv.go | 467 +++++++++++++++++++++++++++ pkg/tinykv/tinykv_test.go | 629 +++++++++++++++++++++++++++++++++++++ 8 files changed, 1325 insertions(+), 30 deletions(-) create mode 100644 pkg/tinykv/heap.go create mode 100644 pkg/tinykv/retry.go create mode 100644 pkg/tinykv/timeout_heap.go create mode 100644 pkg/tinykv/tinykv.go create mode 100644 pkg/tinykv/tinykv_test.go diff --git a/app/sablier.go b/app/sablier.go index a546cac..ce83f0e 100644 --- a/app/sablier.go +++ b/app/sablier.go @@ -11,7 +11,7 @@ import ( "github.com/acouvreur/sablier/config" "github.com/acouvreur/sablier/pkg/scaler" "github.com/acouvreur/sablier/pkg/storage" - "github.com/acouvreur/tinykv" + "github.com/acouvreur/sablier/pkg/tinykv" "github.com/docker/docker/client" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" diff --git a/go.mod b/go.mod index 21c227b..def6d81 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,11 @@ require ( ) require ( - github.com/acouvreur/tinykv v0.0.0-20220511194520-45fc921a4a83 + github.com/gin-gonic/gin v1.8.1 + github.com/pkg/errors v0.9.1 + github.com/spf13/cobra v1.5.0 + github.com/spf13/pflag v1.0.5 + github.com/spf13/viper v1.13.0 k8s.io/api v0.24.0 ) @@ -29,7 +33,6 @@ require ( github.com/emicklei/go-restful v2.15.0+incompatible // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/gin-gonic/gin v1.8.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect @@ -41,7 +44,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.6.9 // indirect - github.com/google/go-cmp v0.5.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect @@ -60,14 +62,10 @@ require ( github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/afero v1.8.2 // indirect github.com/spf13/cast v1.5.0 // indirect - github.com/spf13/cobra v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.13.0 // indirect github.com/stretchr/objx v0.4.0 // indirect github.com/subosito/gotenv v1.4.1 // indirect github.com/ugorji/go/codec v1.2.7 // indirect diff --git a/go.sum b/go.sum index ba37f1f..b16d917 100644 --- a/go.sum +++ b/go.sum @@ -56,8 +56,6 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= -github.com/acouvreur/tinykv v0.0.0-20220511194520-45fc921a4a83 h1:uNIfP86bHqLfpZAZ3ashsCRABfh9otkBgdeI4niG3WY= -github.com/acouvreur/tinykv v0.0.0-20220511194520-45fc921a4a83/go.mod h1:ECs3aPB6f6dVXSV6GM+UiN3/0iSttb/5iS3szwp9XeY= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= @@ -103,8 +101,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= @@ -133,6 +131,7 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-openapi/swag v0.21.1 h1:wm0rhTb5z7qpJRHBdPOMuY4QjVUMbF6/kwoYeRAOrKU= github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= @@ -193,7 +192,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -246,6 +244,7 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -280,7 +279,6 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -313,6 +311,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= @@ -335,8 +334,6 @@ github.com/spf13/viper v1.13.0 h1:BWSJ/M+f+3nmdz9bxB+bWX28kkALN2ok11D0rSo8EJU= github.com/spf13/viper v1.13.0/go.mod h1:Icm2xNL3/8uyh/wFuB1jI7TiTNKp8632Nwegu+zgdYw= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= -github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -345,13 +342,11 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= -github.com/ugorji/go v1.2.7 h1:qYhyWUUd6WbiM+C6JZAUkIJt/1WrjzNHY9+KCIjVqTo= github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M= github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0= github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= @@ -458,10 +453,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y= -golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220930213112-107f3e3c3b0b h1:uKO3Js8lXGjpjdc4J3rqs0/Ex5yDKUGfk43tTYWVLas= golang.org/x/net v0.0.0-20220930213112-107f3e3c3b0b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -542,10 +533,6 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 h1:nonptSpoQ4vQjyraW20DXPAglgQfVnM9ZC6MmNLMR60= -golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -624,7 +611,6 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df h1:5Pf6pFKu98ODmgnpvkJ3kFUOQGGLIzLIkbzUHp47618= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= @@ -733,14 +719,11 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= -google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -762,7 +745,6 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/tinykv/heap.go b/pkg/tinykv/heap.go new file mode 100644 index 0000000..acb94cc --- /dev/null +++ b/pkg/tinykv/heap.go @@ -0,0 +1,118 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package heap provides heap operations for any type that implements +// heap.Interface. A heap is a tree with the property that each node is the +// minimum-valued node in its subtree. +// +// The minimum element in the tree is the root, at index 0. +// +// A heap is a common way to implement a priority queue. To build a priority +// queue, implement the Heap interface with the (negative) priority as the +// ordering for the Less method, so Push adds items while Pop removes the +// highest-priority item from the queue. The Examples include such an +// implementation; the file example_pq_test.go has the complete source. +package tinykv + +type Val = interface{} + +// Any type that implements heap.Interface may be used as a +// min-heap with the following invariants (established after +// Init has been called or if the data is empty or sorted): +// +// !h.Less(j, i) for 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len() +// +// Note that Push and Pop in this interface are for package heap's +// implementation to call. To add and remove things from the heap, +// use heap.Push and heap.Pop. +type Interface interface { + Len() int + Less(i, j int) bool + Swap(i, j int) + Push(x Val) // add x as element Len() + Pop() Val // remove and return element Len() - 1. +} + +// A heap must be initialized before any of the heap operations +// can be used. Init is idempotent with respect to the heap invariants +// and may be called whenever the heap invariants may have been invalidated. +// Its complexity is O(n) where n = h.Len(). +func Init(h Interface) { + // heapify + n := h.Len() + for i := n/2 - 1; i >= 0; i-- { + down(h, i, n) + } +} + +// Push pushes the element x onto the heap. The complexity is +// O(log(n)) where n = h.Len(). +func Push(h Interface, x Val) { + h.Push(x) + up(h, h.Len()-1) +} + +// Pop removes the minimum element (according to Less) from the heap +// and returns it. The complexity is O(log(n)) where n = h.Len(). +// It is equivalent to Remove(h, 0). +func Pop(h Interface) Val { + n := h.Len() - 1 + h.Swap(0, n) + down(h, 0, n) + return h.Pop() +} + +// Remove removes the element at index i from the heap. +// The complexity is O(log(n)) where n = h.Len(). +func Remove(h Interface, i int) Val { + n := h.Len() - 1 + if n != i { + h.Swap(i, n) + if !down(h, i, n) { + up(h, i) + } + } + return h.Pop() +} + +// Fix re-establishes the heap ordering after the element at index i has changed its value. +// Changing the value of the element at index i and then calling Fix is equivalent to, +// but less expensive than, calling Remove(h, i) followed by a Push of the new value. +// The complexity is O(log(n)) where n = h.Len(). +func Fix(h Interface, i int) { + if !down(h, i, h.Len()) { + up(h, i) + } +} + +func up(h Interface, j int) { + for { + i := (j - 1) / 2 // parent + if i == j || !h.Less(j, i) { + break + } + h.Swap(i, j) + j = i + } +} + +func down(h Interface, i0, n int) bool { + i := i0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && h.Less(j2, j1) { + j = j2 // = 2*i + 2 // right child + } + if !h.Less(j, i) { + break + } + h.Swap(i, j) + i = j + } + return i > i0 +} diff --git a/pkg/tinykv/retry.go b/pkg/tinykv/retry.go new file mode 100644 index 0000000..617ebdc --- /dev/null +++ b/pkg/tinykv/retry.go @@ -0,0 +1,20 @@ +package tinykv + +import ( + "github.com/pkg/errors" +) + +// Try tries to run a function and recovers from a panic, in case +// one happens, and returns the error, if there are any. +func try(f func() error) (errRun error) { + defer func() { + if e := recover(); e != nil { + if err, ok := e.(error); ok { + errRun = err + return + } + errRun = errors.Errorf("RECOVERED, UNKNOWN ERROR: %+v", e) + } + }() + return f() +} diff --git a/pkg/tinykv/timeout_heap.go b/pkg/tinykv/timeout_heap.go new file mode 100644 index 0000000..6e2fe6f --- /dev/null +++ b/pkg/tinykv/timeout_heap.go @@ -0,0 +1,81 @@ +// Package heap provides heap operations for any type that implements +// heap.Interface. A heap is a tree with the property that each node is the +// minimum-valued node in its subtree. +// +// The minimum element in the tree is the root, at index 0. +// +// A heap is a common way to implement a priority queue. To build a priority +// queue, implement the Heap interface with the (negative) priority as the +// ordering for the Less method, so Push adds items while Pop removes the +// highest-priority item from the queue. The Examples include such an +// implementation; the file example_pq_test.go has the complete source. +// + +package tinykv + +type tohVal = *timeout + +// Any type that implements heap.Interface may be used as a +// min-heap with the following invariants (established after +// Init has been called or if the data is empty or sorted): +// +// !h.Less(j, i) for 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len() +// +// Note that Push and Pop in this interface are for package heap's +// implementation to call. To add and remove things from the heap, +// use heap.Push and heap.Pop. +type timeheapInterface interface { + Len() int + Less(i, j int) bool + Swap(i, j int) + Push(x tohVal) // add x as element Len() + Pop() tohVal // remove and return element Len() - 1. +} + +// Push pushes the element x onto the heap. The complexity is +// O(log(n)) where n = h.Len(). +func timeheapPush(h timeheapInterface, x tohVal) { + h.Push(x) + timeheapup(h, h.Len()-1) +} + +// Pop removes the minimum element (according to Less) from the heap +// and returns it. The complexity is O(log(n)) where n = h.Len(). +// It is equivalent to Remove(h, 0). +func timeheapPop(h timeheapInterface) tohVal { + n := h.Len() - 1 + h.Swap(0, n) + timeheapdown(h, 0, n) + return h.Pop() +} + +func timeheapup(h timeheapInterface, j int) { + for { + i := (j - 1) / 2 // parent + if i == j || !h.Less(j, i) { + break + } + h.Swap(i, j) + j = i + } +} + +func timeheapdown(h timeheapInterface, i0, n int) bool { + i := i0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && h.Less(j2, j1) { + j = j2 // = 2*i + 2 // right child + } + if !h.Less(j, i) { + break + } + h.Swap(i, j) + i = j + } + return i > i0 +} diff --git a/pkg/tinykv/tinykv.go b/pkg/tinykv/tinykv.go new file mode 100644 index 0000000..078d2f9 --- /dev/null +++ b/pkg/tinykv/tinykv.go @@ -0,0 +1,467 @@ +package tinykv + +import ( + "encoding/json" + "fmt" + "sync" + "time" +) + +type timeout struct { + expiresAt time.Time + expiresAfter time.Duration + isSliding bool + key string +} + +func newTimeout( + key string, + expiresAfter time.Duration, + isSliding bool) *timeout { + return &timeout{ + expiresAt: time.Now().Add(expiresAfter), + expiresAfter: expiresAfter, + isSliding: isSliding, + key: key, + } +} + +func (to *timeout) slide() { + if to == nil { + return + } + if !to.isSliding { + return + } + if to.expiresAfter <= 0 { + return + } + to.expiresAt = time.Now().Add(to.expiresAfter) +} + +func (to *timeout) expired() bool { + if to == nil { + return false + } + return time.Now().After(to.expiresAt) +} + +//----------------------------------------------------------------------------- + +// timeout heap +type th []*timeout + +func (h th) Len() int { return len(h) } +func (h th) Less(i, j int) bool { return h[i].expiresAt.Before(h[j].expiresAt) } +func (h th) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *th) Push(x tohVal) { *h = append(*h, x) } +func (h *th) Pop() tohVal { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +//----------------------------------------------------------------------------- + +type entry[T any] struct { + *timeout + value T +} + +//----------------------------------------------------------------------------- + +// KV is a registry for values (like/is a concurrent map) with timeout and sliding timeout +type KV[T any] interface { + Delete(k string) + Get(k string) (v T, ok bool) + Keys() (keys []string) + Values() (values []T) + Entries() (entries map[string]entry[T]) + Put(k string, v T, options ...PutOption) error + Take(k string) (v T, ok bool) + Stop() + MarshalJSON() ([]byte, error) + UnmarshalJSON(b []byte) error +} + +//----------------------------------------------------------------------------- + +type putOpt struct { + expiresAfter time.Duration + isSliding bool + cas func(interface{}, bool) bool +} + +// PutOption extra options for put +type PutOption func(*putOpt) + +// ExpiresAfter entry will expire after this time +func ExpiresAfter(expiresAfter time.Duration) PutOption { + return func(opt *putOpt) { + opt.expiresAfter = expiresAfter + } +} + +// IsSliding sets if the entry would get expired in a sliding manner +func IsSliding(isSliding bool) PutOption { + return func(opt *putOpt) { + opt.isSliding = isSliding + } +} + +// CAS for performing a compare and swap +func CAS(cas func(oldValue interface{}, found bool) bool) PutOption { + return func(opt *putOpt) { + opt.cas = cas + } +} + +//----------------------------------------------------------------------------- + +// store is a registry for values (like/is a concurrent map) with timeout and sliding timeout +type store[T any] struct { + onExpire func(k string, v T) + + stop chan struct{} + stopOnce sync.Once + expirationInterval time.Duration + mx sync.Mutex + kv map[string]*entry[T] + heap th +} + +// New creates a new *store, onExpire is for notification (must be fast). +func New[T any](expirationInterval time.Duration, onExpire ...func(k string, v T)) KV[T] { + if expirationInterval <= 0 { + expirationInterval = time.Second * 20 + } + res := &store[T]{ + stop: make(chan struct{}), + kv: make(map[string]*entry[T]), + expirationInterval: expirationInterval, + heap: th{}, + } + if len(onExpire) > 0 && onExpire[0] != nil { + res.onExpire = onExpire[0] + } + go res.expireLoop() + return res +} + +// Stop stops the goroutine +func (kv *store[T]) Stop() { + kv.stopOnce.Do(func() { close(kv.stop) }) +} + +// Delete deletes an entry +func (kv *store[T]) Delete(k string) { + kv.mx.Lock() + defer kv.mx.Unlock() + delete(kv.kv, k) +} + +// Get gets an entry from KV store +// and if a sliding timeout is set, it will be slided +func (kv *store[T]) Get(k string) (T, bool) { + var zero T + kv.mx.Lock() + defer kv.mx.Unlock() + + e, ok := kv.kv[k] + if !ok { + return zero, ok + } + e.slide() + if e.expired() { + go notifyExpirations(map[string]T{k: e.value}, kv.onExpire) + delete(kv.kv, k) + return zero, false + } + return e.value, ok +} + +func (kv *store[T]) Keys() (keys []string) { + kv.mx.Lock() + defer kv.mx.Unlock() + + for k := range kv.kv { + keys = append(keys, k) + } + return keys +} + +func (kv *store[T]) Values() (values []T) { + kv.mx.Lock() + defer kv.mx.Unlock() + + for _, v := range kv.kv { + values = append(values, v.value) + } + return values +} + +func (kv *store[T]) Entries() (entries map[string]entry[T]) { + kv.mx.Lock() + defer kv.mx.Unlock() + + entries = make(map[string]entry[T]) + for k, v := range kv.kv { + + e := entry[T]{ + value: v.value, + } + if v.timeout != nil { + + t := &timeout{ + expiresAt: v.expiresAt, + expiresAfter: v.expiresAfter, + isSliding: v.isSliding, + key: k, + } + e.timeout = t + } + entries[k] = e + } + return entries +} + +// Put puts an entry inside kv store with provided options +func (kv *store[T]) Put(k string, v T, options ...PutOption) error { + opt := &putOpt{} + for _, v := range options { + v(opt) + } + e := &entry[T]{ + value: v, + } + kv.mx.Lock() + defer kv.mx.Unlock() + if opt.expiresAfter > 0 { + e.timeout = newTimeout(k, opt.expiresAfter, opt.isSliding) + timeheapPush(&kv.heap, e.timeout) + } + if opt.cas != nil { + return kv.cas(k, e, opt.cas) + } + kv.kv[k] = e + return nil +} + +func (kv *store[T]) MarshalJSON() ([]byte, error) { + kv.mx.Lock() + defer kv.mx.Unlock() + return json.Marshal(kv.kv) +} + +func (e *entry[T]) MarshalJSON() ([]byte, error) { + if e.timeout != nil { + return json.Marshal(&struct { + Value T `json:"value"` + ExpiresAt time.Time `json:"expiresAt"` + ExpiresAfter time.Duration `json:"expiresAfter"` + IsSliding bool `json:"isSliding"` + }{ + Value: e.value, + ExpiresAt: e.expiresAt, + ExpiresAfter: e.expiresAfter, + IsSliding: e.isSliding, + }) + } else { + return json.Marshal(&struct { + Value T `json:"value"` + }{ + Value: e.value, + }) + } +} + +type minimalEntry[T any] struct { + Value T + ExpiresAfter time.Duration +} + +func (kv *store[T]) UnmarshalJSON(b []byte) error { + + var result map[string]minimalEntry[T] + + // Unmarshal or Decode the JSON to the interface. + json.Unmarshal([]byte(b), &result) + + for k, v := range result { + // TODO: Handle sliding... + kv.Put(k, v.Value, ExpiresAfter(v.ExpiresAfter)) + } + + return nil +} + +func (e *minimalEntry[T]) UnmarshalJSON(b []byte) error { + + result := &struct { + Value T `json:"value"` + ExpiresAt time.Time `json:"expiresAt"` + }{} + + // Unmarshal or Decode the JSON to the interface. + json.Unmarshal([]byte(b), &result) + + if result.ExpiresAt.After(time.Now()) { + e.Value = result.Value + e.ExpiresAfter = time.Until(result.ExpiresAt) + } + // TODO: Handle sliding... + + return nil +} + +func (kv *store[T]) cas(k string, e *entry[T], casFunc func(interface{}, bool) bool) error { + old, ok := kv.kv[k] + var oldValue T + if ok && old != nil { + oldValue = old.value + } + if !casFunc(oldValue, ok) { + return ErrCASCond + } + if ok && old != nil { + if e.timeout != nil { + old.timeout = e.timeout + } + old.value = e.value + e = old + } + e.slide() + kv.kv[k] = e + return nil +} + +// Take takes an entry out of kv store +func (kv *store[T]) Take(k string) (T, bool) { + var zero T + kv.mx.Lock() + defer kv.mx.Unlock() + e, ok := kv.kv[k] + if ok { + delete(kv.kv, k) + return e.value, ok + } + return zero, ok +} + +//----------------------------------------------------------------------------- + +func (kv *store[T]) expireLoop() { + interval := kv.expirationInterval + expireTime := time.NewTimer(interval) + for { + select { + case <-kv.stop: + return + case <-expireTime.C: + v := kv.expireFunc() + if v < 0 { + v = -1 * v + } + if v > 0 && v <= kv.expirationInterval { + interval = (2*interval + v) / 3 // good enough history + } + if interval <= 0 { + interval = time.Millisecond + } + expireTime.Reset(interval) + } + } +} + +func (kv *store[T]) expireFunc() time.Duration { + kv.mx.Lock() + defer kv.mx.Unlock() + + var interval time.Duration + if len(kv.heap) == 0 { + return interval + } + expired := make(map[string]T) + c := -1 + for { + if len(kv.heap) == 0 { + break + } + c++ + if c >= len(kv.heap) { + break + } + last := kv.heap[0] + entry, ok := kv.kv[last.key] + if !ok { + timeheapPop(&kv.heap) + continue + } + if !last.expired() { + interval = time.Until(last.expiresAt) + if interval < 0 { + interval = last.expiresAfter + } + break + } + last = timeheapPop(&kv.heap) + if ok { + expired[last.key] = entry.value + } + } +REVAL: + for k := range expired { + newVal, ok := kv.kv[k] + if !ok || + newVal.timeout == nil || + !newVal.expired() { + delete(expired, k) + goto REVAL + } + delete(kv.kv, k) + } + go notifyExpirations(expired, kv.onExpire) + if interval == 0 && len(kv.heap) > 0 { + last := kv.heap[0] + interval = time.Until(last.expiresAt) + if interval < 0 { + interval = last.expiresAfter + } + } + return interval +} + +func notifyExpirations[T any]( + expired map[string]T, + onExpire func(k string, v T)) { + if onExpire == nil { + return + } + for k, v := range expired { + k, v := k, v + try(func() error { + onExpire(k, v) + return nil + }) + } +} + +//----------------------------------------------------------------------------- + +// errors +var ( + ErrCASCond = errorf("CAS COND FAILED") +) + +//----------------------------------------------------------------------------- + +type sentinelErr string + +func (v sentinelErr) Error() string { return string(v) } +func errorf(format string, a ...interface{}) error { + return sentinelErr(fmt.Sprintf(format, a...)) +} + +//----------------------------------------------------------------------------- diff --git a/pkg/tinykv/tinykv_test.go b/pkg/tinykv/tinykv_test.go new file mode 100644 index 0000000..c380e40 --- /dev/null +++ b/pkg/tinykv/tinykv_test.go @@ -0,0 +1,629 @@ +package tinykv + +import ( + "encoding/json" + "fmt" + "math/rand" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTimeoutHeap(t *testing.T) { + assert := assert.New(t) + + now := time.Now() + r := rand.New(rand.NewSource(now.Unix())) + n := r.Intn(10000) + 10 + var h th = []*timeout{} + for i := 0; i < n; i++ { + to := &timeout{expiresAt: now.Add(time.Duration(r.Intn(100000)) * time.Second)} + timeheapPush(&h, to) + } + + var prev *timeout + // t.Log(h[0].expiresAt, h[len(h)-1].expiresAt) + for len(h) > 0 { + ito := timeheapPop(&h) + if prev != nil { + assert.Condition(func() bool { return !prev.expiresAt.After(ito.expiresAt) }) + } + prev = ito + } + assert.Equal(0, len(h)) +} + +var _ KV[int] = &store[int]{} + +func TestGetPut(t *testing.T) { + assert := assert.New(t) + rg := New[int](0) + defer rg.Stop() + + rg.Put("1", 1) + v, ok := rg.Get("1") + assert.True(ok) + assert.Equal(1, v) + + rg.Put("2", 2, ExpiresAfter(time.Millisecond*50)) + v, ok = rg.Get("2") + assert.True(ok) + assert.Equal(2, v) + <-time.After(time.Millisecond * 100) + + v, ok = rg.Get("2") + assert.False(ok) + assert.NotEqual(2, v) +} + +func TestKeys(t *testing.T) { + assert := assert.New(t) + rg := New[int](0) + defer rg.Stop() + + rg.Put("1", 1) + rg.Put("2", 2) + + keys := rg.Keys() + assert.NotEmpty(keys) + assert.Contains(keys, "1") + assert.Contains(keys, "2") +} + +func TestValues(t *testing.T) { + assert := assert.New(t) + rg := New[int](0) + defer rg.Stop() + + rg.Put("1", 1) + rg.Put("2", 2) + + values := rg.Values() + assert.NotEmpty(values) + assert.Contains(values, 1) + assert.Contains(values, 2) +} + +func TestEntries(t *testing.T) { + assert := assert.New(t) + rg := New[int](0) + defer rg.Stop() + + rg.Put("1", 1) + rg.Put("2", 2) + rg.Put("3", 3, ExpiresAfter(time.Minute*50)) + + entries := rg.Entries() + assert.NotEmpty(entries) + assert.NotNil(entries["1"]) + assert.NotNil(entries["2"]) + assert.NotNil(entries["3"]) +} + +func TestMarshalJSON(t *testing.T) { + assert := assert.New(t) + rg := New[int](0) + defer rg.Stop() + + rg.Put("1", 1) + rg.Put("2", 2) + rg.Put("3", 3, ExpiresAfter(time.Minute*50)) + + jsonb, err := json.Marshal(rg) + assert.Nil(err) + json := string(jsonb) + assert.Regexp("{\"1\":{\"value\":1},\"2\":{\"value\":2},\"3\":{\"value\":3,\"expiresAt\":\"\\d\\d\\d\\d-\\d\\d-\\d\\dT\\d\\d:\\d\\d:\\d\\d.\\d\\d\\d\\d\\d\\d\\d\\d\\dZ\",\"expiresAfter\":3000000000000,\"isSliding\":false}}", json) +} + +func TestUnmarshalJSON(t *testing.T) { + assert := assert.New(t) + in5Minutes := time.Now().Add(time.Minute * 5) + in5MinutesJson, err := json.Marshal(in5Minutes) + assert.Nil(err) + jsons := "{\"1\":{\"value\":1},\"2\":{\"value\":2},\"3\":{\"value\":3,\"expiresAt\":" + string(in5MinutesJson) + ",\"expiresAfter\":3000000000000,\"isSliding\":false}}" + + rg := New[int](0) + defer rg.Stop() + + err = json.Unmarshal([]byte(jsons), &rg) + assert.Nil(err) +} + +func TestTimeout(t *testing.T) { + assert := assert.New(t) + rcvd := make(chan string, 100) + notify := func(k string, v interface{}) { + rcvd <- k + } + rg := New(time.Millisecond*10, notify) + n := 1000 + for i := n; i < 2*n; i++ { + rg.Put(strconv.Itoa(i), i, ExpiresAfter(time.Millisecond*10)) + } + got := make([]string, n) +OUT01: + for { + select { + case v := <-rcvd: + i, err := strconv.Atoi(v) + assert.NoError(err) + i = i - n + if i < 0 || i >= n { + t.Fail() + } + got[i] = v + case <-time.After(time.Millisecond * 100): + break OUT01 + } + } + assert.Equal(len(got), n) + for i := 0; i < n; i++ { + if got[i] != "" { + continue + } + assert.Fail("should have value", i, got[i]) + } +} + +func Test02(t *testing.T) { + assert := assert.New(t) + rg := New[int](time.Millisecond * 30) + + rg.Put("1", 1) + v, ok := rg.Get("1") + assert.True(ok) + assert.Equal(1, v) + + rg.Put("1", 1, ExpiresAfter(time.Millisecond*50), IsSliding(true)) + <-time.After(time.Millisecond * 40) + v, ok = rg.Get("1") + assert.True(ok) + assert.Equal(1, v) + <-time.After(time.Millisecond * 10) + v, ok = rg.Get("1") + assert.True(ok) + assert.Equal(1, v) + <-time.After(time.Millisecond * 10) + v, ok = rg.Get("1") + assert.True(ok) + assert.Equal(1, v) + + <-time.After(time.Millisecond * 100) + + v, ok = rg.Get("1") + assert.False(ok) + assert.NotEqual(1, v) +} + +func Test03(t *testing.T) { + assert := assert.New(t) + var putAt time.Time + elapsed := make(chan time.Duration, 1) + kv := New( + time.Millisecond*50, + func(k string, v interface{}) { + elapsed <- time.Since(putAt) + }) + + putAt = time.Now() + kv.Put("1", 1, ExpiresAfter(time.Millisecond*10)) + + <-time.After(time.Millisecond * 100) + assert.WithinDuration(putAt, putAt.Add(<-elapsed), time.Millisecond*60) +} + +func Test04(t *testing.T) { + assert := assert.New(t) + kv := New( + time.Millisecond*10, + func(k string, v interface{}) { + t.Fatal(k, v) + }) + + err := kv.Put("1", 1, ExpiresAfter(time.Millisecond*10000)) + assert.NoError(err) + <-time.After(time.Millisecond * 50) + kv.Delete("1") + kv.Delete("1") + + <-time.After(time.Millisecond * 100) + _, ok := kv.Get("1") + assert.False(ok) +} + +func Test05(t *testing.T) { + assert := assert.New(t) + N := 10000 + var cnt int64 + kv := New( + time.Millisecond*10, + func(k string, v interface{}) { + atomic.AddInt64(&cnt, 1) + }) + + src := rand.NewSource(time.Now().Unix()) + rnd := rand.New(src) + for i := 0; i < N; i++ { + k := fmt.Sprintf("%d", i) + kv.Put(k, fmt.Sprintf("VAL::%v", k), + ExpiresAfter( + time.Millisecond*time.Duration(rnd.Intn(10)+1))) + } + + <-time.After(time.Millisecond * 100) + for i := 0; i < N; i++ { + k := fmt.Sprintf("%d", i) + _, ok := kv.Get(k) + assert.False(ok) + } +} + +func Test06(t *testing.T) { + assert := assert.New(t) + kv := New( + time.Millisecond, + func(k string, v interface{}) { + t.Fail() + }) + + err := kv.Put("1", 1, ExpiresAfter(10*time.Millisecond), IsSliding(true)) + assert.NoError(err) + + for i := 0; i < 100; i++ { + _, ok := kv.Get("1") + assert.True(ok) + <-time.After(time.Millisecond) + } + kv.Delete("1") + + <-time.After(time.Millisecond * 30) + + _, ok := kv.Get("1") + assert.False(ok) +} + +func Test07(t *testing.T) { + assert := assert.New(t) + + kv := New[int](-1) + kv.Put("1", 1) + v, ok := kv.Take("1") + assert.True(ok) + assert.Equal(1, v) + + _, ok = kv.Get("1") + assert.False(ok) +} + +func Test08(t *testing.T) { + assert := assert.New(t) + + kv := New[interface{}](-1) + err := kv.Put( + "QQG", "G", + CAS(func(interface{}, bool) bool { return true }), + ExpiresAfter(time.Millisecond)) + assert.NoError(err) + + v, ok := kv.Take("QQG") + assert.True(ok) + assert.Equal("G", v) +} + +// ignore new timeouts when cas, and just use the old ones from the old value (if exists) +func Test09IgnoreTimeoutParamsOnCAS(t *testing.T) { + assert := assert.New(t) + + key := "QQG" + + kv := New[interface{}](time.Millisecond) + err := kv.Put( + key, "G", + CAS(func(interface{}, bool) bool { return true }), + ExpiresAfter(time.Millisecond*30)) + assert.NoError(err) + + v, ok := kv.Get(key) + assert.True(ok) + assert.Equal("G", v) + + <-time.After(time.Millisecond * 20) + + err = kv.Put(key, "OK", + CAS(func(currentValue interface{}, found bool) bool { + assert.True(found) + assert.Equal("G", currentValue) + return true + })) + assert.NoError(err) + + <-time.After(time.Millisecond * 12) + _, ok = kv.Get(key) + assert.False(ok) +} + +func Test10(t *testing.T) { + assert := assert.New(t) + + key := "QQG" + + kv := New[interface{}](time.Millisecond) + err := kv.Put( + key, "G", + CAS(func(interface{}, bool) bool { return true }), + IsSliding(true), + ExpiresAfter(time.Millisecond*15)) + assert.NoError(err) + + <-time.After(time.Millisecond * 12) + + v, ok := kv.Get(key) + assert.True(ok) + assert.Equal("G", v) + + <-time.After(time.Millisecond * 12) + + err = kv.Put(key, "OK", + CAS(func(currentValue interface{}, found bool) bool { + assert.True(found) + assert.Equal("G", currentValue) + return true + })) + assert.NoError(err) + + <-time.After(time.Millisecond * 12) + + _, ok = kv.Get(key) + assert.True(ok) +} + +func Test11(t *testing.T) { + assert := assert.New(t) + + key := "QQG" + + var expiredKey = make(chan string, 100) + onExpired := func(k string, v interface{}) { expiredKey <- k } + + kv := New(time.Millisecond*100, onExpired) + err := kv.Put( + key, "G", + ExpiresAfter(time.Millisecond*15)) + assert.NoError(err) + + <-time.After(time.Millisecond * 10) + + v, ok := kv.Get(key) + assert.True(ok) + assert.Equal("G", v) + + <-time.After(time.Millisecond * 10) + + _, ok = kv.Get(key) + assert.False(ok) + <-time.After(time.Millisecond) + assert.Equal(key, <-expiredKey) + + <-time.After(time.Millisecond * 110) + + _, ok = kv.Get(key) + assert.False(ok) +} + +func Test12(t *testing.T) { + assert := assert.New(t) + + key := "QQG" + + onExpired := func(k string, v interface{}) {} + + kv := New(time.Millisecond*100, onExpired) + err := kv.Put( + key, "G", + ExpiresAfter(time.Millisecond)) + assert.NoError(err) + + <-time.After(time.Millisecond * 10) + + v, ok := kv.Get(key) + assert.False(ok) + assert.Equal(nil, v) +} + +func Test13(t *testing.T) { + assert := assert.New(t) + + got := make(chan interface{}, 10) + onExpired := func(k string, v interface{}) { + got <- v + } + + kv := New(time.Millisecond*10, onExpired) + err := kv.Put( + "1", 123, + ExpiresAfter(time.Millisecond)) + assert.NoError(err) + + <-time.After(time.Millisecond * 50) + + v, ok := kv.Get("1") + assert.False(ok) + assert.Equal(nil, v) + + v = <-got + assert.Equal(123, v) +} + +func TestOrdering(t *testing.T) { + assert := assert.New(t) + + type data struct { + key string + value interface{} + } + got := make(chan data, 100) + onExpired := func(k string, v interface{}) { + got <- data{k, v} + } + + kv := New(time.Millisecond*5, onExpired) + + for i := 1; i <= 10; i++ { + k := strconv.Itoa(i) + v := i + kv.Put(k, v, ExpiresAfter(time.Millisecond*time.Duration(i)*50)) + } + + var order = make([]int, 10) + done := make(chan struct{}) + go func() { + defer close(done) + for { + select { + case v := <-got: + i, _ := strconv.Atoi(v.key) + i-- + val := v.value.(int) + val-- + order[i] = val + case <-time.After(time.Millisecond * 100): + return + } + } + }() + <-done + for k, v := range order { + assert.Equal(k, v) + } + + assert.Equal(1, 1) +} + +func TestCASOldFound(t *testing.T) { + assert := assert.New(t) + + kv := New[interface{}](time.Millisecond * 10) + key := "KEY01" + value := "VALUE01" + err := kv.Put( + key, value, + CAS(func(old interface{}, found bool) bool { + assert.Nil(old) + assert.False(found) + return true + })) + assert.NoError(err) + err = kv.Put( + key, value, + CAS(func(old interface{}, found bool) bool { + assert.Equal(value, old) + assert.True(found) + return true + })) + assert.NoError(err) + kv.Delete(key) + err = kv.Put( + key, value, + CAS(func(old interface{}, found bool) bool { + assert.Nil(old) + assert.False(found) + return true + })) + assert.NoError(err) + v, ok := kv.Take(key) + assert.True(ok) + assert.Equal(value, v) + err = kv.Put( + key, value, + CAS(func(old interface{}, found bool) bool { + assert.Nil(old) + assert.False(found) + return true + })) + assert.NoError(err) +} + +func ExampleNew() { + key := "KEY" + value := "VALUE" + + kv := New[interface{}](time.Millisecond * 10) + defer kv.Stop() + kv.Put(key, value) + v, ok := kv.Get(key) + if !ok { + // ... + } + fmt.Println(key, v) + kv.Delete(key) + _, ok = kv.Get(key) + fmt.Println(ok) + + // Output: + // KEY VALUE + // false +} + +func BenchmarkGetNoValue(b *testing.B) { + rg := New[interface{}](-1) + for n := 0; n < b.N; n++ { + rg.Get("1") + } +} + +func BenchmarkGetValue(b *testing.B) { + rg := New[interface{}](-1) + rg.Put("1", 1) + for n := 0; n < b.N; n++ { + rg.Get("1") + } +} + +func BenchmarkGetSlidingTimeout(b *testing.B) { + rg := New[interface{}](-1) + rg.Put("1", 1, ExpiresAfter(time.Second*10)) + for n := 0; n < b.N; n++ { + rg.Get("1") + } +} + +func BenchmarkPutOne(b *testing.B) { + rg := New[interface{}](-1) + for n := 0; n < b.N; n++ { + rg.Put("1", 1) + } +} + +func BenchmarkPutN(b *testing.B) { + rg := New[interface{}](-1) + for n := 0; n < b.N; n++ { + k := strconv.Itoa(n) + rg.Put(k, n) + } +} + +func BenchmarkPutExpire(b *testing.B) { + rg := New[interface{}](-1) + for n := 0; n < b.N; n++ { + rg.Put("1", 1, ExpiresAfter(time.Second*10)) + } +} + +func BenchmarkCASTrue(b *testing.B) { + rg := New[interface{}](-1) + rg.Put("1", 1) + for n := 0; n < b.N; n++ { + rg.Put("1", 2, CAS(func(interface{}, bool) bool { return true })) + } +} + +func BenchmarkCASFalse(b *testing.B) { + rg := New[interface{}](-1) + rg.Put("1", 1) + for n := 0; n < b.N; n++ { + rg.Put("1", 2, CAS(func(interface{}, bool) bool { return false })) + } +}