Flask sqlalchemy 设置默认时间

记录第一次创建时间: default

最近发现程序中凡存在默认时间的部分,经常出现多条记录create_time时间一致的情况,而实际上创建时间明明差的很多
一番debug之后发现坑在这里:

falsk中如下两个字段

1
2
3
create_time1 = db.Column(db.DateTime, default=datetime.now)

create_time2 = db.Column(db.DateTime, default=datetime.now())

两者的区别:

第一个插入的是期望的, 数据的插入时间,每条数据插入时可自动根据当前时间生成

第二条是一个固定的时间, 程序部署的时间,所有的数据都是这个固定时间

实际上默认值在mysql数据库没有体现, 都是sqlalchemy在插入数据时加的


如果想想在生成的table中有默认值使用: server_default

1
name = db.Column(db.String(45), server_default='hh')

当我们要给布尔值类型指定server_default时,需要用到text

1
2
from sqlalchemy import text
is_domain = db.Column(db.Boolean,default=False,server_default=text('0'))

因为mysql的datetime类型的数据不支持函数, 所以没法指定默认值位当前时间

记录每次修改的时间: onupdate

1
update_time = db.Column(db.DateTime, default=datetime.now,onupdate=datetime.now)

Celery SQLAlchemy issue

一个基于Flask + SQLAlchemy + Celery的后端任务平台,主要用到异步运行Ansible任务。
因业务需要, 前两天升级了mysql配置,升级过程中重启了mysql服务。

下午,在执行Ansible后台任务的时候报错:

1
StatementError("(sqlalchemy.exc.InvalidRequestError) Can't reconnect until invalid transaction is rolled back")

排查下来发现,mysql重启后,如果不一并重启Flask和Celery进程,就会报出上述错误。

参考:https://groups.google.com/forum/#!topic/sqlalchemy/h9U0Kmx_vMU

Flask request小结

一、post 请求 (Content-Type: application/json)

  1. c = request.get_data()

    可以获取未经处理过的原始数据而不管内容类型,如果数据格式是json的,则取得的是json字符串,排序和请求参数一致

  2. c =request.get_json()

    将请求参数做了处理,得到的是字典格式的,因此排序会打乱依据字典排序规则

  3. c = request.data

    可以获取未经处理过的原始数据,如果数据格式是json的,则取得的是json字符串,排序和请求参数一致

  4. c = request.json

    刚开始使用的时候以为是一个方法这样调用request.json()然后报错如下:
    Content-Type: application/json时报错’dict’ object is not callable
    原来是个属性,因此这样使用request.json,就能正常使用了总结如下:
    将请求参数做了处理,得到的是字典格式的,因此排序会打乱依据字典排序规则

二、get请求

1
2
3
request.args.get('key') # 可以获取到单个的值,
requestValues = request.args # 可以获取get请求的所有参数返回值是ImmutableMultiDict类型,
requestValues.to_dict() #将获得的参数转换为字典

Celery任务状态报错一例

Celery 中定义FAILURE状态时,报错:KeyError: 'exc_type',完整内容大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
>>> print(task.state)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 471, in state
return self._get_task_meta()['status']
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 410, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 359, in get_task_meta
meta = self._get_task_meta_for(task_id)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 674, in _get_task_meta_for
return self.decode_result(meta)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 278, in decode_result
return self.meta_from_decoded(self.decode(payload))
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 274, in meta_from_decoded
meta['result'] = self.exception_to_python(meta['result'])
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 248, in exception_to_python
from_utf8(exc['exc_type']), __name__)
KeyError: 'exc_type'

这个报错主要是meta字段中缺少exc_typeexc_message关键字导致的,在meta中添加这两个关键字即可消除报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
@app.task(bind=True)
def task(self):
try:
raise ValueError('Some error')
except Exception as ex:
self.update_state(
state=states.FAILURE,
meta={
'exc_type': type(ex).__name__,
'exc_message': traceback.format_exc().split('\n')
'custom': '...'
})
raise Ignore()

需要注意的是,在使用返回response时就不能再使用task.info来返回内容了,需要使用如下文件返回:

