· 7 years ago · Oct 10, 2018, 04:58 AM
1from sqlalchemy import (DDL, event, Column, TEXT, select, func,
2 FLOAT, CHAR, DATE, Table, table, MetaData)
3from sqlalchemy.ext.declarative import declarative_base
4from sqlalchemy.schema import CreateColumn
5from sqlalchemy.sql.ddl import _CreateDropBase
6from sqlalchemy.ext.compiler import compiles
7
8
9class _View(Table):
10 __visit_name__ = 'view'
11 is_view = True
12
13
14def View(name, metadata, selectable, replace=True, cascade=False):
15 v = _View(name, metadata)
16 t = table(name)
17 t.schema = metadata.schema
18
19 for c in selectable.c:
20 c._make_proxy(t)
21
22 event.listen(
23 metadata, 'after_create', CreateView(v, selectable, replace=replace)
24 )
25 event.listen(
26 metadata, 'before_drop', DropView(v, if_exists=True, cascade=cascade)
27 )
28
29 return t
30
31
32class CreateView(_CreateDropBase):
33 __visit_name__ = 'create_view'
34
35 def __init__(self, element: View, selectable, on=None, bind=None,
36 replace=True):
37 super(CreateView, self).__init__(element, on=on, bind=bind)
38 self.columns = [CreateColumn(column) for column in element.columns]
39 self.selectable = selectable
40 self.replace = replace
41
42
43@compiles(CreateView)
44def visit_create_view(create, compiler, **kw):
45 view = create.element
46 preparer = compiler.dialect.identifier_preparer
47 text = '\nCREATE '
48
49 if create.replace:
50 text += 'OR REPLACE '
51 text += 'VIEW %s ' % preparer.format_table(view)
52
53 if create.columns:
54 column_names = [preparer.format_column(col.element)
55 for col in create.columns]
56 text += '(%s)' % ', '.join(column_names)
57
58 text += 'AS %s\n\n' % compiler.sql_compiler.process(create.selectable)
59
60 return text
61
62
63class DropView(_CreateDropBase):
64 __visit_name__ = 'drop_view'
65
66 def __init__(self, element: View, on=None, bind=None,
67 cascade=False, if_exists=False):
68 super(DropView, self).__init__(element, on=on, bind=bind)
69 self.cascade = cascade
70 self.if_exists = if_exists
71
72
73@compiles(DropView)
74def compile(drop, compiler, **kw):
75 text = "\nDROP VIEW "
76 if drop.if_exists:
77 text += "IF EXISTS "
78 text += compiler.preparer.format_table(drop.element)
79 if drop.cascade:
80 text += " CASCADE"
81 return text
82
83
84def mount_declare_schema(target_schema):
85 statement = 'CREATE SCHEMA IF NOT EXISTS {}'.format(target_schema.schema)
86 ddl_statement = DDL(statement)
87 event.listen(target_schema, 'before_create',
88 ddl_statement.execute_if(dialect='postgresql'))
89
90 return target_schema
91
92
93SCHEMA_NAME = 'my_schema'
94meta_schema = MetaData(schema=SCHEMA_NAME)
95
96# Base class for models using declarative syntax
97Base = declarative_base(metadata=meta_schema)
98
99
100class Accounts(Base):
101 __tablename__ = 'accounts'
102
103 portfolio_code = Column(TEXT, primary_key=True)
104 account_code = Column(TEXT, nullable=True)
105 open_date = Column(DATE, nullable=True)
106
107
108class Appraisals(Base):
109 __tablename__ = 'appraisals'
110
111 report_date = Column(DATE, primary_key=True)
112 portfolio_code = Column(TEXT, primary_key=True)
113 symbol = Column(TEXT, primary_key=True)
114 asset_class_code = Column(CHAR(1), nullable=False)
115 base_market_value = Column(FLOAT, nullable=False)
116
117
118def broken_view_query():
119 aa = Appraisals.__table__.alias('aa')
120 acc = Accounts.__table__.alias('acc')
121
122 query = select([
123 aa.c.portfolio_code,
124 func.sum(aa.c.base_market_value).label('total_cash'),
125 ]).select_from(
126 aa.join(acc, aa.c.portfolio_code == acc.c.portfolio_code)
127 ).where(
128 aa.c.asset_class_code == 'c'
129 ).group_by(
130 aa.c.report_date, aa.c.portfolio_code
131 )
132
133 return query
134
135
136broken_view = View('broken_view', Base.metadata, broken_view_query())
137
138
139Base.metadata = mount_declare_schema(Base.metadata)
140
141
142def declare(engine):
143 Base.metadata.create_all(engine)