Queue Processor – its Configuration, Usage & Execution
Queue Processor

Queue Processor – its Configuration, Usage & Execution

Summary:

This article discusses about the Queue processor which is an enhanced version (replacement) of standard agents in Pega v8.

With our earlier post on Job Scheduler, we understood that we can use a job scheduler to run a defined task in a defined time interval as an internal background process.

Until and unless if the task and time are defined, the job scheduler suffices. What-if if we want to execute the same/different process as an internal background process for different purposes at different time intervals? Will Job scheduler still satisfies the requirement or do we have a better capability to implement this in Pega? 😳

Let’s get it answered with a real-time example 👍

Consider claims application, where a customer can raise a claim, and system (Pega) should send them an email notification to acknowledge the request. Customers who submit the claims are classified as Silver, Gold, and Diamond. Below is the business requirement on sending the notification to the customer,

  • Silver customers should receive the email notification after 8 hours of the request submission.
  • Gold customers should receive the email notification after 6 hours of the request submission.
  • Diamond customers should receive the email notification after 4 hours of the request submission.

Email notification should be sent as part of the background process. But Job scheduler would not suffice since the schedule to perform the task is specific to each case (not fixed).

Queue processor in Pega helps us achieve the above requirement with minimal configuration and maximum reusability.

Queue processor execution for the given requirement

Before getting into the discussion, let’s make sure we are comfortable with the below terminologies to understand the actual context better.

Kafka

  • Kafka is a distributed streaming platform used for handling real-time data feeds.
  • Kafka is a software platform developed by LinkedIn and donated to the Apache Software Foundation.
  • Apache Kafka is mainly used to publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

Below diagram will help us understand Kafka architecture in detail.

OSP version of Kafka Architecture
  • Topic – An intermediary channel that maintains a list of subscribers to relay messages to that are received from publishers
  • Message – Serialized messages sent to a topic by a publisher which has no knowledge of the subscribers
  • Publisher/Producer – The application that publishes a message to a topic
  • Subscriber/ Consumers – An application that registers itself with the desired topic in order to receive the appropriate messages

Publishers write the messages to the Kafka cluster (topics) of their choice. Each topic can have zero, one, or many subscribers/consumers that subscribe to the data written to it. Unlike other messaging systems, Kafka can support up to 20 partitions per topic.

Each topic can have a maximum of 20 partitions

Messages inside a topic will be distributed across the 20 partitions. Subscribers will receive messages from the specific topic and then proceeds with processing the messages. Real-time data flow in Pega helps us process the records from different partitions.

Let’s now get into the actual discussion on the Queue Processors.

What is Queue Processor?

  • Queue processor in v8 is a replacement of standard agent in Pega.
  • Queue processor in Pega is an internal background process operating on the server/node that runs business logic based on the specific time interval.
  • As the name suggests, each queue processor will be mapped to a topic (queue) & processes the messages published into its respective topic.
  • Queue processor does not require a queue class that should extend from System-Queue- class since it does not involve the typical queue processing logic that a standard agent does. It works with the publish-subscribe paradigm.
  • Each queue processor will have a real-time data flow associated with it. Associated Real-time data flow will be used to receive the messages from the topic and to process it.
  • Queue Processor rules produce maximum throughput than standard agents because of the Kafka cluster and its partitioning capabilities.
  • Each real-time data flow in the queue processor can process messages across 20 partitions, which means that Queue Processor rules can support up to 20 separate processing threads simultaneously without any conflicts.

How to configure a Queue Processor?

  • Queue Processors are instances of class RULE-ASYNC-QUEUEPROCESSOR.
  • We should have pzQueueProcessorAdministrator privilege to create and manage queue processor rules. To view the list of the queue processors in the application, make sure we have the pzQueueProcessorObserver privilege.
  • Queue processors are versioned rules and can be seen under the SysAdmin category in Records explorer.
New Rule form of the Queue Processor rule

Please go through our Requestor types in Pega article to know about Requestor types and how does ASYNCPROCESSOR/ System Runtime Context plays a vital role in Queue Processor execution.

  • When we open any queue processor rule, we could see a banner displaying the below message.
In Pega v 8.1, 8.2, 8.3
From Pega v 8.4

