sparkStream持久化设计

数据批处理

现在有一种业务,数据需要在较短的时间内处理一下, spark Stream 是一个不错的选择。

数据流基本按照官网给的图走的。

spark stream 数据流图

flume收集日志,sinkkafka, kafka的消息被 spark stream 批处理消费掉。

批处理的数据被存到外部系统

在业务中, 外部系统是mysql, 怎么高效的存储呢?

在spark的官网中提供了思路

dstream.foreachRDD 的算子, 你有可能这样写

1
2
3
4
5
6
dstream.foreachRDD(rdd => {
val connection = createNewConnection() // executed at the driver
rdd.foreach(record => {
connection.send(record) // executed at the worker
})
})

但是这样写会报序列化的错误,提高系统的吞吐量,更好的办法是利用 rdd.foreachPartition 的方法。
为RDD的每个partition创建一个连接对象

类似如下这种方法

1
2
3
4
5
6
7
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
})
})

还有更好的方案是创建一个静态的,懒加载的连接对象,节省进一步的开销

1
2
3
4
5
6
7
8
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
})
})

附送python实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import MySQLdb
from DBUtils.PooledDB import PooledDB
class ConnectionPool(object):
def __init__(self):
TRACK_HOST = $TRACK_HOST
TRACK_USER = $TRACK_USER
TRACK_PASSWD = $TRACK_PASSWD
TRACK_DB = $TRACK_DB
self.pool = PooledDB(MySQLdb, 2, host=TRACK_HOST, user=TRACK_USER, passwd=TRACK_PASSWD, db=TRACK_DB, port=3306)
def getConnection(self):
return self.pool.connection()
def returnConnection(self, connection):
return connection.close()
def main():
balabala
...
counts.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition))
def send_partition(data):
try:
c = ConnectionPool()
conn = c.getConnection()
cur = conn.cursor()
for record in data:
balabala
...
c.returnConnection(conn)
except Exception, e:
print (e)
if __name__ == "__main__":
main()

优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ConnectionPool(object):
def __init__(self):
pass
@staticmethod
def get_connection():
track_host = ""
track_user = ""
track_password = ""
track_db = ""
import MySQLdb
from DBUtils.PooledDB import PooledDB
pool = PooledDB(MySQLdb, 2, host=track_host, user=track_user, passwd=track_password, db=track_db, port=3306)
return pool.connection()
@staticmethod
def return_connection(connection):
return connection.close()

参考

https://aiyanbo.gitbooks.io/spark-programming-guide-zh-cn/content/spark-streaming/basic-concepts/output-operations-on-DStreams.html
https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd