获取CREATE TABLE语句

问题描述 投票:0回答:1

我想以编程方式在另一台服务器上重新创建我的一个表。从网络控制台中,我看到我可以请求表模式,它会为我提供

CREATE TABLE
语句,但我在文档中看不到任何对
SHOW CREATE TABLE
或任何其他有效命令的引用。

我搜索了 stackoverflow,我看到 this 3 年老问题要求相同,但提供的解决方案并不理想,因为它只给我有关表和列的信息,而不是

CREATE TABLE
语句。

例如,如果我这样做

table_columns('kafka_cluster');
我会得到以下输出:

output of the table_colums statement

但是我需要的是这个:

CREATE TABLE 'kafka_cluster' (
  system SYMBOL capacity 256 CACHE,
  controller_GlobalTopicCount_Value DOUBLE,
  controller_MigratingZkBrokerCount_Value DOUBLE,
  controller_ActiveControllerCount_Value DOUBLE,
  topics_ReassignmentBytesInPerSec_Count DOUBLE,
  controller_GlobalPartitionCount_Value DOUBLE,
  request_MessageConversionsTimeMs_Count DOUBLE,
  controller_ZkMigrationState_Value DOUBLE,
  request_ResponseQueueTimeMs_Count DOUBLE,
  topics_BytesOutPerSec_Count DOUBLE,
  request_TemporaryMemoryBytes_Count DOUBLE,
  topics_TotalProduceRequestsPerSec_Count DOUBLE,
  replica_manager_IsrShrinksPerSec_Count DOUBLE,
  request_RemoteTimeMs_Count DOUBLE,
  controller_MetadataErrorCount_Value DOUBLE,
  replica_manager_IsrExpandsPerSec_Count DOUBLE,
  controller_EventQueueProcessingTimeMs_Count DOUBLE,
  topics_TotalFetchRequestsPerSec_Count DOUBLE,
  controller_LastAppliedRecordLagMs_Value DOUBLE,
  controller_OfflinePartitionsCount_Value DOUBLE,
  controller_LastAppliedRecordOffset_Value DOUBLE,
  topics_InvalidOffsetOrSequenceRecordsPerSec_Count DOUBLE,
  replica_manager_PartitionsWithLateTransactionsCount_Value DOUBLE,
  request_RequestBytes_Count DOUBLE,
  topics_ReassignmentBytesOutPerSec_Count DOUBLE,
  controller_EventQueueOperationsTimedOutCount_Value DOUBLE,
  replica_manager_LeaderCount_Value DOUBLE,
  replica_manager_AtMinIsrPartitionCount_Value DOUBLE,
  topics_BytesRejectedPerSec_Count DOUBLE,
  topics_NoKeyCompactedTopicRecordsPerSec_Count DOUBLE,
  topics_FailedProduceRequestsPerSec_Count DOUBLE,
  controller_FencedBrokerCount_Value DOUBLE,
  topics_ReplicationBytesInPerSec_Count DOUBLE,
  controller_LastCommittedRecordOffset_Value DOUBLE,
  request_ThrottleTimeMs_Count DOUBLE,
  controller_EventQueueTimeMs_Count DOUBLE,
  replica_manager_ProducerIdCount_Value DOUBLE,
  request_RequestQueueTimeMs_Count DOUBLE,
  replica_manager_PartitionCount_Value DOUBLE,
  topics_MessagesInPerSec_Count DOUBLE,
  controller_TimedOutBrokerHeartbeatCount_Value DOUBLE,
  controller_NewActiveControllersCount_Value DOUBLE,
  request_TotalTimeMs_Count DOUBLE,
  replica_manager_FailedIsrUpdatesPerSec_Count DOUBLE,
  replica_manager_UnderMinIsrPartitionCount_Value DOUBLE,
  controller_PreferredReplicaImbalanceCount_Value DOUBLE,
  topics_FetchMessageConversionsPerSec_Count DOUBLE,
  topics_BytesInPerSec_Count DOUBLE,
  replica_manager_ReassigningPartitions_Value DOUBLE,
  topics_ReplicationBytesOutPerSec_Count DOUBLE,
  topics_InvalidMessageCrcRecordsPerSec_Count DOUBLE,
  controller_EventQueueOperationsStartedCount_Value DOUBLE,
  topics_ProduceMessageConversionsPerSec_Count DOUBLE,
  controller_LastAppliedRecordTimestamp_Value DOUBLE,
  replica_manager_OfflineReplicaCount_Value DOUBLE,
  request_ResponseSendTimeMs_Count DOUBLE,
  topics_InvalidMagicNumberRecordsPerSec_Count DOUBLE,
  replica_manager_UnderReplicatedPartitions_Value DOUBLE,
  topics_FailedFetchRequestsPerSec_Count DOUBLE,
  request_LocalTimeMs_Count DOUBLE,
  controller_ActiveBrokerCount_Value DOUBLE,
  timestamp TIMESTAMP
) timestamp (timestamp) PARTITION BY DAY WAL;
database time-series questdb
1个回答
0
投票

目前 QuestDB 没有公开任何命令来获取

CREATE TABLE
语句。 Web 控制台向数据库发送多个查询,请求元数据(如
table_columns
查询),然后动态地组成语句。

此 Python 代码片段可用于根据元数据查询重新创建语句,然后将 SQL 发送到另一台服务器以在那里创建表:

import psycopg2
from psycopg2 import extras
import sys

def connect_qdb(host: str = '127.0.0.1', port: int = 8812,  user: str = 'admin', pwd: str = 'quest', dbname: str = 'qdb'):
    try:
        conn = psycopg2.connect(f'user={user} password={pwd} host={host} port={port} dbname={dbname}')
        conn.autocommit = False

        return conn
    except psycopg2.Error as e:
        print(f'Had problem connecting with error {e}.')


def get_table_meta(conn, table_name):
    meta = { "table_name": table_name, "partition" : None, "wal": False, "dedup" : None, "upsertKeys" : [],
            "columns" : {}, "symbols": [], "designated" : None, "columns_sql" : [], "with" : [] }

    with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
        cur.execute(f"""
                    SELECT * FROM tables WHERE name = '{table_name}';
                    """
                    )
        row = cur.fetchone()
        meta["dedup"] = row.get("dedup")
        meta["designated"] = row.get("designatedTimestamp")
        meta["partition"] = row.get("partitionBy")
        if meta["partition"] == "NONE":
            meta["partition"] = None
        meta["wal"] = row.get("walEnabled")
        if row.get("maxUncommittedRows"):
            meta["with"].append(f' maxUncommittedRows={row["maxUncommittedRows"]} ')

        cur.execute(f"""
                    SELECT * FROM table_columns('{table_name}');
                    """
                    )
        records = cur.fetchall()
        for row in records:
            column_name = row["column"]
            column_type = row["type"]
            meta["columns"][column_name]={"type": column_type}
            if row["upsertKey"]:
                meta["upsertKeys"].append(column_name)
            if column_type == "SYMBOL":
                meta["symbols"].append(column_name)
                if row["symbolCached"]:
                    cached_sql = "CACHE"
                else:
                    cached_sql = "NOCACHE"

                if row["indexed"]:
                    cached_sql = 'INDEX CAPACITY {row["indexBlockCapacity"]}'
                else:
                    index_sql = ""
                meta["columns_sql"].append(f'{column_name} SYMBOL CAPACITY {row["symbolCapacity"]} {cached_sql} {index_sql}')
            else:
                meta["columns_sql"].append(f"{column_name} {column_type}")

    return meta

def get_create_statement(conn, table_meta):

    columns_sql = ",\n\t".join(table_meta["columns_sql"])
    if table_meta["designated"]:
        designated_sql = f' TIMESTAMP({table_meta["designated"]}) '
    else:
        designated_sql = ""
    if table_meta["partition"]:
        partition_sql = f' PARTITION BY {table_meta["partition"]} '
    else:
        partition_sql = ""
    if table_meta["wal"]:
        wal_sql = f' WAL '
    else:
        wal_sql = ""
    if table_meta["dedup"]:
        upsert_sql = ", ".join(table_meta["upsertKeys"])
        dedup_sql = f" DEDUP UPSERT KEYS({upsert_sql}) "
    else:
        dedup_sql = ""
    if table_meta["with"]:
        with_sql =  f' WITH {", ".join(table_meta["with"])}'
    else:
        with_sql = ""


    sql = f"""\
    CREATE TABLE IF NOT EXISTS {table_meta['table_name']} (
    \t{columns_sql}
    ) {designated_sql} {partition_sql} {wal_sql} {with_sql} {dedup_sql};
    """
    return sql

def create_dest_table(conn, sql):
    with conn.cursor() as cur:
        cur.execute(sql)

origin_conn = connect_qdb()
table_meta = get_table_meta(origin_conn, "table_name")
sql = get_create_statement(origin_conn, table_meta)

destination_conn = connect_qdb("destination_host_ip") 
create_dest_table(destination_conn, sql)

origin_conn.close()
destination_conn.close()
© www.soinside.com 2019 - 2024. All rights reserved.