Akka persistence receiveRecover receives snapshots that are from other actor instances -
i experiencing unexpected behaviour when using akka persistence. new akka apologies in advance if have missed obvious.
i have actor called pcnprocessor. create actor instance every pcn id have. problem experience when create first actor instance, works fine , receive processed response. however, when create further pcnprocessor instances using different pcn ids, already processed pcn response.
essentially, reason snapshot stored part of first pcn id processor reapplied subsequent pcn id instances though not relate pcn , pcn id different. confirm behaviour, printed out log in receiverecover, , every subsequent pcnprocessor instance receives snapshots not belong it.
my question is:
- should storing snapshots in specific way keyed against pcn id? , should filtering away snapshots not related pcn in context?
- or should akka framework taking care of behind scenes, , should not have worry storing snapshots against pcn id.
source code actor below. use sharding.
package com.abc.pcn.core.actors import java.util.uuid import akka.actor._ import akka.persistence.{atleastoncedelivery, persistentactor, snapshotoffer} import com.abc.common.autopassivation import com.abc.pcn.core.events.{pcnnotprocessedevt, pcnprocessedevt} object pcnprocessor { import akka.contrib.pattern.shardregion import com.abc.pcn.core.pcn val shardname = "pcn" val idextractor: shardregion.idextractor = { case processpcn(pcn) => (pcn.id.tostring, processpcn(pcn)) } val shardresolver: shardregion.shardresolver = { case processpcn(pcn) => pcn.id.tostring } // shard settings def props = props(classof[pcnprocessor]) // command , response case class processpcn(pcn: pcn) case class notprocessed(reason: string) case object processed } class pcnprocessor extends persistentactor atleastoncedelivery autopassivation actorlogging { import com.abc.pcn.core.actors.pcnprocessor._ import scala.concurrent.duration._ context.setreceivetimeout(10.seconds) private val pcnid = uuid.fromstring(self.path.name) private var state: string = "not started" override def persistenceid: string = "pcn-processor-${pcnid.tostring}" override def receiverecover: receive = { case snapshotoffer(_, s: string) => log.info("recovering. pcn id: " + pcnid + ", state restore: " + s) state = s } def receivecommand: receive = withpassivation { case processpcn(pcn) if state == "processed" => sender ! left(notprocessed("already processed pcn")) case processpcn(pcn) if pcn.name.isempty => val error: string = "name invalid" persist(pcnnotprocessedevt(pcn.id, error)) { evt => state = "invalid" savesnapshot(state) sender ! left(notprocessed(error)) } case processpcn(pcn) => persist(pcnprocessedevt(pcn.id)) { evt => state = "processed" savesnapshot(state) sender ! right(processed) } } }
update:
after logging out metadata received snapshot, can see problem snapshotterid not resolving , being set pcn-processor-${pcnid.tostring} without resolving bit in italics.
[info] [06/06/2015 09:10:00.329] [ecp-akka.actor.default-dispatcher-16] [akka.tcp://ecp@127.0.0.1:2551/user/sharding/pcn/16b3d4dd-9e0b-45de-8e32-de799d21e7c5] recovering. pcn id: 16b3d4dd-9e0b-45de-8e32-de799d21e7c5, metadata of snapshot snapshotmetadata(pcn-processor-${pcnid.tostring},1,1433577553585)
i think misusing scala string interpolation feature.
try in following way:
override def persistenceid: string = s"pcn-processor-${pcnid.tostring}"
please note use of s
before string literal.
Comments
Post a Comment