It’s required to ensure that either ASYNCPROCESSOR requestor type or System Runtime Context has access to the configured Queue processor rule. Failing which will not run the Queue processor to process the messages in the queue (topic).

We can classify queue processor configuration into three sections,

  • Node Configuration
  • Business Logic
  • Execution Mode

Node Configuration

  • Refer to our earlier post on Background processing which explains Node, its classification & how node classification contribute towards a proper load balancing.
Toggle option provides us an option to Enable and Disable Queue Processors whenever required.
  • Similar to other background processing rules, each queue processor should be tagged to the node type on which it should be running.
  • Unlike job schedulers, a queue processor can’t be configured to run on multiple node types. It can be enabled and configured to run on only one type of node. The configured node type can have multiple nodes without any restrictions.
In our scenario, Queue processor rule configured to run on Background processing node.

Business Logic

  • This section of the queue processor rules allows us to define the actual business process.
  • The business process in a background processing or an agent rule is nothing but the activity rule that defines the logic to process the received messages from the topic (queue).
In our scenario, sending acknowledgement email is the business process that is used to send out the acknowledgement emails.

Processing Mode

When to process

this field defines when the queue processor should pick a message from the topic and process it.

  • Immediate: Selecting this will instruct the system to process the messages as soon as it gets published to the queue.
  • Delayed: Selecting this will give an additional option to mention a date-time property when we invoke the queue processor rule. Each message in the queue will have it’s own date and time at which it should be processed. In case of delayed processing, messages will be written to the System-Message-QueueProcessor-DelayedItem class and will be published to the topic in the Kafka cluster when the processing time is reached.

Number of threads per node

Please go through our article on real-time data flows which explains how parallel processing is achieved using child requestors.

Max No Of Child requestor = No Of Node * Thread (requestor) count

  • No of nodes in the configured node type. In our scenario, the queue processor is configured to run on Background processing node type and we have one node in the cluster that is tagged as background processing. Hence the number is 1 in our scenario.
  • Thread (requestor) count will be from the value configured in this field (Number of threads per node).

Hence Maximum number of child requestors that real-time data flow of this queue processor can process is (1*10) = 10. Kafka cluster supports 20 partitions in each topic, and our queue processor can process records from 10 partitions at a time since our maximum processing count is 10 (1*10).

Max attempts

this field defines the number of attempts the messages in a topic (queue) should be re-processed in case of failure. When the maximum attempt is reached and the message still has a failure, then it will be placed in the System-Message-QueueProcessor-BrokenItem (broken-queue).

Types of Queue Processors

Queue processors are broadly classified into two types,

  • Standard Queue Processor
  • Dedicated Queue Processor

Standard Queue Processor

  • Standard queue processors are preferred for simple queue management.
  • Standard queue processors will not be having a separate queue processor rule created, rather it just reuses the OOTB pzStandardProcessor queue processor rule with a custom activity for its execution.
  • Standard queue processors do not support the delayed processing of messages. If our requirement requires delayed processing of messages, then we need to end up creating a dedicated queue processor rule.
  • Standard queue processors by default have a maximum of 3 attempts.
Standard queue processor and it’s dynamic logic of executing the queued activity.

Dedicated Queue Processor

  • Dedicated queue processors are preferred if the business process is complex or wants extensive customization in terms of queue processor rule. For example, the number of attempts, the node on which it should run, thread count, etc.
  • Dedicated queue processors will have a separate queue processor rule created using which it processes the messages in the topic(queue).
  • Dedicated queue processors support both immediate and delayed execution.
Dedicated queue processor for sending the claim acknowledgement email.

How is Pega connected to Apache Kafka?

Starting from v8, Apache Kafka gets shipped along with the Pega product. All the connectivity of Apache Kafka would be placed on the server path. When tomcat is used as the application server, Apache Kafka connectivity can be found under /tomcat/Kafka-1.1.0.4/config

How does Pega creates topic for queue processor?

As soon as a dedicated queue processor rule is created in Pega, the system automatically creates a topic for it in the Kafka cluster and makes the queue processor as an active subscriber to the created topic. From then, any messages published to the topic will be received by the subscribers (Queue processor) and will be processed accordingly.