1
2
3
4
5
6
7
8
9
>>> import tasks
>>> task = tasks.task.s().delay()
>>> print(task.state)
'FAILURE'
>>> print(task.info)
ValueError('Traceback (most recent call last):', ' File "/app/tasks.py", line 16, in task', " raise ValueError('some exception')", 'ValueError: some exception', '')
>>> print(task.backend.get(task.backend.get_key_for_task(task.id)))
b'{"status": "FAILURE", "result": {"exc_type": "ValueError", "exc_message": ["Traceback (most recent call last):", " File \\"/app/tasks.py\\", line 16, in task", " raise ValueError(\'some exception\')", "ValueError: some exception", ""], "custom": "..."}, "traceback": null, "children": [], "task_id": "d2f60111-aec6-4c58-83a7-24f0edb7ac5f"}'
Custom state

Hbase Rowkey设计规范参考

HBase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对HBase中的数据进行快速定位。

HBase中rowkey可以唯一标识一行记录,在HBase查询的时候,有以下几种方式:

  1. 通过get方式,指定rowkey获取唯一一条记录
  2. 通过scan方式,设置startRow和stopRow参数进行范围匹配
  3. 全表扫描,即直接扫描整张表中所有行记录

rowkey长度原则

利用Ansible Playbook批量部署node_exporter

之前有翻译过一系列Grafana配置Graphite的文章。传送门

在用过Prometheus之后,发现Prometheus配合Grafana也是天造的一对,以设的一双。

对于服务器基础指标监控而言,Prometheus通过node_exporter来收集数据做为数据源,提供了各种与硬件和内核相关的详细指标。

下面分享一个我正在使用的Ansible Playbook,用于批量部署node_exporter到多个目标主机。

Spark性能优化之foreach与foreachPartition

首先,我们对比一下foreachPartitionforeach两个方法的实现,有什么不同的地方:

1
2
3
4
5
6
7
8
9
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

Kafka与Flume区别

Kafka 与 Flume 在很多方面有相似性。以下是评估两个系统的一些建议:

  1. Kafka 是一个通用型系统。你可以有许多的生产者和消费者分享多个主题。相反地,Flume 被设计成特定用途的工作,特定地向 HDFS 和 HBase 发送出去。Flume 为了更好地为 HDFS 服务而做了特定的优化,并且与 Hadoop 的安全体系整合在了一起。基于这样的结论,Hadoop 开发商 Cloudera 推荐如果数据需要被多个应用程序消费的话,推荐使用 Kafka,如果数据只是面向 Hadoop 的,可以使用 Flume。

  2. Flume 拥有许多配置的来源 (sources) 和存储池 (sinks)。然后,Kafka 拥有的是非常小的生产者和消费者环境体系,Kafka 社区并不是非常支持这样。如果你的数据来源已经确定,不需要额外的编码,那你可以使用 Flume 提供的 sources 和 sinks,反之,如果你需要准备自己的生产者和消费者,那你需要使用 Kafka。

  3. Flume 可以在拦截器里面实时处理数据。这个特性对于过滤数据非常有用。Kafka 需要一个外部系统帮助处理数据。

  4. 无论是 Kafka 或是 Flume,两个系统都可以保证不丢失数据。然后,Flume 不会复制事件。相应地,即使我们正在使用一个可以信赖的文件通道,如果 Flume agent 所在的这个节点宕机了,你会失去所有的事件访问能力直到你修复这个受损的节点。使用 Kafka 的管道特性不会有这样的问题。

  5. Flume 和 Kafka 可以一起工作的。如果你需要把流式数据从 Kafka 转移到 Hadoop,可以使用 Flume 代理 (agent),将 kafka 当作一个来源 (source),这样可以从 Kafka 读取数据到 Hadoop。你不需要去开发自己的消费者,你可以使用 Flume 与 Hadoop、HBase 相结合的特性,使用 Cloudera Manager 平台监控消费者,并且通过增加过滤器的方式处理数据。

Scala异常获取一例

在处理第11行读文件时,由于数据文件出现的不规律,在指定日期内可能存在日志文件不存在的情况,这里需要处理下异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def readLog(sc: SparkContext, startDate: String, endDate: String, logNames: List[String]): RDD[String] = {
val dateLst = DateUtils.getDateListBetweenTwoDate(startDate, endDate)

var logRdd = sc.makeRDD(List[String]())
for (date <- dateLst) {
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
for (logName <- logNames) {
val logRdd = logRdd.union(
try {sc.textFile(s"cosn://fuge/mid-data/fuge/ssp/bid-log/$year/$month/$day/${logName}*")
.map(x => x.split("\\|", -1))
.filter(x => x.length >= 2 && (x(1).trim == "6" || x(1).trim == "0")).map(_.toString) // 0和6为请求成功的状态码
} catch {
case _: Exception => sc.makeRDD(List[String]())
}
)
}
}
logRdd
}