In [1]: import peewee as pw In [2]: from playhouse import db_url In [3]: db = db_url.connect("mysql://127.0.0.1", port=9696, user="root", password="root", database="ks1") In [4]: db.get_tables() Out[4]: ['test_shard_hash_0000', 'test_shard_hash_0001', 'test_shard_hash_0002', 'test_shard_hash_0003', 'test_shard_range_0000', 'test_shard_range_0001', 'test_shard_range_0002', 'test_shard_range_0003']
创建对应表结构的model,并绑定到db实例。
In [6]: classUserInfo(pw.Model): ...: id = pw.BigIntegerField() ...: user_id = pw.BigIntegerField() ...: name = pw.CharField(max_length=255) ...: age = pw.SmallIntegerField() ...: classMeta: ...: primary_key = pw.CompositeKey("user_id", "id") ...: table_name = "test_shard_hash" In [7]: UserInfo.bind(db) Out[7]: True
复现
下面我们打开kingshard sql日志,试一下CRUD等操作。
create
In [10]: UserInfo.create(user_id=1, id=1, name="", age=0) Out[10]: <UserInfo: (1, 1)>
kingshard日志 OK - 1.0ms - 127.0.0.1:59593->127.0.0.1:3306:insert into test_shard_hash_0001(id, user_id, name, age) values (1, 1, '', 0)
测试一下插入冲突 In [11]: UserInfo.create(user_id=1, id=1, name="", age=0) # IntegrityError: (1062, "Duplicate entry '1-1' for key 'PRIMARY'") In [12]: UserInfo.create(user_id=5, id=1, name="", age=0) # InternalError: (1105, 'transaction in multi node') 我们发现kingshard报错了,原因是一个事务跨node了,但是我们这里并没有显式开启事务,这是为什么呢?我们下面会分析。
select
In [18]: u = UserInfo.select().where(UserInfo.user_id == 1).first() In [19]: u.user_id Out[19]: 1
OK - 0.9ms - 127.0.0.1:59593->127.0.0.1:3306:select t1.id, t1.user_id, t1.name, t1.age from test_shard_hash_0001 as t1 where (t1.user_id = 1) limit 1
跨node的select In [43]: UserInfo.select() # InternalError: (1105, 'transaction in multi node') 发生了跟之前一样的问题。
ERROR - 15.1ms - 127.0.0.1:59593->127.0.0.1:3306:SELECT COUNT(1) FROM (SELECT 1 FROM `test_shard_hash` AS `t1` WHERE (`t1`.`user_id` = 2)) AS `_wrapped` 这说明这种count方法kingshard没办法根据shard_key计算出发往哪张表。我们换一种方法试试: ---------------------------------------------- In [35]: UserInfo.select(pw.fn.COUNT('*')).where(UserInfo.user_id == 1).scalar() Out[35]: 1
OK - 1.2ms - 127.0.0.1:59593->127.0.0.1:3306:select count('*') from test_shard_hash_0001 as t1 where (t1.user_id = 1) 看来这种写法可行,我们测试一下跨node的count In [47]: UserInfo.select(pw.fn.COUNT('*')).scalar() # InternalError: (1105, 'transaction in multi node') 还是不行,看来kingshard的跨node基本的操作在peewee里都搞不起来,下面我们会一步步揭开它的神秘面纱。
update
In [50]: UserInfo.update(age=10).where(UserInfo.user_id == 1).execute() Out[50]: 1
测试一下kingshard支持的跨node更新 In [56]: UserInfo.update(age=10).where(UserInfo.user_id.in_((1,5))).execute() # InternalError: (1105, 'transaction in multi node')
如果MySQL Server执行SQL时抛错(Duplicate/Deadlock/kingshard一些报错),并且没有设置autorollback,则当前事务不会提交,这也是为什么如果插入一条数据失败,再使用当前连接执行一个shard到不同node的SQL,同样也会报transaction in multi node。
mysql_passwd = False try: import pymysql as mysql except ImportError: try: import MySQLdb as mysql mysql_passwd = True except ImportError: mysql = None ... def_connect(self): if mysql isNone: raise ImproperlyConfigured('MySQL driver not installed!') return mysql.connect(db=self.database, **self.connect_params)
其实是使用的对应的包提供的方法,我们看一下pymsql的connect方法
defConnect(*args, **kwargs): """ Connect to the database; see connections.Connection.__init__() for more information. """ from .connections import Connection return Connection(*args, **kwargs) ... connect = Connection = Connect ... classConnection(object): def__init__(self, ..., autocommit=False, ...): ... # specified autocommit mode. None means use server default. self.autocommit_mode = autocommit ... self.connect()
defconnect(self, sock=None): ... if self.autocommit_mode isnotNone: self.autocommit(self.autocommit_mode) ...
defautocommit(self, value): self.autocommit_mode = bool(value) current = self.get_autocommit() if value != current: self._send_autocommit_mode()
def_send_autocommit_mode(self): """Set whether or not to commit after every execute()""" self._execute_command(COMMAND.COM_QUERY, "SET AUTOCOMMIT = %s" % self.escape(self.autocommit_mode)) self._read_ok_packet()
In [1]: db.connect_params.update(autocommit=True) In [2]: db.close() Out[2]: True In [3]: db.connect() Out[3]: True In [4]: UserInfo.bind(db) Out[4]: False In [5]: UserInfo.select(pw.fn.COUNT('*')).scalar() Out[5]: 5
总结
kingshard报错transaction in multi node,有一个前提,就是当前session显式的开启一个事务begin/start transaction或者*设置了autocommit=0*。