Each Queue Processor invocation is the Publishers and each Queue Processor rule is a subscriber.

The standard queue processor will publish messages to an existing topic in the Kafka cluster (pzStandardProcessor) whereas the dedicated queue processor will publish messages to the custom topic created for the specific queue processor rule.

How is partition maintained for each topic?

Each topic created in Kafka by default will have 20 partitions in it. This can be reduced based on the requirement by using the below DSS,

Server restart is required after updating the DSS for the setting to be effective

For each queue processor invocation, messages will be published into different partitions (between 0-19) randomly. If tomcat is used as the application server, then partitions for a given topic can be seen in /tomcat/kafka-data/<<Queue processor Topic>>

Topic partition in the Kafka cluster for the given queue processor rule.

Queue processors from then use the real-time data flow to receive and process messages/records from each partition using different child requestors. This way of processing records using 20 partitions ensures maximum throughput.

Invoking a Queue Processor Rule

Each queue processor invocation publishes a message to the topic in the Kafka cluster. We can queue (publish) a message to a topic (queue) in below two ways,

  • Using the Queue-For-Processing method in an activity rule.
  • Using Run in Background smart shape in a flow rule.

Queue-For-Processing – Activity Method

Type of Queue

Standard – Select this option when we want to invoke the standard queue processor rule to publish the message to the standard queue (topic) in the Kafka cluster.

  • With this option, we will just be left with an option to enter the activity name that needs to be used to process the message in the queue.
  • This will publish the message to the standard queue (pzStandardProcessor) and the standard queue processor then invokes the specified activity to process the message.
  • ItemID (Primary key of the published message) will be set in a Param pxItemID by default which can be used to further update or dequeue the message from the queue if required.

Dedicated – Select this option when we want to invoke our custom queue processor rule to publish the message to custom queue (topic) in the Kafka cluster.

Dedicated queue with Immediate processing
Dedicated queue with Delayed processing
  • With the Dedicated queue, we will be able to select from the list of queue processors that these messages should be published into.
  • This will publish the message to the selected queue and the queue processor rule receives the message from the topic and processes it.
  • ItemID (Primary key of the published message) will be set in a Param pxItemID by default which can be used to further update or dequeue the messages if required.

Lock using

This option is used to mention the locking mechanism during the message processing.

Possible locking options
  • Primary Page – this can be selected when we want to lock the record & if the primary page has a valid pzInsKey. When this option is selected record will be locked using the value in the pzInsKey property. Processing of the message will fail if the Key does not have a value or if the lock cannot be acquired at the run-time.
  • Key defined on the property – this can be selected if the record is to be locked using a custom property other than pzInsKey. This is preferred when we use external classes in Pega which does not have pzInsKey column. Processing of the message will fail if the Key does not have a value or if the lock cannot be acquired at the run-time.
  • None – this can be selected if the record need not be locked.

Advanced

This section is used to configure some advanced settings when using Queue-For-Processing method.

  • Write now – this can be selected if the message should be published to the queue (topic) immediately or when the next commit happens. Selecting this will publish the message immediately. When this is not selected, it writes the entry into the queue when the next commit happens.
  • Queue current snapshot of the page – this option will be enabled only when locking is selected as NONE
    • If selected– this will take a snapshot of the page and the current snapshot will be used during the message processing. Further updates on the record will not be available at run-time. Taking a snapshot ensures faster processing.
    • If not selected – the record will be opened form the database during the message execution which ensures the recent changes in the record is available for processing.
  • Alternate access group – by default queued entries will be processed using the access group of the OperatorID who published the message. We can use this option to override the access group for any queue entries if required.

Run in Background – Smart Shape

The configuration looks more or less the same when we use a smart shape and activity method to publish a message into the topic (queue). Few differences are listed below,

Option to use a snapshot of the current page will not be available when we use smart shape to queue the message.
Queue Item ID needs a custom property mapping when using the smart shape. The item id of the queue message will get persisted in the work page inside the provided property which can then be used for further processing.

We are done with a basic understanding of how to configure Queue processor and how to use it. Let’s now implement a real-time scenario in Pega to understand the queue processor execution in detail.

Business Scenario

