Matthias Andreas Benkard | 832a54e | 2019-01-29 09:27:38 +0100 | [diff] [blame^] | 1 | // Diskv (disk-vee) is a simple, persistent, key-value store. |
| 2 | // It stores all data flatly on the filesystem. |
| 3 | |
| 4 | package diskv |
| 5 | |
| 6 | import ( |
| 7 | "bytes" |
| 8 | "errors" |
| 9 | "fmt" |
| 10 | "io" |
| 11 | "io/ioutil" |
| 12 | "os" |
| 13 | "path/filepath" |
| 14 | "strings" |
| 15 | "sync" |
| 16 | "syscall" |
| 17 | ) |
| 18 | |
| 19 | const ( |
| 20 | defaultBasePath = "diskv" |
| 21 | defaultFilePerm os.FileMode = 0666 |
| 22 | defaultPathPerm os.FileMode = 0777 |
| 23 | ) |
| 24 | |
| 25 | var ( |
| 26 | defaultTransform = func(s string) []string { return []string{} } |
| 27 | errCanceled = errors.New("canceled") |
| 28 | errEmptyKey = errors.New("empty key") |
| 29 | errBadKey = errors.New("bad key") |
| 30 | errImportDirectory = errors.New("can't import a directory") |
| 31 | ) |
| 32 | |
| 33 | // TransformFunction transforms a key into a slice of strings, with each |
| 34 | // element in the slice representing a directory in the file path where the |
| 35 | // key's entry will eventually be stored. |
| 36 | // |
| 37 | // For example, if TransformFunc transforms "abcdef" to ["ab", "cde", "f"], |
| 38 | // the final location of the data file will be <basedir>/ab/cde/f/abcdef |
| 39 | type TransformFunction func(s string) []string |
| 40 | |
| 41 | // Options define a set of properties that dictate Diskv behavior. |
| 42 | // All values are optional. |
| 43 | type Options struct { |
| 44 | BasePath string |
| 45 | Transform TransformFunction |
| 46 | CacheSizeMax uint64 // bytes |
| 47 | PathPerm os.FileMode |
| 48 | FilePerm os.FileMode |
| 49 | // If TempDir is set, it will enable filesystem atomic writes by |
| 50 | // writing temporary files to that location before being moved |
| 51 | // to BasePath. |
| 52 | // Note that TempDir MUST be on the same device/partition as |
| 53 | // BasePath. |
| 54 | TempDir string |
| 55 | |
| 56 | Index Index |
| 57 | IndexLess LessFunction |
| 58 | |
| 59 | Compression Compression |
| 60 | } |
| 61 | |
| 62 | // Diskv implements the Diskv interface. You shouldn't construct Diskv |
| 63 | // structures directly; instead, use the New constructor. |
| 64 | type Diskv struct { |
| 65 | Options |
| 66 | mu sync.RWMutex |
| 67 | cache map[string][]byte |
| 68 | cacheSize uint64 |
| 69 | } |
| 70 | |
| 71 | // New returns an initialized Diskv structure, ready to use. |
| 72 | // If the path identified by baseDir already contains data, |
| 73 | // it will be accessible, but not yet cached. |
| 74 | func New(o Options) *Diskv { |
| 75 | if o.BasePath == "" { |
| 76 | o.BasePath = defaultBasePath |
| 77 | } |
| 78 | if o.Transform == nil { |
| 79 | o.Transform = defaultTransform |
| 80 | } |
| 81 | if o.PathPerm == 0 { |
| 82 | o.PathPerm = defaultPathPerm |
| 83 | } |
| 84 | if o.FilePerm == 0 { |
| 85 | o.FilePerm = defaultFilePerm |
| 86 | } |
| 87 | |
| 88 | d := &Diskv{ |
| 89 | Options: o, |
| 90 | cache: map[string][]byte{}, |
| 91 | cacheSize: 0, |
| 92 | } |
| 93 | |
| 94 | if d.Index != nil && d.IndexLess != nil { |
| 95 | d.Index.Initialize(d.IndexLess, d.Keys(nil)) |
| 96 | } |
| 97 | |
| 98 | return d |
| 99 | } |
| 100 | |
| 101 | // Write synchronously writes the key-value pair to disk, making it immediately |
| 102 | // available for reads. Write relies on the filesystem to perform an eventual |
| 103 | // sync to physical media. If you need stronger guarantees, see WriteStream. |
| 104 | func (d *Diskv) Write(key string, val []byte) error { |
| 105 | return d.WriteStream(key, bytes.NewBuffer(val), false) |
| 106 | } |
| 107 | |
| 108 | // WriteStream writes the data represented by the io.Reader to the disk, under |
| 109 | // the provided key. If sync is true, WriteStream performs an explicit sync on |
| 110 | // the file as soon as it's written. |
| 111 | // |
| 112 | // bytes.Buffer provides io.Reader semantics for basic data types. |
| 113 | func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error { |
| 114 | if len(key) <= 0 { |
| 115 | return errEmptyKey |
| 116 | } |
| 117 | |
| 118 | d.mu.Lock() |
| 119 | defer d.mu.Unlock() |
| 120 | |
| 121 | return d.writeStreamWithLock(key, r, sync) |
| 122 | } |
| 123 | |
| 124 | // createKeyFileWithLock either creates the key file directly, or |
| 125 | // creates a temporary file in TempDir if it is set. |
| 126 | func (d *Diskv) createKeyFileWithLock(key string) (*os.File, error) { |
| 127 | if d.TempDir != "" { |
| 128 | if err := os.MkdirAll(d.TempDir, d.PathPerm); err != nil { |
| 129 | return nil, fmt.Errorf("temp mkdir: %s", err) |
| 130 | } |
| 131 | f, err := ioutil.TempFile(d.TempDir, "") |
| 132 | if err != nil { |
| 133 | return nil, fmt.Errorf("temp file: %s", err) |
| 134 | } |
| 135 | |
| 136 | if err := f.Chmod(d.FilePerm); err != nil { |
| 137 | f.Close() // error deliberately ignored |
| 138 | os.Remove(f.Name()) // error deliberately ignored |
| 139 | return nil, fmt.Errorf("chmod: %s", err) |
| 140 | } |
| 141 | return f, nil |
| 142 | } |
| 143 | |
| 144 | mode := os.O_WRONLY | os.O_CREATE | os.O_TRUNC // overwrite if exists |
| 145 | f, err := os.OpenFile(d.completeFilename(key), mode, d.FilePerm) |
| 146 | if err != nil { |
| 147 | return nil, fmt.Errorf("open file: %s", err) |
| 148 | } |
| 149 | return f, nil |
| 150 | } |
| 151 | |
| 152 | // writeStream does no input validation checking. |
| 153 | func (d *Diskv) writeStreamWithLock(key string, r io.Reader, sync bool) error { |
| 154 | if err := d.ensurePathWithLock(key); err != nil { |
| 155 | return fmt.Errorf("ensure path: %s", err) |
| 156 | } |
| 157 | |
| 158 | f, err := d.createKeyFileWithLock(key) |
| 159 | if err != nil { |
| 160 | return fmt.Errorf("create key file: %s", err) |
| 161 | } |
| 162 | |
| 163 | wc := io.WriteCloser(&nopWriteCloser{f}) |
| 164 | if d.Compression != nil { |
| 165 | wc, err = d.Compression.Writer(f) |
| 166 | if err != nil { |
| 167 | f.Close() // error deliberately ignored |
| 168 | os.Remove(f.Name()) // error deliberately ignored |
| 169 | return fmt.Errorf("compression writer: %s", err) |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | if _, err := io.Copy(wc, r); err != nil { |
| 174 | f.Close() // error deliberately ignored |
| 175 | os.Remove(f.Name()) // error deliberately ignored |
| 176 | return fmt.Errorf("i/o copy: %s", err) |
| 177 | } |
| 178 | |
| 179 | if err := wc.Close(); err != nil { |
| 180 | f.Close() // error deliberately ignored |
| 181 | os.Remove(f.Name()) // error deliberately ignored |
| 182 | return fmt.Errorf("compression close: %s", err) |
| 183 | } |
| 184 | |
| 185 | if sync { |
| 186 | if err := f.Sync(); err != nil { |
| 187 | f.Close() // error deliberately ignored |
| 188 | os.Remove(f.Name()) // error deliberately ignored |
| 189 | return fmt.Errorf("file sync: %s", err) |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | if err := f.Close(); err != nil { |
| 194 | return fmt.Errorf("file close: %s", err) |
| 195 | } |
| 196 | |
| 197 | if f.Name() != d.completeFilename(key) { |
| 198 | if err := os.Rename(f.Name(), d.completeFilename(key)); err != nil { |
| 199 | os.Remove(f.Name()) // error deliberately ignored |
| 200 | return fmt.Errorf("rename: %s", err) |
| 201 | } |
| 202 | } |
| 203 | |
| 204 | if d.Index != nil { |
| 205 | d.Index.Insert(key) |
| 206 | } |
| 207 | |
| 208 | d.bustCacheWithLock(key) // cache only on read |
| 209 | |
| 210 | return nil |
| 211 | } |
| 212 | |
| 213 | // Import imports the source file into diskv under the destination key. If the |
| 214 | // destination key already exists, it's overwritten. If move is true, the |
| 215 | // source file is removed after a successful import. |
| 216 | func (d *Diskv) Import(srcFilename, dstKey string, move bool) (err error) { |
| 217 | if dstKey == "" { |
| 218 | return errEmptyKey |
| 219 | } |
| 220 | |
| 221 | if fi, err := os.Stat(srcFilename); err != nil { |
| 222 | return err |
| 223 | } else if fi.IsDir() { |
| 224 | return errImportDirectory |
| 225 | } |
| 226 | |
| 227 | d.mu.Lock() |
| 228 | defer d.mu.Unlock() |
| 229 | |
| 230 | if err := d.ensurePathWithLock(dstKey); err != nil { |
| 231 | return fmt.Errorf("ensure path: %s", err) |
| 232 | } |
| 233 | |
| 234 | if move { |
| 235 | if err := syscall.Rename(srcFilename, d.completeFilename(dstKey)); err == nil { |
| 236 | d.bustCacheWithLock(dstKey) |
| 237 | return nil |
| 238 | } else if err != syscall.EXDEV { |
| 239 | // If it failed due to being on a different device, fall back to copying |
| 240 | return err |
| 241 | } |
| 242 | } |
| 243 | |
| 244 | f, err := os.Open(srcFilename) |
| 245 | if err != nil { |
| 246 | return err |
| 247 | } |
| 248 | defer f.Close() |
| 249 | err = d.writeStreamWithLock(dstKey, f, false) |
| 250 | if err == nil && move { |
| 251 | err = os.Remove(srcFilename) |
| 252 | } |
| 253 | return err |
| 254 | } |
| 255 | |
| 256 | // Read reads the key and returns the value. |
| 257 | // If the key is available in the cache, Read won't touch the disk. |
| 258 | // If the key is not in the cache, Read will have the side-effect of |
| 259 | // lazily caching the value. |
| 260 | func (d *Diskv) Read(key string) ([]byte, error) { |
| 261 | rc, err := d.ReadStream(key, false) |
| 262 | if err != nil { |
| 263 | return []byte{}, err |
| 264 | } |
| 265 | defer rc.Close() |
| 266 | return ioutil.ReadAll(rc) |
| 267 | } |
| 268 | |
| 269 | // ReadStream reads the key and returns the value (data) as an io.ReadCloser. |
| 270 | // If the value is cached from a previous read, and direct is false, |
| 271 | // ReadStream will use the cached value. Otherwise, it will return a handle to |
| 272 | // the file on disk, and cache the data on read. |
| 273 | // |
| 274 | // If direct is true, ReadStream will lazily delete any cached value for the |
| 275 | // key, and return a direct handle to the file on disk. |
| 276 | // |
| 277 | // If compression is enabled, ReadStream taps into the io.Reader stream prior |
| 278 | // to decompression, and caches the compressed data. |
| 279 | func (d *Diskv) ReadStream(key string, direct bool) (io.ReadCloser, error) { |
| 280 | d.mu.RLock() |
| 281 | defer d.mu.RUnlock() |
| 282 | |
| 283 | if val, ok := d.cache[key]; ok { |
| 284 | if !direct { |
| 285 | buf := bytes.NewBuffer(val) |
| 286 | if d.Compression != nil { |
| 287 | return d.Compression.Reader(buf) |
| 288 | } |
| 289 | return ioutil.NopCloser(buf), nil |
| 290 | } |
| 291 | |
| 292 | go func() { |
| 293 | d.mu.Lock() |
| 294 | defer d.mu.Unlock() |
| 295 | d.uncacheWithLock(key, uint64(len(val))) |
| 296 | }() |
| 297 | } |
| 298 | |
| 299 | return d.readWithRLock(key) |
| 300 | } |
| 301 | |
| 302 | // read ignores the cache, and returns an io.ReadCloser representing the |
| 303 | // decompressed data for the given key, streamed from the disk. Clients should |
| 304 | // acquire a read lock on the Diskv and check the cache themselves before |
| 305 | // calling read. |
| 306 | func (d *Diskv) readWithRLock(key string) (io.ReadCloser, error) { |
| 307 | filename := d.completeFilename(key) |
| 308 | |
| 309 | fi, err := os.Stat(filename) |
| 310 | if err != nil { |
| 311 | return nil, err |
| 312 | } |
| 313 | if fi.IsDir() { |
| 314 | return nil, os.ErrNotExist |
| 315 | } |
| 316 | |
| 317 | f, err := os.Open(filename) |
| 318 | if err != nil { |
| 319 | return nil, err |
| 320 | } |
| 321 | |
| 322 | var r io.Reader |
| 323 | if d.CacheSizeMax > 0 { |
| 324 | r = newSiphon(f, d, key) |
| 325 | } else { |
| 326 | r = &closingReader{f} |
| 327 | } |
| 328 | |
| 329 | var rc = io.ReadCloser(ioutil.NopCloser(r)) |
| 330 | if d.Compression != nil { |
| 331 | rc, err = d.Compression.Reader(r) |
| 332 | if err != nil { |
| 333 | return nil, err |
| 334 | } |
| 335 | } |
| 336 | |
| 337 | return rc, nil |
| 338 | } |
| 339 | |
| 340 | // closingReader provides a Reader that automatically closes the |
| 341 | // embedded ReadCloser when it reaches EOF |
| 342 | type closingReader struct { |
| 343 | rc io.ReadCloser |
| 344 | } |
| 345 | |
| 346 | func (cr closingReader) Read(p []byte) (int, error) { |
| 347 | n, err := cr.rc.Read(p) |
| 348 | if err == io.EOF { |
| 349 | if closeErr := cr.rc.Close(); closeErr != nil { |
| 350 | return n, closeErr // close must succeed for Read to succeed |
| 351 | } |
| 352 | } |
| 353 | return n, err |
| 354 | } |
| 355 | |
| 356 | // siphon is like a TeeReader: it copies all data read through it to an |
| 357 | // internal buffer, and moves that buffer to the cache at EOF. |
| 358 | type siphon struct { |
| 359 | f *os.File |
| 360 | d *Diskv |
| 361 | key string |
| 362 | buf *bytes.Buffer |
| 363 | } |
| 364 | |
| 365 | // newSiphon constructs a siphoning reader that represents the passed file. |
| 366 | // When a successful series of reads ends in an EOF, the siphon will write |
| 367 | // the buffered data to Diskv's cache under the given key. |
| 368 | func newSiphon(f *os.File, d *Diskv, key string) io.Reader { |
| 369 | return &siphon{ |
| 370 | f: f, |
| 371 | d: d, |
| 372 | key: key, |
| 373 | buf: &bytes.Buffer{}, |
| 374 | } |
| 375 | } |
| 376 | |
| 377 | // Read implements the io.Reader interface for siphon. |
| 378 | func (s *siphon) Read(p []byte) (int, error) { |
| 379 | n, err := s.f.Read(p) |
| 380 | |
| 381 | if err == nil { |
| 382 | return s.buf.Write(p[0:n]) // Write must succeed for Read to succeed |
| 383 | } |
| 384 | |
| 385 | if err == io.EOF { |
| 386 | s.d.cacheWithoutLock(s.key, s.buf.Bytes()) // cache may fail |
| 387 | if closeErr := s.f.Close(); closeErr != nil { |
| 388 | return n, closeErr // close must succeed for Read to succeed |
| 389 | } |
| 390 | return n, err |
| 391 | } |
| 392 | |
| 393 | return n, err |
| 394 | } |
| 395 | |
| 396 | // Erase synchronously erases the given key from the disk and the cache. |
| 397 | func (d *Diskv) Erase(key string) error { |
| 398 | d.mu.Lock() |
| 399 | defer d.mu.Unlock() |
| 400 | |
| 401 | d.bustCacheWithLock(key) |
| 402 | |
| 403 | // erase from index |
| 404 | if d.Index != nil { |
| 405 | d.Index.Delete(key) |
| 406 | } |
| 407 | |
| 408 | // erase from disk |
| 409 | filename := d.completeFilename(key) |
| 410 | if s, err := os.Stat(filename); err == nil { |
| 411 | if s.IsDir() { |
| 412 | return errBadKey |
| 413 | } |
| 414 | if err = os.Remove(filename); err != nil { |
| 415 | return err |
| 416 | } |
| 417 | } else { |
| 418 | // Return err as-is so caller can do os.IsNotExist(err). |
| 419 | return err |
| 420 | } |
| 421 | |
| 422 | // clean up and return |
| 423 | d.pruneDirsWithLock(key) |
| 424 | return nil |
| 425 | } |
| 426 | |
| 427 | // EraseAll will delete all of the data from the store, both in the cache and on |
| 428 | // the disk. Note that EraseAll doesn't distinguish diskv-related data from non- |
| 429 | // diskv-related data. Care should be taken to always specify a diskv base |
| 430 | // directory that is exclusively for diskv data. |
| 431 | func (d *Diskv) EraseAll() error { |
| 432 | d.mu.Lock() |
| 433 | defer d.mu.Unlock() |
| 434 | d.cache = make(map[string][]byte) |
| 435 | d.cacheSize = 0 |
| 436 | if d.TempDir != "" { |
| 437 | os.RemoveAll(d.TempDir) // errors ignored |
| 438 | } |
| 439 | return os.RemoveAll(d.BasePath) |
| 440 | } |
| 441 | |
| 442 | // Has returns true if the given key exists. |
| 443 | func (d *Diskv) Has(key string) bool { |
| 444 | d.mu.Lock() |
| 445 | defer d.mu.Unlock() |
| 446 | |
| 447 | if _, ok := d.cache[key]; ok { |
| 448 | return true |
| 449 | } |
| 450 | |
| 451 | filename := d.completeFilename(key) |
| 452 | s, err := os.Stat(filename) |
| 453 | if err != nil { |
| 454 | return false |
| 455 | } |
| 456 | if s.IsDir() { |
| 457 | return false |
| 458 | } |
| 459 | |
| 460 | return true |
| 461 | } |
| 462 | |
| 463 | // Keys returns a channel that will yield every key accessible by the store, |
| 464 | // in undefined order. If a cancel channel is provided, closing it will |
| 465 | // terminate and close the keys channel. |
| 466 | func (d *Diskv) Keys(cancel <-chan struct{}) <-chan string { |
| 467 | return d.KeysPrefix("", cancel) |
| 468 | } |
| 469 | |
| 470 | // KeysPrefix returns a channel that will yield every key accessible by the |
| 471 | // store with the given prefix, in undefined order. If a cancel channel is |
| 472 | // provided, closing it will terminate and close the keys channel. If the |
| 473 | // provided prefix is the empty string, all keys will be yielded. |
| 474 | func (d *Diskv) KeysPrefix(prefix string, cancel <-chan struct{}) <-chan string { |
| 475 | var prepath string |
| 476 | if prefix == "" { |
| 477 | prepath = d.BasePath |
| 478 | } else { |
| 479 | prepath = d.pathFor(prefix) |
| 480 | } |
| 481 | c := make(chan string) |
| 482 | go func() { |
| 483 | filepath.Walk(prepath, walker(c, prefix, cancel)) |
| 484 | close(c) |
| 485 | }() |
| 486 | return c |
| 487 | } |
| 488 | |
| 489 | // walker returns a function which satisfies the filepath.WalkFunc interface. |
| 490 | // It sends every non-directory file entry down the channel c. |
| 491 | func walker(c chan<- string, prefix string, cancel <-chan struct{}) filepath.WalkFunc { |
| 492 | return func(path string, info os.FileInfo, err error) error { |
| 493 | if err != nil { |
| 494 | return err |
| 495 | } |
| 496 | |
| 497 | if info.IsDir() || !strings.HasPrefix(info.Name(), prefix) { |
| 498 | return nil // "pass" |
| 499 | } |
| 500 | |
| 501 | select { |
| 502 | case c <- info.Name(): |
| 503 | case <-cancel: |
| 504 | return errCanceled |
| 505 | } |
| 506 | |
| 507 | return nil |
| 508 | } |
| 509 | } |
| 510 | |
| 511 | // pathFor returns the absolute path for location on the filesystem where the |
| 512 | // data for the given key will be stored. |
| 513 | func (d *Diskv) pathFor(key string) string { |
| 514 | return filepath.Join(d.BasePath, filepath.Join(d.Transform(key)...)) |
| 515 | } |
| 516 | |
| 517 | // ensurePathWithLock is a helper function that generates all necessary |
| 518 | // directories on the filesystem for the given key. |
| 519 | func (d *Diskv) ensurePathWithLock(key string) error { |
| 520 | return os.MkdirAll(d.pathFor(key), d.PathPerm) |
| 521 | } |
| 522 | |
| 523 | // completeFilename returns the absolute path to the file for the given key. |
| 524 | func (d *Diskv) completeFilename(key string) string { |
| 525 | return filepath.Join(d.pathFor(key), key) |
| 526 | } |
| 527 | |
| 528 | // cacheWithLock attempts to cache the given key-value pair in the store's |
| 529 | // cache. It can fail if the value is larger than the cache's maximum size. |
| 530 | func (d *Diskv) cacheWithLock(key string, val []byte) error { |
| 531 | valueSize := uint64(len(val)) |
| 532 | if err := d.ensureCacheSpaceWithLock(valueSize); err != nil { |
| 533 | return fmt.Errorf("%s; not caching", err) |
| 534 | } |
| 535 | |
| 536 | // be very strict about memory guarantees |
| 537 | if (d.cacheSize + valueSize) > d.CacheSizeMax { |
| 538 | panic(fmt.Sprintf("failed to make room for value (%d/%d)", valueSize, d.CacheSizeMax)) |
| 539 | } |
| 540 | |
| 541 | d.cache[key] = val |
| 542 | d.cacheSize += valueSize |
| 543 | return nil |
| 544 | } |
| 545 | |
| 546 | // cacheWithoutLock acquires the store's (write) mutex and calls cacheWithLock. |
| 547 | func (d *Diskv) cacheWithoutLock(key string, val []byte) error { |
| 548 | d.mu.Lock() |
| 549 | defer d.mu.Unlock() |
| 550 | return d.cacheWithLock(key, val) |
| 551 | } |
| 552 | |
| 553 | func (d *Diskv) bustCacheWithLock(key string) { |
| 554 | if val, ok := d.cache[key]; ok { |
| 555 | d.uncacheWithLock(key, uint64(len(val))) |
| 556 | } |
| 557 | } |
| 558 | |
| 559 | func (d *Diskv) uncacheWithLock(key string, sz uint64) { |
| 560 | d.cacheSize -= sz |
| 561 | delete(d.cache, key) |
| 562 | } |
| 563 | |
| 564 | // pruneDirsWithLock deletes empty directories in the path walk leading to the |
| 565 | // key k. Typically this function is called after an Erase is made. |
| 566 | func (d *Diskv) pruneDirsWithLock(key string) error { |
| 567 | pathlist := d.Transform(key) |
| 568 | for i := range pathlist { |
| 569 | dir := filepath.Join(d.BasePath, filepath.Join(pathlist[:len(pathlist)-i]...)) |
| 570 | |
| 571 | // thanks to Steven Blenkinsop for this snippet |
| 572 | switch fi, err := os.Stat(dir); true { |
| 573 | case err != nil: |
| 574 | return err |
| 575 | case !fi.IsDir(): |
| 576 | panic(fmt.Sprintf("corrupt dirstate at %s", dir)) |
| 577 | } |
| 578 | |
| 579 | nlinks, err := filepath.Glob(filepath.Join(dir, "*")) |
| 580 | if err != nil { |
| 581 | return err |
| 582 | } else if len(nlinks) > 0 { |
| 583 | return nil // has subdirs -- do not prune |
| 584 | } |
| 585 | if err = os.Remove(dir); err != nil { |
| 586 | return err |
| 587 | } |
| 588 | } |
| 589 | |
| 590 | return nil |
| 591 | } |
| 592 | |
| 593 | // ensureCacheSpaceWithLock deletes entries from the cache in arbitrary order |
| 594 | // until the cache has at least valueSize bytes available. |
| 595 | func (d *Diskv) ensureCacheSpaceWithLock(valueSize uint64) error { |
| 596 | if valueSize > d.CacheSizeMax { |
| 597 | return fmt.Errorf("value size (%d bytes) too large for cache (%d bytes)", valueSize, d.CacheSizeMax) |
| 598 | } |
| 599 | |
| 600 | safe := func() bool { return (d.cacheSize + valueSize) <= d.CacheSizeMax } |
| 601 | |
| 602 | for key, val := range d.cache { |
| 603 | if safe() { |
| 604 | break |
| 605 | } |
| 606 | |
| 607 | d.uncacheWithLock(key, uint64(len(val))) |
| 608 | } |
| 609 | |
| 610 | if !safe() { |
| 611 | panic(fmt.Sprintf("%d bytes still won't fit in the cache! (max %d bytes)", valueSize, d.CacheSizeMax)) |
| 612 | } |
| 613 | |
| 614 | return nil |
| 615 | } |
| 616 | |
| 617 | // nopWriteCloser wraps an io.Writer and provides a no-op Close method to |
| 618 | // satisfy the io.WriteCloser interface. |
| 619 | type nopWriteCloser struct { |
| 620 | io.Writer |
| 621 | } |
| 622 | |
| 623 | func (wc *nopWriteCloser) Write(p []byte) (int, error) { return wc.Writer.Write(p) } |
| 624 | func (wc *nopWriteCloser) Close() error { return nil } |