sqlalchemy源码分析之create_engine引擎的创建
sqlalchemy源码分析之create_engine引擎的创建
引擎是sqlalchemy的核⼼,不管是 sql core 还是orm的使⽤都需要依赖引擎的创建,为此我们研究下,引擎是如何创建的。
1from sqlalchemy import create_engine
2 engine = create_engine('mysql+pymysql://root:x@127.0.0.1/test',
3                        echo=True,  # 设置为True,则输出sql语句
4                        pool_size=5,  # 数据库连接池初始化的容量
5                        max_overflow=10,  # 连接池最⼤溢出容量,该容量+初始容量=最⼤容量。超出会堵塞等待,等待时间为timeout参数值默认30
6
生活中的物理现象7                        pool_recycle=7200  # 重连周期
8                        )
create_engine 创建引擎对象,源代码如下:
class PlainEngineStrategy(DefaultEngineStrategy):
"""Strategy for configuring a regular Engine."""
name = "plain"
engine_cls = base.Engine
PlainEngineStrategy()
  这⾥有个参数 strategy:策略,⼀般情况默认是'plain',通过参数动态去实例策略类。我们看看对应默认的策略'plain'对应的类是哪个?default_strategy = "plain"
def create_engine(*args, **kwargs):
strategy = kwargs.pop("strategy", default_strategy)
strategy = strategies.strategies[strategy]
ate(*args, **kwargs)
可以看到是PlainEngineStrategy(),接下来回到创建⽅法 ate(*args, **kwargs),具体看看怎么创建的。
其实调⽤了⽗类DefaultEngineStrategy的⽅法create。
class DefaultEngineStrategy(EngineStrategy):
"""Base class for built-in strategies."""
def create(self, name_or_url, **kwargs):
# create url.URL object
u = url.make_url(name_or_url)
plugins = u._instantiate_plugins(kwargs)
u.query.pop("plugin", None)
kwargs.pop("plugins", None)
entrypoint = u._get_entrypoint()
dialect_cls = _dialect_cls(u)
if kwargs.pop("_coerce_config", False):
2016天津高考def pop_kwarg(key, default=None):
value = kwargs.pop(key, default)
if key in ine_config_types:数据库恢复
value = ine_config_types[key](value)
return value
else:
pop_kwarg = kwargs.pop
dialect_args = {}
# consume dialect arguments from kwargs
for k _cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = pop_kwarg(k)
dbapi = kwargs.pop("module", None)
if dbapi is None:
dbapi_args = {}
for k _func_kwargs(dialect_cls.dbapi):
if k in kwargs:
dbapi_args[k] = pop_kwarg(k)
dbapi = dialect_cls.dbapi(**dbapi_args)
dialect_args["dbapi"] = dbapi
for plugin in plugins:
plugin.handle_dialect_kwargs(dialect_cls, dialect_args)
# create dialect
dialect = dialect_cls(**dialect_args)
# assemble connection arguments
(cargs, cparams) = ate_connect_args(u)
cparams.update(pop_kwarg("connect_args", {}))
cargs = list(cargs)  # allow mutability
# look for existing pool or create
pool = pop_kwarg("pool", None)
if pool is None:
def connect(connection_record=None):
if dialect._has_events:
for fn in dialect.dispatch.do_connect:
connection = fn(
dialect, connection_record, cargs, cparams
)
if connection is not None:
return connection
t(*cargs, **cparams)
creator = pop_kwarg("creator", connect)
poolclass = pop_kwarg("poolclass", None)
if poolclass is None:
poolclass = _pool_class(u)
pool_args = {"dialect": dialect}
# consume pool arguments from kwargs, translating a few of            # the arguments
translate = {
"logging_name": "pool_logging_name",
"echo": "echo_pool",
"timeout": "pool_timeout",
"recycle": "pool_recycle",
"events": "pool_events",
"use_threadlocal": "pool_threadlocal",
"reset_on_return": "pool_reset_on_return",
"pre_ping": "pool_pre_ping",
"use_lifo": "pool_use_lifo",
}
for k _cls_kwargs(poolclass):
tk = (k, k)
if tk in kwargs:
pool_args[k] = pop_kwarg(tk)
for plugin in plugins:
plugin.handle_pool_kwargs(poolclass, pool_args)
pool = poolclass(creator, **pool_args)
else:
if isinstance(pool, poollib.dbapi_proxy._DBProxy):
pool = _pool(*cargs, **cparams)
else:
pool = pool
pool._dialect = dialect
# create engine.
engineclass = ine_cls
engine_args = {}
for k _cls_kwargs(engineclass):
if k in kwargs:
engine_args[k] = pop_kwarg(k)
_initialize = kwargs.pop("_initialize", True)
# all kwargs should be consumed
if kwargs:
raise TypeError(
"Invalid argument(s) %s sent to create_engine(), "
"using configuration %s/%s/%s.  Please check that the "
"keyword arguments are appropriate for this combination "
变更法人代表
"of components."
% (
",".join("'%s'" % k for k in kwargs),
dialect.__class__.__name__,
pool.__class__.__name__,
engineclass.__name__,
微博加粉)
)
engine = engineclass(pool, dialect, u, **engine_args)
if _initialize:
do_on_connect = _connect()
if do_on_connect:
def on_connect(dbapi_connection, connection_record):
conn = getattr(
dbapi_connection, "_sqla_unwrap", dbapi_connection
)
if conn is None:
return
do_on_connect(conn)
event.listen(pool, "first_connect", on_connect)
event.listen(pool, "connect", on_connect)
def first_connect(dbapi_connection, connection_record):
c = base.Connection(
engine, connection=dbapi_connection, _has_events=False
)
c._execution_options = util.immutabledict()
dialect.initialize(c)
dialect.do_tion)
event.listen(pool, "first_connect", first_connect, once=True)
ine_created(engine)
if entrypoint is not dialect_cls:
for plugin in plugins:
return engine
我们逐⼀分析:
u = url.make_url(name_or_url) # 这个⽅法解析传⼊的数据库连接的uri信息,符合条件最终返回⼀个URL对象plugins = u._instantiate_plugins(kwargs) # 插件初始化,
entrypoint = u._get_entrypoint()  # 根据传⼊url中的数据库类型(mysql)和驱动库(pymysql),来注册插件,返回⽅⾔类dialect_cls = _dialect_cls(u)  # 获取Dialect类
这⾥需要说明下Dialect(⽅⾔类)的作⽤是⽤来定义数据库和DBapi的⾏为
if kwargs.pop("_coerce_config", False):
def pop_kwarg(key, default=None):
value = kwargs.pop(key, default)
if key in ine_config_types:
value = ine_config_types[key](value)
return value
else:
pop_kwarg = kwargs.pop
dialect_args = {}
# consume dialect arguments from kwargs
for k _cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = pop_kwarg(k)
  这段代码没啥,创建出⽅⾔所需要的完整参数dialect_args dbapi = kwargs.pop("module", None)