Let’s consider the same scenario discussed above. OSP organization has a claims application, where a customer can raise a claim, and the system (Pega) should send them an email notification to acknowledge the request. Customers who submit the claims are classified as Silver, Gold, and Diamond. Below is the business requirement on sending the notification to the customer,

  • Silver customers should receive the email notification after 8 hours of the request submission.
  • Gold customers should receive the email notification after 6 hours of the request submission.
  • Diamond customers should receive the email notification after 4 hours of the request submission.

Implementation

The time to send the acknowledgement email for each of the claims case should be set based on the customer type. Hence a data transform is used to map the processing time for each case based on the business logic.

Data Transform to map the processing time

Each of the messages to be published into the queue has different execution time and hence involves delayed processing. Hence a dedicated queue processor is required to implement the given scenario.

The created queue processor can be invoked using either Queue-For-Processing method in an activity rule or Run in Background smart shape in the flow rule. Since the claim process is already happening in the flow, we can use smart shape to queue (publish) the message to the topic.

AckNotificationID holds the message key in the pyWorkPage which can then be used for further updates to the message.

What happens at the back-end during the execution?

Let’s now create a claim case to verify the above implementation & see how message gets published to the topic followed by the subscriber (queue processor) receiving the message and processing it.

  • First, let’s verify if the created queue processor is up and running from the admin studio list of queues.
If the processor is not running, then the messages will not be processed
  • Let’s create a claims case with customer type as Gold to examine the execution.
  • OOTB API pzRunInBackground gets invoked during the smart shape execution and publishes the messages into the topic based on the configuration. For the delayed processor, the system writes an entry into a staging table System-Message-QueueProcessor-DelayedItem. Messages will be published to the topic from the staging table when the processing time is reached.

If the queue processor is marked as Immediate, then messages will be directly published into the topic in the Kafka cluster without writing an entry into System-Message-QueueProcessor-DelayedItem class.

Message entry in the staging table for our acknowledgement scenario
  • The items stay in the DelayedItem queue class until the processing time is reached for the message. Until then, we can update or dequeue the message from the queue if required. This can be done using the reference that is set when queueing the message that holds the queue item id(primary key of the queued message).
pyWorkPage that has the ItemID (primary key of the message in the queue)
  • When the processing time of the message in the queue is reached, the system publishes the message (snapshot) into the topic & assigns a random partition (0-19).
In our scenario, system published the message into CLAIMACKNOWLEDGEMENT topic and partition 9
  • When the message gets published to the topic, it triggers the real-time data flow associated with the queue processor rule. Real-time data flow then executes the actual queue processor business logic and processes the message.
Data flow then takes care of the execution of the received messages.
  • Refer to our earlier post on real-time data flow to understand how real-time data flow executes the records from different partitions as child requestors.

How to trace & debug queue processors?

Like job schedulers, queue processors can be directly traced from the admin studio.

  • Dedicated queue processors can be directly traced since it has its own process and business logics defined.
Tracing a dedicated queue processor
  • A standard queue processor can have multiple messages with a different activities to process it. Hence the activity name that needs to be traced should be selected when tracing the standard queue processors.
Tracing a Standard queue processor
  • The step page used to call the Queue-For-Processing method or the Run in background smart shape will be the primary page (pyQueuePage) of the queue processor execution.
Queue page can be of Work-, Data- and any other classes without any restriction
  • If an error occurs during the processing of the message, then an entry will be written into the class System-Message-QueueProcessor-BrokenItem & marks the message as broken-process. The message in the broken item class will have the actual failure reason which can be used for debugging purposes.

Replacing a standard agent with Queue Processor

The common question that everyone has in terms of agents and queue processors are,

  • Will my standard agent rule still work in v8 – YES.
  • Should I replace all my standard agents with queue processors – Partially YES
  • Going forward should I create queue processor to publish and subscribe messages – YES

When we say partial Yes for replacing standard agents with the queue processors, how do we do that?

  • Create a queue processor rule for the existing standard agent & define the same business process by following all the configurations discussed above.
  • Leave the existing entries in the queue of the standard agent as such and update the existing code to publish messages into the topic using a queue processor rather than writing an entry into the standard agent queue.
  • Existing entries in the standard agent queue will keep processing using the standard agent and the queue processor will be processing the new entries in the topic.
  • When all the existing entries in the standard agent queues are processed, then disable the agent rule. Hence queue processor replaces the actual standard agent.
