streamparse 学习笔记
开发环境搭建
依赖安装:
- JDK 7+
- lein 可以通过 [Leiningen project page}(http://leiningen.org/) 或者 github 安装
- Apache Strom 开发环境, 可以通过 Strom project page 安装 (Streamparse 需要 Apache Strom 最低版本 0.10.0)
确保 lein
已经安装:
> lein version
Leiningen 2.8.1 on Java 10.0.1 Java HotSpot(TM) 64-Bit Server VM
确保 storm
已经安装:
> storm version
Storm 1.1.2
URL https://git-wip-us.apache.org/repos/asf/storm.git -r 0bb7a66a9b26ad96afc81b27dd45f93ae8969c44
Branch (no branch)
Compiled by ptgoetz on 2018-02-08T17:00Z
From source with checksum b31cec4ba07e9f7ab08f38c2f9943fe
通过 pip
安装 streamparse
:
> pip install streamparse
First Project
通过 sparse
命令行工具来创建一个项目:
> sparse quickstart workcount
Creating your wordcount streamparse project...
create wordcount
create wordcount/.gitignore
create wordcount/config.json
create wordcount/fabfile.py
create wordcount/project.clj
create wordcount/README.md
create wordcount/src
create wordcount/src/bolts/
create wordcount/src/bolts/__init__.py
create wordcount/src/bolts/wordcount.py
create wordcount/src/spouts/
create wordcount/src/spouts/__init__.py
create wordcount/src/spouts/words.py
create wordcount/topologies
create wordcount/topologies/wordcount.py
create wordcount/virtualenvs
create wordcount/virtualenvs/wordcount.txt
Done.
本地执行拓扑:
> cd wordcount
sparse run
如果看到这样的错误:
Local Storm version, 1.2.2, is not the same as the version in your project.clj, 0.10.0. The versions must match.
修改文件 wordcount/project.clj
, 将 Apache Storm 版本改成安装的版本
(defproject wordcount "0.0.1-SNAPSHOT"
:resource-paths ["_resources"]
:target-path "_build"
:min-lein-version "2.0.0"
:jvm-opts ["-client"]
:dependencies [[org.apache.storm/storm-core "1.2.2"]
[org.apache.storm/flux-core "1.2.2"]]
:jar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
:uberjar-exclusions [#"log4j\.properties" #"org\.apache\.storm\.(?!flux)" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
)
项目目录结构
File/Folder | Contents |
---|---|
config.json | Configuration information for all of your topologies. |
fabfile.py | Optional custom fabric tasks. |
project.clj | leiningen project file (can be used to add external JVM dependencies). |
src/ | Python source files (bolts/spouts/etc.) for topologies. |
tasks.py | Optional custom invoke tasks. |
topologies/ | Contains topology definitions written using the Topology DSL. |
virtualenvs/ | Contains pip requirements files used to install dependencies on remote Storm servers. |
定义拓扑
Storm 的拓扑是基于 Thrift 的,尽管可以在纯 Python 中使用 Thrift 来定义拓扑。具体请看 Topology DSL
下面是通过 sparse quickstart
命令生成的定义文件:
"""
Word count topology
"""
from streamparse import Grouping, Topology
from bolts.wordcount import WordCountBolt
from spouts.words import WordSpout
class WordCount(Topology):
word_spout = WordSpout.spec()
count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')},
par=2)
在 count_bolt
这个 bolt 中,我们希望 Strom 通过命名字段 word
将输入的 tuples 流进行分组。 Strom 为流分组提供了全面的选择,最常用的是 shuflle
和 fields
:
Shuffle grouping
: Tuples 随机分布到 bolt ,使得每个 bolt 获得相同数量的 tuplesFields grouping
: 流由指定的分组字段进行分区。例如:如果用 user-id 字段进行分组,相同 user-id 的 tuple 将会分配到同一个任务,而不同 user-id 的 tuple 会被分配到不同的任务中。
更多配置 spout 和 bolt 的选项可以参考 Topology DSL 文档 或者 Storm's Concepts
Spouts 和 Bolts
使用 streamparse 创建新的 spout 和 bolt 的一般流程是将它们添加到 src
目录中,然后更新相应的拓扑定义。
import itertools
from streamparse.spout import Spout
class SentenceSpout(Spout):
outputs = ['sentence']
def initialize(self, stormconf, context):
self.sentences = [
"She advised him to take a long holiday, so he immediately quit work and took a trip around the world",
"I was very glad to get a present from her",
"He will be here in half an hour",
"She saw him eating a sandwich",
]
self.sentences = itertools.cycle(self.sentences)
def next_tuple(self):
sentence = next(self.sentences)
self.emit([sentence])
def ack(self, tup_id):
pass # if a tuple is processed properly, do nothing
def fail(self, tup_id):
pass # if a tuple fails to process, do nothing
一旦 spout 进入主运行循环, streamparse 将会调用 spout 的 initialize()
方法。当初始化完成后, streamparse 将会不断的调用 spout 的 next_tuple()
方法,发出在拓扑定义中相匹配的 tuple。
mport re
from streamparse.bolt import Bolt
class SentenceSplitterBolt(Bolt):
outputs = ['word']
def process(self, tup):
sentence = tup.values[0] # extract the sentence
sentence = re.sub(r"[,.;!\?]", "", sentence) # get rid of punctuation
words = [[word.strip()] for word in sentence.split(" ") if word.strip()]
if not words:
# no words to process in the sentence, fail the tuple
self.fail(tup)
return
for word in words:
self.emit([word])
# tuple acknowledgement is handled automatically
我们只需要复写 bolt 的 process()
方法,当 spout 或者 bolt 发送 tuple 时,streamparse 将会调用这个 process()
方法。你可以做任何事在这个方法中,根据 bolt 的用途决定是否继续发送 tuple。
如果 process()
方法没有异常, streamparse 将在 process()
调用完自动确保当前发送的 tuple 被处理和确认。
如果 process()
方法调用时候有异常, streamparese 会先使当前 tuple失败,再杀死 Python 进程。
失败的 tuple
在上面的例子中,如果句子中没有单词,我们就使 tuple 失败。当我们使 tuple 失败将会发送什么? Strom 将会发送一个 "fail" 的消息返回给这个 tuple 的源 spout(这个例子中为 SentenceSpout
), streamparse 将会调用源 spout 的 fail()
方法(这个例子中为 SentenceSpout.fail
), 然后由 spout决定接下来做什么。可以重试失败的 tuple 、发送错误消息、杀死拓扑。具体可以查看
Dealing With Errors
Bolt 配置选项
可以通过设置类变量为 false 来禁用自动确认、锚定或失败 tuple ,对应 的类变量为auto_ack
, auto_anchor
或 auto_fail
。这三个选项都记录在streamparse.bolt.Bolt
中。