Source of file PostgreSQLDatabase.php

Size: 27,861 Bytes - Last Modified: 2021-12-23T10:33:33+00:00

/var/www/docs.ssmods.com/process/src/code/PostgreSQLDatabase.php

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
<?php

namespace SilverStripe\PostgreSQL;

use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\ORM\DB;
use SilverStripe\ORM\DataObject;
use SilverStripe\ORM\ArrayList;
use SilverStripe\ORM\Connect\Database;
use SilverStripe\ORM\PaginatedList;
use ErrorException;
use Exception;

/**
 * PostgreSQL connector class.
 *
 * @package sapphire
 * @subpackage model
 */
class PostgreSQLDatabase extends Database
{
    use Configurable;

    /**
     * Database schema manager object
     *
     * @var PostgreSQLSchemaManager
     */
    protected $schemaManager;

    /**
     * The currently selected database schema name.
     *
     * @var string
     */
    protected $schema;

    /**
     * @var bool
     */
    protected $transactionNesting = 0;

    /**
     * Toggle if transactions are supported. Defaults to true.
     *
     * @var bool
     */
    protected $supportsTransactions = true;

    /**
     * Determines whether to check a database exists on the host by
     * querying the 'postgres' database and running createDatabase.
     *
     * Some locked down systems prevent access to the 'postgres' table in
     * which case you need to set this to false.
     *
     * If allow_query_master_postgres is false, and model_schema_as_database is also false,
     * then attempts to create or check databases beyond the initial connection will
     * result in a runtime error.
     *
     * @config
     * @var bool
     */
    private static $allow_query_master_postgres = true;

    /**
     * For instances where multiple databases are used beyond the initial connection
     * you may set this option to true to force database switches to switch schemas
     * instead of using databases. This may be useful if the database user does not
     * have cross-database permissions, and in cases where multiple databases are used
     * (such as in running test cases).
     *
     * If this is true then the database will only be set during the initial connection,
     * and attempts to change to this database will use the 'public' schema instead
     *
     * If this is false then errors may be generated during some cross database operations.
     */
    private static $model_schema_as_database = true;

    /**
     * Override the language that tsearch uses.  By default it is 'english, but
     * could be any of the supported languages that can be found in the
     * pg_catalog.pg_ts_config table.
     */
    private static $search_language = 'english';

    /*
     * Describe how T-search will work.
     * You can use either GiST or GIN, and '@@' (gist) or '@@@' (gin)
     * Combinations of these two will also work, so you'll need to pick
     * one which works best for you
     */
    private static $default_fts_cluster_method = 'GIN';

    /*
     * Describe how T-search will work.
     * You can use either GiST or GIN, and '@@' (gist) or '@@@' (gin)
     * Combinations of these two will also work, so you'll need to pick
     * one which works best for you
     */
    private static $default_fts_search_method = '@@@';

    const MASTER_DATABASE = 'postgres';

    const MASTER_SCHEMA = 'public';

    /**
     * Full text cluster method. (e.g. GIN or GiST)
     *
     * @return string
     */
    public static function default_fts_cluster_method()
    {
        return static::config()->default_fts_cluster_method;
    }

    /**
     * Full text search method.
     *
     * @return string
     */
    public static function default_fts_search_method()
    {
        return static::config()->default_fts_search_method;
    }

    /**
     * Determines whether to check a database exists on the host by
     * querying the 'postgres' database and running createDatabase.
     *
     * Some locked down systems prevent access to the 'postgres' table in
     * which case you need to set this to false.
     *
     * If allow_query_master_postgres is false, and model_schema_as_database is also false,
     * then attempts to create or check databases beyond the initial connection will
     * result in a runtime error.
     *
     * @return bool
     */
    public static function allow_query_master_postgres()
    {
        return static::config()->allow_query_master_postgres;
    }

    /**
     * For instances where multiple databases are used beyond the initial connection
     * you may set this option to true to force database switches to switch schemas
     * instead of using databases. This may be useful if the database user does not
     * have cross-database permissions, and in cases where multiple databases are used
     * (such as in running test cases).
     *
     * If this is true then the database will only be set during the initial connection,
     * and attempts to change to this database will use the 'public' schema instead
     *
     * @return bool
     */
    public static function model_schema_as_database()
    {
        return static::config()->model_schema_as_database;
    }

    /**
     * Override the language that tsearch uses.  By default it is 'english, but
     * could be any of the supported languages that can be found in the
     * pg_catalog.pg_ts_config table.
     *
     * @return string
     */
    public static function search_language()
    {
        return static::config()->search_language;
    }

    /**
     * The database name specified at initial connection
     *
     * @var string
     */
    protected $databaseOriginal = '';

    /**
     * The schema name specified at initial construction. When model_schema_as_database
     * is set to true selecting the $databaseOriginal database will instead reset
     * the schema to this
     *
     * @var string
     */
    protected $schemaOriginal = '';

    /**
     * Connection parameters specified at inital connection
     *
     * @var array
     */
    protected $parameters = array();

    public function connect($parameters)
    {
        // Check database name
        if (empty($parameters['database'])) {
            // Check if we can use the master database
            if (!self::allow_query_master_postgres()) {
                throw new ErrorException('PostegreSQLDatabase::connect called without a database name specified');
            }
            // Fallback to master database connection if permission allows
            $parameters['database'] = self::MASTER_DATABASE;
        }
        $this->databaseOriginal = $parameters['database'];

        // check schema name
        if (empty($parameters['schema'])) {
            $parameters['schema'] = self::MASTER_SCHEMA;
        }
        $this->schemaOriginal = $parameters['schema'];

        // Ensure that driver is available (required by PDO)
        if (empty($parameters['driver'])) {
            $parameters['driver'] = $this->getDatabaseServer();
        }

        // Ensure port number is set (required by postgres)
        if (empty($parameters['port'])) {
            $parameters['port'] = 5432;
        }

        $this->parameters = $parameters;

        // If allowed, check that the database exists. Otherwise naively assume
        // that the original database exists
        if (self::allow_query_master_postgres()) {
            // Use master connection to setup initial schema
            $this->connectMaster();
            if (!$this->schemaManager->postgresDatabaseExists($this->databaseOriginal)) {
                $this->schemaManager->createPostgresDatabase($this->databaseOriginal);
            }
        }

        // Connect to the actual database we're requesting
        $this->connectDefault();

        // Set up the schema if required
        $this->setSchema($this->schemaOriginal, true);

        // Set the timezone if required.
        if (isset($parameters['timezone'])) {
            $this->selectTimezone($parameters['timezone']);
        }
    }

    protected function connectMaster()
    {
        $parameters = $this->parameters;
        $parameters['database'] = self::MASTER_DATABASE;
        $this->connector->connect($parameters, true);
    }

    protected function connectDefault()
    {
        $parameters = $this->parameters;
        $parameters['database'] = $this->databaseOriginal;
        $this->connector->connect($parameters, true);
    }

    /**
     * Sets the system timezone for the database connection
     *
     * @param string $timezone
     */
    public function selectTimezone($timezone)
    {
        if (empty($timezone)) {
            return;
        }
        $this->query("SET SESSION TIME ZONE '$timezone';");
    }

    public function supportsCollations()
    {
        return true;
    }

    public function supportsTimezoneOverride()
    {
        return true;
    }

    public function getDatabaseServer()
    {
        return "pgsql";
    }

    /**
     * Returns the name of the current schema in use
     *
     * @return string Name of current schema
     */
    public function currentSchema()
    {
        return $this->schema;
    }

    /**
     * Utility method to manually set the schema to an alternative
     * Check existance & sets search path to the supplied schema name
     *
     * @param string $schema Name of the schema
     * @param boolean $create Flag indicating whether the schema should be created
     * if it doesn't exist. If $create is false and the schema doesn't exist
     * then an error will be raised
     * @param int|boolean $errorLevel The level of error reporting to enable for
     * the query, or false if no error should be raised
     * @return boolean Flag indicating success
     */
    public function setSchema($schema, $create = false, $errorLevel = E_USER_ERROR)
    {
        if (!$this->schemaManager->schemaExists($schema)) {
            // Check DB creation permisson
            if (!$create) {
                if ($errorLevel !== false) {
                    user_error("Schema $schema does not exist", $errorLevel);
                }
                $this->schema = null;
                return false;
            }
            $this->schemaManager->createSchema($schema);
        }
        $this->setSchemaSearchPath($schema);
        $this->schema = $schema;
        return true;
    }

