我想以编程方式在另一台服务器上重新创建我的一个表。从网络控制台中,我看到我可以请求表模式,它会为我提供
CREATE TABLE
语句,但我在文档中看不到任何对 SHOW CREATE TABLE
或任何其他有效命令的引用。
我搜索了 stackoverflow,我看到 this 3 年老问题要求相同,但提供的解决方案并不理想,因为它只给我有关表和列的信息,而不是
CREATE TABLE
语句。
例如,如果我这样做
table_columns('kafka_cluster');
我会得到以下输出:
但是我需要的是这个:
CREATE TABLE 'kafka_cluster' (
system SYMBOL capacity 256 CACHE,
controller_GlobalTopicCount_Value DOUBLE,
controller_MigratingZkBrokerCount_Value DOUBLE,
controller_ActiveControllerCount_Value DOUBLE,
topics_ReassignmentBytesInPerSec_Count DOUBLE,
controller_GlobalPartitionCount_Value DOUBLE,
request_MessageConversionsTimeMs_Count DOUBLE,
controller_ZkMigrationState_Value DOUBLE,
request_ResponseQueueTimeMs_Count DOUBLE,
topics_BytesOutPerSec_Count DOUBLE,
request_TemporaryMemoryBytes_Count DOUBLE,
topics_TotalProduceRequestsPerSec_Count DOUBLE,
replica_manager_IsrShrinksPerSec_Count DOUBLE,
request_RemoteTimeMs_Count DOUBLE,
controller_MetadataErrorCount_Value DOUBLE,
replica_manager_IsrExpandsPerSec_Count DOUBLE,
controller_EventQueueProcessingTimeMs_Count DOUBLE,
topics_TotalFetchRequestsPerSec_Count DOUBLE,
controller_LastAppliedRecordLagMs_Value DOUBLE,
controller_OfflinePartitionsCount_Value DOUBLE,
controller_LastAppliedRecordOffset_Value DOUBLE,
topics_InvalidOffsetOrSequenceRecordsPerSec_Count DOUBLE,
replica_manager_PartitionsWithLateTransactionsCount_Value DOUBLE,
request_RequestBytes_Count DOUBLE,
topics_ReassignmentBytesOutPerSec_Count DOUBLE,
controller_EventQueueOperationsTimedOutCount_Value DOUBLE,
replica_manager_LeaderCount_Value DOUBLE,
replica_manager_AtMinIsrPartitionCount_Value DOUBLE,
topics_BytesRejectedPerSec_Count DOUBLE,
topics_NoKeyCompactedTopicRecordsPerSec_Count DOUBLE,
topics_FailedProduceRequestsPerSec_Count DOUBLE,
controller_FencedBrokerCount_Value DOUBLE,
topics_ReplicationBytesInPerSec_Count DOUBLE,
controller_LastCommittedRecordOffset_Value DOUBLE,
request_ThrottleTimeMs_Count DOUBLE,
controller_EventQueueTimeMs_Count DOUBLE,
replica_manager_ProducerIdCount_Value DOUBLE,
request_RequestQueueTimeMs_Count DOUBLE,
replica_manager_PartitionCount_Value DOUBLE,
topics_MessagesInPerSec_Count DOUBLE,
controller_TimedOutBrokerHeartbeatCount_Value DOUBLE,
controller_NewActiveControllersCount_Value DOUBLE,
request_TotalTimeMs_Count DOUBLE,
replica_manager_FailedIsrUpdatesPerSec_Count DOUBLE,
replica_manager_UnderMinIsrPartitionCount_Value DOUBLE,
controller_PreferredReplicaImbalanceCount_Value DOUBLE,
topics_FetchMessageConversionsPerSec_Count DOUBLE,
topics_BytesInPerSec_Count DOUBLE,
replica_manager_ReassigningPartitions_Value DOUBLE,
topics_ReplicationBytesOutPerSec_Count DOUBLE,
topics_InvalidMessageCrcRecordsPerSec_Count DOUBLE,
controller_EventQueueOperationsStartedCount_Value DOUBLE,
topics_ProduceMessageConversionsPerSec_Count DOUBLE,
controller_LastAppliedRecordTimestamp_Value DOUBLE,
replica_manager_OfflineReplicaCount_Value DOUBLE,
request_ResponseSendTimeMs_Count DOUBLE,
topics_InvalidMagicNumberRecordsPerSec_Count DOUBLE,
replica_manager_UnderReplicatedPartitions_Value DOUBLE,
topics_FailedFetchRequestsPerSec_Count DOUBLE,
request_LocalTimeMs_Count DOUBLE,
controller_ActiveBrokerCount_Value DOUBLE,
timestamp TIMESTAMP
) timestamp (timestamp) PARTITION BY DAY WAL;
目前 QuestDB 没有公开任何命令来获取
CREATE TABLE
语句。 Web 控制台向数据库发送多个查询,请求元数据(如 table_columns
查询),然后动态地组成语句。
此 Python 代码片段可用于根据元数据查询重新创建语句,然后将 SQL 发送到另一台服务器以在那里创建表:
import psycopg2
from psycopg2 import extras
import sys
def connect_qdb(host: str = '127.0.0.1', port: int = 8812, user: str = 'admin', pwd: str = 'quest', dbname: str = 'qdb'):
try:
conn = psycopg2.connect(f'user={user} password={pwd} host={host} port={port} dbname={dbname}')
conn.autocommit = False
return conn
except psycopg2.Error as e:
print(f'Had problem connecting with error {e}.')
def get_table_meta(conn, table_name):
meta = { "table_name": table_name, "partition" : None, "wal": False, "dedup" : None, "upsertKeys" : [],
"columns" : {}, "symbols": [], "designated" : None, "columns_sql" : [], "with" : [] }
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
cur.execute(f"""
SELECT * FROM tables WHERE name = '{table_name}';
"""
)
row = cur.fetchone()
meta["dedup"] = row.get("dedup")
meta["designated"] = row.get("designatedTimestamp")
meta["partition"] = row.get("partitionBy")
if meta["partition"] == "NONE":
meta["partition"] = None
meta["wal"] = row.get("walEnabled")
if row.get("maxUncommittedRows"):
meta["with"].append(f' maxUncommittedRows={row["maxUncommittedRows"]} ')
cur.execute(f"""
SELECT * FROM table_columns('{table_name}');
"""
)
records = cur.fetchall()
for row in records:
column_name = row["column"]
column_type = row["type"]
meta["columns"][column_name]={"type": column_type}
if row["upsertKey"]:
meta["upsertKeys"].append(column_name)
if column_type == "SYMBOL":
meta["symbols"].append(column_name)
if row["symbolCached"]:
cached_sql = "CACHE"
else:
cached_sql = "NOCACHE"
if row["indexed"]:
cached_sql = 'INDEX CAPACITY {row["indexBlockCapacity"]}'
else:
index_sql = ""
meta["columns_sql"].append(f'{column_name} SYMBOL CAPACITY {row["symbolCapacity"]} {cached_sql} {index_sql}')
else:
meta["columns_sql"].append(f"{column_name} {column_type}")
return meta
def get_create_statement(conn, table_meta):
columns_sql = ",\n\t".join(table_meta["columns_sql"])
if table_meta["designated"]:
designated_sql = f' TIMESTAMP({table_meta["designated"]}) '
else:
designated_sql = ""
if table_meta["partition"]:
partition_sql = f' PARTITION BY {table_meta["partition"]} '
else:
partition_sql = ""
if table_meta["wal"]:
wal_sql = f' WAL '
else:
wal_sql = ""
if table_meta["dedup"]:
upsert_sql = ", ".join(table_meta["upsertKeys"])
dedup_sql = f" DEDUP UPSERT KEYS({upsert_sql}) "
else:
dedup_sql = ""
if table_meta["with"]:
with_sql = f' WITH {", ".join(table_meta["with"])}'
else:
with_sql = ""
sql = f"""\
CREATE TABLE IF NOT EXISTS {table_meta['table_name']} (
\t{columns_sql}
) {designated_sql} {partition_sql} {wal_sql} {with_sql} {dedup_sql};
"""
return sql
def create_dest_table(conn, sql):
with conn.cursor() as cur:
cur.execute(sql)
origin_conn = connect_qdb()
table_meta = get_table_meta(origin_conn, "table_name")
sql = get_create_statement(origin_conn, table_meta)
destination_conn = connect_qdb("destination_host_ip")
create_dest_table(destination_conn, sql)
origin_conn.close()
destination_conn.close()