# by Macro Alpha
import json
import math
import multiprocessing
import os
import random
import sys
from collections import deque
from multiprocessing import Pool
# --- Constants for Neuron and Synapse Models ---
# These constants define the biophysical properties and dynamics of neurons and synapses.
# They are chosen to be biologically plausible, though simplified for computational efficiency.
# Neuron parameters (AdEx model based)
DEFAULT_RESTING_POTENTIAL = -70.0 # mV (Typical resting membrane potential of a neuron, maintained by ion pumps and leak channels)
DEFAULT_RESET_POTENTIAL = -65.0 # mV (Membrane potential after a spike, typically -65 to -55 mV)
DEFAULT_SPIKE_THRESHOLD = -50.0 # mV (Critical voltage for action potential, range: -55 to -40 mV; adjusted for excitability)
DEFAULT_PEAK_POTENTIAL = 30.0 # mV (Peak voltage during an action potential, range: +10 to +50 mV)
DEFAULT_MEMBRANE_CAPACITANCE = 0.1 # nF (Neuron's ability to store charge, typically 0.05-0.2 nF for soma)
# ADJUSTED for stability and activity: Increased leak conductance to make neurons less excitable.
DEFAULT_LEAK_CONDUCTANCE = 0.0008 # uS (Membrane permeability to ions at rest)
# ADJUSTED for stability and activity: Increased adaptation amplitude for more controlled firing.
DEFAULT_ADAPTATION_AMPLITUDE = 0.8 # nS (Strength of spike-frequency adaptation current)
DEFAULT_ADAPTATION_TIME_CONSTANT = 250.0 # ms (Time constant for adaptation current decay)
DEFAULT_EXPONENTIAL_SLOPE = 3.0 # mV (Sharpness of spike initiation in AdEx model)
ABSOLUTE_REFRACTORY_PERIOD = 2 # ms (Period after spike where neuron cannot fire)
RELATIVE_REFRACTORY_PERIOD_DURATION = 5 # ms (Period after absolute refractory where stronger stimulus is needed)
RELATIVE_REFRACTORY_THRESHOLD_BOOST = 15.0 # mV (Increase in spike threshold during relative refractory period)
DEFAULT_NOISE_AMPLITUDE = 6.0 # mV (Amplitude of intrinsic random fluctuations in membrane potential)
# New constants for neuron heterogeneity
NEURON_PARAMETER_VARIATION_FACTOR = 0.2 # Factor for random variation in neuron parameters (+/- 20%).
# New constants for simplified oscillatory component
DEFAULT_OSCILLATION_FREQUENCY_MIN = 0.5 # Hz (Minimum intrinsic neuronal oscillation frequency)
DEFAULT_OSCILLATION_FREQUENCY_MAX = 100.0 # Hz (Maximum intrinsic neuronal oscillation frequency)
OSCILLATORY_CURRENT_AMPLITUDE = 0.5 # mV (Amplitude of the intrinsic oscillatory current)
OSCILLATORY_COUPLING_STRENGTH = 0.05 # Strength of coupling for phase synchronization.
OSCILLATION_FREQUENCY_ADJUSTMENT_RATE = 0.0001 # Rate of intrinsic oscillation frequency modulation.
# Synapse parameters (simplified kinetics)
# Time constants for exponential decay of postsynaptic currents (in simulation steps/ms)
AMPA_DECAY_TAU = 10.0 # ms (Typically 1-20 ms) (Decay time for AMPA currents, fast excitation)
NMDA_DECAY_TAU = 150.0 # ms (Typically 50-300 ms) (Decay time for NMDA currents, slower excitation and Ca2+ influx)
GABA_A_DECAY_TAU = 25.0 # ms (Typically 5-50 ms) (Decay time for GABA-A currents, fast inhibition)
# NMDA receptor properties
NMDA_MG_CONCENTRATION = 1.0 # mM (Physiological range: 0.8-1.2 mM) (Extracellular magnesium concentration blocking NMDA receptors)
NMDA_MG_BLOCK_FACTOR = 0.07 # mV^-1 (Approx. 0.06-0.08 mV^-1) (Factor for voltage dependence of NMDA Mg2+ block)
# Short-Term Plasticity (STP) parameters
STP_FACILITATION_TAU = 250.0 # ms (Range: 50-500 ms) (Time constant for decay of synaptic facilitation)
STP_DEPRESSION_TAU = 500.0 # ms (Range: 100-1000 ms) (Time constant for recovery from synaptic depression)
STP_FACILITATION_INCREMENT = 0.1 # Increment in facilitation factor per presynaptic spike.
STP_DEPRESSION_DECREMENT = 0.1 # Decrement in utilization factor (vesicle depletion) per presynaptic spike.
# Spike-Timing Dependent Plasticity (STDP) parameters
STDP_LTP_WINDOW = 20.0 # ms (Typically 10-30 ms)
STDP_LTD_WINDOW = 35.0 # ms (Typically 20-50 ms)
STDP_LTP_MAGNITUDE = 0.05 # (Representing 1-5% change in weight per event, adjusted for model scale, chose mid-range) - ADJUSTED for learning
STDP_LTD_MAGNITUDE = 0.04 # (Representing 1-4% change in weight per event, adjusted for model scale, chose mid-range) - ADJUSTED for learning
STDP_TAU_LTP = 15.0 # ms (Typically 10-20 ms)
STDP_TAU_LTD = 30.0 # ms (Typically 20-40 ms)
STDP_MAX_WEIGHT = 5.0 # Maximum allowed synaptic weight (Variable (Context-Dependent), keeping current value as a reasonable start)
STDP_MIN_WEIGHT = 0.0 # Minimum allowed synaptic weight (Can approach zero for complete depression)
# Homeostatic Plasticity (Synaptic Scaling) parameters
HOMEOSTASIS_TARGET_FIRING_RATE = 0.5 # Hz (Spikes per second, convert to per step for model, chose mid-range of 0.1-1.0)
HOMEOSTASIS_LEARNING_RATE = 0.01 # Rate at which homeostatic plasticity adjusts synaptic weights (Increased for faster adjustment)
HOMEOSTASIS_WINDOW_SIZE = 1000 # Number of steps to average firing rate over (Hours to Days (Scaled for simulation steps), choosing a large number)
# Metaplasticity parameters (modulates STDP learning rates)
METAPLASTICITY_TAU = 5000.0 # ms (Hours to Days (Scaled for simulation steps), choosing a large number)
METAPLASTICITY_FACTOR = 0.00002 # How much metaplasticity changes STDP rates (Variable (Context-Dependent), keeping current value as a reasonable start) - ADJUSTED for learning
# Neuromodulation parameters (simplified global effects)
DOPAMINE_EFFECT_ON_LTP = 0.8 # Multiplier for STDP LTP under dopamine (Variable (Context-Dependent), keeping current value as a reasonable start) - ADJUSTED for learning
SEROTONIN_EFFECT_ON_THRESHOLD = 28.0 # Increased to make serotonin more inhibitory ADJUSTED from 25.
SEROTONIN_EFFECT_ON_ADAPTATION = 80.0 # Further significant increase for stronger inhibitory effect
DOPAMINE_ADJUSTMENT_RATE = 0.0008 # Rate at which dopamine level changes (Variable (Context-Dependent), very small rate)
SEROTONIN_ADJUSTMENT_RATE = 0.0008 # Rate at which serotonin level changes (Variable (Context-Dependent), very small rate)
NEUROMODULATOR_DECAY_RATE = 0.0005 # Rate at which neuromodulators decay to baseline (Variable (Context-Dependent), very small rate) - ADJUSTED for responsiveness
# New constants for complex regulation
DOPAMINE_EFFECT_ON_NOISE = 2.0 # How much dopamine influences noise amplitude (Variable (Context-Dependent), keeping current value as a reasonable start)
STRESS_EFFECT_ON_NOISE = 5.0 # How much stress influences noise amplitude (Variable (Context-Dependent), adjusted)
STRESS_EFFECT_ON_THRESHOLD = 0.5 # How much stress influences spike threshold (Variable (Context-Dependent), keeping current value as a reasonable start)
# New neuromodulator specific effects
DOPAMINE_EXCITABILITY_BOOST = 100.0 # Reduced for more controlled excitability
SEROTONIN_INHIBITION_BOOST = 0.1 # Multiplier for GABAergic synapse weights (Variable (Context-Dependent), keeping current value as a reasonable start)
ACETYLCHOLINE_EFFECT_ON_MEMBRANE_CAPACITANCE = 0.07 # How much ACh influences membrane capacitance (Variable (Context-Dependent), keeping current value as a reasonable start)
ACETYLCHOLINE_EFFECT_ON_EXPONENTIAL_SLOPE = 0.13 # How much ACh influences exponential slope (Variable (Context-Dependent), keeping current value as a reasonable start) # ADJUSTED
ACETYLCHOLINE_ADJUSTMENT_RATE = 0.0008 # Rate at which ACh level changes (Variable (Context-Dependent), very small rate)
ACETYLCHOLINE_DECAY_RATE = 0.003 # Variable (Context-Dependent), very small rate
# NEW: Norepinephrine parameters
NOREPINEPHRINE_ADJUSTMENT_RATE = 0.0008 # Variable (Context-Dependent), very small rate
NOREPINEPHRINE_DECAY_RATE = 0.003 # Variable (Context-Dependent), very small rate
NOREPINEPHRINE_EFFECT_ON_EXCITABILITY = 5.0 # Variable (Context-Dependent), keeping current value as a reasonable start)
NOREPINEPHRINE_EFFECT_ON_STDP_WINDOW = 5.0 # ms, NA can broaden STDP windows (Variable (Context-Dependent), keeping current value as a reasonable start)
# Synaptic delay parameters
MIN_SYNAPTIC_DELAY = 0.5 # ms (Typically 0.5-1.0 ms)
MAX_SYNAPTIC_DELAY = 10.0 # ms (Range: 1-10 ms for short-range unmyelinated axons plus synaptic delay)
# Glial cell (Astrocyte) parameters
ASTROCYTE_ACTIVITY_SENSITIVITY = 0.1 # How much neuronal activity influences astrocyte state (Variable (Context-Dependent), keeping current value as a reasonable start)
ASTROCYTE_GLIOTRANSMITTER_DECAY = 0.05 # Rate of gliotransmitter decay (Variable (Context-Dependent), keeping current value as a reasonable start)
GLIOTRANSMITTER_SYNAPSE_MODULATION_STRENGTH = 0.5 # Reduced to lessen the effect, especially on inhibitory synapses.
GLIOTRANSMITTER_NEURON_MODULATION_STRENGTH = 0.5 # How much gliotransmitters affect neuron thresholds (mV) (Variable (Context-Dependent), keeping current value as a reasonable start)
ASTROCYTE_REUPTAKE_FACTOR = 0.2 # Increased slightly to make astrocyte reuptake more noticeable
ASTROCYTE_METABOLIC_SUPPORT_RATE = 0.001 # Rate at which astrocytes replenish metabolic resources (Variable (Context-Dependent), very small rate)
# NEW: Microglia parameters for synaptic pruning
MICROGLIA_ACTIVITY_THRESHOLD = 0.1 # Below this activity, microglia become active (Variable (Context-Dependent), keeping current value as a reasonable start)
MICROGLIA_PRUNING_PROBABILITY = 0.0002 # Variable (Context-Dependent, low in adult brain) - ADJUSTED for stability
MICROGLIA_PRUNING_WINDOW = 1000 # Steps over which to average synapse activity for pruning (Hours to Days (Scaled for simulation steps), choosing a large number)
SYNAPSE_INACTIVITY_THRESHOLD = 0.01 # Synapses below this weight are candidates for pruning (Variable (Context-Dependent), keeping current value as a reasonable start) # ADJUSTED
STRESS_EFFECT_ON_PRUNING = 0.5 # Reduced to prevent saturation in tests
ASTROCYTE_MICROGLIA_PRUNING_MODULATION = 10.0 # Significantly increased for stronger suppression in tests
# NEW: Oligodendrocyte parameters for myelin plasticity
OMP_ACTIVITY_THRESHOLD = 0.2 # Below this activity, myelination might decrease (Variable (Context-Dependent), keeping current value as a reasonable start)
OMP_MYELIN_GROWTH_RATE = 0.001 # Rate at which myelin increases (Variable (Context-Dependent), very small rate)
OMP_MYELIN_DECAY_RATE = 0.0005 # Rate at which myelin decreases (Variable (Context-Dependent), very small rate)
MAX_MYELIN_FACTOR = 0.7 # Max reduction in delay (0.5-0.9)
MIN_MYELIN_FACTOR = 0.0 # Min reduction in delay (0.0)
# NEW: Ion Channel Dynamics Parameters
K_EXT_BASELINE = 4.0 # mM (Typically 3.0-5.0 mM)
K_EXT_SPIKE_EFFLUX = 1e-4 # mM (Approximate change per spike, 1x10^-5 - 1x10^-3)
K_EXT_BUFFER_RATE = 0.005 # Reduced buffer rate slightly to make changes more persistent
NA_INT_BASELINE = 10.0 # mM (Typically 5.0-15.0 mM) # ADJUSTED
NA_INT_SPIKE_INFLUX = 1e-4 # mM (Approximate change per spike, 1x10^-5 - 1x10^-3)
NA_INT_BUFFER_RATE = 0.01 # Variable (Context-Dependent), choosing a small rate
ION_EFFECT_ON_RESTING_POTENTIAL = 2.0 # Variable (Context-Dependent), keeping current value as a reasonable start)
ION_EFFECT_ON_THRESHOLD = -10.0 # ADJUSTED: Negative value for facilitating effect of elevated Na_int
# NEW: Calcium Dynamics Parameters for Plasticity
CA_DECAY_TAU = 50.0 # ms (Range: 10-100 ms for rapid transients)
CA_NMDA_SCALE = 0.05 # Variable (Context-Dependent), keeping original value
CA_SPIKE_INCREMENT = 0.1 # Variable (Context-Dependent), keeping original value
CA_LTP_THRESHOLD = 0.4 # uM (Typically 0.3-0.5 uM peak for LTP)
CA_LTD_THRESHOLD = 0.15 # uM (Typically 0.1-0.2 uM for LTD)
CA_METAPLASTICITY_SENSITIVITY = 0.1 # How much calcium influences metaplasticity factor (Variable (Context-Dependent), keeping current value as a reasonable start)
# NEW: Dopamine Receptor Profiles
D1_LIKE_EFFECT_LTP = 1.0 # Multiplier for LTP when D1-like receptors are dominant (Variable (Context-Dependent), choosing 1.0 as baseline)
D2_LIKE_EFFECT_LTD = 1.0 # Multiplier for LTD when D2-like receptors are dominant (Variable (Context-Dependent), choosing 1.0 as baseline)
# Constants for neuron addition/removal
MAX_NEURONS_PER_REGION = 10000 # Keeping it computationally manageable, though biologically it's billions
# NEW: Synaptogenesis/Pruning Parameters (Activity-Dependent)
SYNAPTOGENESIS_PROBABILITY = 2e-4 # Probability of forming a new synapse between highly correlated neurons (Variable (Context-Dependent, very low in adult brain)) - ADJUSTED for stability
SYNAPTOGENESIS_CORRELATION_THRESHOLD = 0.8 # Minimum correlation for synaptogenesis (0.7-0.9)
SYNAPTIC_PRUNING_THRESHOLD = 0.005 # Synapses below this weight are candidates for pruning (Variable (Context-Dependent), keeping current value as a reasonable start)
SYNAPTIC_PRUNING_PROBABILITY = 1e-4 # Probability of pruning a weak synapse per step (Variable (Context-Dependent, low in adult brain))
# NEW: Metabolic Parameters
GLUCOSE_BASELINE = 85.0 # mg/dL (Normal fasting blood glucose, chose mid-range of 70-100)
OXYGEN_BASELINE = 97.5 # % saturation (Normal arterial oxygen saturation, chose mid-range of 95-100)
METABOLIC_CONSUMPTION_RATE_PER_SPIKE = 1.5e-7 # ADJUSTED: Decreased for better sustained activity during learning
METABOLIC_REPLENISHMENT_RATE = 4.0e-6 # KEEPING CURRENT
METABOLIC_STRESS_THRESHOLD = 20.0 # Below this, brain stress increases (Adjusted to allow regions to start higher without immediate stress)
METABIC_EFFECT_ON_THRESHOLD = 30.0 # Further increased impact of metabolic deficit on threshold
# NEW: Inter-Region Signal Modulation
INTER_REGION_STRESS_MODULATION_FACTOR = 0.5 # Max reduction percentage of inter-region signal scale under max stress
# NEW: Input Scaling for Brain Regions
REGION_INPUT_CURRENT_SCALE = 2500.0 # Further increased scale factor for more robust region response
# Neuron Layer Definitions and typical proportions
# Cortical layers often have different neuron types and connectivity
CORTICAL_LAYERS = {
"L2/3": {"proportion": 0.35, "neuron_types": ["pyramidal", "interneuron"]}, # Superficial, associative (0.3-0.4)
"L4": {"proportion": 0.15, "neuron_types": ["stellate", "interneuron"]}, # Input layer for sensory (0.1-0.2)
"L5": {"proportion": 0.25, "neuron_types": ["pyramidal", "interneuron"]}, # Output layer to subcortical (0.2-0.3)
"L6": {"proportion": 0.15, "neuron_types": ["pyramidal", "interneuron"]} # Output/input to thalamus (0.1-0.2)
}
# Neuron Type Specific Parameters (adjustments to defaults)
NEURON_TYPE_PROPERTIES = {
"pyramidal": {
"base_spike_threshold_offset": 0.0,
"adaptation_amplitude_multiplier": 1.0,
"leak_conductance_multiplier": 1.0,
"oscillation_frequency_multiplier": 1.0
},
"interneuron": { # Generally faster, more inhibitory
"base_spike_threshold_offset": -7.5, # Easier to fire (-5.0 to -10.0 mV)
"adaptation_amplitude_multiplier": 0.3, # (0.1 to 0.5)
"leak_conductance_multiplier": 1.6, # Faster decay (1.2 to 2.0)
"oscillation_frequency_multiplier": 2.25 # Higher frequency oscillations (1.5 to 3.0)
},
"stellate": { # Often found in L4, granular
"base_spike_threshold_offset": -3.5, # (-2.0 to -5.0 mV)
"adaptation_amplitude_multiplier": 0.75, # (0.5 to 1.0)
"leak_conductance_multiplier": 1.25, # (1.0 to 1.5)
"oscillation_frequency_multiplier": 1.5 # (1.0 to 2.0)
}
}
# Maximum argument for math.exp() to prevent OverflowError (~709 is max on typical systems)
MAX_EXP_ARG = 700
# --- Constants for Math Training ---
MAX_SINGLE_DIGIT_MATH = 9 # For problems like 0-9 + 0-9
MAX_POSSIBLE_SUM_MATH = MAX_SINGLE_DIGIT_MATH + MAX_SINGLE_DIGIT_MATH
# Scale factor for input numbers to brain regions. Adjust based on how much activity a number should generate.
MATH_TASK_INPUT_SCALE = 0.2 # Reduced input scale for more controlled response.
# Scale factor for converting output region activity back to an answer.
MATH_TASK_OUTPUT_SCALE = MAX_POSSIBLE_SUM_MATH # e.g., activity 0.5 becomes 0.5 * 18 = 9
# ADJUSTED: Increased reward/punishment signals for faster learning in math task
# Reward/Punishment signal strengths for training
REWARD_SIGNAL_STRENGTH = 0.05 # Reduced for more stable learning
PUNISHMENT_SIGNAL_STRENGTH = 0.05 # Reduced for more stable learning
# --- Tag Prefixes (Centralized for consistency) ---
TAG_PREFIX_INPUT_DIGIT = "input_digit_"
TAG_PREFIX_INPUT_OPERATOR = "input_operator_"
# Character-based output (replaces _ONES and _TENS)
TAG_PREFIX_OUTPUT_CHAR = "output_char_"
SUPPORTED_OPERATORS_FOR_OUTPUT = ["+", "-", "*", "/", "="] # Characters the brain can output for operations/results
SUPPORTED_DIGITS_FOR_OUTPUT = [str(i) for i in range(10)]
ALL_POSSIBLE_OUTPUT_CHARS = SUPPORTED_DIGITS_FOR_OUTPUT + SUPPORTED_OPERATORS_FOR_OUTPUT
NUM_NEURONS_PER_TAG_GROUP = 5 # Default number of neurons per tagged group
# For character output decoding
CHARACTER_EMISSION_THRESHOLD = 0.75 # Activity level to count as a char emission
CHARACTER_RESET_THRESHOLD = 0.20 # Activity of the emitting group must drop below this
MIN_RESET_DURATION_STEPS = 3 # Must stay below reset for this many brain steps (for decoder logic)
DECODER_REFRACTORY_PERIOD_STEPS = 2 # Min brain steps before decoder looks for a new char after emission
SEQUENCE_TIMEOUT_STEPS = 50 # If no new char for this long (brain steps), sequence ends
MAX_OUTPUT_SEQUENCE_LEN = 7 # Max characters in an output sequence (e.g., "12+5=20")
SAVE_FILE = "brain_state.json"
# Import for the new math training module
import math_training # Assuming math_training.py is in the same directory
# Helper function for randomUniform (equivalent to Python's random.uniform)
def random_uniform(min_val, max_val):
"""Generates a random float within a specified range."""
return random.uniform(min_val, max_val)
# Helper function for random.sample (equivalent to Python's random.sample)
def get_random_elements(arr, num):
"""Returns a list of 'num' unique random elements from 'arr'."""
if num >= len(arr):
return list(arr) # Return all if num is greater or equal to length
return random.sample(arr, num)
# --- 1. Neuron Model (Adaptive Exponential Integrate-and-Fire - AdEx) ---
class Neuron:
def __init__(self, neuron_id,
resting_potential=DEFAULT_RESTING_POTENTIAL,
reset_potential=DEFAULT_RESET_POTENTIAL,
# This is now the *base* threshold, before heterogeneity
base_spike_threshold=DEFAULT_SPIKE_THRESHOLD,
peak_potential=DEFAULT_PEAK_POTENTIAL,
membrane_capacitance=DEFAULT_MEMBRANE_CAPACITANCE,
leak_conductance=DEFAULT_LEAK_CONDUCTANCE,
adaptation_amplitude=DEFAULT_ADAPTATION_AMPLITUDE,
adaptation_time_constant=DEFAULT_ADAPTATION_TIME_CONSTANT,
exponential_slope=DEFAULT_EXPONENTIAL_SLOPE,
refractory_period_steps=ABSOLUTE_REFRACTORY_PERIOD, # Renamed for clarity on units
neuron_type="pyramidal", # NEW: Neuron type for laminar organization
layer="L5" # NEW: Cortical layer
):
self.neuron_id = neuron_id
self.neuron_type = neuron_type # NEW
self.layer = layer # NEW
# Apply type-specific adjustments to base parameters
type_props = NEURON_TYPE_PROPERTIES.get(neuron_type, NEURON_TYPE_PROPERTIES["pyramidal"])
self.resting_potential = resting_potential
self.reset_potential = reset_potential
# Store the base threshold and calculate the heterogeneous one
self.base_spike_threshold = base_spike_threshold + type_props["base_spike_threshold_offset"]
self.heterogeneous_spike_threshold = self.base_spike_threshold * random_uniform(
1 - NEURON_PARAMETER_VARIATION_FACTOR,
1 + NEURON_PARAMETER_VARIATION_FACTOR)
self.peak_potential = peak_potential
# Apply random variation to membrane capacitance
self.C = membrane_capacitance * random_uniform(1 - NEURON_PARAMETER_VARIATION_FACTOR,
1 + NEURON_PARAMETER_VARIATION_FACTOR)
self.g_L = leak_conductance * type_props["leak_conductance_multiplier"]
self.Delta_T = exponential_slope
self.a = adaptation_amplitude * type_props[
"adaptation_amplitude_multiplier"] # This 'a' will be dynamically adjusted in process_step
self.tau_w = adaptation_time_constant
# CRITICAL FIX: Changed 'b' from 0.08 * self.tau_w to a fixed smaller value (e.g., 1.0 nA)
# This prevents excessive adaptation current accumulation after a spike.
self.absolute_refractory_duration = refractory_period_steps # Store the passed value
self.b = 1.0 # nA (Typically a small positive value, e.g., 0.1-5.0 nA)
# State variables
self.membrane_potential = self.resting_potential
self.adaptation_current = 0.0
self.refractory_timer = 0
self.is_firing = False # This will be set by the main loop after parallel processing
# Synaptic current states (for different receptor types)
self.ampa_current = 0.0
self.nmda_current = 0.0
self.gaba_a_current = 0.0
# For STDP: store last spike time
self.last_spike_time = -1
# For Homeostatic Plasticity: track recent firing rate
# Changed to deque for efficient appends and pops from the left
self.firing_history = deque(maxlen=HOMEOSTASIS_WINDOW_SIZE)
self.average_firing_rate = 0.0
# For Metaplasticity: track average activity to modulate STDP rates
self.metaplasticity_factor = 1.0
# Synapses are now handled by a separate Synapse class, stored as objects
self.output_synapses = []
# New: Oscillatory component
self.phase = random_uniform(0, 2 * math.pi) # Initial random phase
self.frequency = random_uniform(DEFAULT_OSCILLATION_FREQUENCY_MIN,
DEFAULT_OSCILLATION_FREQUENCY_MAX) * type_props[
"oscillation_frequency_multiplier"] # Intrinsic frequency
# NEW: Ion concentrations
self.extracellular_potassium = K_EXT_BASELINE
self.intracellular_sodium = NA_INT_BASELINE
# NEW: Intracellular calcium for plasticity
self.intracellular_calcium = 0.0
# NEW: Dopamine receptor profile (e.g., 0.8 means 80% D1-like, 20% D2-like)
self.dopamine_receptor_profile = random_uniform(0.3, 0.7) # Introduce some neuron-level heterogeneity
self.tag = None # NEW: For tagging neurons
def force_activation(self, strength: float):
"""
Forces the neuron to spike in the current step.
Sets membrane potential above its threshold by 'strength' mV.
The actual spike processing (reset, refractory, etc.) happens in calculate_dynamics_static.
"""
if self.refractory_timer < RELATIVE_REFRACTORY_PERIOD_DURATION: # Only allow forcing if not in absolute refractory
# Set membrane potential above threshold to ensure it fires in the next calculate_dynamics_static call
# The 'strength' parameter dictates how much above the threshold.
# A small positive strength (e.g., 1.0 mV) is usually sufficient.
self.membrane_potential = self.heterogeneous_spike_threshold + strength
# self.is_firing will be set to True by BrainRegion.process_input
# after Neuron.calculate_dynamics_static confirms the spike.
# This method just sets up the condition for calculate_dynamics_static to detect a spike.
@staticmethod
def calculate_dynamics_static(
neuron_state, # Dictionary of current state variables
neuron_static_params, # Dictionary of static neuron parameters (C, g_L, Delta_T, etc.)
dt,
current_internal_time,
dopamine_level,
serotonin_level,
acetylcholine_level,
norepinephrine_level, # NEW: Norepinephrine level
regional_average_phase,
dynamic_noise_amplitude,
dynamic_adaptation_amplitude,
dynamic_spike_threshold_offset,
gliotransmitter_neuron_modulation,
astrocyte_reuptake_factor, # NEW: Astrocyte reuptake factor
metabolic_deficit_effect # ADDED THIS PARAMETER
):
"""
Static method to calculate the next state of a single neuron.
This function performs the core AdEx model integration and returns the updated state.
It does NOT modify any Neuron object directly.
"""
# Extract current state variables
membrane_potential = neuron_state['membrane_potential']
adaptation_current = neuron_state['adaptation_current']
refractory_timer = neuron_state['refractory_timer']
ampa_current = neuron_state['ampa_current']
nmda_current = neuron_state['nmda_current']
gaba_a_current = neuron_state['gaba_a_current']
last_spike_time = neuron_state['last_spike_time']
# Copy firing_history to avoid modifying shared list directly, as it's a mutable object
# Correction: It will be a list if coming from get_current_state(). Convert to deque if needed.
firing_history = neuron_state['firing_history']
if not isinstance(firing_history, deque):
firing_history = deque(firing_history, maxlen=HOMEOSTASIS_WINDOW_SIZE)
metaplasticity_factor = neuron_state['metaplasticity_factor']
# average_firing_rate will be calculated fresh later based on firing_history
phase = neuron_state['phase']
frequency = neuron_state['frequency']
# NEW: Ion concentrations
extracellular_potassium = neuron_state['extracellular_potassium']
intracellular_sodium = neuron_state['intracellular_sodium']
# NEW: Intracellular calcium
intracellular_calcium = neuron_state['intracellular_calcium']
# Extract static neuron parameters
resting_potential = neuron_static_params['resting_potential']
reset_potential = neuron_static_params['reset_potential']
# CORRECTED: Use the heterogeneous spike threshold from static params directly
heterogeneous_spike_threshold = neuron_static_params['heterogeneous_spike_threshold']
absolute_refractory_duration = neuron_static_params[
'absolute_refractory_duration'] # Get instance-specific duration
peak_potential = neuron_static_params['peak_potential']
C = neuron_static_params['C']
g_L = neuron_static_params['g_L']
Delta_T = neuron_static_params['Delta_T']
# a = neuron_static_params['a'] # This 'a' is effectively dynamic_adaptation_amplitude passed as arg
tau_w = neuron_static_params['tau_w']
b = neuron_static_params['b'] # Corrected: Access b from static params
# Flag to indicate if a spike occurred in this step
spike_triggered_this_step = False
# Apply neuromodulator specific effects
dopamine_excitability_boost = dopamine_level * DOPAMINE_EXCITABILITY_BOOST
serotonin_inhibition_boost = serotonin_level * SEROTONIN_INHIBITION_BOOST
# NEW: Norepinephrine excitability boost (additive effect)
norepinephrine_excitability_boost = norepinephrine_level * NOREPINEPHRINE_EFFECT_ON_EXCITABILITY
# Acetylcholine effects on neuron properties
effective_C = C * (1 - acetylcholine_level * ACETYLCHOLINE_EFFECT_ON_MEMBRANE_CAPACITANCE)
effective_Delta_T = Delta_T * (1 + acetylcholine_level * ACETYLCHOLINE_EFFECT_ON_EXPONENTIAL_SLOPE)
effective_C = max(0.001, effective_C) # Ensure C doesn't go too low
effective_Delta_T = max(0.001, effective_Delta_T) # Ensure Delta_T doesn't go too low
# Calculate initial effective_spike_threshold (before refractory adjustments)
# CRITICAL FIX: Ensure 'heterogeneous_spike_threshold' is the base for 'effective_spike_threshold'
effective_spike_threshold = heterogeneous_spike_threshold + \
(serotonin_level * SEROTONIN_EFFECT_ON_THRESHOLD) + \
dynamic_spike_threshold_offset + \
gliotransmitter_neuron_modulation + \
(intracellular_sodium - NA_INT_BASELINE) * ION_EFFECT_ON_THRESHOLD + \
metabolic_deficit_effect
# Handle refractory period and adjust threshold
if refractory_timer > 0:
refractory_timer -= 1
if refractory_timer >= RELATIVE_REFRACTORY_PERIOD_DURATION: # Absolute refractory
# Membrane potential is fixed at reset_potential during absolute refractory
membrane_potential = reset_potential
# No spiking possible, so no need to calculate dV_dt for MP evolution during this exact phase.
# Other variables (adaptation, currents) still decay or evolve based on this fixed MP.
else: # Relative refractory period (refractory_timer is between 1 and RELATIVE_REFRACTORY_PERIOD_DURATION - 1)
# Apply threshold boost during relative refractory
effective_spike_threshold += RELATIVE_REFRACTORY_THRESHOLD_BOOST * (
refractory_timer / RELATIVE_REFRACTORY_PERIOD_DURATION)
# Fall through to main AdEx dynamics to calculate dV_dt with boosted threshold
# Core AdEx model equations - these always run unless explicitly overridden by fixed MP in absolute refractory
# Only calculate dV/dt if not in absolute refractory (where MP is held at reset_potential)
if not (
refractory_timer >= RELATIVE_REFRACTORY_PERIOD_DURATION): # This means if refractory_timer is 0 or in relative refractory
# Calculate voltage-dependent magnesium block for NMDA receptors
clamped_membrane_potential = max(membrane_potential, -120.0) # Clamp for log-exp function for stability
exp_val = -clamped_membrane_potential * NMDA_MG_BLOCK_FACTOR
exp_val = min(exp_val, MAX_EXP_ARG) # Ensure exp argument is not too large
nmda_mg_block = 1.0 / (
1.0 + NMDA_MG_CONCENTRATION / 3.57 * math.exp(exp_val))
nmda_effective_current = nmda_current * nmda_mg_block
total_synaptic_current = ampa_current + nmda_effective_current + (
gaba_a_current * (1 + serotonin_inhibition_boost))
# Add intrinsic noise
noise_current = random.gauss(0, dynamic_noise_amplitude)
# Add oscillatory current
oscillation_phase_change = (2 * math.pi * frequency * dt / 1000.0) # Convert Hz to rad/ms
phase_difference = (regional_average_phase - phase + math.pi) % (2 * math.pi) - math.pi
phase = (phase + oscillation_phase_change + OSCILLATORY_COUPLING_STRENGTH * math.sin(
phase_difference) * dt) % (2 * math.pi)
oscillatory_current = OSCILLATORY_CURRENT_AMPLITUDE * math.sin(phase)
# NEW: Include Norepinephrine excitability boost
I_injected = total_synaptic_current + noise_current + oscillatory_current + dopamine_excitability_boost + norepinephrine_excitability_boost # Sum all current sources
# Calculate the argument for the exponential term
exp_argument = (membrane_potential - effective_spike_threshold) / effective_Delta_T
exp_argument = min(exp_argument, MAX_EXP_ARG) # CRITICAL CLAMPING TO PREVENT OVERFLOW
exp_term = g_L * effective_Delta_T * math.exp(exp_argument)
dV_dt = (g_L * (
(resting_potential + (
extracellular_potassium - K_EXT_BASELINE) * ION_EFFECT_ON_RESTING_POTENTIAL) - membrane_potential) + exp_term + I_injected - adaptation_current
) / effective_C
membrane_potential += dV_dt * dt
# TIGHTER CLAMPING FOR MEMBRANE POTENTIAL (CRITICAL FOR STABILITY)
membrane_potential = max(DEFAULT_RESTING_POTENTIAL - 50.0,
min(membrane_potential, DEFAULT_PEAK_POTENTIAL + 50.0))
dw_dt = (dynamic_adaptation_amplitude * (
membrane_potential - (resting_potential + (
extracellular_potassium - K_EXT_BASELINE) * ION_EFFECT_ON_RESTING_POTENTIAL)) - adaptation_current) / tau_w
adaptation_current += dw_dt * dt
# Spike condition and post-spike reset
# CRITICAL FIX: Allow spiking during relative refractory period if threshold is met.
# Spiking is allowed if membrane potential crosses the (potentially boosted) threshold,
# AND the neuron is NOT in the ABSOLUTE refractory period.
# The absolute refractory period is defined as when refractory_timer is >= RELATIVE_REFRACTORY_PERIOD_DURATION
if membrane_potential >= effective_spike_threshold and \
refractory_timer < RELATIVE_REFRACTORY_PERIOD_DURATION: # Condition for not being in absolute refractory
# If is_firing was set by force_activation, ensure it's treated as a spike
# Or if MP naturally crossed threshold
spike_triggered_this_step = neuron_state.get('is_firing', False) or (
membrane_potential >= effective_spike_threshold)
last_spike_time = current_internal_time
membrane_potential = peak_potential # Briefly goes to peak for spike visualization # Use instance-specific absolute refractory duration
adaptation_current += b # Increment adaptation current upon spiking
refractory_timer = absolute_refractory_duration + RELATIVE_REFRACTORY_PERIOD_DURATION # Set full refractory period
# Ion dynamics upon spiking
extracellular_potassium += K_EXT_SPIKE_EFFLUX
intracellular_sodium += NA_INT_SPIKE_INFLUX
intracellular_calcium += CA_SPIKE_INCREMENT
# Post-spike reset: If a spike was just triggered, reset MP for the *next* time step's calculation.
# This occurs after all other calculations for the current step are done.
if spike_triggered_this_step:
membrane_potential = reset_potential
# Decay synaptic currents (always decay)
ampa_current *= math.exp(-dt / (AMPA_DECAY_TAU * (1 + astrocyte_reuptake_factor)))
nmda_current *= math.exp(-dt / (NMDA_DECAY_TAU * (1 + astrocyte_reuptake_factor)))
gaba_a_current *= math.exp(-dt / (GABA_A_DECAY_TAU * (1 + astrocyte_reuptake_factor)))
# Calcium decay (always happens)
intracellular_calcium = max(0.0, intracellular_calcium - (intracellular_calcium / CA_DECAY_TAU) * dt)
# Ion buffering (always happens)
extracellular_potassium += (K_EXT_BASELINE - extracellular_potassium) * K_EXT_BUFFER_RATE * dt
intracellular_sodium += (NA_INT_BASELINE - intracellular_sodium) * NA_INT_BUFFER_RATE * dt
extracellular_potassium = max(0.1, extracellular_potassium)
intracellular_sodium = max(0.1, intracellular_sodium)
# Update firing history and average rate for homeostatic plasticity
firing_history.append(1 if spike_triggered_this_step else 0)
# Calculate average firing rate based on current history length
if len(firing_history) > 0:
# This is where average_firing_rate is first meaningfully assigned in this scope
average_firing_rate = sum(firing_history) / len(firing_history)
else:
average_firing_rate = 0.0
activity_deviation = average_firing_rate - HOMEOSTASIS_TARGET_FIRING_RATE
# NEW: Metaplasticity factor influenced by calcium
metaplasticity_factor -= activity_deviation * METAPLASTICITY_FACTOR * (
1 + intracellular_calcium * CA_METAPLASTICITY_SENSITIVITY)
metaplasticity_factor = max(0.1, min(metaplasticity_factor, 2.0))
# Adaptive oscillation frequency
freq_adjustment = OSCILLATION_FREQUENCY_ADJUSTMENT_RATE * (dopamine_level - serotonin_level)
frequency = max(DEFAULT_OSCILLATION_FREQUENCY_MIN,
min(DEFAULT_OSCILLATION_FREQUENCY_MAX, frequency + freq_adjustment * dt))
updated_state = {
'membrane_potential': membrane_potential,
'adaptation_current': adaptation_current,
'refractory_timer': refractory_timer,
'ampa_current': ampa_current,
'nmda_current': nmda_current,
'gaba_a_current': gaba_a_current,
'last_spike_time': last_spike_time,
'firing_history': list(firing_history), # Convert deque to list for pickling/serialization
'average_firing_rate': average_firing_rate,
'metaplasticity_factor': metaplasticity_factor,
'phase': phase,
'frequency': frequency,
'extracellular_potassium': extracellular_potassium, # NEW
'intracellular_sodium': intracellular_sodium, # NEW
'intracellular_calcium': intracellular_calcium
}
return updated_state, spike_triggered_this_step
def add_synapse(self, synapse):
"""Adds an output synapse to this neuron."""
self.output_synapses.append(synapse)
def calculate_nmda_mg_block(self):
"""Calculates the voltage-dependent magnesium block for NMDA receptors."""
clamped_membrane_potential = max(self.membrane_potential, -120.0)
return 1.0 / (1.0 + NMDA_MG_CONCENTRATION / 3.57 * math.exp(-clamped_membrane_potential * NMDA_MG_BLOCK_FACTOR))
def receive_postsynaptic_current(self, current_type, strength):
"""Receives and accumulates postsynaptic current of a given type."""
try: # NEW: Error handling for current reception
if current_type == 'ampa':
self.ampa_current += strength
elif current_type == 'nmda':
self.nmda_current += strength
elif current_type == 'gaba_a':
self.gaba_a_current += strength
else:
print(f"Warning: Unknown current type '{current_type}' received by Neuron {self.neuron_id}.",
file=sys.stderr)
except Exception as e:
print(f"Error receiving postsynaptic current for Neuron {self.neuron_id}: {e}", file=sys.stderr)
def get_static_params(self):
"""Returns a dictionary of static neuron parameters."""
return {
'resting_potential': self.resting_potential,
'reset_potential': self.reset_potential,
# Return heterogeneous threshold
'heterogeneous_spike_threshold': self.heterogeneous_spike_threshold,
'peak_potential': self.peak_potential,
'C': self.C,
'g_L': self.g_L,
'Delta_T': self.Delta_T,
'a': self.a,
'tau_w': self.tau_w,
'b': self.b, # Corrected: 'b' should be read from self.b
'dopamine_receptor_profile': self.dopamine_receptor_profile, # NEW
'absolute_refractory_duration': self.absolute_refractory_duration # Add to static params
}
def get_current_state(self):
"""Returns a dictionary of current dynamic state variables."""
return {
'membrane_potential': self.membrane_potential,
'adaptation_current': self.adaptation_current,
'refractory_timer': self.refractory_timer,
'ampa_current': self.ampa_current,
'nmda_current': self.nmda_current,
'gaba_a_current': self.gaba_a_current,
'last_spike_time': self.last_spike_time,
'firing_history': list(self.firing_history), # Convert deque to list for pickling/serialization
'average_firing_rate': self.average_firing_rate,
'metaplasticity_factor': self.metaplasticity_factor,
'phase': self.phase,
'frequency': self.frequency,
'extracellular_potassium': self.extracellular_potassium, # NEW
'intracellular_sodium': self.intracellular_sodium, # NEW
'intracellular_calcium': self.intracellular_calcium # NEW
}
def set_state_from_dict(self, state_dict):
"""Updates the neuron's state variables from a dictionary."""
# NEW: Graceful degradation for NaN/Inf/excessively large values
for key, value in state_dict.items():
if isinstance(value, (float, int)):
if math.isnan(value) or math.isinf(value):
print(f"Warning: Neuron {self.neuron_id} state variable '{key}' is NaN/Inf. Resetting.",
file=sys.stderr)
state_dict[key] = 0.0 # Reset to a default safe value
if key == 'membrane_potential': state_dict[key] = self.resting_potential
elif key == 'membrane_potential' and (
value < DEFAULT_RESTING_POTENTIAL - 100.0 or value > DEFAULT_PEAK_POTENTIAL + 100.0):
print(
f"Warning: Neuron {self.neuron_id} membrane potential {value:.2f}mV out of typical range. Resetting to resting potential.",
file=sys.stderr)
state_dict[key] = self.resting_potential
self.membrane_potential = state_dict['membrane_potential']
self.adaptation_current = state_dict['adaptation_current']
self.refractory_timer = state_dict['refractory_timer']
self.ampa_current = state_dict['ampa_current']
self.nmda_current = state_dict['nmda_current']
self.gaba_a_current = state_dict['gaba_a_current']
self.last_spike_time = state_dict['last_spike_time']
# Reconstruct deque from list
self.firing_history = deque(state_dict['firing_history'], maxlen=HOMEOSTASIS_WINDOW_SIZE)
self.average_firing_rate = state_dict['average_firing_rate']
self.metaplasticity_factor = state_dict['metaplasticity_factor']
self.phase = state_dict['phase']
self.frequency = state_dict['frequency']
self.extracellular_potassium = state_dict.get('extracellular_potassium', K_EXT_BASELINE) # NEW
self.intracellular_sodium = state_dict.get('intracellular_sodium', NA_INT_BASELINE) # NEW
self.intracellular_calcium = state_dict.get('intracellular_calcium', 0.0) # NEW
self.dopamine_receptor_profile = state_dict.get('dopamine_receptor_profile', random_uniform(0.3, 0.7)) # NEW
self.tag = state_dict.get('tag', None) # NEW
# Ensure heterogeneous_spike_threshold is loaded correctly
self.base_spike_threshold = state_dict.get('base_spike_threshold',
DEFAULT_SPIKE_THRESHOLD) # Use DEFAULT_SPIKE_THRESHOLD if not found
self.heterogeneous_spike_threshold = state_dict.get('heterogeneous_spike_threshold', self.base_spike_threshold)
# Ensure 'a' is loaded correctly if it was dynamically changed
self.a = state_dict.get('adaptation_amplitude', DEFAULT_ADAPTATION_AMPLITUDE)
# Ensure 'b' is loaded correctly if it was dynamically changed (or set to default fixed value)
self.b = state_dict.get('b', 1.0)
def apply_homeostatic_plasticity(self):
"""Applies homeostatic plasticity to output synapses to regulate firing rate."""
# Only apply if there are synapses and average firing rate is not exactly at target
if not self.output_synapses or self.average_firing_rate == HOMEOSTASIS_TARGET_FIRING_RATE:
return
change_factor = 0.0
if self.average_firing_rate == 0:
if HOMEOSTASIS_TARGET_FIRING_RATE > 0:
# If current rate is zero and target is positive, aim for a strong increase.
# Using a large scaling_factor equivalent.
# (scaling_factor - 1) becomes large. e.g. if scaling_factor is 10, (10-1)=9
change_factor = HOMEOSTASIS_LEARNING_RATE * (10.0 - 1.0) # Effectively a large scaling factor
# If target is also 0, change_factor remains 0 (no change needed)
else:
scaling_factor = HOMEOSTASIS_TARGET_FIRING_RATE / self.average_firing_rate
change_factor = HOMEOSTASIS_LEARNING_RATE * (scaling_factor - 1)
for synapse in self.output_synapses:
if synapse.neurotransmitter_type == 'glutamate': # Only apply to excitatory synapses
# Apply change based on deviation from target
# The change should be positive if rate is too low (scaling_factor > 1)
# and negative if rate is too high (scaling_factor < 1)
synapse.weight *= (1 + change_factor)
synapse.weight = max(STDP_MIN_WEIGHT, min(synapse.weight, STDP_MAX_WEIGHT))
def to_dict(self):
"""Converts the Neuron object to a dictionary for serialization."""
return {
'neuron_id': self.neuron_id,
'neuron_type': self.neuron_type, # NEW
'layer': self.layer, # NEW
'resting_potential': self.resting_potential,
'reset_potential': self.reset_potential,
'base_spike_threshold': self.base_spike_threshold, # Save base threshold
'heterogeneous_spike_threshold': self.heterogeneous_spike_threshold, # Save heterogeneous threshold
'peak_potential': self.peak_potential,
'membrane_capacitance': self.C,
'leak_conductance': self.g_L,
'adaptation_amplitude': self.a, # Save the current 'a' which might be dynamic
'adaptation_time_constant': self.tau_w,
'exponential_slope': self.Delta_T,
'refractory_timer': self.refractory_timer, # Store current refractory timer
'absolute_refractory_duration': self.absolute_refractory_duration, # Save instance-specific duration
'membrane_potential': self.membrane_potential,
'adaptation_current': self.adaptation_current,
'ampa_current': self.ampa_current, # Include synaptic currents in serialization
'nmda_current': self.nmda_current,
'gaba_a_current': self.gaba_a_current,
'last_spike_time': self.last_spike_time,
'firing_history': list(self.firing_history), # Convert deque to list for serialization
'average_firing_rate': self.average_firing_rate,
'metaplasticity_factor': self.metaplasticity_factor,
'phase': self.phase,
'frequency': self.frequency,
'extracellular_potassium': self.extracellular_potassium, # NEW
'intracellular_sodium': self.intracellular_sodium, # NEW
'intracellular_calcium': self.intracellular_calcium, # NEW
'dopamine_receptor_profile': self.dopamine_receptor_profile, # NEW
'tag': self.tag, # NEW
'b': self.b # Save the 'b' value
}
@classmethod
def from_dict(cls, data):
"""Creates a Neuron object from a dictionary."""
neuron = cls(
neuron_id=data['neuron_id'],
resting_potential=data['resting_potential'],
reset_potential=data['reset_potential'],
# Load base_spike_threshold for initialization, with fallback to old 'spike_threshold' or default
base_spike_threshold=data.get('base_spike_threshold', data.get('spike_threshold', DEFAULT_SPIKE_THRESHOLD)),
peak_potential=data['peak_potential'],
membrane_capacitance=data.get('membrane_capacitance', DEFAULT_MEMBRANE_CAPACITANCE), # Corrected line
leak_conductance=data['leak_conductance'],
adaptation_amplitude=data['adaptation_amplitude'], # Load dynamic 'a'
adaptation_time_constant=data['adaptation_time_constant'],
# Load instance-specific absolute refractory duration
exponential_slope=data['exponential_slope'], # Fallback to global constant if not found
refractory_period_steps=data.get('absolute_refractory_duration', ABSOLUTE_REFRACTORY_PERIOD),
neuron_type=data.get('neuron_type', "pyramidal"), # NEW: Load neuron type
layer=data.get('layer', "L5") # NEW: Load layer
)
neuron.set_state_from_dict(data) # Use the set_state_from_dict to apply loaded values and validation
return neuron
# --- 2. Synapse Model ---
class Synapse:
def __init__(self, source_neuron, target_neuron, weight, neurotransmitter_type, receptor_type, initial_delay):
self.source_neuron = source_neuron
self.target_neuron = target_neuron # Corrected target assignment
self.weight = weight
self.neurotransmitter_type = neurotransmitter_type
self.receptor_type = receptor_type
self.initial_delay = initial_delay # Store the base delay
self.current_delay = initial_delay # This will be dynamically modulated
self.myelination_factor = 0.5 # NEW: 0=no myelin, 1=max myelin (max speed/min delay)
self.utilization = 1.0
self.facilitation = 0.0
# NEW: For Microglial pruning
self.activity_history = deque(maxlen=MICROGLIA_PRUNING_WINDOW) # Changed to deque
self.average_activity = 0.0
def get_postsynaptic_current(self, gliotransmitter_synapse_modulation=0.0):
"""Calculates the effective postsynaptic current based on weight, STP, and gliotransmitter modulation.
For inhibitory synapses (negative weight), a positive gliotransmitter_synapse_modulation will make the
weight more negative (stronger inhibition), effectively increasing the magnitude of the inhibitory effect."""
effective_weight = self.weight * self.utilization * (1 + self.facilitation) * (
1 + gliotransmitter_synapse_modulation)
return effective_weight
def update_short_term_plasticity(self, spike_occurred, dt):
"""Updates short-term plasticity (facilitation and depression) based on spike occurrence."""
self.facilitation *= math.exp(-dt / STP_FACILITATION_TAU)
self.utilization = 1.0 - (1.0 - self.utilization) * math.exp(-dt / STP_DEPRESSION_TAU)
if spike_occurred:
self.facilitation += STP_FACILITATION_INCREMENT
self.utilization -= STP_DEPRESSION_DECREMENT * self.utilization
self.utilization = max(0.0, self.utilization)
# NEW: Update activity history for pruning
self.activity_history.append(1 if spike_occurred else 0)
# maxlen handles popping automatically for deque
self.average_activity = sum(self.activity_history) / len(
self.activity_history) if self.activity_history else 0.0
def apply_stdp(self, metaplasticity_factor=1.0, dopamine_level=0.0, norepinephrine_level=0.0):
"""Applies Spike-Timing Dependent Plasticity (STDP) to adjust synaptic weight."""
pre_spike_time = self.source_neuron.last_spike_time
post_spike_time = self.target_neuron.last_spike_time
if pre_spike_time == -1 or post_spike_time == -1:
return # No STDP if either neuron hasn't spiked yet
delta_t = post_spike_time - pre_spike_time
weight_change = 0.0
# NEW: Norepinephrine can broaden STDP windows
effective_ltp_window = STDP_LTP_WINDOW * (1 + norepinephrine_level * NOREPINEPHRINE_EFFECT_ON_STDP_WINDOW)
effective_ltd_window = STDP_LTD_WINDOW * (1 + norepinephrine_level * NOREPINEPHRINE_EFFECT_ON_STDP_WINDOW)
# Dopamine modulates both LTP and LTD magnitudes, now influenced by receptor profile
d1_like_effect = self.source_neuron.dopamine_receptor_profile # Higher value means more D1-like
d2_like_effect = 1.0 - self.source_neuron.dopamine_receptor_profile # Higher value means more D2-like
# NEW: Dopamine's effect on LTP/LTD is now receptor-dependent
effective_ltp_magnitude = STDP_LTP_MAGNITUDE * metaplasticity_factor * (
1 + dopamine_level * DOPAMINE_EFFECT_ON_LTP * d1_like_effect * D1_LIKE_EFFECT_LTP)
effective_ltd_magnitude = STDP_LTD_MAGNITUDE * metaplasticity_factor * (
1 - dopamine_level * DOPAMINE_EFFECT_ON_LTP * d2_like_effect * D2_LIKE_EFFECT_LTD) # D2-like might reduce LTD
target_calcium = self.target_neuron.intracellular_calcium
# Hebbian STDP:
# LTP if pre-synaptic spike precedes post-synaptic spike (delta_t > 0)
# AND post-synaptic calcium is high.
if 0 < delta_t <= effective_ltp_window: # delta_t is t_post - t_pre
if target_calcium > CA_LTP_THRESHOLD:
# Exponential decay for LTP window, delta_t is positive
exp_arg_ltp = -delta_t / STDP_TAU_LTP # Negative sign for decay as delta_t increases
exp_arg_ltp = min(exp_arg_ltp, MAX_EXP_ARG) # Clamp to prevent overflow
weight_change = effective_ltp_magnitude * math.exp(exp_arg_ltp)
# LTD if post-synaptic spike precedes pre-synaptic spike (delta_t < 0)
# AND post-synaptic calcium is in the LTD-permissive range.
elif -effective_ltd_window <= delta_t < 0: # delta_t is t_post - t_pre
# A common model is that LTD occurs at moderate calcium, below LTP threshold.
if CA_LTD_THRESHOLD <= target_calcium < CA_LTP_THRESHOLD: # More specific calcium window for LTD
# Exponential decay for LTD window, delta_t is negative
exp_arg_ltd = delta_t / STDP_TAU_LTD # Positive sign for decay as delta_t becomes more negative
exp_arg_ltd = min(exp_arg_ltd, MAX_EXP_ARG) # Clamp to prevent overflow
weight_change = -effective_ltd_magnitude * math.exp(exp_arg_ltd)
if self.neurotransmitter_type == 'glutamate': # Only apply STDP to excitatory synapses
self.weight += weight_change
self.weight = max(STDP_MIN_WEIGHT, min(self.weight, STDP_MAX_WEIGHT))
# NEW: Myelin plasticity
def update_myelination(self, dt):
"""Dynamically adjusts synaptic delay based on myelination factor."""
# Simple rule: if synapse is active, increase myelylation (reduce delay)
# if inactive, decrease myelination (increase delay)
if self.average_activity > OMP_ACTIVITY_THRESHOLD:
self.myelination_factor += OMP_MYELIN_GROWTH_RATE * dt
else:
self.myelination_factor -= OMP_MYELIN_DECAY_RATE * dt
self.myelination_factor = max(0.0, min(1.0, self.myelination_factor))
# Delay is inversely proportional to myelination factor (more myelin = less delay)
# current_delay = initial_delay * (1 - myelination_factor * (MAX_MYELIN_FACTOR - MIN_MYELIN_FACTOR))
self.current_delay = self.initial_delay * (1 - self.myelination_factor * MAX_MYELIN_FACTOR)
self.current_delay = max(MIN_SYNAPTIC_DELAY, min(MAX_SYNAPTIC_DELAY, self.current_delay))
def to_dict(self):
"""Converts the Synapse object to a dictionary for serialization."""
return {
'source_neuron_id': self.source_neuron.neuron_id,
'target_neuron_id': self.target_neuron.neuron_id,
'weight': self.weight,
'neurotransmitter_type': self.neurotransmitter_type,
'receptor_type': self.receptor_type,
'initial_delay': self.initial_delay, # Save initial delay
'current_delay': self.current_delay, # Save current dynamic delay
'myelination_factor': self.myelination_factor, # NEW
'utilization': self.utilization,
'facilitation': self.facilitation,
'activity_history': list(self.activity_history), # Convert deque to list for serialization
'average_activity': self.average_activity # NEW
}
# "No from_dict for Synapse, as it's reconstructed within BrainRegion after neurons exist."
# --- 3. Astrocyte Model (Simplified) ---
class Astrocyte:
def __init__(self, region_name):
self.region_name = region_name
self.gliotransmitter_level = 0.0 # Represents overall gliotransmitter release
self.activity_history = deque(maxlen=100) # Changed to deque
def update_state(self, region_activity_level, dt):
"""Updates astrocyte state based on regional neuronal activity."""
# Simple model: gliotransmitter level increases with activity, decays over time
self.gliotransmitter_level += region_activity_level * ASTROCYTE_ACTIVITY_SENSITIVITY * dt
self.gliotransmitter_level *= math.exp(-dt / (1.0 / ASTROCYTE_GLIOTRANSMITTER_DECAY))
self.gliotransmitter_level = max(0.0, min(1.0, self.gliotransmitter_level)) # Clamp between 0 and 1
self.activity_history.append(region_activity_level)
# maxlen handles popping automatically for deque
def get_synapse_modulation_factor(self):
"""Returns a modulation factor for synapses based on gliotransmitter level."""
return self.gliotransmitter_level * GLIOTRANSMITTER_SYNAPSE_MODULATION_STRENGTH
def get_neuron_modulation_factor(self):
"""Returns a modulation factor for neuron thresholds based on gliotransmitter level."""
# Gliotransmitters can make neurons slightly less excitable (higher threshold) for stability.
return self.gliotransmitter_level * GLIOTRANSMITTER_NEURON_MODULATION_STRENGTH
# NEW: Get reuptake factor based on astrocyte activity
def get_reuptake_factor(self):
"""Returns a reuptake factor based on gliotransmitter level, influencing current decay."""
return self.gliotransmitter_level * ASTROCYTE_REUPTAKE_FACTOR
def to_dict(self):
"""Converts the Astrocyte object to a dictionary for serialization."""
return {
'region_name': self.region_name,
'gliotransmitter_level': self.gliotransmitter_level,
'activity_history': list(self.activity_history) # Convert deque to list for serialization
}
@classmethod
def from_dict(cls, data):
"""Creates an Astrocyte object from a dictionary."""
astrocyte = cls(region_name=data['region_name'])
astrocyte.gliotransmitter_level = data['gliotransmitter_level']
astrocyte.activity_history = deque(data['activity_history'], maxlen=100) # Reconstruct deque
return astrocyte
# NEW: Microglia Model (Simplified)
class Microglia:
def __init__(self, region_name):
self.region_name = region_name
self.pruning_activity = 0.0 # Represents microglial activity related to pruning
def update_state(self, region_activity_level, brain_stress_level, dt,
astrocyte_gliotransmitter_level): # ADDED brain_stress_level and astrocyte_gliotransmitter_level
"""Microglia become more active in pruning if region activity is low and/or brain stress is high."""
# Simplified: low local activity and high global stress trigger pruning activity
if region_activity_level < MICROGLIA_ACTIVITY_THRESHOLD:
self.pruning_activity += (MICROGLIA_ACTIVITY_THRESHOLD - region_activity_level) * dt
else:
self.pruning_activity -= self.pruning_activity * 0.1 * dt # Decay if activity is high
# Additionally, increase pruning activity with brain stress
self.pruning_activity += brain_stress_level * STRESS_EFFECT_ON_PRUNING * dt
# NEW: Astrocyte gliotransmitter level can suppress microglial pruning activity
self.pruning_activity -= astrocyte_gliotransmitter_level * ASTROCYTE_MICROGLIA_PRUNING_MODULATION * dt
self.pruning_activity = max(0.0, min(1.0, self.pruning_activity)) # Clamp between 0 and 1
def prune_synapses(self, synapses):
"""Identifies and prunes inactive synapses."""
pruned_count = 0
# Create a new list of synapses, excluding those to be pruned
remaining_synapses = []
for synapse in synapses:
# Check for pruning eligibility only if microglia activity is significant
if self.pruning_activity > 0.1 and synapse.average_activity < SYNAPSE_INACTIVITY_THRESHOLD:
if random.random() < MICROGLIA_PRUNING_PROBABILITY * self.pruning_activity:
# Mark for removal by not adding to remaining_synapses
pruned_count += 1
else:
remaining_synapses.append(synapse)
else:
remaining_synapses.append(synapse)
return pruned_count, remaining_synapses
def to_dict(self):
return {
'region_name': self.region_name,
'pruning_activity': self.pruning_activity
}
@classmethod
def from_dict(cls, data):
microglia = cls(region_name=data['region_name'])
microglia.pruning_activity = data['pruning_activity']
return microglia
# --- 4. Brain Region Model ---
class BrainRegion:
def __init__(self, name, initial_neuron_count,
initial_baseline_threshold=DEFAULT_SPIKE_THRESHOLD,
min_neuron_count=5, # New parameter for min count
max_neuron_count=5000, # New parameter for max count
pyramidal_proportion=0.8):
self.name = name
self.min_neuron_count = min_neuron_count # Store the min count
self.max_neuron_count = max_neuron_count # Store the max count
self.tagged_neuron_groups = {} # NEW: For storing lists of neurons by tag
self.absolute_min_neurons = 5 # Hard floor for neuron count, regardless of ideal_neuron_count
self.pyramidal_proportion = pyramidal_proportion # This will be overridden by layer proportions
self.neurons = []
self.neurons_by_layer = {layer_name: [] for layer_name in CORTICAL_LAYERS.keys()}
# Distribute neurons across layers based on defined proportions
# And assign tags based on region name and TAGGING_CONFIG
current_neuron_id = 0
neuron_counter_for_tagging = 0 # Counter for assigning tags within a role
for layer_name, layer_props in CORTICAL_LAYERS.items():
num_neurons_in_layer = max(1, round(initial_neuron_count * layer_props["proportion"]))
for _ in range(num_neurons_in_layer):
# Randomly choose neuron type for the layer
chosen_neuron_type = random.choice(layer_props["neuron_types"])
new_neuron = Neuron(
neuron_id=current_neuron_id,
base_spike_threshold=initial_baseline_threshold, # Use BrainRegion's initial_baseline_threshold
neuron_type=chosen_neuron_type, # Use the type chosen for this layer
layer=layer_name # Use the current layer_name
)
# --- Assign Tags based on the new plan ---
# Inputs (digits and operators) go to "Frontal Lobe"
# Character outputs come from "Brain Stem"
if name == "Frontal Lobe": # INPUT_REGION_NUMBER and INPUT_REGION_OPERATOR
digit_value = neuron_counter_for_tagging // NUM_NEURONS_PER_TAG_GROUP
# Assign tags for input digits 0-9
if neuron_counter_for_tagging < (10 * NUM_NEURONS_PER_TAG_GROUP): # Digits 0-9
tag = f"{TAG_PREFIX_INPUT_DIGIT}{digit_value}"
new_neuron.tag = tag
self.tagged_neuron_groups.setdefault(tag, []).append(new_neuron)
# Assign tags for input operators
elif neuron_counter_for_tagging < ((10 + len(SUPPORTED_OPERATORS_FOR_OUTPUT)) * NUM_NEURONS_PER_TAG_GROUP):
op_index = (neuron_counter_for_tagging - (10 * NUM_NEURONS_PER_TAG_GROUP)) // NUM_NEURONS_PER_TAG_GROUP
if op_index < len(SUPPORTED_OPERATORS_FOR_OUTPUT):
tag = f"{TAG_PREFIX_INPUT_OPERATOR}{SUPPORTED_OPERATORS_FOR_OUTPUT[op_index]}"
new_neuron.tag = tag
self.tagged_neuron_groups.setdefault(tag, []).append(new_neuron)
neuron_counter_for_tagging += 1
elif name == "Brain Stem": # OUTPUT_REGION_ANSWER for characters (output_char_X)
char_idx_in_role = neuron_counter_for_tagging // NUM_NEURONS_PER_TAG_GROUP
if char_idx_in_role < len(ALL_POSSIBLE_OUTPUT_CHARS):
tag = f"{TAG_PREFIX_OUTPUT_CHAR}{ALL_POSSIBLE_OUTPUT_CHARS[char_idx_in_role]}"
new_neuron.tag = tag
self.tagged_neuron_groups.setdefault(tag, []).append(new_neuron)
neuron_counter_for_tagging += 1
self.neurons.append(new_neuron)
self.neurons_by_layer[layer_name].append(new_neuron)
current_neuron_id += 1
self.activity_level = 0.0
self.total_spikes_in_last_processing_window = 0
self.synapses = []
self.consecutive_calibrated_iterations = 0
self.default_num_connections_per_neuron = 0
self.default_excitatory_proportion = 0.0
self.default_excitatory_weight_range = [0.1, 1.0]
self.default_inhibitory_weight_range = [-1.0, -0.1]
# Event queue for delayed synaptic currents: {arrival_time: [(target_neuron_id:int, current_type:str, strength:float), ...]}
self.event_queue = {}
# Add an Astrocyte for this region
self.astrocyte = Astrocyte(self.name)
# NEW: Add a Microglia for this region
self.microglia = Microglia(self.name)
# NEW: Metabolic state for the region
self.glucose_level = GLUCOSE_BASELINE
self.oxygen_level = OXYGEN_BASELINE
def activate_tagged_group(self, tag: str, activation_strength: float):
"""Forces activation of all neurons with the given tag."""
if tag in self.tagged_neuron_groups:
# print(f"DEBUG: Region {self.name} activating tag '{tag}' for {len(self.tagged_neuron_groups[tag])} neurons.", file=sys.stdout)
for neuron in self.tagged_neuron_groups[tag]:
neuron.force_activation(strength=activation_strength)
# else:
# print(f"DEBUG: Region {self.name} - Tag '{tag}' not found in tagged_neuron_groups for activation.", file=sys.stdout)
def get_tagged_group_activity(self, tag: str,
current_simulation_internal_time_upper_bound: int,
window_duration_internal_steps: int):
"""Calculates the proportion of neurons in a tagged group that spiked recently."""
if tag in self.tagged_neuron_groups and self.tagged_neuron_groups[tag]:
# Neurons are considered active if they spiked within the defined window.
# The window is (threshold_time_exclusive, current_simulation_internal_time_upper_bound_inclusive]
# Note: last_spike_time is the internal step number.
threshold_time_exclusive = current_simulation_internal_time_upper_bound - window_duration_internal_steps
active_neurons = 0
for n in self.tagged_neuron_groups[tag]:
# A neuron's last_spike_time is -1 if it has never spiked.
# Check if the last spike occurred after the threshold and not in the future (though future shouldn't happen).
if n.last_spike_time > threshold_time_exclusive and n.last_spike_time <= current_simulation_internal_time_upper_bound:
active_neurons += 1
return active_neurons / len(self.tagged_neuron_groups[tag])
# print(f"DEBUG: Region {self.name} - Tag '{tag}' not found or empty for get_activity.", file=sys.stdout)
return 0.0
def add_internal_connections(self, num_connections_per_neuron=2,
excitatory_proportion=0.8, # Default proportion of excitatory connections
excitatory_weight_range=None, # Default range for excitatory synapse weights
inhibitory_weight_range=None, # Default range for inhibitory synapse weights
rewiring_probability=0.01):
"""Establishes internal synaptic connections within the region, with small-world properties and layer-specific biases."""
self.default_num_connections_per_neuron = num_connections_per_neuron
self.default_excitatory_proportion = excitatory_proportion
self.default_excitatory_weight_range = excitatory_weight_range
self.default_inhibitory_weight_range = inhibitory_weight_range
if excitatory_weight_range is None:
excitatory_weight_range = [0.1, 1.0]
if inhibitory_weight_range is None:
inhibitory_weight_range = [-1.0, -0.1]
if not self.neurons:
return
# Clear existing synapses to prevent duplicates on re-call
self.synapses = []
for neuron in self.neurons:
neuron.output_synapses = []
N = len(self.neurons)
if N == 0: return
# Define layer-specific connectivity rules (simplified example)
# These are illustrative and can be made more complex
layer_connectivity_rules = {
"L2/3": {"targets": ["L2/3", "L5"], "excitatory_bias": 0.9}, # Superficial, associative (0.3-0.4)
"L4": {"targets": ["L2/3"], "excitatory_bias": 0.95}, # Primarily feeds L2/3
"L5": {"targets": ["L2/3", "L5", "L6"], "excitatory_bias": 0.8}, # Strong recurrent and output
"L6": {"targets": ["L4", "L5"], "excitatory_bias": 0.7} # Feedback to L4, also connects to L5
}
# 1. Create connections based on layer rules
for source_neuron in self.neurons:
source_layer_rules = layer_connectivity_rules.get(source_neuron.layer,
{"targets": list(CORTICAL_LAYERS.keys()),
"excitatory_bias": 0.8})
possible_targets_in_region = []
for target_layer_name in source_layer_rules["targets"]:
possible_targets_in_region.extend(self.neurons_by_layer.get(target_layer_name, []))
# Filter possible_targets_in_region to exclude the source_neuron itself
# and any neurons already targeted by an existing output synapse from source_neuron.
current_target_ids = {s.target_neuron.neuron_id for s in source_neuron.output_synapses}
filtered_possible_targets = [
n for n in possible_targets_in_region if n != source_neuron and n.neuron_id not in current_target_ids
]
num_connections = min(num_connections_per_neuron, len(filtered_possible_targets))
if num_connections > 0:
targets = get_random_elements(filtered_possible_targets, num_connections)
for target_neuron in targets:
self._create_and_add_synapse(source_neuron, target_neuron, source_layer_rules["excitatory_bias"],
excitatory_weight_range, inhibitory_weight_range)
# 2. Rewire edges with probability 'rewiring_probability' (simplified for layered structure)
# This step can be complex with layers, keeping it simple for now: rewire within region.
all_possible_targets = [n for n in self.neurons]
for source_neuron in self.neurons:
synapses_to_rewire = list(source_neuron.output_synapses)
for synapse in synapses_to_rewire:
if random.random() < rewiring_probability:
source_neuron.output_synapses.remove(synapse)
if synapse in self.synapses:
self.synapses.remove(synapse)
# Optimize finding a new unique target with limited attempts
new_target = None
max_attempts = 100 # Limit attempts to avoid indefinite loops
attempts = 0
while attempts < max_attempts:
if not all_possible_targets: # No targets to choose from
break
candidate_target = random.choice(all_possible_targets)
# Ensure target is not the source and not already connected
existing_target_ids_for_rewire = {s.target_neuron.neuron_id for s in
source_neuron.output_synapses}
if candidate_target != source_neuron and candidate_target.neuron_id not in existing_target_ids_for_rewire:
new_target = candidate_target
break
attempts += 1
if new_target:
# Use the source neuron's layer's excitatory bias for the new synapse
source_layer_rules = layer_connectivity_rules.get(source_neuron.layer, {"excitatory_bias": 0.8})
self._create_and_add_synapse(source_neuron, new_target, source_layer_rules["excitatory_bias"],
excitatory_weight_range, inhibitory_weight_range)
def set_neuron_baseline_threshold(self, new_average_threshold):
"""Sets the baseline spike threshold for all neurons in the region, applying heterogeneity."""
# When setting a new baseline, apply it with the same initial variation logic
for neuron in self.neurons:
# Re-apply neuron type offset
type_props = NEURON_TYPE_PROPERTIES.get(neuron.neuron_type, NEURON_TYPE_PROPERTIES["pyramidal"])
neuron.base_spike_threshold = new_average_threshold + type_props["base_spike_threshold_offset"]
# Recalculate the heterogeneous threshold based on the new base threshold
neuron.heterogeneous_spike_threshold = neuron.base_spike_threshold * random_uniform(
1 - NEURON_PARAMETER_VARIATION_FACTOR,
1 + NEURON_PARAMETER_VARIATION_FACTOR)
def add_a_neuron(self, current_average_threshold):
"""Adds a new neuron to the region and incrementally adds connections."""
new_neuron_id = len(self.neurons) # Assign a new unique ID
# Determine layer and type for the new neuron
chosen_layer = random.choice(list(CORTICAL_LAYERS.keys()))
chosen_type = random.choice(CORTICAL_LAYERS[chosen_layer]["neuron_types"])
new_neuron = Neuron(new_neuron_id,
base_spike_threshold=current_average_threshold,
neuron_type=chosen_type,
layer=chosen_layer)
self.neurons.append(new_neuron)
self.neurons_by_layer[chosen_layer].append(new_neuron)
# Incrementally add connections for the new neuron
# 1. Connections from the new neuron to existing neurons
num_outgoing_connections = self.default_num_connections_per_neuron
# Apply layer-specific rules for outgoing connections from the new neuron
new_neuron_layer_rules = CORTICAL_LAYERS.get(new_neuron.layer,
{"targets": list(CORTICAL_LAYERS.keys()), "excitatory_bias": 0.8})
possible_outgoing_targets = []
for target_layer_name in new_neuron_layer_rules["targets"]:
possible_outgoing_targets.extend(
[n for n in self.neurons_by_layer.get(target_layer_name, []) if n != new_neuron])
outgoing_targets = get_random_elements(possible_outgoing_targets,
min(num_outgoing_connections, len(possible_outgoing_targets)))
for target_neuron in outgoing_targets:
self._create_and_add_synapse(new_neuron, target_neuron, new_neuron_layer_rules["excitatory_bias"],
self.default_excitatory_weight_range, self.default_inhibitory_weight_range)
# 2. Connections from existing neurons to the new neuron
# For simplicity, let's connect a few random existing neurons to the new one.
# A more sophisticated approach would consider layer-specific incoming rules.
num_incoming_connections = min(self.default_num_connections_per_neuron, len(self.neurons) - 1)
if num_incoming_connections > 0:
source_neurons_for_new = get_random_elements([n for n in self.neurons if n != new_neuron],
num_incoming_connections)
for source_neuron in source_neurons_for_new:
# Use the source neuron's layer's excitatory bias for the new synapse
# CORRECTED: Changed 'excitability_bias' to 'excitatory_bias'
source_layer_rules = CORTICAL_LAYERS.get(source_neuron.layer, {"excitatory_bias": 0.8})
self._create_and_add_synapse(source_neuron, new_neuron, source_layer_rules["excitatory_bias"],
self.default_excitatory_weight_range, self.default_inhibitory_weight_range)
# Removed print statement for cleaner full simulation output
print(f" {self.name}: Added a neuron. Total now {len(self.neurons)}.", file=sys.stdout) #- Suppressed for cleaner calibration output
def remove_a_neuron(self):
"""Removes a random neuron from the region and efficiently updates connections."""
if len(self.neurons) > self.absolute_min_neurons: # Check against absolute minimum
removed_index = random.randint(0, len(self.neurons) - 1)
removed_neuron = self.neurons.pop(removed_index) # Remove the neuron
self.neurons_by_layer[removed_neuron.layer].remove(removed_neuron) # Remove from layer list
# Efficiently remove all synapses connected to/from the removed neuron
self.synapses = [s for s in self.synapses if
s.source_neuron != removed_neuron and s.target_neuron != removed_neuron]
# Update output_synapses for all remaining neurons
for neuron in self.neurons:
neuron.output_synapses = [s for s in neuron.output_synapses if s.target_neuron != removed_neuron]
# Removed print statement for cleaner full simulation output
print(f" {self.name}: Removed a neuron. Total now {len(self.neurons)}.", file=sys.stdout) #- Suppressed for cleaner output
return True
else:
return False
def _create_and_add_synapse(self, source_neuron, target_neuron, excitatory_proportion, excitatory_weight_range,
inhibitory_weight_range):
"""Helper to create and add a synapse."""
# weight = 0.0 # This initial assignment is overwritten below
# neurotransmitter = '' # This initial assignment is overwritten below
# receptor = '' # This initial assignment is overwritten below
delay = random.uniform(MIN_SYNAPTIC_DELAY, MAX_SYNAPTIC_DELAY) # Assign a random float delay
# Inhibitory interneurons only form inhibitory synapses
if source_neuron.neuron_type == "interneuron":
weight = random_uniform(inhibitory_weight_range[0], inhibitory_weight_range[1])
neurotransmitter = 'gaba'
receptor = 'gaba_a'
elif random.random() < excitatory_proportion:
weight = random_uniform(excitatory_weight_range[0], excitatory_weight_range[1])
neurotransmitter = 'glutamate'
receptor = 'ampa' if random.random() < 0.5 else 'nmda' # Randomly assign AMPA or NMDA
else: # Excitatory neuron forming an inhibitory synapse (rare, but possible for some types)
weight = random_uniform(inhibitory_weight_range[0], inhibitory_weight_range[1])
neurotransmitter = 'gaba'
receptor = 'gaba_a'
synapse = Synapse(source_neuron, target_neuron, weight, neurotransmitter, receptor, delay)
source_neuron.add_synapse(synapse)
self.synapses.append(synapse)
return synapse # Return the created synapse
def process_input(self, input_data, dt, current_time,
dopamine_level=0.0, serotonin_level=0.0, acetylcholine_level=0.0,
norepinephrine_level=0.0,
internal_simulation_steps=10,
dynamic_noise_amplitude=DEFAULT_NOISE_AMPLITUDE,
dynamic_adaptation_amplitude=DEFAULT_ADAPTATION_AMPLITUDE,
dynamic_spike_threshold_offset=0.0,
pool=None,
metabolic_deficit_effect=0.0,
brain_stress_level=0.0,
print_step_details=False): # NEW PARAMETER
"""
Processes input for the brain region, simulating internal neuron dynamics
and synaptic interactions.
"""
total_input_strength = input_data.get('aggregated_input', 0.0)
# Initialize total_spikes_for_current_window for the current processing window
total_spikes_for_current_window = 0
# NEW: Counters for structural changes within this processing window
synaptogenesis_events_this_window = 0
activity_pruning_events_this_window = 0
microglial_pruning_events_this_window = 0
# Update astrocyte state based on current activity, and metabolic support
self.astrocyte.update_state(self.activity_level, dt * internal_simulation_steps)
gliotransmitter_synapse_modulation = self.astrocyte.get_synapse_modulation_factor()
gliotransmitter_neuron_modulation = self.astrocyte.get_neuron_modulation_factor()
astrocyte_reuptake_factor = self.astrocyte.get_reuptake_factor()
astrocyte_gliotransmitter_level = self.astrocyte.gliotransmitter_level # Get current gliotransmitter level
# Astrocytes replenish metabolic resources
self.glucose_level = min(GLUCOSE_BASELINE,
self.glucose_level + ASTROCYTE_METABOLIC_SUPPORT_RATE * self.astrocyte.gliotransmitter_level * dt * internal_simulation_steps)
self.oxygen_level = min(OXYGEN_BASELINE,
self.oxygen_level + ASTROCYTE_METABOLIC_SUPPORT_RATE * self.astrocyte.gliotransmitter_level * dt * internal_simulation_steps)
# Update microglia state and prune synapses
self.microglia.update_state(self.activity_level, brain_stress_level, dt * internal_simulation_steps,
astrocyte_gliotransmitter_level) # Pass brain_stress_level and astrocyte_gliotransmitter_level
pruned_count, remaining_synapses = self.microglia.prune_synapses(self.synapses)
if pruned_count > 0:
microglial_pruning_events_this_window += pruned_count
# Refresh output_synapses for all neurons to reflect changes
for neuron in self.neurons:
neuron.output_synapses = [s for s in remaining_synapses if s.source_neuron == neuron]
self.synapses = remaining_synapses # Update the region's synapse list
# Calculate regional average phase for oscillatory coupling
regional_average_phase = 0.0
if self.neurons:
x_sum = sum(math.cos(n.phase) for n in self.neurons)
y_sum = sum(math.sin(n.phase) for n in self.neurons)
# Handle potential division by zero if sums are very close to zero
if x_sum == 0 and y_sum == 0:
regional_average_phase = 0.0 # Or handle as a specific case
else:
regional_average_phase = math.atan2(y_sum, x_sum)
regional_average_phase = (regional_average_phase + 2 * math.pi) % (2 * math.pi)
for i in range(internal_simulation_steps):
current_internal_time = current_time * internal_simulation_steps + i
spikes_this_step = []
# 1. Apply scheduled postsynaptic currents for the current time step (sequential)
if current_internal_time in self.event_queue:
for target_neuron_id, current_type, strength in self.event_queue[current_internal_time]:
target_neuron = next((n for n in self.neurons if n.neuron_id == target_neuron_id), None)
if target_neuron:
target_neuron.receive_postsynaptic_current(current_type, strength)
del self.event_queue[current_internal_time]
# Apply external input at each internal simulation step for sustained effect
if total_input_strength > 0 and len(self.neurons) > 0:
target_neurons_for_input = self.neurons_by_layer.get("L4", self.neurons)
if not target_neurons_for_input: # Fallback if L4 is empty or not defined
target_neurons_for_input = self.neurons
# Apply input proportionally to number of neurons in target_neurons_for_input
# Changed the hardcoded 50.0 to REGION_INPUT_CURRENT_SCALE
input_per_neuron = (total_input_strength * REGION_INPUT_CURRENT_SCALE) / len(target_neurons_for_input)
for neuron in target_neurons_for_input:
neuron.receive_postsynaptic_current('ampa', input_per_neuron)
# 2. Prepare arguments for parallel neuron processing
neuron_args = []
for neuron in self.neurons:
neuron_args.append((
neuron.get_current_state(),
neuron.get_static_params(),
dt,
current_internal_time,
dopamine_level,
serotonin_level,
acetylcholine_level,
norepinephrine_level,
regional_average_phase,
dynamic_noise_amplitude,
dynamic_adaptation_amplitude,
dynamic_spike_threshold_offset,
gliotransmitter_neuron_modulation,
astrocyte_reuptake_factor,
metabolic_deficit_effect,
))
# 3. Process neurons and identify spikes in parallel
if pool:
# Call the static method of the Neuron class
results = pool.starmap(Neuron.calculate_dynamics_static, neuron_args)
else:
results = []
for args in neuron_args:
# Direct call to the static method if no pool
results.append(Neuron.calculate_dynamics_static(*args))
# 4. Update neuron objects with results and collect spikes (sequential)
spikes_in_this_internal_step = 0
for idx, (updated_neuron_state, fired_status) in enumerate(results):
self.neurons[idx].set_state_from_dict(updated_neuron_state)
if fired_status:
self.neurons[idx].is_firing = True
spikes_this_step.append(self.neurons[idx])
spikes_in_this_internal_step += 1
else:
self.neurons[idx].is_firing = False
# Accumulate spikes for the current processing window
total_spikes_for_current_window += spikes_in_this_internal_step
# Consume metabolic resources based on total spikes in this internal step
self.glucose_level -= spikes_in_this_internal_step * METABOLIC_CONSUMPTION_RATE_PER_SPIKE * dt
self.oxygen_level -= spikes_in_this_internal_step * METABOLIC_CONSUMPTION_RATE_PER_SPIKE * dt
self.glucose_level = max(0.0, self.glucose_level)
self.oxygen_level = max(0.0, self.oxygen_level)
# 5. Schedule postsynaptic currents for future time steps (due to delays) (sequential)
for source_neuron in spikes_this_step:
for synapse in source_neuron.output_synapses:
synapse.update_short_term_plasticity(True, dt)
psc = synapse.get_postsynaptic_current(gliotransmitter_synapse_modulation)
synapse.update_myelination(dt)
arrival_time = int(round(current_internal_time + synapse.current_delay)) # Cast to int here
if arrival_time not in self.event_queue:
self.event_queue[arrival_time] = []
self.event_queue[arrival_time].append((synapse.target_neuron.neuron_id, synapse.receptor_type, psc))
# 6. Update STP for non-spiking neurons (sequential)
for neuron in self.neurons:
if not neuron.is_firing: # Only update STP for non-spiking neurons
for synapse in neuron.output_synapses:
synapse.update_short_term_plasticity(False, dt)
# 7. Apply STDP and Homeostatic Plasticity (sequential)
for synapse in self.synapses:
synapse.apply_stdp(synapse.source_neuron.metaplasticity_factor, dopamine_level,
norepinephrine_level)
try:
for neuron in self.neurons:
neuron.apply_homeostatic_plasticity()
except Exception as e:
print(f"Error applying homeostatic plasticity in region {self.name}: {e}", file=sys.stderr)
# NEW: Activity-dependent synaptogenesis and pruning
# Synaptogenesis: Check for highly correlated neurons
for neuron1 in self.neurons:
for neuron2 in self.neurons:
if neuron1 != neuron2 and random.random() < SYNAPTOGENESIS_PROBABILITY:
# Simplified correlation: check if both fired recently
if neuron1.average_firing_rate > SYNAPTOGENESIS_CORRELATION_THRESHOLD and \
neuron2.average_firing_rate > SYNAPTOGENESIS_CORRELATION_THRESHOLD:
# Check if a synapse already exists
existing_synapse = next((s for s in neuron1.output_synapses if s.target_neuron == neuron2),
None)
if not existing_synapse:
# Create a new excitatory synapse
self._create_and_add_synapse(neuron1, neuron2, 1.0, [0.1, 0.5], [-0.1, -0.5])
synaptogenesis_events_this_window += 1
print(f" {self.name}: New synapse formed between {neuron1.neuron_id} and {neuron2.neuron_id}", file=sys.stdout) #- Removed print statement for cleaner full simulation output
# Pruning of very weak synapses (beyond microglial pruning)
synapses_to_remove = []
for synapse in self.synapses:
if synapse.weight < SYNAPTIC_PRUNING_THRESHOLD and random.random() < SYNAPTIC_PRUNING_PROBABILITY:
synapses_to_remove.append(synapse)
for synapse in synapses_to_remove:
if synapse in self.synapses: # Double check in case it was already removed
self.synapses.remove(synapse)
synapse.source_neuron.output_synapses = [s for s in synapse.source_neuron.output_synapses if
s.target_neuron.neuron_id != synapse.target_neuron.neuron_id] # Corrected removal
# Removed print statement for cleaner full simulation output
activity_pruning_events_this_window += 1
# print(f" {self.name}: Weak synapse from {synapse.source_neuron.neuron_id} to {synapse.target_neuron.neuron_id} pruned.", file=sys.stdout)
# After the internal_simulation_steps loop, update the region's total spikes
self.total_spikes_in_last_processing_window = total_spikes_for_current_window
self.activity_level = self.total_spikes_in_last_processing_window / (
len(self.neurons) * internal_simulation_steps) if self.neurons else 0.0
if print_step_details:
if microglial_pruning_events_this_window > 0:
print(
f" {self.name}: Microglia pruned {microglial_pruning_events_this_window} synapses this window.",
file=sys.stdout)
if synaptogenesis_events_this_window > 0:
print(
f" {self.name}: Formed {synaptogenesis_events_this_window} new synapses (activity-based) this window.",
file=sys.stdout)
if activity_pruning_events_this_window > 0: # This counter needs to be incremented where weak synapses are pruned
print(
f" {self.name}: Pruned {activity_pruning_events_this_window} weak synapses (activity-based) this window.",
file=sys.stdout)
output = {'region_name': self.name, 'activity_level': self.activity_level}
# Add specific cognitive outputs based on region name
if self.name == "Frontal Lobe":
output['cognitive_output'] = f"Decision made with activity {self.activity_level:.2f}"
elif self.name == "Parietal Lobe":
output['sensory_interpretation'] = f"Sensory interpreted with activity {self.activity_level:.2f}"
elif self.name == "Occipital Lobe":
output['visual_processing'] = f"Visual data processed with activity {self.activity_level:.2f}"
elif self.name == "Temporal Lobe":
output['memory_recall_audio'] = f"Memory/Audio processed with activity {self.activity_level:.2f}"
elif self.name == "Cerebellum":
output['motor_coordination'] = f"Motor coordination adjusted with activity {self.activity_level:.2f}"
elif self.name == "Brain Stem":
output['autonomic_response'] = f"Autonomic functions stable with activity {self.activity_level:.2f}"
elif self.name == "Hippocampus":
output['memory_formation_status'] = f"Memory formation active with activity {self.activity_level:.2f}"
elif self.name == "Amygdala":
output['emotional_response'] = f"Emotional state influenced with activity {self.activity_level:.2f}"
elif self.name == "Thalamus":
output['sensory_relay_status'] = f"Sensory relay active with activity {self.activity_level:.2f}"
elif self.name == "Hypothalamus":
output['hormone_regulation'] = f"Hormone regulation active with activity {self.activity_level:.2f}"
elif self.name == "Basal Ganglia":
output[
'movement_initiation_status'] = f"Movement initiation influenced with activity {self.activity_level:.2f}"
return output
def get_state(self):
"""Returns the current state of the brain region."""
# Calculate average membrane potential and adaptation current for richer output
avg_mp = sum(n.membrane_potential for n in self.neurons) / len(self.neurons) if self.neurons else 0.0
avg_ac = sum(n.adaptation_current for n in self.neurons) / len(self.neurons) if self.neurons else 0.0
avg_phase = 0.0
if self.neurons:
x_sum = sum(math.cos(n.phase) for n in self.neurons)
y_sum = sum(math.sin(n.phase) for n in self.neurons)
# Handle potential division by zero if sums are very close to zero
if x_sum == 0 and y_sum == 0:
avg_phase = 0.0
else:
avg_phase = math.atan2(y_sum, x_sum)
avg_phase = (avg_phase + 2 * math.pi) % (2 * math.pi)
# NEW: Average ion concentrations
avg_k_ext = sum(n.extracellular_potassium for n in self.neurons) / len(
self.neurons) if self.neurons else K_EXT_BASELINE
avg_na_int = sum(n.intracellular_sodium for n in self.neurons) / len(
self.neurons) if self.neurons else NA_INT_BASELINE
avg_ca_int = sum(n.intracellular_calcium for n in self.neurons) / len(self.neurons) if self.neurons else 0.0
# Calculate average base_spike_threshold for reporting
avg_base_threshold = sum(n.base_spike_threshold for n in self.neurons) / len(
self.neurons) if self.neurons else DEFAULT_SPIKE_THRESHOLD
return {
'name': self.name,
'total_neurons': len(self.neurons),
'total_spikes_in_last_processing_window': self.total_spikes_in_last_processing_window,
'average_firing_rate_percentage': self.activity_level * 100,
'average_membrane_potential': avg_mp,
'average_adaptation_current': avg_ac,
'average_phase': avg_phase,
'astrocyte_gliotransmitter_level': self.astrocyte.gliotransmitter_level,
'microglia_pruning_activity': self.microglia.pruning_activity,
'average_extracellular_potassium': avg_k_ext,
'average_intracellular_sodium': avg_na_int,
'average_intracellular_calcium': avg_ca_int,
'average_base_threshold': avg_base_threshold,
'glucose_level': self.glucose_level, # NEW
'oxygen_level': self.oxygen_level # NEW
}
def to_dict(self):
"""Converts the BrainRegion object to a dictionary for serialization."""
return {
'name': self.name,
'neuron_count': len(self.neurons), # Current neuron count
'min_neuron_count': self.min_neuron_count, # Store min neuron count
'max_neuron_count': self.max_neuron_count, # Store max neuron count
'absolute_min_neurons': self.absolute_min_neurons, # Store absolute min neurons
'pyramidal_proportion': self.pyramidal_proportion,
# Kept for backward compatibility, though layers are now primary
'activity_level': self.activity_level,
'total_spikes_in_last_processing_window': self.total_spikes_in_last_processing_window,
'consecutive_calibrated_iterations': self.consecutive_calibrated_iterations,
'default_num_connections_per_neuron': self.default_num_connections_per_neuron,
'default_excitatory_proportion': self.default_excitatory_proportion,
'default_excitatory_weight_range': self.default_excitatory_weight_range,
'default_inhibitory_weight_range': self.default_inhibitory_weight_range,
'neurons': [n.to_dict() for n in self.neurons],
'synapses': [s.to_dict() for s in self.synapses],
'event_queue': {str(k): v for k, v in self.event_queue.items()},
'astrocyte': self.astrocyte.to_dict(),
'microglia': self.microglia.to_dict(),
'glucose_level': self.glucose_level, # NEW
'oxygen_level': self.oxygen_level # NEW
}
@classmethod
def from_dict(cls, data):
"""Creates a BrainRegion object from a dictionary."""
region = cls(
name=data['name'],
initial_neuron_count=data['neuron_count'], # Use saved neuron_count for initial population
initial_baseline_threshold=DEFAULT_SPIKE_THRESHOLD, # Placeholder, will be set by neuron.from_dict
min_neuron_count=data.get('min_neuron_count', 5), # Load min_neuron_count, fallback to 5
max_neuron_count=data.get('max_neuron_count', 5000), # Load max_neuron_count, fallback to 5000
pyramidal_proportion=data.get('pyramidal_proportion', 0.8) # Load, but will be re-calculated by layers
)
region.absolute_min_neurons = data.get('absolute_min_neurons', 5) # Load absolute_min_neurons, fallback to 5
region.activity_level = data['activity_level']
region.total_spikes_in_last_processing_window = data['total_spikes'] if 'total_spikes' in data else data[
'total_spikes_in_last_processing_window'] # Compatibility
region.consecutive_calibrated_iterations = data['consecutive_calibrated_iterations']
region.default_num_connections_per_neuron = data['default_num_connections_per_neuron']
region.default_excitatory_proportion = data['default_excitatory_proportion']
region.default_excitatory_weight_range = data['default_excitatory_weight_range']
region.default_inhibitory_weight_range = data['default_inhibitory_weight_range']
# Reconstruct neurons first and populate neurons_by_layer
region.neurons = []
region.neurons_by_layer = {layer_name: [] for layer_name in CORTICAL_LAYERS.keys()}
for n_data in data['neurons']:
neuron = Neuron.from_dict(n_data)
region.neurons.append(neuron)
if neuron.layer in region.neurons_by_layer:
region.neurons_by_layer[neuron.layer].append(neuron)
else: # Handle new layers or old data without layer info
region.neurons_by_layer.setdefault(neuron.layer, []).append(neuron)
# Create a map for quick neuron lookup by ID
neuron_map = {n.neuron_id: n for n in region.neurons}
# Reconstruct synapses
region.synapses = []
for s_data in data['synapses']:
source_neuron = neuron_map.get(s_data['source_neuron_id'])
target_neuron = neuron_map.get(s_data['target_neuron_id'])
if source_neuron and target_neuron: # Only reconstruct if both neurons exist
synapse = Synapse(
source_neuron=source_neuron,
target_neuron=target_neuron,
weight=s_data['weight'],
neurotransmitter_type=s_data['neurotransmitter_type'],
receptor_type=s_data['receptor_type'],
initial_delay=s_data['initial_delay']
)
synapse.current_delay = s_data.get('current_delay', s_data['initial_delay'])
synapse.myelination_factor = s_data.get('myelination_factor', 0.5)
synapse.utilization = s_data['utilization']
synapse.facilitation = s_data['facilitation']
synapse.activity_history = deque(s_data.get('activity_history', []),
maxlen=MICROGLIA_PRUNING_WINDOW) # Reconstruct deque
synapse.average_activity = s_data.get('average_activity', 0.0)
source_neuron.add_synapse(synapse)
region.synapses.append(synapse)
else:
print(
f"Warning: Skipping synapse reconstruction due to missing source ({s_data['source_neuron_id']}) or target ({s_data['target_neuron_id']}) neuron in region {region.name}.",
file=sys.stderr)
# Reconstruct event queue
# Ensure keys are converted to int for correct lookup
region.event_queue = {int(k): v for k, v in data['event_queue'].items()}
# Reconstruct astrocyte
astrocyte_data = data.get('astrocyte',
{'region_name': region.name, 'gliotransmitter_level': 0.0, 'activity_history': []})
if 'region_name' not in astrocyte_data or astrocyte_data['region_name'] != region.name:
astrocyte_data['region_name'] = region.name # Ensure region name matches
region.astrocyte = Astrocyte.from_dict(astrocyte_data)
# NEW: Reconstruct microglia
microglia_data = data.get('microglia', {'region_name': region.name, 'pruning_activity': 0.0})
if 'region_name' not in microglia_data or microglia_data['region_name'] != region.name:
microglia_data['region_name'] = region.name
region.microglia = Microglia.from_dict(microglia_data)
# NEW: Load metabolic state
region.glucose_level = data.get('glucose_level', GLUCOSE_BASELINE)
region.oxygen_level = data.get('oxygen_level', OXYGEN_BASELINE)
return region
# --- 5. Digital Brain Orchestrator ---
class DigitalBrain:
def __init__(self, load_path=None):
self.regions = {}
self.brain_stress_level = 0.0 # Initialize brain_stress_level here
# Total number of neurons for the simulated brain.
# This value is chosen to be computationally manageable while allowing for
# a respectable number of neurons in each region.
# Reduced from 20000 to 5000 for faster initialization during testing.
self.total_target_neurons = 5000 # Keeping computational limit for now, not 8.6e10
# Approximate proportions of neurons in human brain regions based on biological data.
# These are relative proportions, not absolute counts, and are adjusted to ensure
# all regions have a meaningful number of neurons in each region.
# The Cerebellum's proportion is significantly reduced from its biological 80%
# to allow other regions to be more substantial in the simulation.
original_proportions_normalized = { # As per the human model
"Cerebellum": 0.4878,
"Frontal Lobe": 0.1171,
"Parietal Lobe": 0.0780,
"Occipital Lobe": 0.0683,
"Temporal Lobe": 0.0780,
"Brain Stem": 0.0244,
"Hippocampus": 0.0195,
"Amygdala": 0.0146,
"Thalamus": 0.0488,
"Basal Ganglia": 0.0488,
"Hypothalamus": 0.0049,
"Pituitary Gland": 0.0024,
"Pineal Gland": 0.0024,
}
self.initial_region_params = {}
for region_name, proportion in original_proportions_normalized.items():
# Calculate the initial neuron count for each region based on the scaled total
initial_count = max(5, round(self.total_target_neurons * proportion))
# Define a range around the initial count for slight variation, ensuring min 5 neurons
min_count = max(5, round(initial_count * 0.85))
max_count = round(initial_count * 1.15)
self.initial_region_params[region_name] = {
'initial_neuron_count': initial_count,
'min_neuron_count': min_count,
'max_neuron_count': max_count,
'baseline_threshold': DEFAULT_SPIKE_THRESHOLD,
'target_range': self._get_original_target_range(region_name),
'pyramidal_proportion': 0.8 if region_name not in ["Cerebellum", "Brain Stem"] else 0.5
}
self.dopamine_level = 0.5 # Variable (Context-Dependent, Normalized 0-1), keeping 0.5 as baseline
self.serotonin_level = 0.5 # Variable (Context-Dependent, Normalized 0-1), keeping 0.5 as baseline
self.acetylcholine_level = 0.5 # Variable (Context-Dependent, Normalized 0-1), keeping 0.5 as baseline
self.norepinephrine_level = 0.5 # Variable (Context-Dependent, Normalized 0-1), keeping 0.5 as baseline
self.current_time = 0 # Simulation initialization
self.dt = 1.0 # ms (Simulation time step)
self.connections = {}
# This scale factor can be adjusted if overall inter-region activity is too high or low
self.inter_region_signal_scale = 125.0 # Variable (Context-Dependent), adjusted
self.stress_accumulation_rate = 0.0015 # ADJUSTED
self.stress_decay_rate = 0.012 # ADJUSTED
# For character stream decoding
self.decoded_character_sequence = []
self.char_decoder_state = "IDLE" # States: IDLE, AWAITING_RESET, IN_REFRACTORY
self.last_emitted_char_tag = None
self.reset_duration_counter = 0
self.decoder_refractory_counter = 0
self.sequence_timeout_counter = 0
# Initialize multiprocessing pool to use all available CPU cores
self.num_processes = multiprocessing.cpu_count()
print(f"Initializing multiprocessing pool with {self.num_processes} cores.", file=sys.stdout)
self.pool_is_active = False # Initialize flag, will be set to True after pool creation
self.pool = Pool(processes=self.num_processes)
if load_path and os.path.exists(load_path):
try:
print(f"Attempting to load brain state from {load_path}...", file=sys.stdout)
self.load_state(load_path)
print("Brain state loaded successfully.", file=sys.stdout)
self.pool_is_active = True # Pool is active after successful load/init
except json.decoder.JSONDecodeError as e:
print(f"Error loading brain state from {load_path}: {e}", file=sys.stderr)
print("The brain state file might be corrupted. Deleting and initializing a new brain state.",
file=sys.stderr)
if os.path.exists(load_path):
os.remove(load_path)
print(f"Deleted corrupted brain state file: {load_path}", file=sys.stdout)
self.current_time = 0
self._initialize_brain_regions()
self._establish_inter_region_flow_logic()
self.pool_is_active = True # Pool is active after successful init
except Exception as e: # Catch other potential loading errors
print(f"An unexpected error occurred during loading: {e}", file=sys.stderr)
print("Initializing a new brain state.", file=sys.stderr)
if os.path.exists(load_path):
os.remove(load_path)
print(f"Deleted potentially problematic brain state file: {load_path}", file=sys.stdout)
self.current_time = 0
self._initialize_brain_regions()
self._establish_inter_region_flow_logic()
self.pool_is_active = True # Pool is active after successful init
else:
print("Initializing new brain state...", file=sys.stdout)
self.current_time = 0
self._initialize_brain_regions()
self._establish_inter_region_flow_logic()
self.pool_is_active = True # Pool is active after successful init
def __del__(self):
"""Ensures the multiprocessing pool is terminated when the object is deleted."""
self.close_main_pool() # Use the new method
def close_main_pool(self):
"""Closes the main multiprocessing pool if it's active."""
# Check if pool exists and if our flag indicates it's active
if hasattr(self, 'pool') and self.pool and getattr(self, 'pool_is_active', False):
self.pool.close()
self.pool.join()
self.pool_is_active = False # Mark as inactive
# The print statement will be handled by the caller or the finally block
# to avoid duplicate prints if called from __del__ and finally.
@staticmethod
def _get_original_target_range(region_name):
"""Helper to get original target ranges for brain regions."""
# Adjusted target ranges to be more realistic for the model's output
original_ranges = {
"Frontal Lobe": [0.09, 0.15],
"Parietal Lobe": [0.08, 0.14],
"Occipital Lobe": [0.09, 0.16],
"Temporal Lobe": [0.08, 0.14],
"Cerebellum": [0.07, 0.12],
"Brain Stem": [0.07, 0.13],
"Hippocampus": [0.09, 0.16],
"Amygdala": [0.09, 0.15],
"Hypothalamus": [0.08, 0.14],
"Thalamus": [0.07, 0.13],
"Basal Ganglia": [0.09, 0.15],
"Pituitary Gland": [0.07, 0.12],
"Pineal Gland": [0.06, 0.11],
}
return original_ranges.get(region_name)
def _initialize_brain_regions(self):
"""Initializes all brain regions with specified neuron counts and baseline thresholds."""
print(f"\n--- Initializing {len(self.initial_region_params)} Brain Regions ---", file=sys.stdout)
print(f"Total target neurons for the brain: {self.total_target_neurons}\n", file=sys.stdout)
for region_name, params in self.initial_region_params.items():
print(f" Creating region: {region_name} with {params['initial_neuron_count']} initial neurons.",
file=sys.stdout)
self.regions[region_name] = BrainRegion(region_name,
initial_neuron_count=params['initial_neuron_count'],
initial_baseline_threshold=params['baseline_threshold'],
min_neuron_count=params['min_neuron_count'],
max_neuron_count=params['max_neuron_count'],
pyramidal_proportion=params['pyramidal_proportion'])
print("\n--- Establishing Internal Connections within Regions ---", file=sys.stdout)
total_neurons_in_brain = 0
total_synapses_in_brain = 0
for region_name, region_obj in self.regions.items():
if region_obj.name in ["Frontal Lobe", "Parietal Lobe", "Occipital Lobe", "Temporal Lobe"]:
region_obj.add_internal_connections(10, 0.8, [0.5, 2.0], [-2.0, -0.5],
rewiring_probability=0.01) # Reduced rewiring
elif region_obj.name in ["Cerebellum"]:
region_obj.add_internal_connections(5, 0.7, [0.2, 1.0], [-1.0, -0.2],
rewiring_probability=0.005) # Reduced rewiring
elif region_obj.name in ["Brain Stem", "Hippocampus", "Basal Ganglia", "Amygdala", "Hypothalamus",
"Thalamus"]:
region_obj.add_internal_connections(7, 0.9, [0.7, 3.0], [-3.0, -0.7],
rewiring_probability=0.01) # Reduced rewiring
else: # For Pituitary Gland and Pineal Gland
region_obj.add_internal_connections(3, 0.95, [1.0, 5.0], [-5.0, -1.0],
rewiring_probability=0.0) # No rewiring
total_neurons_in_brain += len(region_obj.neurons)
total_synapses_in_brain += len(region_obj.synapses)
print(
f" Region '{region_name}': {len(region_obj.neurons)} neurons, {len(region_obj.synapses)} internal synapses.",
file=sys.stdout)
print("\n--- Initial Brain State Summary ---", file=sys.stdout)
print(f"Total Neurons in Brain: {total_neurons_in_brain}", file=sys.stdout)
print(f"Total Synapses in Brain: {total_synapses_in_brain}", file=sys.stdout)
print(f"Initial Dopamine Level: {self.dopamine_level:.2f}", file=sys.stdout)
print(f"Initial Serotonin Level: {self.serotonin_level:.2f}", file=sys.stdout)
print(f"Initial Acetylcholine Level: {self.acetylcholine_level:.2f}", file=sys.stdout)
print(f"Initial Norepinephrine Level: {self.norepinephrine_level:.2f}", file=sys.stdout)
print(f"Initial Brain Stress Level: {self.brain_stress_level:.2f}", file=sys.stdout)
print("-" * 80 + "\n", file=sys.stdout)
def _establish_inter_region_flow_logic(self):
"""Defines the connections and information flow between different brain regions."""
# Clear existing connections to prevent duplicates on re-initialization
self.connections = {}
# Sensory pathways to Thalamus
self.connections["Parietal Lobe"] = ["Thalamus"]
self.connections["Occipital Lobe"] = ["Thalamus"]
self.connections["Temporal Lobe"] = ["Thalamus"]
# Thalamus as a central relay
self.connections["Thalamus"] = ["Parietal Lobe", "Occipital Lobe", "Temporal Lobe", "Frontal Lobe",
"Basal Ganglia", "Hippocampus",
"Amygdala"] # Added Basal Ganglia and Limbic System for more comprehensive flow
# Frontal Lobe connections (executive functions, motor planning, emotion)
self.connections["Frontal Lobe"] = ["Parietal Lobe", "Basal Ganglia", "Cerebellum", "Amygdala", "Thalamus",
"Hippocampus"]
# Basal Ganglia (motor control, reward)
self.connections["Basal Ganglia"] = ["Thalamus", "Frontal Lobe"] # Bidirectional with Frontal Lobe
# Cerebellum (motor coordination, balance)
self.connections["Cerebellum"] = ["Brain Stem", "Thalamus",
"Frontal Lobe"] # Connects to Brain Stem, Thalamus, and Frontal Lobe
# Limbic System components (memory, emotion)
self.connections["Hippocampus"] = ["Temporal Lobe", "Amygdala", "Thalamus",
"Frontal Lobe"] # Connects to Temporal, Amygdala, Thalamus, Frontal Lobe
self.connections["Amygdala"] = ["Hippocampus", "Frontal Lobe", "Hypothalamus",
"Thalamus"] # Connects to Hippocampus, Frontal, Hypothalamus, Thalamus
self.connections["Hypothalamus"] = ["Brain Stem", "Pituitary Gland", "Amygdala",
"Thalamus"] # Connects to Brain Stem, Pituitary, Amygdala, Thalamus
# Brain Stem (vital functions, relay)
self.connections["Brain Stem"] = ["Cerebellum", "Hypothalamus", "Thalamus", "Frontal Lobe", "Parietal Lobe",
"Occipital Lobe", "Temporal Lobe"] # Connects broadly
# Glands (endocrine system)
self.connections["Pituitary Gland"] = ["Hypothalamus"] # Primary connection to Hypothalamus
self.connections["Pineal Gland"] = ["Hypothalamus"] # Connects to Hypothalamus for circadian rhythm
# Ensure all regions are keys in the connections dictionary, even if they have no explicit outgoing connections initially
for region_name in self.regions:
self.connections.setdefault(region_name, [])
def simulate_step(self, external_input, print_brain_state=False,
reward_signal=0.0, punishment_signal=0.0): # NEW parameters
"""Simulates one step of the entire digital brain."""
self.current_time += 1
if print_brain_state:
print(f"\n--- Simulating Brain Step (Time: {self.current_time}) ---", file=sys.stdout)
# Removed the "External Input Application" and "Region Processing Order" sections
# to keep the per-iteration printout cleaner as requested.
region_outputs = {}
current_inputs = {region_name: 0.0 for region_name in self.regions}
# Apply external input
for region_name, signal_value in external_input.items():
if region_name in self.regions:
current_inputs[region_name] += signal_value
# Removed print statement for cleaner output
# else:
# if print_brain_state:
# print(f" Warning: External input for unknown region '{region_name}' ignored.", file=sys.stderr)
# Calculate dynamic neuromodulator effects
dynamic_noise_amplitude = DEFAULT_NOISE_AMPLITUDE + (
self.dopamine_level - 0.5) * DOPAMINE_EFFECT_ON_NOISE + self.brain_stress_level * STRESS_EFFECT_ON_NOISE
dynamic_noise_amplitude = max(0.0, dynamic_noise_amplitude)
dynamic_adaptation_amplitude = DEFAULT_ADAPTATION_AMPLITUDE + (
self.serotonin_level - 0.5) * SEROTONIN_EFFECT_ON_ADAPTATION
dynamic_adaptation_amplitude = max(0.0, dynamic_adaptation_amplitude)
dynamic_spike_threshold_offset = self.brain_stress_level * STRESS_EFFECT_ON_THRESHOLD
# NEW: Dynamic Inter-Region Signal Scale based on stress
# High stress reduces inter-region signal efficiency.
# Ensure it doesn't go too low, maintaining at least 10% of original scale.
dynamic_inter_region_signal_scale = self.inter_region_signal_scale * (
1 - self.brain_stress_level * INTER_REGION_STRESS_MODULATION_FACTOR)
dynamic_inter_region_signal_scale = max(0.1 * self.inter_region_signal_scale, dynamic_inter_region_signal_scale)
# Define a processing order to ensure dependencies are somewhat met (e.g., sensory before higher cognition) # Could be made to be multi-threaded. However, additional resources is recommended
processing_order = [
"Thalamus", "Parietal Lobe", "Occipital Lobe", "Temporal Lobe",
"Hippocampus", "Amygdala", "Hypothalamus", "Basal Ganglia",
"Cerebellum", "Brain Stem", "Pituitary Gland", "Pineal Gland",
"Frontal Lobe"
]
for region_name in processing_order:
if region_name in self.regions:
region = self.regions[region_name]
# Calculate metabolic deficit for this region
metabolic_deficit_effect = 0.0
if region.glucose_level < METABOLIC_STRESS_THRESHOLD:
metabolic_deficit_effect += (METABOLIC_STRESS_THRESHOLD - region.glucose_level) * METABIC_EFFECT_ON_THRESHOLD
if region.oxygen_level < METABOLIC_STRESS_THRESHOLD:
metabolic_deficit_effect += (METABOLIC_STRESS_THRESHOLD - region.oxygen_level) * METABIC_EFFECT_ON_THRESHOLD
output = region.process_input({'aggregated_input': current_inputs[region_name]},
self.dt,
self.current_time,
self.dopamine_level,
self.serotonin_level,
self.acetylcholine_level,
self.norepinephrine_level,
dynamic_noise_amplitude=dynamic_noise_amplitude,
dynamic_adaptation_amplitude=dynamic_adaptation_amplitude,
dynamic_spike_threshold_offset=dynamic_spike_threshold_offset,
pool=self.pool,
metabolic_deficit_effect=metabolic_deficit_effect, # Corrected typo
brain_stress_level=self.brain_stress_level,
print_step_details=print_brain_state # Pass the flag
)
region_outputs[region_name] = output
# Removed print statement for cleaner output
# if print_brain_state:
# print(f" Processed {region_name}. Activity: {output['activity_level']:.2f}", file=sys.stdout)
# Propagate signals to connected regions
connected_regions = self.connections.get(region_name, [])
for connected_region_name in connected_regions:
if connected_region_name in self.regions:
# Use the new dynamic_inter_region_signal_scale
transfer_amount = output['activity_level'] * dynamic_inter_region_signal_scale
# inter_region_delay = )(MIN_SYNAPTIC_DELAY + MAX_SYNAPTIC_DELAY) // 2) # Average delay for inter-region
# For simplicity, we're adding it to current_inputs for the *next* region's processing in the same step.
# A more accurate model would add it to event_queue for the target region.
current_inputs[connected_region_name] += transfer_amount
# Removed print statement for cleaner output
# if print_brain_state:
# print(
# f" -> Transferred {transfer_amount:.2f} from {region_name} to {connected_region_name}",
# file=sys.sys.stdout)
# Removed print statement for cleaner output
# else:
# if print_brain_state:
# print(f" Warning: Region '{region_name}' not found in brain regions.", file=sys.stderr)
# Update brain stress level based on overall activity deviation from targets and metabolic state
activity_out_of_range_count = 0
total_regions = len(self.regions)
for region_name in self.regions:
region = self.regions[region_name]
params = self.initial_region_params[region_name]
target_min, target_max = params['target_range']
current_activity = region.activity_level
if not (target_min <= current_activity <= target_max):
activity_out_of_range_count += 1
# Add metabolic stress to brain stress
if region.glucose_level < METABOLIC_STRESS_THRESHOLD or region.oxygen_level < METABOLIC_STRESS_THRESHOLD:
self.brain_stress_level += self.stress_accumulation_rate * 0.1 * self.dt # Small stress from metabolic deficit
stress_factor = activity_out_of_range_count / total_regions if total_regions > 0 else 0.0
if stress_factor > 0.1: # If a significant portion of regions are out of range
self.brain_stress_level += self.stress_accumulation_rate * stress_factor * self.dt
else:
self.brain_stress_level -= self.stress_decay_rate * self.dt
self.brain_stress_level = max(0.0, min(1.0, self.brain_stress_level))
# Apply external reward/punishment signals directly to neuromodulator levels
self.dopamine_level = min(1.0, self.dopamine_level + reward_signal)
self.brain_stress_level = min(1.0, self.brain_stress_level + punishment_signal)
self.dopamine_level = max(0.0, self.dopamine_level) # Ensure dopamine doesn't go below 0
# Calculate overall metabolic health for neuromodulator decay
avg_glucose = sum(r.glucose_level for r in self.regions.values()) / len(
self.regions) if self.regions else GLUCOSE_BASELINE
avg_oxygen = sum(r.oxygen_level for r in self.regions.values()) / len(
self.regions) if self.regions else OXYGEN_BASELINE
metabolic_health = (avg_glucose / GLUCOSE_BASELINE + avg_oxygen / OXYGEN_BASELINE) / 2.0
metabolic_health = max(0.0, min(1.0, metabolic_health)) # Clamp for mood calculation
# Update neuromodulator levels based on stress and decay (adjusted based on research)
# UPDATED: Dopamine, Acetylcholine, Norepinephrine now DECREASE with stress
self.dopamine_level = max(0.0,
min(1.0,
self.dopamine_level - DOPAMINE_ADJUSTMENT_RATE * self.brain_stress_level)) # Changed to decrease
self.serotonin_level = max(0.0,
min(1.0,
self.serotonin_level - SEROTONIN_ADJUSTMENT_RATE * self.brain_stress_level))
self.acetylcholine_level = max(0.0,
min(1.0,
self.acetylcholine_level - ACETYLCHOLINE_ADJUSTMENT_RATE * self.brain_stress_level)) # Changed to decrease
self.norepinephrine_level = max(0.0,
min(1.0,
self.norepinephrine_level - NOREPINEPHRINE_ADJUSTMENT_RATE * self.brain_stress_level)) # Changed to decrease
# Neuromodulators decay back to a baseline (0.5) over time, influenced by metabolic health
# If metabolic_health is low, decay is slower, implying impaired synthesis/replenishment
self.dopamine_level += (0.5 - self.dopamine_level) * NEUROMODULATOR_DECAY_RATE * metabolic_health
self.serotonin_level += (0.5 - self.serotonin_level) * NEUROMODULATOR_DECAY_RATE * metabolic_health
self.acetylcholine_level += (0.5 - self.acetylcholine_level) * ACETYLCHOLINE_DECAY_RATE * metabolic_health
self.norepinephrine_level += (0.5 - self.norepinephrine_level) * NOREPINEPHRINE_DECAY_RATE * metabolic_health
# Clamp neuromodulator levels to stay within [0, 1]
self.dopamine_level = max(0.0, min(1.0, self.dopamine_level))
self.serotonin_level = max(0.0, min(1.0, self.serotonin_level))
self.acetylcholine_level = max(0.0, min(1.0, self.acetylcholine_level))
self.norepinephrine_level = max(0.0, min(1.0, self.norepinephrine_level))
if print_brain_state:
print("\n --- Brain State Summary After Step ---", file=sys.stdout)
for name, region in self.regions.items():
state = region.get_state()
print(
f" {state['name']}: Neurons={state['total_neurons']}, "
f"Avg Firing Rate={state['average_firing_rate_percentage']:.2f}%, "
f"Avg MP={state['average_membrane_potential']:.2f}mV, "
f"Avg AC={state['average_adaptation_current']:.2f}nA, "
f"Avg Phase={state['average_phase']:.2f}rad, "
f"Gliotransmitter={state['astrocyte_gliotransmitter_level']:.2f}, "
f"Microglia Activity={state['microglia_pruning_activity']:.2f}, "
f"Avg K_ext={state['average_extracellular_potassium']:.2f}mM, "
f"Avg Na_int={state['average_intracellular_sodium']:.2f}mM, "
f"Avg Ca_int={state['average_intracellular_calcium']:.2f}nM, "
f"Glucose={state['glucose_level']:.2f}, Oxygen={state['oxygen_level']:.2f}"
, file=sys.stdout
)
print(f" Brain Stress Level: {self.brain_stress_level:.2f}", file=sys.stdout)
print(
f" Current Dopamine Level: {self.dopamine_level:.2f}, Serotonin Level: {self.serotonin_level:.2f}, Acetylcholine Level: {self.acetylcholine_level:.2f}, Norepinephrine Level: {self.norepinephrine_level:.2f}",
file=sys.stdout)
return region_outputs
def get_brain_mood(self):
"""Determines the current mood of the brain based on neuromodulator levels and stress."""
dopamine = self.dopamine_level
serotonin = self.serotonin_level
acetylcholine = self.acetylcholine_level
norepinephrine = self.norepinephrine_level
stress = self.brain_stress_level
# Also consider overall metabolic health
avg_glucose = sum(r.glucose_level for r in self.regions.values()) / len(
self.regions) if self.regions else GLUCOSE_BASELINE
avg_oxygen = sum(r.oxygen_level for r in self.regions.values()) / len(
self.regions) if self.regions else OXYGEN_BASELINE
metabolic_health = (avg_glucose / GLUCOSE_BASELINE + avg_oxygen / OXYGEN_BASELINE) / 2.0
metabolic_health = max(0.0, min(1.0, metabolic_health)) # Clamp for mood calculation
# Adjust mood based on metabolic health
if metabolic_health < 0.5:
if stress > 0.8:
return "Severely Stressed & Exhausted (Critical Metabolic Deficit)"
elif stress > 0.5:
return "Fatigued & Stressed (Metabolic Strain)"
else:
return "Lethargic & Disengaged (Metabolic Depletion)"
if stress > 0.8:
if dopamine < 0.2 and serotonin < 0.2 and norepinephrine < 0.2:
return "Severely Stressed & Exhausted (Critical)"
elif dopamine < 0.3 and serotonin > 0.7 and norepinephrine > 0.6:
return "Stressed & Overly Cautious (Anxiety-Prone, Hyper-vigilant)"
elif dopamine > 0.6 and serotonin < 0.4 and norepinephrine > 0.7:
return "Highly Stressed & Anxious (Hyper-alert, Agitated)"
else:
return "Critically Stressed (Dysregulated)"
elif stress > 0.5:
if dopamine > 0.6 and serotonin > 0.6 and norepinephrine > 0.6:
return "Productive & Resilient (Managing High Stress)"
elif dopamine < 0.4 and acetylcholine > 0.6 and norepinephrine < 0.4:
return "Lethargic but Alert (Seeking Stimulation)"
# elif serotonin > 0.7: # This was in the wrong stress block (stress > 0.5)
# return "Calm & Resilient (Managing Moderate Stress)"
elif serotonin < 0.4 and acetylcholine < 0.4 and norepinephrine < 0.4:
return "Irritable & Disengaged"
else:
return "Moderately Stressed (Adapting)"
elif stress < 0.2:
if dopamine > 0.8 and acetylcholine > 0.8 and norepinephrine > 0.7:
return "Highly Focused & Motivated (Optimal State)"
elif serotonin > 0.8 and dopamine < 0.2 and norepinephrine < 0.3:
return "Deeply Calm & Content (Restorative)"
elif dopamine > 0.6 and serotonin > 0.6 and acetylcholine > 0.6 and norepinephrine > 0.6:
return "Optimistic & Engaged (Balanced)"
else:
return "Peaceful & Stable (Low Stress)"
else:
if dopamine > 0.6 and serotonin > 0.6 and norepinephrine > 0.5:
return "Productive & Resilient (Managing Moderate Stress)"
elif dopamine < 0.4:
return "Slightly Lethargic (Needs Boost)"
elif serotonin > 0.7: # Moved condition here for 0.2 <= stress <= 0.5
return "Calm & Resilient (Managing Moderate Stress)" # Test 13d target
elif serotonin < 0.4:
return "Slightly Irritable (Needs Calm)"
elif norepinephrine < 0.4:
return "Low Arousal (Needs Activation)"
else:
return "Neutral & Stable"
def to_dict(self):
"""Converts the DigitalBrain object to a dictionary for serialization."""
return {
'dopamine_level': self.dopamine_level,
'serotonin_level': self.serotonin_level,
'acetylcholine_level': self.acetylcholine_level,
'norepinephrine_level': self.norepinephrine_level,
'current_time': self.current_time,
'dt': self.dt,
'brain_stress_level': self.brain_stress_level,
'inter_region_signal_scale': self.inter_region_signal_scale,
'initial_region_params': self.initial_region_params,
'connections': self.connections,
'regions': {name: region.to_dict() for name, region in self.regions.items()},
'stress_accumulation_rate': self.stress_accumulation_rate, # Corrected typo
'stress_decay_rate': self.stress_decay_rate,
# Add decoder state to save
'decoded_character_sequence': self.decoded_character_sequence,
'char_decoder_state': self.char_decoder_state,
'last_emitted_char_tag': self.last_emitted_char_tag,
'reset_duration_counter': self.reset_duration_counter,
'decoder_refractory_counter': self.decoder_refractory_counter,
'sequence_timeout_counter': self.sequence_timeout_counter
}
# noinspection PyTypeChecker
def save_state(self, filename="brain_state.json"):
"""Saves the current state of the DigitalBrain to a JSON file."""
print("Attempting to save brain state to JSON file...", file=sys.stdout)
try:
with open(filename, 'w') as f:
json.dump(self.to_dict(), f, indent=4)
print(f"Brain state saved to {filename}", file=sys.stdout)
except Exception as e:
print(f"Error saving brain state to {filename}: {e}", file=sys.stderr)
def load_state(self, filename="brain_state.json"):
"""Loads the state of the DigitalBrain from a JSON file."""
with open(filename, 'r') as f:
data = json.load(f)
self.dopamine_level = data['dopamine_level']
self.serotonin_level = data['serotonin_level']
self.acetylcholine_level = data.get('acetylcholine_level', 0.5)
self.norepinephrine_level = data.get('norepinephrine_level', 0.5)
self.current_time = data['current_time']
self.dt = data['dt']
self.brain_stress_level = data['brain_stress_level']
self.inter_region_signal_scale = data['inter_region_signal_scale']
self.initial_region_params = data['initial_region_params']
self.connections = data['connections']
self.stress_accumulation_rate = data.get('stress_accumulation_rate', 0.0015) # Use updated default
self.stress_decay_rate = data.get('stress_decay_rate', 0.012) # Use updated default
self.regions = {}
for region_name, region_data in data['regions'].items():
self.regions[region_name] = BrainRegion.from_dict(region_data)
# Load decoder state
self.decoded_character_sequence = data.get('decoded_character_sequence', [])
self.char_decoder_state = data.get('char_decoder_state', "IDLE")
self.last_emitted_char_tag = data.get('last_emitted_char_tag', None)
self.reset_duration_counter = data.get('reset_duration_counter', 0)
self.decoder_refractory_counter = data.get('decoder_refractory_counter', 0)
self.sequence_timeout_counter = data.get('sequence_timeout_counter', 0)
# Reconstruct tagged_neuron_groups after all neurons are loaded
for region in self.regions.values():
region.tagged_neuron_groups = {} # Clear and rebuild
for neuron in region.neurons:
if neuron.tag:
region.tagged_neuron_groups.setdefault(neuron.tag, []).append(neuron)
def reset_char_decoder_state(self):
"""Resets the state of the character stream decoder."""
self.decoded_character_sequence = []
self.char_decoder_state = "IDLE"
self.last_emitted_char_tag = None
self.reset_duration_counter = 0
self.decoder_refractory_counter = 0
self.sequence_timeout_counter = 0
# print("DEBUG: Character decoder state reset.", file=sys.stdout)
def decode_character_stream(self, output_region_name: str,
internal_steps_per_brain_step: int,
recent_spike_window_brain_steps: int):
"""
Decodes a stream of characters from the output region based on tagged neuron group activity.
Manages state for emission, reset, refractory period, and timeout.
Returns a dictionary with current_sequence, char_emitted_this_step, and status.
"""
output_region = self.regions.get(output_region_name)
if not output_region:
return {"current_sequence": [], "char_emitted_this_step": None, "status": "error_region_not_found"}
char_emitted_this_step = None
# Initialize status for the final return, will be updated throughout
status_for_return = self.char_decoder_state
# Handle sequence timeout
self.sequence_timeout_counter += 1
if self.sequence_timeout_counter >= SEQUENCE_TIMEOUT_STEPS and self.decoded_character_sequence:
final_sequence = list(self.decoded_character_sequence) # Copy before reset
self.reset_char_decoder_state() # Full reset
return {"current_sequence": final_sequence, "char_emitted_this_step": None, "status": "complete_timeout"}
if self.char_decoder_state == "IN_REFRACTORY":
self.decoder_refractory_counter -= 1
if self.decoder_refractory_counter <= 0:
self.char_decoder_state = "IDLE"
status_for_return = "in_refractory" # Update the status that will be returned
elif self.char_decoder_state == "AWAITING_RESET":
activity = output_region.get_tagged_group_activity(self.last_emitted_char_tag,
self.current_time * internal_steps_per_brain_step,
recent_spike_window_brain_steps * internal_steps_per_brain_step)
if activity < CHARACTER_RESET_THRESHOLD:
self.reset_duration_counter += 1
if self.reset_duration_counter >= MIN_RESET_DURATION_STEPS:
self.char_decoder_state = "IN_REFRACTORY"
self.decoder_refractory_counter = DECODER_REFRACTORY_PERIOD_STEPS
status_for_return = "reset_complete" # Update
else:
status_for_return = "awaiting_reset_decaying" # Update
else: # Activity still high
self.reset_duration_counter = 0 # Reset counter if activity bounces back
status_for_return = "awaiting_reset_still_active" # Update
elif self.char_decoder_state == "IDLE":
if len(self.decoded_character_sequence) >= MAX_OUTPUT_SEQUENCE_LEN:
final_sequence = list(self.decoded_character_sequence) # Copy before reset
self.reset_char_decoder_state() # Full reset
return {"current_sequence": final_sequence, "char_emitted_this_step": None, "status": "complete_max_len"}
activities = {}
for char_val in ALL_POSSIBLE_OUTPUT_CHARS:
tag = f"{TAG_PREFIX_OUTPUT_CHAR}{char_val}"
activities[char_val] = output_region.get_tagged_group_activity(tag,
self.current_time * internal_steps_per_brain_step,
recent_spike_window_brain_steps * internal_steps_per_brain_step)
if activities:
winning_char = max(activities, key=activities.get)
if activities[winning_char] >= CHARACTER_EMISSION_THRESHOLD:
char_emitted_this_step = winning_char
self.decoded_character_sequence.append(winning_char)
self.last_emitted_char_tag = f"{TAG_PREFIX_OUTPUT_CHAR}{winning_char}"
self.char_decoder_state = "AWAITING_RESET"
self.reset_duration_counter = 0
self.sequence_timeout_counter = 0 # Reset timeout on new char
status_for_return = "char_emitted" # Update
if len(self.decoded_character_sequence) >= MAX_OUTPUT_SEQUENCE_LEN:
# This will be handled by the next call's timeout/max_len check
pass # No need to set status_for_return here, it's handled by the immediate return above
else: # No activities or no char above threshold
status_for_return = "idle" # Update
return {
"current_sequence": list(self.decoded_character_sequence), # Return a copy
"char_emitted_this_step": char_emitted_this_step,
"status": status_for_return # Use the consolidated status
}
# --- New Methods for Precision Math Training (moved inside DigitalBrain class) ---
def present_precise_math_input(self, value, region_name: str, tag_prefix: str, strength: float, duration: int):
"""
Presents a precise math input by activating a tagged group of neurons for a specified duration.
'value' can be int for digit or str for operator.
"""
if region_name not in self.regions:
print(f"Warning: Region '{region_name}' not found for precise input.", file=sys.stderr)
return
region = self.regions[region_name]
full_tag = f"{tag_prefix}{value}"
# print(f"DEBUG: Presenting precise input. Region: {region_name}, Tag: {full_tag}, Duration: {duration}, Strength: {strength}", file=sys.stdout)
for _ in range(duration):
region.activate_tagged_group(full_tag, activation_strength=strength)
# Simulate one step to process this forced activation and allow network effects
# We call simulate_step with no external input here, as the input is being forced directly.
self.simulate_step({}, print_brain_state=False)
# decode_precise_math_output and force_precise_output_replay are now replaced by
# decode_character_stream and force_character_sequence_replay (implemented above)
def force_character_sequence_replay(self, char_sequence: list,
output_region_name: str,
strength: float,
duration_char_pulse: int, # In brain steps
duration_inter_char_pause: int): # In brain steps
"""Forces replay of a character sequence by activating tagged neuron groups with pauses."""
if output_region_name not in self.regions:
print(f"Warning: Region '{output_region_name}' not found for output replay.", file=sys.stderr)
return
region = self.regions[output_region_name]
for _ in range(1): # The duration parameter was ambiguous here, replay is for one sequence.
for char_val in char_sequence:
tag_to_activate = f"{TAG_PREFIX_OUTPUT_CHAR}{char_val}"
# Activate the character pulse
for _ in range(duration_char_pulse):
region.activate_tagged_group(tag_to_activate, activation_strength=strength)
self.simulate_step({}, print_brain_state=False) # Allow brain to process
# Simulate pause (no forced activation in output region)
for _ in range(duration_inter_char_pause):
# During pause, don't force any output character group.
self.simulate_step({}, print_brain_state=False) # Allow brain to process
def generate_math_problem(problem_type="addition"):
"""
Generates a simple math problem (e.g., addition) and its correct answer.
""" #Corrected comment
if problem_type == "addition":
num1 = random.randint(0, MAX_SINGLE_DIGIT_MATH) # Use MAX_SINGLE_DIGIT_MATH
num2 = random.randint(0, MAX_SINGLE_DIGIT_MATH) # Use MAX_SINGLE_DIGIT_MATH
correct_answer = num1 + num2
return num1, num2, correct_answer
else:
raise ValueError(f"Unknown problem type: {problem_type}")
# Main execution block for the simulator
if __name__ == "__main__":
brain_simulator = DigitalBrain(load_path=SAVE_FILE)
print("\n" + "=" * 80, file=sys.stdout)
print("--- Digital Brain Simulator Initialization Complete ---".center(80), file=sys.stdout)
print("=" * 80 + "\n", file=sys.stdout)
# Import test functions dynamically when needed
tests_module = None # Initialize to None
try:
import run_tests as tests_module
except ImportError:
tests_module = None
# Do not print warning here, only when user attempts to run tests.
pass
try:
while True:
print("\n" + "=" * 80, file=sys.stdout)
print("--- Choose Initial Action ---".center(80), file=sys.stdout)
print("=" * 80 + "\n", file=sys.stdout)
print("1. Run Automated Component Tests", file=sys.stdout)
print("2. Run Full Brain Simulation", file=sys.stdout)
print("3. Train Brain on Math Task", file=sys.stdout) # NEW OPTION
print("4. Exit Program", file=sys.stdout) # SHIFTED OPTION
initial_choice = input("Enter your choice (1, 2, 3 or 4): ").strip() # Updated menu options
if initial_choice == '1':
if not tests_module:
print("\n" + "=" * 80, file=sys.stderr)
print("ERROR: Automated tests not available. 'run_tests.py' not found.".center(80), file=sys.stderr)
print("=" * 80 + "\n", file=sys.stderr)
continue
while True:
print("\n" + "=" * 80, file=sys.stdout)
print("--- Choose Component Test(s) to Run ---".center(80), file=sys.stdout)
print("=" * 80 + "\n", file=sys.stdout)
print("1. Neuron Spiking & Refractory Period", file=sys.stdout)
print("2. Synapse Short-Term Plasticity", file=sys.stdout)
print("3. Synapse STDP (LTP/LTD)", file=sys.stdout)
print("4. Astrocyte Modulation", file=sys.stdout)
print("5. Microglia Synaptic Pruning", file=sys.stdout)
print("6. Metabolic Effect on Neuron Threshold", file=sys.stdout)
print("7. Neuromodulator Effects on Region Activity", file=sys.stdout)
print("8. Homeostatic Plasticity (Synaptic Scaling)", file=sys.stdout)
print("9. Ion Channel Dynamics", file=sys.stdout)
print("10. Myelin Plasticity", file=sys.stdout)
print("11. Metabolic Stress Impact", file=sys.stdout)
print("12. Brain Mood Correlation", file=sys.stdout)
print("13. Stress-Induced Synaptic Pruning", file=sys.stdout)
print("14. Astrocyte-Microglia Pruning Modulation", file=sys.stdout)
print("15. BrainRegion Activity Response to Input (Isolated) [Prompts for Regions]",
file=sys.stdout)
print("16. Neuron Tagging and Precise Activation", file=sys.stdout) # New
print("17. Precise Output Decoding", file=sys.stdout) # New
print("18. Precise Output Replay", file=sys.stdout) # New
print("19. Layered Neuron Initialization", file=sys.stdout) # New
print("20. Activity-Dependent Synaptogenesis", file=sys.stdout) # New
print("21. Metabolic Resource Dynamics", file=sys.stdout) # New
print("22. Dopamine Receptor STDP Modulation", file=sys.stdout) # New
print("23. Run All Above Component Tests (1-22)", file=sys.stdout) # Updated "Run All"
print("24. Back to Main Menu", file=sys.stdout) # Updated "Back"
test_selection = input(
f"Enter test number(s) (e.g., 1,3,5 or 23 for 1-22, 24 to go back): ").strip()
# Determine the max number for "Run All" and "Back" dynamically
# These are the menu option numbers, not the test keys directly
max_individual_test_menu_option = 22
run_all_menu_option = 23
back_menu_option = 24
if test_selection == str(back_menu_option):
print("Beak at 2456")
break # Go back to main menu
else:
try:
raw_indices = [int(x.strip()) for x in test_selection.split(',')]
selected_tests_indices = []
if run_all_menu_option in raw_indices: # Expand "Run All"
# The keys in all_tests are 1-22.
selected_tests_indices.extend(list(range(1, max_individual_test_menu_option + 1)))
raw_indices = [idx for idx in raw_indices if idx != run_all_menu_option]
selected_tests_indices.extend([idx for idx in raw_indices if
idx not in selected_tests_indices]) # Add other unique indices
# Check if any valid tests were selected or if "Run All" was the only valid input
if not selected_tests_indices and run_all_menu_option not in raw_indices:
print(f"No valid test numbers selected. Please try again.", file=sys.stderr)
continue
# Add a try-except around the call to run_automated_component_tests
try:
tests_module.run_automated_component_tests(selected_tests_indices)
except Exception as test_runner_ex:
print(f"ERROR: An exception occurred while running the test suite: {test_runner_ex}",
file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
except ValueError:
print("Invalid input. Please enter comma-separated numbers or a single number.",
file=sys.stderr)
elif initial_choice == '2': # SHIFTED OPTION
while True:
try:
simulation_duration_seconds_str = input(
"How long should the brain be active during the full simulation? (in seconds, e.g., 10): ").strip()
simulation_duration_seconds = float(simulation_duration_seconds_str)
if simulation_duration_seconds <= 0:
print("Please enter a positive duration.", file=sys.stdout)
else:
print("Beak at 2487")
break
except ValueError:
print("Invalid input. Please enter a number.", file=sys.stderr)
simulation_duration_steps = int(simulation_duration_seconds / brain_simulator.dt)
print("\n" + "=" * 80, file=sys.stdout)
print(f"--- Full Brain Simulation for {simulation_duration_seconds} Seconds ---".center(80),
file=sys.stdout)
print("=" * 80 + "\n", file=sys.stdout)
# Print initial state of the brain before starting the simulation loop
print("\n --- Initial Brain State ---", file=sys.stdout)
print(f" Current Time: {brain_simulator.current_time}s", file=sys.stdout)
print(f" Dopamine Level: {brain_simulator.dopamine_level:.2f}", file=sys.stdout)
print(f" Serotonin Level: {brain_simulator.serotonin_level:.2f}", file=sys.stdout)
print(f" Acetylcholine Level: {brain_simulator.acetylcholine_level:.2f}", file=sys.stdout)
print(f" Norepinephrine Level: {brain_simulator.norepinephrine_level:.2f}",
file=sys.stdout) # Fixed this line
print(f" Brain Stress Level: {brain_simulator.brain_stress_level:.2f}", file=sys.stdout)
print(f" Brain Mood: {brain_simulator.get_brain_mood()}", file=sys.stdout)
print("\n --- Initial Region States ---", file=sys.stdout)
for name, region in brain_simulator.regions.items():
state = region.get_state()
print(
f" {state['name']}: Neurons={state['total_neurons']}, "
f"Avg Firing Rate={state['average_firing_rate_percentage']:.2f}%, "
f"Avg MP={state['average_membrane_potential']:.2f}mV, "
f"Gliotransmitter={state['astrocyte_gliotransmitter_level']:.2f}, "
f"Microglia={state['microglia_pruning_activity']:.2f}, "
f"Glucose={state['glucose_level']:.2f}, Oxygen={state['oxygen_level']:.2f}",
file=sys.stdout
)
print("-" * 80, file=sys.stdout)
for i in range(simulation_duration_steps):
brain_simulator.simulate_step({}, print_brain_state=True) # Set to True to print every iteration
print("\n" + "=" * 80, file=sys.stdout)
print("--- Full Brain Simulation Complete ---".center(80), file=sys.stdout)
print("=" * 80 + "\n", file=sys.stdout)
print("\n --- Final Brain State Report ---", file=sys.stdout)
for name, region in brain_simulator.regions.items():
state = region.get_state()
print(
f" {state['name']}: Neurons={state['total_neurons']}, "
f"Avg Firing Rate={state['average_firing_rate_percentage']:.2f}%, "
f"Avg MP={state['average_membrane_potential']:.2f}mV, "
f"Avg AC={state['average_adaptation_current']:.2f}nA, "
f"Avg Phase={state['average_phase']:.2f}rad, "
f"Gliotransmitter={state['astrocyte_gliotransmitter_level']:.2f}, "
f"Microglia Activity={state['microglia_pruning_activity']:.2f}, "
f"Avg K_ext={state['average_extracellular_potassium']:.2f}mM, "
f"Avg Na_int={state['average_intracellular_sodium']:.2f}mM, "
f"Avg Ca_int={state['average_intracellular_calcium']:.2f}nM, "
f"Glucose={state['glucose_level']:.2f}, Oxygen={state['oxygen_level']:.2f}"
, file=sys.stdout
)
print(f" Final Brain Stress Level: {brain_simulator.brain_stress_level:.2f}", file=sys.stdout)
print(
f" Final Dopamine Level: {brain_simulator.dopamine_level:.2f}, Final Serotonin Level: {brain_simulator.serotonin_level:.2f}, Final Acetylcholine Level: {brain_simulator.acetylcholine_level:.2f}, Final Norepinephrine Level: {brain_simulator.norepinephrine_level:.2f}",
# Fixed this line
flush=True)
print(f" Final Brain Mood: {brain_simulator.get_brain_mood()}\n", file=sys.stdout)
brain_simulator.save_state(SAVE_FILE)
elif initial_choice == '3': # SHIFTED OPTION
math_training.math_training_menu(brain_simulator) # Call the menu from the new module
elif initial_choice == '4': # SHIFTED OPTION
# brain_simulator.save_state(SAVE_FILE)
print("Exiting Digital Brain Simulator. Goodbye!", file=sys.stdout)
brain_simulator.close_main_pool() # Use the new method
if not brain_simulator.pool_is_active: # Check if it was successfully closed
print("Multiprocessing pool closed upon exit.", file=sys.stdout)
break
else:
print("Invalid choice. Please enter 1, 2, 3 or 4.", file=sys.stdout) # Updated menu options
finally:
# This finally block ensures the pool is closed if the loop terminates unexpectedly
# or if it wasn't closed by the 'Exit Program' option or __del__.
if hasattr(brain_simulator, 'pool_is_active') and brain_simulator.pool_is_active:
# print("Closing multiprocessing pool in finally block (unexpected exit or not closed by user action).", file=sys.stdout) # Optional debug
brain_simulator.close_main_pool()
if not brain_simulator.pool_is_active: # Check if it was successfully closed
print("Multiprocessing pool closed.", file=sys.stdout)
Discussion about this post
No posts