    /**
     * Override the schema search path. Search using the arguments supplied.
     * NOTE: The search path is normally set through setSchema() and only
     * one schema is selected. The facility to add more than one schema to
     * the search path is provided as an advanced PostgreSQL feature for raw
     * SQL queries. Sapphire cannot search for datamodel tables in alternate
     * schemas, so be wary of using alternate schemas within the ORM environment.
     *
     * @param string ...$arg Schema name to use. Add additional schema names as extra arguments.
     */
    public function setSchemaSearchPath($arg = null)
    {
        if (!$arg) {
            user_error('At least one Schema must be supplied to set a search path.', E_USER_ERROR);
        }
        $schemas = array_values(func_get_args());
        $this->query("SET search_path TO \"" . implode("\",\"", $schemas) . "\"");
    }

    /**
     * The core search engine configuration.
     * @todo Properly extract the search functions out of the core.
     *
     * @param array $classesToSearch
     * @param string $keywords Keywords as a space separated string
     * @param int $start
     * @param int $pageLength
     * @param string $sortBy
     * @param string $extraFilter
     * @param bool $booleanSearch
     * @param string $alternativeFileFilter
     * @param bool $invertedMatch
     * @return PaginatedList List of result pages
     * @throws Exception
     */
    public function searchEngine($classesToSearch, $keywords, $start, $pageLength, $sortBy = "ts_rank DESC", $extraFilter = "", $booleanSearch = false, $alternativeFileFilter = "", $invertedMatch = false)
    {
        $start = (int)$start;
        $pageLength = (int)$pageLength;

        //Fix the keywords to be ts_query compatitble:
        //Spaces must have pipes
        //@TODO: properly handle boolean operators here.
        $keywords= trim($keywords);
        $keywords= str_replace(' ', ' | ', $keywords);
        $keywords= str_replace('"', "'", $keywords);


        $keywords = $this->quoteString(trim($keywords));

        // Get tables
        $tablesToSearch = [];
        foreach ($classesToSearch as $class) {
            $tablesToSearch[$class] = DataObject::getSchema()->baseDataTable($class);
        }

        //We can get a list of all the tsvector columns though this query:
        //We know what tables to search in based on the $classesToSearch variable:
        $classesPlaceholders = DB::placeholders($classesToSearch);
        $searchableColumns = $this->preparedQuery(
            "
            SELECT table_name, column_name, data_type
            FROM information_schema.columns
            WHERE data_type='tsvector' AND table_name in ($classesPlaceholders);",
            array_values($tablesToSearch)
        );
        if (!$searchableColumns->numRecords()) {
            throw new Exception('there are no full text columns to search');
        }

        $tables = array();
        $tableParameters = array();

        // Make column selection lists
        $pageClass = 'SilverStripe\\CMS\\Model\\SiteTree';
        $fileClass = 'SilverStripe\\Assets\\File';
        $select = array(
            $pageClass => array(
                '"ClassName"',
                '"' . $tablesToSearch[$pageClass] . '"."ID"',
                '"ParentID"',
                '"Title"',
                '"URLSegment"',
                '"Content"',
                '"LastEdited"',
                '"Created"',
                'NULL AS "Name"',
                '"CanViewType"'
            ),
            $fileClass => array(
                '"ClassName"',
                '"' . $tablesToSearch[$fileClass] . '"."ID"',
                '0 AS "ParentID"',
                '"Title"',
                'NULL AS "URLSegment"',
                'NULL AS "Content"',
                '"LastEdited"',
                '"Created"',
                '"Name"',
                'NULL AS "CanViewType"'
            )
        );

        foreach ($searchableColumns as $searchableColumn) {
            $conditions = array();
            $tableName = $searchableColumn['table_name'];
            $columnName = $searchableColumn['column_name'];
            $className = DataObject::getSchema()->tableClass($tableName);
            if (DataObject::getSchema()->fieldSpec($className, 'ShowInSearch')) {
                $conditions[] = array('"ShowInSearch"' => 1);
            }

            $method = self::default_fts_search_method();
            $conditions[] = "\"{$tableName}\".\"{$columnName}\" $method q ";
            $query = DataObject::get($className, $conditions)->dataQuery()->query();

            // Could parameterise this, but convention is only to to so for where conditions
            $query->addFrom(array(
                'q' => ", to_tsquery('" . self::search_language() . "', $keywords)"
            ));
            $query->setSelect(array());

            foreach ($select[$className] as $clause) {
                if (preg_match('/^(.*) +AS +"?([^"]*)"?/i', $clause, $matches)) {
                    $query->selectField($matches[1], $matches[2]);
                } else {
                    $query->selectField($clause);
                }
            }

            $query->selectField("ts_rank(\"{$tableName}\".\"{$columnName}\", q)", 'Relevance');
            $query->setOrderBy(array());

            //Add this query to the collection
            $tables[] = $query->sql($parameters);
            $tableParameters = array_merge($tableParameters, $parameters);
        }

        $limit = $pageLength;
        $offset = $start;

        if ($keywords) {
            $orderBy = " ORDER BY $sortBy";
        } else {
            $orderBy='';
        }

        $fullQuery = "SELECT *, count(*) OVER() as _fullcount FROM (" . implode(" UNION ", $tables) . ") AS q1 $orderBy LIMIT $limit OFFSET $offset";

        // Get records
        $records = $this->preparedQuery($fullQuery, $tableParameters);
        $totalCount = 0;
        $objects = [];
        foreach ($records as $record) {
            $objects[] = Injector::inst()->createWithArgs($record['ClassName'], [$record]);
            $totalCount = $record['_fullcount'];
        }

        if ($objects) {
            $results = new ArrayList($objects);
        } else {
            $results = new ArrayList();
        }
        $list = new PaginatedList($results);
        $list->setLimitItems(false);
        $list->setPageStart($start);
        $list->setPageLength($pageLength);
        $list->setTotalItems($totalCount);
        return $list;
    }

