/* * Copyright (C) 2006-2011 by Benedict Paten (benedictpaten@gmail.com) * * Released under the MIT license, see LICENSE.txt */ /* * sonLibKVDatabase_KyotoTycoon.cpp * * Created on: 5-1-11 * Author: epaull * * Note: all the KT methods seem to have a C and a CPP version (in terms of the arguments) , * and for this implementation we're using the plain C versions as much as we can. * * Update October 24, 2011 by Glenn Hickey: * Added "secondaryDB" which points to instance of BigRecordFile type database * as a fallback for big records. Now records of a certain size (from conf) * don't get added to the kyoto tycoon but go into this new database instead. * The whole thing should be transparent to the client, and hopefully prevent * the dreaded network errors in the kyoto tycoon API. * Note that all operations that can change a record's size must check to make * sure that it does not get duplicated across the two db's! */ //Database functions #ifdef HAVE_KYOTO_TYCOON #include <ktremotedb.h> #include <kclangc.h> #include "sonLibGlobalsInternal.h" #include "sonLibKVDatabasePrivate.h" using namespace std; using namespace kyototycoon; // the default expiration time: negative means indefinite, I believe int64_t XT = kc::INT64MAX; /* * construct in the Kyoto Tycoon case means connect to the remote DB */ static RemoteDB *constructDB(stKVDatabaseConf *conf, bool create) { // we actually do need a local DB dir for Kyoto Tycoon to store the sequences file const char *dbDir = stKVDatabaseConf_getDir(conf); mkdir(dbDir, S_IRWXU); // just let open of database generate error (FIXME: would be better to make this report errors) const char *dbRemote_Host = stKVDatabaseConf_getHost(conf); unsigned dbRemote_Port = stKVDatabaseConf_getPort(conf); int timeout = stKVDatabaseConf_getTimeout(conf); // create the database object RemoteDB *rdb = new RemoteDB(); // set the target (if specified) // used for the case when a single server is operating on multiple databases // dbName is from the optional database_name attribute in the kyoto_tycoon xml element const char *dbName = stKVDatabaseConf_getDatabaseName(conf); if (dbName) { rdb->set_target(dbName); } // tcrdb open sets the host and port for the rdb object if (!rdb->open(dbRemote_Host, dbRemote_Port, timeout)) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Opening connection to host: %s with error: %s", dbRemote_Host, rdb->error().name()); } return rdb; } static stKVDatabase* constructBigRecordDB(stKVDatabaseConf *conf, bool create) { if (stKVDatabaseConf_getMaxKTRecordSize(conf) != kc::INT64MAX) { // warning: bypassing stKVDatabase_construct() stKVDatabase *database = (stKVDatabase *)st_calloc(1, sizeof(struct stKVDatabase)); database->conf = stKVDatabaseConf_constructClone(conf); database->deleted = false; stKVDatabase_initialise_bigRecordFile(database, conf, create); return database; } return NULL; } /* closes the remote DB connection and deletes the rdb object, but does not destroy the remote database */ static void destructDB(stKVDatabase *database) { RemoteDB *rdb = (RemoteDB*)database->dbImpl; if (rdb != NULL) { // close the connection: first try a graceful close, then a forced close if (!rdb->close(true)) { if (!rdb->close(false)) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Closing database error: %s",rdb->error().name()); } } // delete the local in-memory object delete rdb; database->dbImpl = NULL; } if (database->secondaryDB != NULL) { stKVDatabase_destruct(database->secondaryDB); } } /* WARNING: removes all records from the remote database */ static void deleteDB(stKVDatabase *database) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; if (rdb != NULL) { rdb->clear(); } if (database->secondaryDB != NULL) { database->secondaryDB->deleteDatabase(database->secondaryDB); } destructDB(database); // this removes all records from the remove database object } /* check if a record already exists in the kt database*/ static bool recordInTycoon(stKVDatabase *database, int64_t key) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; size_t sp; char *cA; if ((cA = rdb->get((char *)&key, (size_t)sizeof(key), &sp, NULL)) == NULL) { return false; } else { free(cA); return true; } } /* check if a record already exists in the kt database*/ static bool recordOnDisk(stKVDatabase *database, int64_t key) { if (database->secondaryDB != NULL) { return database->secondaryDB->containsRecord(database->secondaryDB, key); } return false; } /* remove a record from the disk cache if it exists. must be called before * adding a record with this key to the tycoon. */ static void removeRecordFromTycoonIfPresent(stKVDatabase *database, int64_t key) { if (recordInTycoon(database, key) == true) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; if (!rdb->remove((char *)&key, (size_t)sizeof(int64_t))) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Removing key/value to database error: %s", rdb->error().name()); } } } /* remove a record from the tycoon if it exists. must be called before * adding a record with this key to the disk cache. */ static void removeRecordFromDiskIfPresent(stKVDatabase *database, int64_t key) { if (recordOnDisk(database, key) == true) { database->secondaryDB->removeRecord(database->secondaryDB, key); } } static bool containsRecord(stKVDatabase *database, int64_t key) { bool found = recordInTycoon(database, key); if (found == false && database->secondaryDB != NULL) { found = recordOnDisk(database, key); } return found; } static void insertRecord(stKVDatabase *database, int64_t key, const void *value, int64_t sizeOfRecord) { stKVDatabaseConf* conf = stKVDatabase_getConf(database); int64_t maxRecordSize = stKVDatabaseConf_getMaxKTRecordSize(conf); if (sizeOfRecord > maxRecordSize) { assert(database->secondaryDB != NULL); removeRecordFromTycoonIfPresent(database, key); database->secondaryDB->insertRecord(database->secondaryDB, key, value, sizeOfRecord); } else { removeRecordFromDiskIfPresent(database, key); RemoteDB *rdb = (RemoteDB *)database->dbImpl; size_t sizeOfKey = sizeof(int64_t); // add method: If the key already exists the record will not be modified and it'll return false if (!rdb->add((char *)&key, sizeOfKey, (const char *)value, sizeOfRecord)) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Inserting key/value to database error: %s", rdb->error().name()); } } } static void insertInt64(stKVDatabase *database, int64_t key, int64_t value) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; // Normalize a 64-bit number in the native order into the network byte order. // little endian (our x86 linux machine) to big Endian.... int64_t KCSafeIV = kyotocabinet::hton64(value); if (!rdb->add((char *)&key, sizeof(int64_t), (const char *)&KCSafeIV, sizeof(int64_t))) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Inserting int64 key/value to database error: %s", rdb->error().name()); } } static void updateInt64(stKVDatabase *database, int64_t key, int64_t value) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; // Normalize a 64-bit number in the native order into the network byte order. // little endian (our x86 linux machine) to big Endian.... uint64_t KCSafeIV = kyotocabinet::hton64(value); if (!rdb->replace((char *)&key, sizeof(int64_t), (const char *)&KCSafeIV, sizeof(int64_t))) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Updating int64 key/value to database error: %s", rdb->error().name()); } } static void updateRecord(stKVDatabase *database, int64_t key, const void *value, int64_t sizeOfRecord) { stKVDatabaseConf* conf = stKVDatabase_getConf(database); int64_t maxRecordSize = stKVDatabaseConf_getMaxKTRecordSize(conf); if (sizeOfRecord > maxRecordSize) { assert(database->secondaryDB != NULL); removeRecordFromTycoonIfPresent(database, key); database->secondaryDB->updateRecord(database->secondaryDB, key, value, sizeOfRecord); } else { removeRecordFromDiskIfPresent(database, key); RemoteDB *rdb = (RemoteDB *)database->dbImpl; // replace method: If the key doesn't already exist it won't be created, and we'll get an error if (!rdb->replace((char *)&key, (size_t)sizeof(int64_t), (const char *)value, sizeOfRecord)) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Updating key/value to database error: %s", rdb->error().name()); } } } static void setRecord(stKVDatabase *database, int64_t key, const void *value, int64_t sizeOfRecord) { stKVDatabaseConf* conf = stKVDatabase_getConf(database); int64_t maxRecordSize = stKVDatabaseConf_getMaxKTRecordSize(conf); if (sizeOfRecord > maxRecordSize) { assert(database->secondaryDB != NULL); removeRecordFromTycoonIfPresent(database, key); database->secondaryDB->setRecord(database->secondaryDB, key, value, sizeOfRecord); } else { removeRecordFromDiskIfPresent(database, key); RemoteDB *rdb = (RemoteDB *)database->dbImpl; if (!rdb->set((char *)&key, (size_t)sizeof(int64_t), (const char *)value, sizeOfRecord)) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "kyoto tycoon setting key/value failed: %s", rdb->error().name()); } } } /* increment a record by the specified numerical value: atomic operation */ /* return the new record value */ static int64_t incrementInt64(stKVDatabase *database, int64_t key, int64_t incrementAmount) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; int64_t returnValue = kyotocabinet::INT64MIN; size_t sizeOfKey = sizeof(int64_t); if ( (returnValue = rdb->increment((char *)&key, sizeOfKey, incrementAmount, kyotocabinet::INT64MIN, XT)) == kyotocabinet::INT64MIN ) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "kyoto tycoon incremement record failed: %s", rdb->error().name()); } return returnValue; } // sets a bulk list of records atomically static void bulkSetRecords(stKVDatabase *database, stList *records) { stKVDatabaseConf* conf = stKVDatabase_getConf(database); int64_t maxRecordSize = stKVDatabaseConf_getMaxKTRecordSize(conf); int64_t maxBulkSetSize = stKVDatabaseConf_getMaxKTBulkSetSize(conf); int64_t maxBulkSetNumRecords = stKVDatabaseConf_getMaxKTBulkSetNumRecords(conf); RemoteDB *rdb = (RemoteDB *)database->dbImpl; map<string,string> recs; int64_t runningSize = 0; // copy the records from our C data structure to the CPP map needed for the Tycoon API for(int32_t i=0; i<stList_length(records); i++) { stKVDatabaseBulkRequest *request = (stKVDatabaseBulkRequest *)stList_get(records, i); // current batch can't get any bigger so we write and clear it if ((runningSize + request->size > maxBulkSetSize || (int64_t)recs.size() >= maxBulkSetNumRecords) && recs.empty() == false) { int retVal; if ((retVal = rdb->set_bulk(recs, XT, true)) < 1) { assert(rdb->error().name() != NULL); fprintf(stderr, "Throwing an exception with the string %s\n", rdb->error().name()); stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "kyoto tycoon set bulk record failed: %s", rdb->error().name()); } recs.clear(); runningSize = 0; } // record too big for kt, we put in the secondary if (request->size > maxRecordSize) { assert(database->secondaryDB != NULL); removeRecordFromTycoonIfPresent(database, request->key); database->secondaryDB->setRecord(database->secondaryDB, request->key, request->value, request->size); } else { removeRecordFromDiskIfPresent(database, request->key); recs.insert(pair<string,string>( string((const char *)&(request->key), sizeof(int64_t)), string((const char *)request->value, request->size)) ); runningSize += request->size; } } // test for empty list if (recs.empty() == false) { // set values, atomic = true int retVal; if ((retVal = rdb->set_bulk(recs, XT, true)) < 1) { assert(rdb->error().name() != NULL); fprintf(stderr, "Throwing an exception with the string %s\n", rdb->error().name()); stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "kyoto tycoon set bulk record failed: %s", rdb->error().name()); } } //printf("size of insert: %d\n", retVal); } // remove a bulk list atomically static void bulkRemoveRecords(stKVDatabase *database, stList *records) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; vector<string> keys; for(int32_t i=0; i<stList_length(records); i++) { int64_t key = stInt64Tuple_getPosition((stInt64Tuple *)stList_get(records, i), 0); if (recordOnDisk(database, key) == true) { database->secondaryDB->removeRecord(database->secondaryDB, key); } else { keys.push_back(string((const char *)&key, sizeof(int64_t))); } } // test for empty list if (keys.empty()) { return; } if (rdb->remove_bulk(keys, true) < 1) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "kyoto tycoon bulk remove record failed: %s", rdb->error().name()); } } static int64_t numberOfRecords(stKVDatabase *database) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; int64_t count = rdb->count(); if (database->secondaryDB != NULL) { count += database->secondaryDB->numberOfRecords(database->secondaryDB); } return count; } static void *getRecord2(stKVDatabase *database, int64_t key, int64_t *recordSize) { char* record = NULL; if (recordOnDisk(database, key) == true) { record = (char*)database->secondaryDB->getRecord2(database->secondaryDB, key, recordSize); } else if (recordInTycoon(database, key) == true) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; //Return value must be freed. size_t i; record = rdb->get((char *)&key, (size_t)sizeof(int64_t), &i, NULL); *recordSize = (int64_t)i; } return record; } /* get a single non-string record */ static void *getRecord(stKVDatabase *database, int64_t key) { int64_t i; return getRecord2(database, key, &i); } /* get a single non-string record */ static int64_t getInt64(stKVDatabase *database, int64_t key) { RemoteDB *rdb = (RemoteDB *)database->dbImpl; size_t sp; char *record = rdb->get((char *)&key, sizeof(int64_t), &sp, NULL); // convert from KC native big-endian back to little-endian Intel... return kyotocabinet::ntoh64(*((int64_t*)record)); } /* get part of a string record */ static void *getPartialRecord(stKVDatabase *database, int64_t key, int64_t zeroBasedByteOffset, int64_t sizeInBytes, int64_t recordSize) { stKVDatabaseConf* conf = stKVDatabase_getConf(database); int64_t maxRecordSize = stKVDatabaseConf_getMaxKTRecordSize(conf); if (recordSize > maxRecordSize) { assert (database->secondaryDB != NULL); if (recordOnDisk(database, key) == false) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "The record does not exist: %lld for partial retrieval", (long long)key); } return database->secondaryDB->getPartialRecord(database->secondaryDB, key, zeroBasedByteOffset, sizeInBytes, recordSize); } else { int64_t recordSize2; char *record = (char *)getRecord2(database, key, &recordSize2); if(recordSize2 != recordSize) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "The given record size is incorrect: %lld, should be %lld", (long long)recordSize, recordSize2); } if(record == NULL) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "The record does not exist: %lld for partial retrieval", (long long)key); } if(zeroBasedByteOffset < 0 || sizeInBytes < 0 || zeroBasedByteOffset + sizeInBytes > recordSize) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Partial record retrieval to out of bounds memory, record size: %lld, requested start: %lld, requested size: %lld", (long long)recordSize, (long long)zeroBasedByteOffset, (long long)sizeInBytes); } void *partialRecord = memcpy(st_malloc(sizeInBytes), record + zeroBasedByteOffset, sizeInBytes); free(record); return partialRecord; } } static void removeRecord(stKVDatabase *database, int64_t key) { if (recordOnDisk(database, key) == true) { database->secondaryDB->removeRecord(database->secondaryDB, key); } else { RemoteDB *rdb = (RemoteDB *)database->dbImpl; if (!rdb->remove((char *)&key, (size_t)sizeof(int64_t))) { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Removing key/value to database error: %s", rdb->error().name()); } } } void stKVDatabase_initialise_kyotoTycoon(stKVDatabase *database, stKVDatabaseConf *conf, bool create) { database->dbImpl = constructDB(stKVDatabase_getConf(database), create); database->secondaryDB = constructBigRecordDB(stKVDatabase_getConf(database), create); database->destruct = destructDB; database->deleteDatabase = deleteDB; database->containsRecord = containsRecord; database->insertRecord = insertRecord; database->insertInt64 = insertInt64; database->updateRecord = updateRecord; database->updateInt64 = updateInt64; database->setRecord = setRecord; database->incrementInt64 = incrementInt64; database->bulkSetRecords = bulkSetRecords; database->bulkRemoveRecords = bulkRemoveRecords; database->numberOfRecords = numberOfRecords; database->getRecord = getRecord; database->getInt64 = getInt64; database->getRecord2 = getRecord2; database->getPartialRecord = getPartialRecord; database->removeRecord = removeRecord; } #endif