Disabling the standard agent rule

Pointers to Remember

  • Queue processors will be up and running only when the ASYNCPROCESSOR or the System Runtime Context has the context of the queue processor rule. Refer to our older post to see how to configure it.
  • Queue processor needs at least one of the Stream nodes to be up and running for its processing.
  • If the stream node goes offline or it becomes unavailable, Messages will not be published to the topic in the Kafka cluster and hence processing will not happen.
When a stream node is not available and you queue a message, the system saves the message in a database and pushes it to Kafka when the stream node is running. The system does not process the messages in a queue until at least one stream mode is available.
  • Even if the stream nodes go down in between the process, Kafka is capable enough of retaining the original message in its disk. Hence it ensures 0% data loss.
  • Queue processors do not have any agent schedules created rather it directly executes the messages from the topic.
  • Queue processor does not follow the typical queue management process that a standard agent follows. It uses the publish-subscribe paradigm to process the messages.
  • Apache Kafka is used for the streaming process that ensures secured data transmission with maximum throughput.
  • Each topic in Kafka can have a maximum of 20 partitions. Each queue processor rule will be an active subscriber of any one of the topics in the Kafka cluster.
  • Real-time data flow of each queue processor rule takes care of processing records in each partition as separate child requestors.
  • Standard queue processors can be used for simple tasks that should be executed immediately.
  • Dedicated queue processors can be used for complex business logic and for the process which requires delayed processing.

We are now at the end of the post. We hope you got the basics of queue processor along with its configuration, back-end processing & troubleshooting. We wanted the article to be up-to-date and didn’t cover much about standard agents. If you want us to publish an article about standard agents, then feel free to comment on this post or raise your request here.

We are glad to publish the article requested by @ Mukkram on “Queue Processors”. If you want us to publish any articles of your choice, please fill out this form & we will make sure it gets published.

Stay tuned for our next post on Reports 😎

OSP TEAM
Written by
OSP Editorial Team

Recent Jobs from our community

loading
View more Jobs on OSP Forum
Join the discussion

Feel free to post your questions here about this topic if any. We will definitely get back to you ASAP !!!
If you have any off-topic questions, Let's discuss at OSP Forum