    public function supportsTransactions()
    {
        return $this->supportsTransactions;
    }

    /*
     * This is a quick lookup to discover if the database supports particular extensions
     */
    public function supportsExtensions($extensions = array('partitions', 'tablespaces', 'clustering'))
    {
        if (isset($extensions['partitions'])) {
            return true;
        } elseif (isset($extensions['tablespaces'])) {
            return true;
        } elseif (isset($extensions['clustering'])) {
            return true;
        } else {
            return false;
        }
    }

    public function transactionStart($transaction_mode = false, $session_characteristics = false)
    {
        if ($this->transactionNesting > 0) {
            $this->transactionSavepoint('NESTEDTRANSACTION' . $this->transactionNesting);
        } else {
            $this->query('BEGIN;');

            if ($transaction_mode) {
                $this->query("SET TRANSACTION {$transaction_mode};");
            }

            if ($session_characteristics) {
                $this->query("SET SESSION CHARACTERISTICS AS TRANSACTION {$session_characteristics};");
            }
        }
        ++$this->transactionNesting;
    }

    public function transactionSavepoint($savepoint)
    {
        $this->query("SAVEPOINT {$savepoint};");
    }

    public function transactionRollback($savepoint = false)
    {
        // Named savepoint
        if ($savepoint) {
            $this->query('ROLLBACK TO ' . $savepoint);
            return true;
        }

        // Abort if unable to unnest, otherwise jump up a level
        if (!$this->transactionNesting) {
            return false;
        }
        --$this->transactionNesting;

        // Rollback nested
        if ($this->transactionNesting > 0) {
            return $this->transactionRollback('NESTEDTRANSACTION' . $this->transactionNesting);
        }

        // Rollback top level
        $this->query('ROLLBACK');
        return true;
    }