if dbapi is None:
dbapi_args = {}
for k _func_kwargs(dialect_cls.dbapi):
if k in kwargs:
dbapi_args[k] = pop_kwarg(k)
dbapi = dialect_cls.dbapi(**dbapi_args)
dialect_args["dbapi"] = dbapi
  这段代码则是实例化dbpai对象。
# create dialect
dialect = dialect_cls(**dialect_args)
开始实例化⽅⾔
(cargs, cparams) = ate_connect_args(u)
cparams.update(pop_kwarg("connect_args", {}))
cargs = list(cargs)  # allow mutability
创建连接所需要的参数
pool = pop_kwarg("pool", None)
if pool is None:
def connect(connection_record=None):
if dialect._has_events:
for fn in dialect.dispatch.do_connect:
connection = fn(
dialect, connection_record, cargs, cparams
)
if connection is not None:
return connection
t(*cargs, **cparams)
creator = pop_kwarg("creator", connect)
poolclass = pop_kwarg("poolclass", None)
if poolclass is None:
poolclass = _pool_class(u)
pool_args = {"dialect": dialect}
# consume pool arguments from kwargs, translating a few of
# the arguments
translate = {
"logging_name": "pool_logging_name",
"echo": "echo_pool",
"timeout": "pool_timeout",
"recycle": "pool_recycle",
"events": "pool_events",
"use_threadlocal": "pool_threadlocal",
"reset_on_return": "pool_reset_on_return",
"pre_ping": "pool_pre_ping",
"use_lifo": "pool_use_lifo",
}
for k _cls_kwargs(poolclass):
tk = (k, k)
if tk in kwargs:
pool_args[k] = pop_kwarg(tk)
for plugin in plugins:
plugin.handle_pool_kwargs(poolclass, pool_args)
pool = poolclass(creator, **pool_args)
dnf如何镶嵌徽章
else:
if isinstance(pool, poollib.dbapi_proxy._DBProxy):
pool = _pool(*cargs, **cparams)
else:
pool = pool
pool._dialect = dialect
  创建连接池,默认创建pool.QueuePool
# create engine.
engineclass = ine_cls
engine_args = {}
for k _cls_kwargs(engineclass):
if k in kwargs:
engine_args[k] = pop_kwarg(k)
_initialize = kwargs.pop("_initialize", True)
# all kwargs should be consumed
if kwargs:
raise TypeError(
"Invalid argument(s) %s sent to create_engine(), "
"using configuration %s/%s/%s.  Please check that the "
"keyword arguments are appropriate for this combination "
"of components."
% (
",".join("'%s'" % k for k in kwargs),
dialect.__class__.__name__,
pool.__class__.__name__,
engineclass.__name__,
)
)
engine = engineclass(pool, dialect, u, **engine_args)
  从上⾯可以看出来,引擎的核⼼是连接池和⽅⾔,连接池负责连接的维护,⽅⾔负责数据的⾏为。if _initialize:
do_on_connect = _connect()
if do_on_connect:
def on_connect(dbapi_connection, connection_record):
conn = getattr(
dbapi_connection, "_sqla_unwrap", dbapi_connection
)
if conn is None:
return
do_on_connect(conn)
event.listen(pool, "first_connect", on_connect)
event.listen(pool, "connect", on_connect)
def first_connect(dbapi_connection, connection_record):
c = base.Connection(
engine, connection=dbapi_connection, _has_events=False
)
c._execution_options = util.immutabledict()
dialect.initialize(c)
dialect.do_tion)

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。