59 comments
  • Thank you for your very detailed explanation of Queue Processors.
    We should have a DataSet rule and DataFlow rule created by Pega to handle the Processing of Items using Queue Processors.

    Can you pls provide the rule details so that we can see how it is implemented.

    • DataSet in real-time data flows is used to publish the messages to the topic and make the subscriber receive the messages. In the queue processor, all of these is handled in Server level Kafka configuration files Consumer.properties & producer.properties. It is similar to having a data set connected with Kafka as a source, but the difference is it’s implemented at the server level.

      Pega has disabled the view of data flow rule created for real-time data flow in the queue processor. Pegas has also hidden the standard queue processor from the list of queue processor which we assume it could be because of security reasons. Because data flow rule updates will be reflected in the real-time data flow work object immediately which might affect the process.

      But you can see the data flow work object that Pega has created for each queue processor as mentioned in our article.

    • First of all Thanks alot and Hatsoff for you guys.

      I understood Queue Processing part very clearly. For Kafka item, Do we have any other post for Configuration related?

      like Sysadmin-> Kafka instances.

      Sorry, if i made wrong query. Just wanted to learn how to link Kafka to Pega.

      Usage of it is explained properly with Queue Processor.

      • Thanks a lot for your appreciation.

        WRT Queue Processor in Pega, Kafka connectivity has been implemented by Pega at the server level which is explained in our post.

        We will create Kafka instances in Pega from the SysAdmin category only when we are connecting to custom Kafka Server. We will try to cover this in a seperate post

  • Pega has provided a link to get the list of the Scheduled , Broken queue, Ready to process queue items.
    Schedule and Broken queue items are stored in tables so we can have our own custom report.
    How to create a custom report on the Ready to Process items..How is Pega handling that in Admin studio.

  • As Queue Processor uses Real Time Processing, will the count of RealTime nodes and other configurations done for RealTime nodes have any impact on queue processors.

    • Not actually… tats the scenario when we use real-time data flow stand-alone. But in Queue Processor, we select the node type on which it should be running, so those nodes do the job of the real-time nodes.

  • We have a Delayed Queue Processor with 3 Max Attempts. If the first attempt fails, Is there any possibility to delay the second attempts without executing immediately.

  • In case of delayed processing, messages will be written to the System-Message-QueueProcessor-DelayedItem class and will be published to the topic in the Kafka cluster when the processing time is reached.
    Is there any agent that moves the items from Delayed Items table to Kafka Topic ?

    • Instead of agent may be pega used again one more queue processor internally to push it to the topic… like queue for processing

      • You might be right. Any Idea on the Queue Processor rule name which handle moving of schedule queue items to kafka topics

  • For “Lock Using Key defined on Property” configuration in the Queue-For-Processing method…we have only single property reference to define the key of external class. How can we handle the external class with multiple keys.

    • The single property reference is not provided to define the keys of external class. It should be used to refer a custom property that holds the pzInsKey of external class instance

  • Good Job 3 CLSA guys.(OneStop Team) ..

    The Artical was very clear and helped me in completed understading of the new pega features queue processor.

    Couple of questions:

    Will the SLA’s now use standard queue processors or Dedicated queue processor?

    What about wait shapes those are also one type of queue processors??

    So when we queue the message into topic we get pxItemID how to update message using this?

    • Thank you so much for your appreciation.

      1) Pega has not replaced SLA entries & wait entries to be processed by the queue processor. It still uses the default standard agent ServiceLevelEvents
      2) We cannot directly update the queue entry. In case of any update, we need to dequeue the old entry and enqueue the new entry. You can refer to the activity
      pzRunInBackground
      to get the logic of enqueue and dequeue.

  • Not able to submit new article request. Please note down: I need notification rule article.

  • Thanks for the detailed explanation. Can you also compare this with Pega 7 queue processors. What’s the difference?

    • You mean to say Pega 7 Standard agents and Pega 8 Queue processor comparison?

      Sure we will try to update this post with the requested information or write a new post on Standard agents and have the comparison there.

  • Iam having great time with this Blog and the Content is Superb.You Guys are doing an Awesome Job Keep up the Good Work.

  • You guys are awesome. This article really helped me to understand the concept with clarity along with example. Keep posting such wonderful articles guys. Thanks a lot for sharing this

  • Hello Team,

    This is really a nice article, appreciate your work. Just want to give my opinion on replacing the existing inflight standard agents in prod, I don’t think that would be a nice idea as it involves lots of rework, testing and maintenance. I believe going forward in future we can use this feature rather than replacing the existing standard agents with the queue processor..!

  • Hi Team,
    We have a requirement of delaying the real time process data flows.
    Any idea that this can be achieved by Queue processor? Because we have to delay of processing from Kafka topic every time.

  • Hi, thanks for the very informative article! However I have a problem: when I create a dedicated queue processor, the associated data flow is not created, thus the specified activity is not called and also the status of the queue processor is empty in the admin studio. I ensured that the queue processor using the correct system runtime context, the background and stream nodes are configured properly.
    The problem seems to be the missing data flow work object. Should I create it manually somehow?

      • Yes, it’s there and also shows the number of processed items but the status is empty and if I click on the context menu (three dots at the end of the row) and select View Data Flow, I get an error that the Data Flow instance ID is empty

        • Open instances of class Pega-DM-DDF-Work from App Explorer and filter with ID column with your Queue Processor Name. If you don’t find an entry, we would suggest to delete and create the queue processor rule again with SysAdmin4 access.

          • Hi thanks for the quick answer! I used a sysadmin access group which has sysadm4 role. Tried to delete/receate the queue processor a few times but no success.
            I also noticed that the messages I created are in Ready To Process state instead of Processed, however the record disappears from the pr_sys_delayed_queue table after the specified delay time.
            Do you have any idea why this happens? Thanks a lot!

          • Hi, one addition: you are right, I got an exception:
            Unable to authorize flow execution: RULE-OBJ-FLOW PEGA-DM-DDF-WORK NEWWORK…
            I checked the flow and found it requires two privileges: AllFlows and FlowNewWork
            I cloned role PegaRules:SysAdm4 to my own sysadmin role and added the missing FlowNewWork privilege to Work- class.
            Also I selected to inherit privileges from class hierarchy.
            Still no success. When I try to trace the creation of Queue Processor, there are no errors. It seems the associated Data Flow is created in the background somehow.

          • Hi,

            Finally I have solved the issue. The solution was that however in Pega 8.3 I see a notice on the top when creating Queue Processor, that this QP will use System Runtime Configuration, you need to configure ASYNCPROCESSOR requestor to have sysadmin access group otherwise it can’t create the data flow!

  • thanks for sharing the information. Really useful.

    Have a question here.

    How is the Data flow rule connected with queue processor (let’s say dedicated queue processor). Every time a message is published to the topic, i understand that real time data flow is triggered. These data flows are configured with different shapes like data transform; change; data set but how the activity rule in queue processor is invoked after the data flow rule is invoked. Am not seeing the connection on how queue processor rule and data flow is connected? please shed some light on this.

    TIA

    • Thank you so much for your appreciation @Kumar

      The created data flows has activity as the destination. And that activity is out queue processor activity which takes the responsibility of executing the logic. For some reason, these data flows are not shown/exposed to the developers.

  • It is simply awesome. Thanks a lot.

    A quick question. As generally we tag the util nodes as Background processing node. So if the Queue processor is configured to run on Background processing nodes, then I believe all the child requestors are going to be running under util node. Then I am wondering what will be the relation between these child requestors running in util node to the Stream node. Where will Stream node come into picture? If you can try to explain please? Thanks.

  • A very detailed post on Queue Processor. Thanks for collecting all the information and posting it as one article. It really helps as you get all the information at one place.

  • Hi Team,

    We have 2 nodes of type Background processing. We want to move our agents to QP so below are the changes that we are planning to make on Node Classification side

    Moving 2 nodes to Stream Node type
    Moving 2 nodes to Real-time Node type
    Keeping 2 nodes to Background processing

    Q1.) Since the QP will be executed on the BP node type of nodes do we need the Real-time Node type for Data flow execution or the same Background processing nodes will take care of the same?

    Q2.) Having doubt on partitions created on nodes . Since we have 2 background processing nodes will there be 40 partitions , 20 on each node?

    Q3.) What should we put on the no of threads options in QP if we have 2 nodes of Stream and BP type?

  • Every thing is looking perfect (very good explanation) but only observation is; let’s not club Queue and Topic in messaging technology. In messaging tech Queues are dedicated for Point to Point communication and Topics are dedicated for Publish and Subcribe communication.

    Both are not same 🙂

  • Hi, thanks for this detailed explanation on QP.
    I have a question, suppose a QP is processing an item and due to some issue processing failed. Now, how can we requeue the failed item for processing as similar to what we had for agent in Admin studio?

  • Thanks for sharing wonderfull concepts.

    Can you please share the steps for how to deque the item which is scheduled.
    i tried deleting the record from delayed queues table, its not working.

  • We have queue processor where we are pushing some records but they are gng to ready to process but not getting picking up and in the data flow we can see stale threads . Could you please help us in debugging the issue

  • I want to replace my standard agent to queue Processor. The activity responsible to call the standard agent is using step page of System-Queue-DefaultEntry , i.e.Testpage1, property set method has also applied on the same page. The queue Processor/standard agent refers to an activity which is in data class, another steppage is present for this class in the activity, i.e.Testpage2.
    How can I configure the queue-for-processing method replacing queue-for-agent in this scenario? and what should be done with System-Queue-DefaultEntry and its property set ?
    As of now, If I try to execute queue-for-processing on the Testpage1 it is unable to call the agent activity from another class, of steppage Testpage2.

  • We just upgraded to Agile Studio for Pega Platform 8.7.4. The queue processor “pyFTSIncrementalIndexer” is not processing scheduled tasks from today back to May 24, 2023. The processor is “running” but processing nothing. Nothing created in the timeframe of May 24th till now in Agile Studio (like ISSUES or BUGS) is showing up when using the Search. I’ve tried stopping and starting the processor and re-indexing everything. Still nothing is showing up in Agile Studio search. How do I get that processor to start “processing” the 811 scheduled items?