    public function transactionDepth()
    {
        return $this->transactionNesting;
    }

    public function transactionEnd($chain = false)
    {
        --$this->transactionNesting;
        if ($this->transactionNesting <= 0) {
            $this->transactionNesting = 0;
            $this->query('COMMIT;');
        }
    }

    public function comparisonClause($field, $value, $exact = false, $negate = false, $caseSensitive = null, $parameterised = false)
    {
        if ($exact && $caseSensitive === null) {
            $comp = ($negate) ? '!=' : '=';
        } else {
            $comp = ($caseSensitive === true) ? 'LIKE' : 'ILIKE';
            if ($negate) {
                $comp = 'NOT ' . $comp;
            }
            $field.='::text';
        }

        if ($parameterised) {
            return sprintf("%s %s ?", $field, $comp);
        } else {
            return sprintf("%s %s '%s'", $field, $comp, $value);
        }
    }

    /**
     * Function to return an SQL datetime expression that can be used with Postgres
     * used for querying a datetime in a certain format
     * @param string $date to be formated, can be either 'now', literal datetime like '1973-10-14 10:30:00' or field name, e.g. '"SiteTree"."Created"'
     * @param string $format to be used, supported specifiers:
     * %Y = Year (four digits)
     * %m = Month (01..12)
     * %d = Day (01..31)
     * %H = Hour (00..23)
     * %i = Minutes (00..59)
     * %s = Seconds (00..59)
     * %U = unix timestamp, can only be used on it's own
     * @return string SQL datetime expression to query for a formatted datetime
     */
    public function formattedDatetimeClause($date, $format)
    {
        preg_match_all('/%(.)/', $format, $matches);
        foreach ($matches[1] as $match) {
            if (array_search($match, array('Y','m','d','H','i','s','U')) === false) {
                user_error('formattedDatetimeClause(): unsupported format character %' . $match, E_USER_WARNING);
            }
        }

        $translate = array(
            '/%Y/' => 'YYYY',
            '/%m/' => 'MM',
            '/%d/' => 'DD',
            '/%H/' => 'HH24',
            '/%i/' => 'MI',
            '/%s/' => 'SS',
        );
        $format = preg_replace(array_keys($translate), array_values($translate), $format);

        if (preg_match('/^now$/i', $date)) {
            $date = "NOW()";
        } elseif (preg_match('/^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/i', $date)) {
            $date = "TIMESTAMP '$date'";
        }

        if ($format == '%U') {
            return "FLOOR(EXTRACT(epoch FROM $date))";
        }

        return "to_char($date, TEXT '$format')";
    }

    /**
     * Function to return an SQL datetime expression that can be used with Postgres
     * used for querying a datetime addition
     * @param string $date, can be either 'now', literal datetime like '1973-10-14 10:30:00' or field name, e.g. '"SiteTree"."Created"'
     * @param string $interval to be added, use the format [sign][integer] [qualifier], e.g. -1 Day, +15 minutes, +1 YEAR
     * supported qualifiers:
     * - years
     * - months
     * - days
     * - hours
     * - minutes
     * - seconds
     * This includes the singular forms as well
     * @return string SQL datetime expression to query for a datetime (YYYY-MM-DD hh:mm:ss) which is the result of the addition
     */
    public function datetimeIntervalClause($date, $interval)
    {
        if (preg_match('/^now$/i', $date)) {
            $date = "NOW()";
        } elseif (preg_match('/^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/i', $date)) {
            $date = "TIMESTAMP '$date'";
        }

        // ... when being too precise becomes a pain. we need to cut of the fractions.
        // TIMESTAMP(0) doesn't work because it rounds instead flooring
        return "CAST(SUBSTRING(CAST($date + INTERVAL '$interval' AS VARCHAR) FROM 1 FOR 19) AS TIMESTAMP)";
    }

