Class NonThreadedProcessingUnit

java.lang.Object
org.apache.uima.collection.impl.cpm.engine.NonThreadedProcessingUnit

public class NonThreadedProcessingUnit extends Object
The Class NonThreadedProcessingUnit.
  • Field Details

    • threadState

      public int threadState
      The thread state.
    • casPool

      protected CPECasPool casPool
      The cas pool.
    • relaseCAS

      protected boolean relaseCAS
      The relase CAS.
    • cpm

      protected CPMEngine cpm
      The cpm.
    • workQueue

      protected BoundedWorkQueue workQueue
      The work queue.
    • outputQueue

      protected BoundedWorkQueue outputQueue
      The output queue.
    • mConverter

      protected CasConverter mConverter
      The m converter.
    • processingUnitProcessTrace

      protected ProcessTrace processingUnitProcessTrace
      The processing unit process trace.
    • processContainers

      protected LinkedList processContainers
      The process containers.
    • numToProcess

      protected long numToProcess
      The num to process.
    • casList

      protected CAS[] casList
      The cas list.
    • statusCbL

      protected ArrayList statusCbL
      The status cb L.
    • notifyListeners

      protected boolean notifyListeners
      The notify listeners.
    • conversionCas

      protected CAS conversionCas
      The conversion cas.
    • artifact

      protected Object[] artifact
      The artifact.
    • conversionCasArray

      protected CAS[] conversionCasArray
      The conversion cas array.
    • timer

      protected UimaTimer timer
      The timer.
    • threadId

      protected String threadId
      The thread id.
    • cpeConfiguration

      protected CpeConfiguration cpeConfiguration
      The cpe configuration.
    • casCache

      private CAS[] casCache
      The cas cache.
  • Constructor Details

    • NonThreadedProcessingUnit

      public NonThreadedProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)
      Initialize the PU.
      Parameters:
      acpm - - component managing life cycle of the CPE
      aInputQueue - - queue to read from
      aOutputQueue - - queue to write to
    • NonThreadedProcessingUnit

      public NonThreadedProcessingUnit(CPMEngine acpm)
      Instantiates a new non threaded processing unit.
      Parameters:
      acpm - the acpm
  • Method Details

    • setInputQueue

      public void setInputQueue(BoundedWorkQueue aInputQueue)
      Alternative method of providing a queue from which this PU will read bundle of Cas.
      Parameters:
      aInputQueue - - read queue
    • setOutputQueue

      public void setOutputQueue(BoundedWorkQueue aOutputQueue)
      Alternative method of providing a queue where this PU will deposit results of analysis.
      Parameters:
      aOutputQueue - - queue to write to
    • setCPMEngine

      public void setCPMEngine(CPMEngine acpm)
      Alternative method of providing the reference to the component managing the lifecycle of the CPE.
      Parameters:
      acpm - - reference to the contrlling engine
    • cleanup

      public void cleanup()
      Null out fields of this object. Call this only when this object is no longer needed.
    • setNotifyListeners

      public void setNotifyListeners(boolean aDoNotify)
      Set a flag indicating if notifications should be made via configured Listeners.
      Parameters:
      aDoNotify - - true if notification is required, false otherwise
    • addStatusCallbackListener

      public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
      Plugs in Listener object used for notifications.
      Parameters:
      aListener - - BaseStatusCallbackListener instance
    • getCallbackListeners

      public ArrayList getCallbackListeners()
      Returns list of listeners used by this PU for callbacks.
      Returns:
      - lif of BaseStatusCallbackListener instances
    • removeStatusCallbackListener

      public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
      Removes given listener from the list of listeners.
      Parameters:
      aListener - - object to remove from the list
    • setProcessingUnitProcessTrace

      public void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
      Plugs in ProcessTrace object used to collect statistics.
      Parameters:
      aProcessingUnitProcessTrace - - object to compile stats
    • setUimaTimer

      public void setUimaTimer(UimaTimer aTimer)
      Plugs in custom timer used by the PU for getting time.
      Parameters:
      aTimer - - custom timer to use
    • setContainers

      public void setContainers(LinkedList processorList)
      Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.
      Parameters:
      processorList - CASProcessor to be added to the processing pipeline
    • disableCasProcessor

      public void disableCasProcessor(int aCasProcessorIndex)
      Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.
      Parameters:
      aCasProcessorIndex - - location in the pipeline of the Cas Processor to delete
    • disableCasProcessor

      public void disableCasProcessor(String aCasProcessorName)
      Alternative method to disable Cas Processor. Uses a name to locate it.
      Parameters:
      aCasProcessorName - - a name of the Cas Processor to disable
    • enableCasProcessor

      public void enableCasProcessor(String aCasProcessorName)
      Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.
      Parameters:
      aCasProcessorName - - name of the Cas Processor to enable
    • analyze

      protected boolean analyze(Object[] aCasObjectList, ProcessTrace pTrTemp) throws Exception
      Analyze.
      Parameters:
      aCasObjectList - the a cas object list
      pTrTemp - the tr temp
      Returns:
      true, if successful
      Throws:
      Exception - the exception
    • setReleaseCASFlag

      public void setReleaseCASFlag(boolean aFlag)
      Sets the release CAS flag.
      Parameters:
      aFlag - the new release CAS flag
    • setCasPool

      public void setCasPool(CPECasPool aPool)
      Sets the cas pool.
      Parameters:
      aPool - the new cas pool
    • postAnalysis

      private void postAnalysis(Object[] aCasObjectList, boolean isCasObject, Object[] casObjects, ProcessTrace aProcessTr, boolean doneAlready) throws Exception
      Post analysis.
      Parameters:
      aCasObjectList - the a cas object list
      isCasObject - the is cas object
      casObjects - the cas objects
      aProcessTr - the a process tr
      doneAlready - the done already
      Throws:
      Exception - -
    • doReleaseCasProcessor

      private void doReleaseCasProcessor(ProcessingContainer aContainer, CasProcessor aCasProcessor)
      Do release cas processor.
      Parameters:
      aContainer - the a container
      aCasProcessor - the a cas processor
    • doEndOfBatch

      private void doEndOfBatch(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, int howManyCases)
      Do end of batch.
      Parameters:
      aContainer - the a container
      aProcessor - the a processor
      aProcessTr - the a process tr
      howManyCases - the how many cases
    • handleErrors

      private boolean handleErrors(Throwable e, ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTrace, Object[] aCasObjectList, boolean isCasObject) throws Exception
      Main routine that handles errors occuring in the processing loop.
      Parameters:
      e - - exception in the main processing loop
      aContainer - - current container of the Cas Processor
      aProcessor - - current Cas Processor
      aProcessTrace - - an object containing stats for this procesing loop
      aCasObjectList - - list of CASes being analyzed
      isCasObject - - determines type of CAS in the aCasObjectList ( CasData or CasObject)
      Returns:
      boolean
      Throws:
      Exception - -
    • invokeCasObjectCasProcessor

      private void invokeCasObjectCasProcessor(ProcessingContainer container, CasProcessor processor, Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject) throws Exception
      Invoke cas object cas processor.
      Parameters:
      container - the container
      processor - the processor
      aCasObjectList - the a cas object list
      pTrTemp - the tr temp
      isCasObject - the is cas object
      Throws:
      Exception - -
    • convertCasDataToCasObject

      private void convertCasDataToCasObject(int casIndex, String aContainerName, Object[] aCasObjectList) throws Exception
      Convert cas data to cas object.
      Parameters:
      casIndex - the cas index
      aContainerName - the a container name
      aCasObjectList - the a cas object list
      Throws:
      Exception - -
    • invokeCasDataCasProcessor

      private void invokeCasDataCasProcessor(ProcessingContainer container, CasProcessor processor, Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry) throws Exception
      Invoke cas data cas processor.
      Parameters:
      container - the container
      processor - the processor
      aCasObjectList - the a cas object list
      pTrTemp - the tr temp
      isCasObject - the is cas object
      retry - the retry
      Throws:
      Exception - -
    • containerDisabled

      private boolean containerDisabled(ProcessingContainer aContainer)
      Container disabled.
      Parameters:
      aContainer - the a container
      Returns:
      true, if successful
    • isProcessorReady

      protected boolean isProcessorReady(int aStatus)
      Check if the CASProcessor status is available for processing.
      Parameters:
      aStatus - the a status
      Returns:
      true, if is processor ready
    • filterOutTheCAS

      private boolean filterOutTheCAS(ProcessingContainer aContainer, boolean isCasObject, Object[] aCasObjectList)
      Filter out the CAS.
      Parameters:
      aContainer - the a container
      isCasObject - the is cas object
      aCasObjectList - the a cas object list
      Returns:
      true, if successful
    • notifyListeners

      protected void notifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
      Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es.
      Parameters:
      aCas - - object containing an array of OR a single instance of Cas
      isCasObject - - true if instance of Cas is of type Cas, false otherwise
      aEntityProcStatus - - status object that may contain exceptions and trace
    • doNotifyListeners

      protected void doNotifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
      Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.
      Parameters:
      aCas - - Cas to pass to listener
      isCasObject - - true is Cas is of type CAS
      aEntityProcStatus - - status object containing exceptions and trace info
    • clearCasCache

      private void clearCasCache()
      Clear cas cache.
    • pauseContainer

      private boolean pauseContainer(ProcessingContainer aContainer, Exception aException, String aThreadId)
      Determines if the thread should be paused. Pausing container effectively pauses ALL Cas Processors that are managed by the container. The pause is needed when there are multiple pipelines shareing a common service. If this service dies (Socket Down), only one thread should initiate service restart. While the service is being restarted no invocations on the service should be done. Containers will be resumed on successfull service restart.
      Parameters:
      aContainer - - a container that manages the current Cas Processor.
      aException - the a exception
      aThreadId - - id of the current thread
      Returns:
      true, if successful
    • handleServiceException

      private void handleServiceException(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, Exception ex) throws Exception
      Handle service exception.
      Parameters:
      aContainer - the a container
      aProcessor - the a processor
      aProcessTr - the a process tr
      ex - the ex
      Throws:
      Exception - -
    • handleSkipCasProcessor

      private void handleSkipCasProcessor(ProcessingContainer aContainer, Object[] aCasObjectList, boolean isLastCP) throws Exception
      Handle skip cas processor.
      Parameters:
      aContainer - the a container
      aCasObjectList - the a cas object list
      isLastCP - the is last CP
      Throws:
      Exception - -
    • getBytes

      protected long getBytes(Object aCas)
      Returns the size of the CAS object. Currently only CASData is supported.
      Parameters:
      aCas - - Cas to get the size for
      Returns:
      the size of the CAS object. Currently only CASData is supported.
    • releaseCases

      private void releaseCases(Object aCasList, boolean lastProcessor, String aName)
      Conditionally, releases CASes back to the CAS pool. The release only occurs if the Cas Processor is the last in the processing chain.
      Parameters:
      aCasList - - list of CASes to release
      lastProcessor - - determines if the release takes place
      aName - the a name
    • stopCasProcessors

      public void stopCasProcessors(boolean kill)
      Stops all Cas Processors that are part of this PU.
      Parameters:
      kill - - true if CPE has been stopped before finishing processing during external stop