Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 87 additions & 14 deletions translib/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ type Options struct {
InitIndicator string
TableNameSeparator string //Overriden by the DB config file's separator.
KeySeparator string //Overriden by the DB config file's separator.
IsWriteDisabled bool //Indicated if write is allowed
IsWriteDisabled bool //Indicated if write is allowed
IsOnChangeEnabled bool // whether OnChange cache enabled

DisableCVLCheck bool
}
Expand Down Expand Up @@ -249,6 +250,15 @@ type Table struct {
db *DB
}

type dbCache struct {
Tables map[string]Table
}

const (
ConnectionClosed = tlerr.TranslibDBInvalidState("connection closed")
OnChangeDisabled = tlerr.TranslibDBInvalidState("OnChange disabled")
)

type _txOp int

const (
Expand All @@ -275,6 +285,9 @@ type DB struct {
cv *cvl.CVL
cvlEditConfigData []cvl.CVLEditConfigData

onCReg dbOnChangeReg // holds OnChange enabled table names
cache dbCache // holds OnChange cache

/*
sKeys []*SKey // Subscribe Key array
sHandler HFunc // Handler Function
Expand Down Expand Up @@ -341,6 +354,7 @@ func GetdbNameToIndex(dbName string) DBNum {
// NewDB is the factory method to create new DB's.
func NewDB(opt Options) (*DB, error) {

var d DB
var e error

if glog.V(3) {
Expand Down Expand Up @@ -370,8 +384,14 @@ func NewDB(opt Options) (*DB, error) {
} else {
glog.Error(fmt.Errorf("Invalid database number %d", dbId))
}

d := DB{client: redis.NewClient(&redis.Options{

if opt.IsOnChangeEnabled && !opt.IsWriteDisabled {
glog.Errorf("NewDB: IsEnableOnChange cannot be set on write enabled DB")
e = tlerr.TranslibDBCannotOpen{}
goto NewDBExit
}

d = DB{client: redis.NewClient(&redis.Options{
Network: "tcp",
Addr: ipAddr,
//Addr: DefaultRedisRemoteTCPEP,
Expand All @@ -395,6 +415,10 @@ func NewDB(opt Options) (*DB, error) {
goto NewDBExit
}

if opt.IsOnChangeEnabled {
d.onCReg = dbOnChangeReg{CacheTables: make(map[string]bool)}
}

if opt.DBNo != ConfigDB {
if glog.V(3) {
glog.Info("NewDB: ! ConfigDB. Skip init. check.")
Expand Down Expand Up @@ -438,6 +462,17 @@ func (d *DB) DeleteDB() error {
return d.client.Close()
}

func (d *DB) Name() string {
if d == nil {
return ""
}
return getDBInstName(d.Opts.DBNo)
}

func (d *DB) IsOpen() bool {
return d != nil && d.client != nil
}

func (d *DB) key2redis(ts *TableSpec, key Key) string {

if glog.V(5) {
Expand Down Expand Up @@ -482,31 +517,61 @@ func (d *DB) ts2redisUpdated(ts *TableSpec) string {

// GetEntry retrieves an entry(row) from the table.
func (d *DB) GetEntry(ts *TableSpec, key Key) (Value, error) {
if !d.IsOpen() {
return Value{}, ConnectionClosed
}
return d.getEntry(ts, key, false)
}

func (d *DB) getEntry(ts *TableSpec, key Key, forceReadDB bool) (Value, error) {

if glog.V(3) {
glog.Info("GetEntry: Begin: ", "ts: ", ts, " key: ", key)
}

var value Value
var cacheHit bool
var e error

/*
m := make(map[string]string)
m["f0.0"] = "v0.0"
m["f0.1"] = "v0.1"
m["f0.2"] = "v0.2"
v := Value{Field: m}
*/
entry := d.key2redis(ts, key)
useCache := d.Opts.IsOnChangeEnabled && d.onCReg.isCacheTable(ts.Name)

if !forceReadDB && useCache {
if table, ok := d.cache.Tables[ts.Name]; ok {
if value, ok = table.entry[entry]; ok {
value = value.Copy()
cacheHit = true
}
}
}

v, e := d.client.HGetAll(d.key2redis(ts, key)).Result()
if !cacheHit {
value.Field, e = d.client.HGetAll(d.key2redis(ts, key)).Result()
}

if len(v) != 0 {
value = Value{Field: v}
} else {
if e != nil {
glog.V(2).Infof("GetEntry: %s: HGetAll(%q) error: %v", d.Name(), entry, e)
value = Value{}

} else if !value.IsPopulated() {
if glog.V(4) {
glog.Info("GetEntry: HGetAll(): empty map")
}
// e = errors.New("Entry does not exist")
e = tlerr.TranslibRedisClientEntryNotExist{Entry: d.key2redis(ts, key)}

} else if !cacheHit && useCache {
if _, ok := d.cache.Tables[ts.Name]; !ok {
if d.cache.Tables == nil {
d.cache.Tables = make(map[string]Table, d.onCReg.size())
}
d.cache.Tables[ts.Name] = Table{
ts: ts,
entry: make(map[string]Value),
db: d,
}
}
d.cache.Tables[ts.Name].entry[entry] = value.Copy()
}

if glog.V(3) {
Expand Down Expand Up @@ -1097,6 +1162,14 @@ func (t *Table) GetEntry(key Key) (Value, error) {

//===== Functions for db.Value =====

func (v Value) Copy() (rV Value) {
rV = Value{Field: make(map[string]string, len(v.Field))}
for k, v1 := range v.Field {
rV.Field[k] = v1
}
return
}

func (v *Value) IsPopulated() bool {
return len(v.Field) > 0
}
Expand Down
123 changes: 123 additions & 0 deletions translib/db/db_onchangecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
////////////////////////////////////////////////////////////////////////////////
// //
// Copyright 2021 Broadcom. The term Broadcom refers to Broadcom Inc. and/or //
// its subsidiaries. //
// //
// Licensed under the Apache License, Version 2.0 (the "License"); //
// you may not use this file except in compliance with the License. //
// You may obtain a copy of the License at //
// //
// http://www.apache.org/licenses/LICENSE-2.0 //
// //
// Unless required by applicable law or agreed to in writing, software //
// distributed under the License is distributed on an "AS IS" BASIS, //
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. //
// See the License for the specific language governing permissions and //
// limitations under the License. //
// //
////////////////////////////////////////////////////////////////////////////////

package db

import (
"github.com/golang/glog"
)

////////////////////////////////////////////////////////////////////////////////
// Internal Types //
////////////////////////////////////////////////////////////////////////////////

type dbOnChangeReg struct {
CacheTables map[string]bool // Only cache these tables.
}

////////////////////////////////////////////////////////////////////////////////
// Exported Functions //
////////////////////////////////////////////////////////////////////////////////

func (d *DB) RegisterTableForOnChangeCaching(ts *TableSpec) error {
if glog.V(3) {
glog.Info("RegisterTableForOnChange: ts:", ts)
}
if !d.IsOpen() {
return ConnectionClosed
}
if !d.Opts.IsOnChangeEnabled {
return OnChangeDisabled
}

d.onCReg.CacheTables[ts.Name] = true
return nil
}

// OnChangeCacheUpdate reads a db entry from redis and updates the on_change cache.
// Returns both previously cached Value and current Value. Previous Value will be
// empty if there was no such cache entry earlier. Returns an error if DB entry
// does not exists or could not be read.
func (d *DB) OnChangeCacheUpdate(ts *TableSpec, key Key) (Value, Value, error) {
if glog.V(3) {
glog.Info("OnChangeCacheUpdate: Begin: ", "ts: ", ts, " key: ", key)
}
if !d.IsOpen() {
return Value{}, Value{}, ConnectionClosed
}
if !d.Opts.IsOnChangeEnabled {
return Value{}, Value{}, OnChangeDisabled
}

var valueOrig Value
if _, ok := d.cache.Tables[ts.Name]; ok {
valueOrig = d.cache.Tables[ts.Name].entry[d.key2redis(ts, key)]
}

// Get New Value from the DB
value, e := d.getEntry(ts, key, true)

return valueOrig, value, e
}

// OnChangeCacheDelete deletes an entry from the on_change cache.
// Returns the previously cached Value object; or an empty Value if there was
// no such cache entry.
func (d *DB) OnChangeCacheDelete(ts *TableSpec, key Key) (Value, error) {
if glog.V(3) {
glog.Info("OnChangeCacheDelete: Begin: ", "ts:", ts, " key:", key)
}
if !d.IsOpen() {
return Value{}, ConnectionClosed
}
if !d.Opts.IsOnChangeEnabled {
return Value{}, OnChangeDisabled
}

redisKey := d.key2redis(ts, key)
var valueOrig Value
_, ok := d.cache.Tables[ts.Name]
if ok {
valueOrig, ok = d.cache.Tables[ts.Name].entry[redisKey]
}

if ok {
glog.V(2).Info("OnChangeCacheDelete: Delete ts:", ts, " key:", key)
delete(d.cache.Tables[ts.Name].entry, redisKey)
} else {
glog.V(2).Info("OnChangeCacheDelete: Not found; ts:", ts, " key:", key)
}

return valueOrig, nil
}

////////////////////////////////////////////////////////////////////////////////
// Internal Functions //
////////////////////////////////////////////////////////////////////////////////

func init() {
}

func (reg *dbOnChangeReg) isCacheTable(name string) bool {
return reg.CacheTables[name]
}

func (reg *dbOnChangeReg) size() int {
return len(reg.CacheTables)
}
Loading