    /**
     * Function to return an SQL datetime expression that can be used with Postgres
     * used for querying a datetime substraction
     * @param string $date1, can be either 'now', literal datetime like '1973-10-14 10:30:00' or field name, e.g. '"SiteTree"."Created"'
     * @param string $date2 to be substracted of $date1, can be either 'now', literal datetime like '1973-10-14 10:30:00' or field name, e.g. '"SiteTree"."Created"'
     * @return string SQL datetime expression to query for the interval between $date1 and $date2 in seconds which is the result of the substraction
     */
    public function datetimeDifferenceClause($date1, $date2)
    {
        if (preg_match('/^now$/i', $date1)) {
            $date1 = "NOW()";
        } elseif (preg_match('/^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/i', $date1)) {
            $date1 = "TIMESTAMP '$date1'";
        }

        if (preg_match('/^now$/i', $date2)) {
            $date2 = "NOW()";
        } elseif (preg_match('/^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/i', $date2)) {
            $date2 = "TIMESTAMP '$date2'";
        }

        return "(FLOOR(EXTRACT(epoch FROM $date1)) - FLOOR(EXTRACT(epoch from $date2)))";
    }

    public function now()
    {
        return 'NOW()';
    }

    public function random()
    {
        return 'RANDOM()';
    }

    /**
     * Determines the name of the current database to be reported externally
     * by substituting the schema name for the database name.
     * Should only be used when model_schema_as_database is true
     *
     * @param string $schema Name of the schema
     * @return string Name of the database to report
     */
    public function schemaToDatabaseName($schema)
    {
        switch ($schema) {
            case $this->schemaOriginal:
                return $this->databaseOriginal;
            default:
                return $schema;
        }
    }

    /**
     * Translates a requested database name to a schema name to substitute internally.
     * Should only be used when model_schema_as_database is true
     *
     * @param string $database Name of the database
     * @return string Name of the schema to use for this database internally
     */
    public function databaseToSchemaName($database)
    {
        switch ($database) {
            case $this->databaseOriginal:
                return $this->schemaOriginal;
            default:
                return $database;
        }
    }

    public function dropSelectedDatabase()
    {
        if (self::model_schema_as_database()) {
            // Check current schema is valid
            $oldSchema = $this->schema;
            if (empty($oldSchema)) {
                return;
            } // Nothing selected to drop

            // Select another schema
            if ($oldSchema !== $this->schemaOriginal) {
                $this->setSchema($this->schemaOriginal);
            } elseif ($oldSchema !== self::MASTER_SCHEMA) {
                $this->setSchema(self::MASTER_SCHEMA);
            } else {
                $this->schema = null;
            }

            // Remove this schema
            $this->schemaManager->dropSchema($oldSchema);
        } else {
            parent::dropSelectedDatabase();
        }
    }

    public function getSelectedDatabase()
    {
        if (self::model_schema_as_database()) {
            return $this->schemaToDatabaseName($this->schema);
        }
        return parent::getSelectedDatabase();
    }

    public function selectDatabase($name, $create = false, $errorLevel = E_USER_ERROR)
    {
        // Substitute schema here as appropriate
        if (self::model_schema_as_database()) {
            // Selecting the database itself should be treated as selecting the public schema
            $schemaName = $this->databaseToSchemaName($name);
            return $this->setSchema($schemaName, $create, $errorLevel);
        }

        // Database selection requires that a new connection is established.
        // This is not ideal postgres practise
        if (!$this->schemaManager->databaseExists($name)) {
            // Check DB creation permisson
            if (!$create) {
                if ($errorLevel !== false) {
                    user_error("Attempted to connect to non-existing database \"$name\"", $errorLevel);
                }
                // Unselect database
                $this->connector->unloadDatabase();
                return false;
            }
            $this->schemaManager->createDatabase($name);
        }

        // New connection made here, treating the new database name as the new original
        $this->databaseOriginal = $name;
        $this->connectDefault();
        return true;
    }

    /**
     * Delete all entries from the table instead of truncating it.
     *
     * This gives a massive speed improvement compared to using TRUNCATE, with
     * the caveat that primary keys are not reset etc.
     *
     * @see DatabaseAdmin::clearAllData()
     *
     * @param string $table
     */
    public function clearTable($table)
    {
        $this->query('DELETE FROM "'.$table.'";');
    }
}