Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。
Akka是一个开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于JVM的应用。使构建高并发的分布式应用更加容易。
Akka可以以两种不同的方式来使用
1)易于构建并行和分布式应用 (Simple Concurrency & Distribution)
Akka在设计时采用了异步通讯和分布式架构,并对上层进行抽象,如Actors、Futures ,STM等。
2)可靠性(Resilient by Design)
系统具备自愈能力,在本地/远程都有监护。
3)高性能(High Performance)
在单机中每秒可发送50000000个消息。内存占用小,1GB内存中可保存2500000个actors。
4)弹性,无中心(Elastic — Decentralized)
自适应的负责均衡,路由,分区,配置
5)可扩展(Extensible)
以下是Akka被部署到生产环境中的领域
事务处理 (在线游戏,金融/银行业,贸易,统计,赌博,社会媒体,电信):垂直扩展,水平扩展,容错/高可用性
服务后端 (任何行业,任何应用):提供REST, SOAP, Cometd, WebSockets 等服务 作为消息总线/集成层 垂直扩展,水平扩展,容错/高可用性
并发/并行 (任何应用):运行正确,方便使用,只需要将jar包添加到现有的JVM项目中(使用Scala,java, Groovy或jruby)
仿真:主/从,计算网格,MaReduce等等.
批处理 (任何行业):Camel集成来连接批处理数据源 Actor来分治地批处理工作负载
通信Hub (电信, Web媒体, 手机媒体):垂直扩展,水平扩展,容错/高可用性
游戏与赌博 (MOM, 在线游戏, 赌博):垂直扩展,水平扩展,容错/高可用性
商业智能/数据挖掘/通用数据处理:垂直扩展,水平扩展,容错/高可用性
复杂事件流处理:垂直扩展,水平扩展,容错/高可用性
Actor,可以看作是一个个独立的实体,他们之间是毫无关联的。但是,他们可以通过消息来通信。
一个Actor收到其他Actor的信息后,它可以根据需要作出各种相应。消息的类型可以是任意的,消息的内容也可以是任意的。这点有点像webservice了。只提供接口服务,你不必了解我是如何实现的。
一个Actor如何处理多个Actor的请求呢?它先建立一个消息队列,每次收到消息后,就放入队列,而它每次也从队列中取出消息体来处理。通常我们都使得这个过程是循环的。让Actor可以时刻处理发送来的消息。
各种语言在实现Coroutine方式的支持时,多数都采用了Actor Model来实现,Actor Model简单来说就是每个任务就是一个Actor,Actor之间通过消息传递的方式来进行交互,而不采用共享的方式,Actor可以看做是一个轻量级的进程或线程,通常在一台4G内存的机器上,创建几十万个Actor是毫无问题的,Actor支持Continuations,但go 不是。
应用场景:服务端要处理大量的客户端的请求,并且处理请求耗费较长的时间。这时就需要使用并发处理。多线程是一种方法,这里使用Akka框架处理并发。(以下代码在Groovy1.7.5、akka-actors-1.2下运行成功)
class HelloClient implements Runnable {
int seq
String serviceName
HelloClient(int seq, String serviceName) {
this.seq = seq
this.serviceName = serviceName
}
void run() {
ActorRef actor = remote().actorFor(serviceName, "10.68.15.113", 9999);
String str = "Hello--" + seq
println "请求-----${str}"
Object res = actor.sendRequestReply(str)
println "返回-----${res}"
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new HelloClient(i, "hello-service"))
thread.start() //同时启动5个客户端请求Master
}
}
}
class HelloWorker extends UntypedActor { //Worker是一个Actor,需要实现onReceive方法
@Override
void onReceive(Object o) {
println "Worker 收到消息----" + o
if (o instanceof String) {
String result = doWork(o) //调用真实的处理方法
getContext().replyUnsafe(result)//将结果返回给Master
}
}
//Worker处理其实很简单,仅仅将参数字符串改造一下而已。只不过使其sleep了20秒,让它变得“耗时较长”
String doWork(String str) {
Thread.sleep(1000 * 20)
return "result----" + str + " 。"
}
}
class HelloMaster extends UntypedActor {
@Override
void onReceive(Object o) {
println "Master接收到Work消息:" + o
def clientChannel = getContext().channel() //客户端链接Channel
//启动worker actor
ActorRef worker = Actors.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new HelloWorker();
}
}).start();
//这里实现真正的并发
Future f1 = Futures.future(new Callable() {
Object call() {
def result = worker.sendRequestReply(o) //将消息发给worker actor,让Worker处理业务,同时得到返回结果
worker.stop()
println "Worker Return----" + result
clientChannel.sendOneWay(result) //将结果返回给客户端
return result
}
})
println "Future call over"
}
public static void main(String[] args) { //启动Master进程,绑定IP、端口和服务
Actors.remote().start("10.68.15.113", 9999).register(
"hello-service",
Actors.actorOf(HelloMaster.class));
}
}