背景
数据报表即将上线,需准备一个Clickhouse测试库用作后续开发
方案调研
迁移集群实际上就是要把所有数据库(system 除外)的表结构和数据完整的复制一遍。ClickHouse 官方和社区有一些现成的解决方案,也可以自己实现。
拷贝数据目录
先观察一下 ClickHouse 在文件系统上的目录结构(配置文件 /ect/clickhouse-server/config.xml
里面配置的 <path>
),为了便于查看,只保留了 data
和 metadata
目录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
.
├── data
│ ├── default
│ ├── system
│ │ ├── asynchronous_metric_log
│ │ ├── metric_log
│ │ ├── query_log
│ │ ├── query_thread_log
│ │ └── trace_log
├── metadata
│ ├── default
│ │ └── v_table_size.sql
│ ├── default.sql
│ ├── system
│ │ ├── asynchronous_metric_log.sql
│ │ ├── metric_log.sql
│ │ ├── query_log.sql
│ │ ├── query_thread_log.sql
│ │ └── trace_log.sql
data
目录里保存的是数据,每个数据库一个目录,内部每个表一个子目录。metadata
目录里保存的是元数据,即数据库和表结构。其中<database>.sql
是 创建数据库的 DDL(ATTACH DATABASE default ENGINE = Ordinary
)<database>/<table>.sql
是建表的 DDL (ATTACH TABLE ...
).
这里的 DDL 使用的是
ATTACH
语句,进入文档 查看 ATTACH 的作用及跟 CREATE 的区别
基于这个信息,直接把 data
和 metadata
目录(要排除 system)复制到新集群,即可实现数据迁移。用一个小表做测试,验证可行。
操作流程
- 在源集群的硬盘上打包好对应数据库或表的 data 和 metadata 数据
- 拷贝到目标集群对应的目录
- 重启 clickhouse-server
使用 remote
表函数
ClickHouse 除了查询常规的表,还能使用表函数来构建一些特殊的「表」,其中 remote 函数 可用于查询另一个 ClickHouse 的表。
使用方式很简单:
1
2
3
SELECT * FROM remote('addresses_expr', db, table, 'user', 'password') LIMIT 10;
因此,可以借助这个功能实现数据迁移:
1
2
3
4
INSERT INTO <local_database>.<local_table>
SELECT * FROM remote('remote_clickhouse_addr', <remote_database>, <remote_table>, '<remote_user>', '<remote_password>')
操作流程
- 在源集群的
system.tables
表查询出数据库、表、DDL、分区、表引擎等信息 - 在目标集群上,运行 DDL 创建表,然后运行上述迁移语句复制数据
- 遍历所有表,执行 2
使用 clickhouse-copier
Clickhouse-copier 是 ClickHouse 官方提供的一款数据迁移工具,可用于把表从一个集群迁移到另一个(也可以是同一个)集群。Clickhouse-copier 使用 Zookeeper 来管理同步任务,可以同时运行多个 clickhouse-copier 实例。
使用方式:
1
2
3
clickhouse-copier --daemon --config zookeeper.xml --task-path /task/path --base-dir /path/to/dir
其中 --config zookeeper.xml
是 Zookeeper 的连接信息,--task-path /task/path
是 Zookeeper 里任务配置的节点路径。在使用时,需要先定义一个 XML 格式的任务配置文件,上传到 /task/path/description
里。同步任务是表级别的,可以配置的内容还比较多。Clickhouse-copier 可以监听 /task/path/description
的变化,动态加载新的配置而不需要重启。
操作流程
- 创建
zookeeper.xml
- 创建任务配置文件,格式见官方文档,每个表都要配置(可使用代码自动生成)
- 把配置文件内容上传到 Zookeeper
- 启动 clickhouse-copier 进程
理论上 clickhouse-copier 运行在源集群或目标集群的环境都可以,官方文档推进在源集群,这样可以节省带宽。
使用 clickhouse-backup
clickhouse-backup 是社区开源的一个 ClickHouse 备份工具,可用于实现数据迁移。其原理是先创建一个备份,然后从备份导入数据,类似 MySQL 的 mysqldump + SOURCE。这个工具可以作为常规的异地冷备方案,不过有个局限是只支持 MergeTree 系列的表。
操作流程
- 在源集群使用
clickhouse-backup create
创建备份 - 把备份文件压缩拷贝到目标集群
- 在目标集群使用
clickhouse-backup restore
恢复
对比
拷贝数据目录 | 使用 remote 表函数 | 使用 clickhouse-copier | 使用 clickhouse-backup | |
---|---|---|---|---|
操作复杂度 | 较麻烦,需要在两台服务器上操作文件系统并拷贝文件,不方便自动化 | 一般,需要写程序自动化 | 看起来比使用 remote 更复杂一些,主要是生成配置文件比较麻烦 | 类似拷贝数据目录,会更简单一些 |
全量同步 | 支持 | 支持 | 支持 | 支持 |
增量同步 | 不支持 | 支持 | 应该支持 | 不支持 |
迁移视图 | 不支持 | 支持 | 不确定,理论上应该支持 | 不支持 |
性能 | 较好 | 较好 | 不确定,应该比 remote 快 | 不确定 |
局限性 | 不支持集群,很多人工操作 | 不适合大表?应该需要相同的拓扑结构 | 不确定,可能没有 | 只支持 MergeTree 系列 |
从官方和社区的一些资料综合来看 clickhouse-copier 功能最强大,不过考虑到数据量较少,而且对 clickhouse-copier 有些地方也不是很清楚,最终决定使用 remote
函数来做数据迁移。
关于别的数据迁移方案、更多的 clickhouse-copier 使用案例,可参考 Altinity 的博客 Clickhouse-copier in practice.
使用 remote
函数做数据迁移
使用 remote
函数还能实现更多特性:
- 对于分区表,可逐个分区进行同步,这样实际上同步的最小单位是分区,可以实现增量同步
- 可方便集成数据完整性(行数对比)检查,自动重新同步更新过的表
代码
代码如下,需要先安装 clickhouse-driver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import collections
import datetime
import functools
import logging
import time
from clickhouse_driver import Client
source_conn = Client(host='source-host', user='user', password='password')
target_conn = Client(host='target-host', user='user', password='password')
def format_partition_expr(p):
if isinstance(p, int):
return p
return f"'{p}'"
def execute_queries(conn, queries):
if isinstance(queries, str):
queries = queries.split(';')
for q in queries:
conn.execute(q.strip())
class Table(object):
def __init__(self, database, name, ddl, partition_key, is_view):
self.database = database
self.name = name
self.ddl = ddl.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
self.partition_key = partition_key
self.is_view = is_view
def exists(self, conn):
q = f"SELECT name FROM system.tables WHERE database = '{self.database}' AND name = '{self.name}'"
return len(conn.execute(q)) > 0
def get_partitions(self, conn):
partitions = []
q = f'SELECT {self.partition_key}, count() FROM {self.identity} GROUP BY {self.partition_key} ORDER BY {self.partition_key}'
partitions = collections.OrderedDict(conn.execute(q))
return partitions
def get_total_count(self, conn):
q = f'SELECT COUNT() FROM {self.identity}'
return conn.execute(q)[0][0]
def check_consistency(self):
if not self.exists(target_conn):
return False, None
source_ttl_count = self.get_total_count(source_conn)
target_ttl_count = self.get_total_count(target_conn)
if source_ttl_count == target_ttl_count:
return True, None
if not self.partition_key:
return False, None
source_partitions = self.get_partitions(source_conn)
target_partitions = self.get_partitions(target_conn)
bug_partitions = []
for p, c in source_partitions.items():
if p not in target_partitions or c != target_partitions[p]:
bug_partitions.append(p)
return False, bug_partitions
def create(self, replace=False):
target_conn.execute(f'CREATE DATABASE IF NOT EXISTS {self.database}')
if self.is_view:
replace = True
if replace:
target_conn.execute(f'DROP TABLE IF EXISTS {self.identity}')
target_conn.execute(self.ddl)
def copy_data_from_remote(self, by_partition=True):
self.create()
if self.is_view:
logging.info('ignore view %s', self.identity)
return
is_identical, bug_partitions = self.check_consistency()
if is_identical:
logging.info('table %s has the same number of rows, skip', self.identity)
return
if self.partition_key and by_partition:
for p in bug_partitions:
logging.info('copy partition %s=%s', self.partition_key, p)
self._copy_partition_from_remote(p)
else:
self._copy_table_from_remote()
def _copy_table_from_remote(self):
queries = f'''
DROP TABLE {self.identity};
{self.ddl};
INSERT INTO {self.identity}
SELECT * FROM remote('{source_conn.host}', {self.identity}, '{source_conn.user}', '{source_conn.password}')
'''
execute_queries(target_conn, queries)
def _copy_partition_from_remote(self, partition):
partition = format_partition_expr(partition)
queries = f'''
ALTER TABLE {self.identity} DROP PARTITION {partition};
INSERT INTO {self.identity}
SELECT * FROM remote('{source_conn.host}', {self.identity}, '{source_conn.user}', '{source_conn.password}')
WHERE {self.partition_key} = {partition}
'''
execute_queries(target_conn, queries)
def copy_to_another_table(self, database, name=None):
if not name:
name = self.name
assert not (self.database == database and self.name == name)
if self.partition_key:
partitions = self.get_partitions(target_conn)
queries = [f'CREATE TABLE IF NOT EXISTS {database}.{name} AS {self.identity}']
for p in partitions.keys():
expr = format_partition_expr(p)
queries.append(f'ALTER TABLE {database}.{name} DROP PARTITION {expr}')
queries.append(f'ALTER TABLE {database}.{name} ATTACH PARTITION {expr} FROM {self.identity}')
execute_queries(target_conn, queries)
else:
queries = f'''
DROP TABLE IF EXISTS {database}.{name};
CREATE TABLE {database}.{name} AS {self.identity};
INSERT INTO {database}.{name} SELECT * FROM {self.identity};
'''
execute_queries(target_conn, queries)
@property
def identity(self):
return f'{self.database}.{self.name}'
def __str__(self):
return self.identity
__repr__ = __str__
def get_all_tables() -> [Table]:
q = '''
SELECT database, name, create_table_query, partition_key, engine = 'View' AS is_view
FROM system.tables
WHERE database NOT IN ('system')
ORDER BY if(engine = 'View', 999, 0), database, name
'''
rows = source_conn.execute(q)
tables = [Table(*values) for values in rows]
return tables
def copy_remote_tables(tables):
for idx, t in enumerate(tables):
start_time = datetime.datetime.now()
logging.info('>>>> start to migrate table %s, progress %s/%s', t.identity, idx+1, len(tables))
t.copy_data_from_remote()
logging.info('<<<< migrated table %s in %s', t.identity, datetime.datetime.now() - start_time)
def with_retry(max_attempts=5, backoff=120):
def decorator(f):
@functools.wraps(f)
def inner(*args, **kwargs):
attempts = 0
while True:
attempts += 1
logging.info('start attempt #%s', attempts)
try:
f(*args, **kwargs)
except Exception as e:
if attempts >= max_attempts:
raise e
logging.exception('caught exception')
time.sleep(backoff)
else:
break
return inner
return decorator
@with_retry(max_attempts=10, backoff=60)
def main():
tables = get_all_tables()
logging.info('got %d tables: %s', len(tables), tables)
copy_remote_tables(tables)
if __name__ == '__main__':
main()
使用方式:直接运行即可,挂了重跑,不会有副作用。
局限性
仅通过对比行数来判断数据同步完整,没有比较内部数据的一致性,因此如果上游表行数不变,更新了部分字段,将无法自动识别,需要先从目标库里把表删掉重新同步。
必须为两个相同的库,例如从阿里云Clickhouse迁移数据到本地基本就不可行。