Complete Guide to ArcPy Batch Geoprocessing
ArcPy is Python’s gateway to ArcGIS geoprocessing tools, enabling you to automate repetitive spatial analysis tasks across multiple datasets. This guide covers everything from basic batch operations to advanced parallel processing techniques.
Getting Started
Environment Setup
import arcpy
import os
import sys
from pathlib import Path
# Essential environment settings
arcpy.env.workspace = r"C:\GISData\WorkingFolder"
arcpy.env.overwriteOutput = True
arcpy.env.parallelProcessingFactor = "75%" # Use 75% of available cores
# Optional: Set up scratch workspace for temporary files
arcpy.env.scratchWorkspace = r"C:\Temp\GISTemp"
# Check out extensions if needed
arcpy.CheckOutExtension("Spatial")
Basic Workspace Management
def setup_workspace(workspace_path, create_if_missing=True):
"""Set up and validate workspace"""
if create_if_missing and not os.path.exists(workspace_path):
os.makedirs(workspace_path)
arcpy.env.workspace = workspace_path
# Verify workspace is accessible
if not arcpy.Exists(workspace_path):
raise Exception(f"Cannot access workspace: {workspace_path}")
print(f"Workspace set to: {arcpy.env.workspace}")
return workspace_path
Basic Batch Processing Patterns
Pattern 1: Process All Feature Classes in Workspace
def batch_buffer_all_features():
"""Buffer all feature classes in current workspace"""
# Get all feature classes
feature_classes = arcpy.ListFeatureClasses()
if not feature_classes:
print("No feature classes found in workspace")
return
print(f"Found {len(feature_classes)} feature classes to process")
for fc in feature_classes:
try:
# Create output name
output_fc = f"{fc}_buffered_100m"
# Perform buffer operation
arcpy.analysis.Buffer(
in_features=fc,
out_feature_class=output_fc,
buffer_distance_or_field="100 METERS",
line_side="FULL",
line_end_type="ROUND",
dissolve_option="NONE"
)
print(f"✓ Successfully buffered: {fc}")
except arcpy.ExecuteError:
print(f"✗ ArcPy error processing {fc}:")
print(arcpy.GetMessages())
except Exception as e:
print(f"✗ General error processing {fc}: {str(e)}")
# Usage
batch_buffer_all_features()
Pattern 2: Process Files from Multiple Directories
def batch_process_multiple_folders(root_directory, file_pattern="*.shp"):
"""Process files from multiple subdirectories"""
root_path = Path(root_directory)
processed_count = 0
error_count = 0
# Walk through all subdirectories
for folder_path in root_path.rglob("*"):
if folder_path.is_dir():
# Set workspace to current folder
arcpy.env.workspace = str(folder_path)
# Get matching files
files = arcpy.ListFeatureClasses(file_pattern)
if files:
print(f"\nProcessing folder: {folder_path}")
print(f"Found {len(files)} files matching pattern: {file_pattern}")
for file in files:
try:
# Example: Repair geometry
arcpy.management.RepairGeometry(
in_features=file,
delete_null="DELETE_NULL"
)
print(f" ✓ Repaired geometry: {file}")
processed_count += 1
except Exception as e:
print(f" ✗ Error processing {file}: {str(e)}")
error_count += 1
print(f"\n=== Batch Processing Complete ===")
print(f"Successfully processed: {processed_count} files")
print(f"Errors encountered: {error_count} files")
# Usage
batch_process_multiple_folders(r"C:\GISData\ProjectFolders", "*.shp")
Pattern 3: Conditional Processing Based on Attributes
def batch_process_by_attribute(feature_class_list, attribute_field, condition_value):
"""Process features based on attribute values"""
results = []
for fc in feature_class_list:
try:
# Check if field exists
field_names = [f.name for f in arcpy.ListFields(fc)]
if attribute_field not in field_names:
print(f"Field '{attribute_field}' not found in {fc}")
continue
# Build where clause
where_clause = f"{attribute_field} = '{condition_value}'"
# Count features matching condition
feature_count = int(arcpy.management.GetCount(
arcpy.management.MakeFeatureLayer(fc, "temp_layer", where_clause)
)[0])
if feature_count > 0:
# Create output for matching features
output_fc = f"{fc}_filtered_{condition_value}"
arcpy.analysis.Select(
in_features=fc,
out_feature_class=output_fc,
where_clause=where_clause
)
results.append({
'input': fc,
'output': output_fc,
'count': feature_count,
'status': 'success'
})
print(f"✓ Processed {fc}: {feature_count} features selected")
else:
print(f"○ Skipped {fc}: No features match condition")
except Exception as e:
results.append({
'input': fc,
'output': None,
'count': 0,
'status': f'error: {str(e)}'
})
print(f"✗ Error processing {fc}: {str(e)}")
return results
# Usage example
feature_classes = ["roads", "buildings", "parcels"]
results = batch_process_by_attribute(
feature_classes,
"STATUS",
"ACTIVE"
)
Advanced Techniques
Parallel Processing with Multiprocessing
import multiprocessing as mp
from functools import partial
import time
def process_single_raster(raster_info, operation_type):
"""Process a single raster - designed for multiprocessing"""
raster_path, output_dir = raster_info
try:
# Set up environment for this process
arcpy.env.workspace = os.path.dirname(raster_path)
arcpy.env.overwriteOutput = True
raster_name = os.path.basename(raster_path)
base_name = os.path.splitext(raster_name)[0]
if operation_type == "slope":
# Calculate slope
output_path = os.path.join(output_dir, f"{base_name}_slope.tif")
slope_raster = arcpy.sa.Slope(raster_path, "DEGREE")
slope_raster.save(output_path)
elif operation_type == "hillshade":
# Create hillshade
output_path = os.path.join(output_dir, f"{base_name}_hillshade.tif")
hillshade_raster = arcpy.sa.Hillshade(
raster_path,
azimuth=315,
altitude=45
)
hillshade_raster.save(output_path)
return f"SUCCESS: {raster_name} -> {os.path.basename(output_path)}"
except Exception as e:
return f"ERROR: {raster_name} -> {str(e)}"
def batch_raster_parallel(input_directory, output_directory, operation="slope", num_processes=None):
"""Process rasters in parallel"""
# Set up
if not os.path.exists(output_directory):
os.makedirs(output_directory)
# Find all raster files
raster_extensions = ['.tif', '.img', '.bil', '.asc']
raster_files = []
for root, dirs, files in os.walk(input_directory):
for file in files:
if any(file.lower().endswith(ext) for ext in raster_extensions):
full_path = os.path.join(root, file)
raster_files.append((full_path, output_directory))
if not raster_files:
print("No raster files found!")
return
print(f"Found {len(raster_files)} raster files to process")
# Set up multiprocessing
if num_processes is None:
num_processes = min(mp.cpu_count() - 1, len(raster_files))
print(f"Using {num_processes} processes")
# Create partial function with operation type
process_func = partial(process_single_raster, operation_type=operation)
# Start timing
start_time = time.time()
# Process in parallel
with mp.Pool(processes=num_processes) as pool:
results = pool.map(process_func, raster_files)
# Report results
end_time = time.time()
successful = sum(1 for r in results if r.startswith("SUCCESS"))
errors = len(results) - successful
print(f"\n=== Parallel Processing Complete ===")
print(f"Total processing time: {end_time - start_time:.2f} seconds")
print(f"Successfully processed: {successful} rasters")
print(f"Errors: {errors} rasters")
# Show detailed results
for result in results:
print(f" {result}")
# Usage
batch_raster_parallel(
input_directory=r"C:\GISData\DEMs",
output_directory=r"C:\GISData\ProcessedDEMs",
operation="slope",
num_processes=4
)
Dynamic Tool Parameter Configuration
def create_processing_config():
"""Create flexible configuration for different processing scenarios"""
configs = {
'urban_analysis': {
'buffer_distances': ["50 METERS", "100 METERS", "200 METERS"],
'dissolve_fields': ["ZONING", "LANDUSE"],
'clip_features': ["city_boundary", "urban_growth_boundary"]
},
'environmental_study': {
'buffer_distances': ["500 METERS", "1000 METERS"],
'dissolve_fields': ["HABITAT_TYPE", "PROTECTION_STATUS"],
'clip_features': ["study_area", "watershed_boundary"]
},
'transportation': {
'buffer_distances': ["25 METERS", "50 METERS", "100 METERS"],
'dissolve_fields': ["ROAD_CLASS", "SURFACE_TYPE"],
'clip_features': ["county_boundary", "municipal_boundary"]
}
}
return configs
def execute_configurable_batch(input_features, config_name):
"""Execute batch processing with dynamic configuration"""
configs = create_processing_config()
if config_name not in configs:
print(f"Unknown configuration: {config_name}")
return
config = configs[config_name]
print(f"Using configuration: {config_name}")
for feature in input_features:
print(f"\nProcessing: {feature}")
# Create buffers at multiple distances
for distance in config['buffer_distances']:
try:
output_name = f"{feature}_buffer_{distance.replace(' ', '_').lower()}"
arcpy.analysis.Buffer(
in_features=feature,
out_feature_class=output_name,
buffer_distance_or_field=distance,
dissolve_option="NONE"
)
print(f" ✓ Created buffer: {output_name}")
except Exception as e:
print(f" ✗ Buffer error ({distance}): {str(e)}")
# Create dissolved versions
for dissolve_field in config['dissolve_fields']:
try:
# Check if field exists
field_names = [f.name for f in arcpy.ListFields(feature)]
if dissolve_field not in field_names:
print(f" ○ Skipping dissolve - field not found: {dissolve_field}")
continue
output_name = f"{feature}_dissolved_{dissolve_field.lower()}"
arcpy.management.Dissolve(
in_features=feature,
out_feature_class=output_name,
dissolve_field=[dissolve_field]
)
print(f" ✓ Created dissolve: {output_name}")
except Exception as e:
print(f" ✗ Dissolve error ({dissolve_field}): {str(e)}")
# Usage
features_to_process = ["roads", "buildings", "land_parcels"]
execute_configurable_batch(features_to_process, "urban_analysis")
Real-World Examples
Example 1: Batch Geocoding Cleanup and Standardization
def standardize_address_data():
"""Clean and standardize address data across multiple datasets"""
# Define datasets to process
address_datasets = [
"customer_addresses",
"service_locations",
"delivery_points",
"emergency_contacts"
]
standardization_rules = {
'street_abbreviations': {
'STREET': 'ST', 'AVENUE': 'AVE', 'BOULEVARD': 'BLVD',
'DRIVE': 'DR', 'LANE': 'LN', 'ROAD': 'RD'
},
'direction_abbreviations': {
'NORTH': 'N', 'SOUTH': 'S', 'EAST': 'E', 'WEST': 'W',
'NORTHEAST': 'NE', 'NORTHWEST': 'NW',
'SOUTHEAST': 'SE', 'SOUTHWEST': 'SW'
}
}
for dataset in address_datasets:
if not arcpy.Exists(dataset):
print(f"Dataset not found: {dataset}")
continue
print(f"\nProcessing address dataset: {dataset}")
try:
# Add standardized address field if it doesn't exist
field_names = [f.name for f in arcpy.ListFields(dataset)]
if "ADDR_STANDARD" not in field_names:
arcpy.management.AddField(
in_table=dataset,
field_name="ADDR_STANDARD",
field_type="TEXT",
field_length=100
)
# Process records
with arcpy.da.UpdateCursor(dataset, ["ADDRESS", "ADDR_STANDARD"]) as cursor:
count = 0
for row in cursor:
original_address = row[0]
if original_address:
# Standardize address
standardized = original_address.upper().strip()
# Apply street abbreviations
for full_form, abbrev in standardization_rules['street_abbreviations'].items():
standardized = standardized.replace(f" {full_form}", f" {abbrev}")
# Apply direction abbreviations
for full_form, abbrev in standardization_rules['direction_abbreviations'].items():
standardized = standardized.replace(f"{full_form} ", f"{abbrev} ")
standardized = standardized.replace(f" {full_form}", f" {abbrev}")
# Update record
row[1] = standardized
cursor.updateRow(row)
count += 1
print(f" ✓ Standardized {count} addresses")
# Create backup
backup_name = f"{dataset}_backup_{time.strftime('%Y%m%d_%H%M%S')}"
arcpy.management.CopyFeatures(dataset, backup_name)
print(f" ✓ Created backup: {backup_name}")
except Exception as e:
print(f" ✗ Error processing {dataset}: {str(e)}")
# Usage
standardize_address_data()
Example 2: Automated Quality Control Checks
def run_quality_control_batch():
"""Run comprehensive QC checks on spatial datasets"""
qc_results = {
'geometry_errors': [],
'topology_errors': [],
'attribute_issues': [],
'spatial_reference_problems': []
}
# Get all feature classes to check
feature_classes = arcpy.ListFeatureClasses()
print(f"Running QC on {len(feature_classes)} datasets...")
for fc in feature_classes:
print(f"\nQC Check: {fc}")
try:
# 1. Check geometry validity
print(" Checking geometry validity...")
geometry_issues = 0
# Create temporary layer for geometry checking
temp_layer = "temp_qc_layer"
arcpy.management.MakeFeatureLayer(fc, temp_layer)
# Check for geometry problems
with arcpy.da.SearchCursor(temp_layer, ["OID@", "SHAPE@"]) as cursor:
for row in cursor:
oid, geometry = row
if geometry:
# Check for various geometry issues
if hasattr(geometry, 'area') and geometry.area == 0:
geometry_issues += 1
qc_results['geometry_errors'].append({
'dataset': fc,
'oid': oid,
'issue': 'Zero area polygon'
})
if not geometry.isValid:
geometry_issues += 1
qc_results['geometry_errors'].append({
'dataset': fc,
'oid': oid,
'issue': 'Invalid geometry'
})
print(f" Found {geometry_issues} geometry issues")
# 2. Check spatial reference
print(" Checking spatial reference...")
desc = arcpy.Describe(fc)
sr = desc.spatialReference
if sr.name == "Unknown":
qc_results['spatial_reference_problems'].append({
'dataset': fc,
'issue': 'Unknown spatial reference'
})
print(" ⚠ Warning: Unknown spatial reference")
else:
print(f" ✓ Spatial reference: {sr.name}")
# 3. Check for required fields and null values
print(" Checking attribute completeness...")
required_fields = ['ID', 'NAME', 'TYPE'] # Customize as needed
fields = arcpy.ListFields(fc)
field_names = [f.name.upper() for f in fields]
for req_field in required_fields:
if req_field not in field_names:
qc_results['attribute_issues'].append({
'dataset': fc,
'issue': f'Missing required field: {req_field}'
})
# Check for null values in important fields
with arcpy.da.SearchCursor(fc, field_names) as cursor:
null_counts = {}
total_records = 0
for row in cursor:
total_records += 1
for i, value in enumerate(row):
field_name = field_names[i]
if value is None or value == '':
null_counts[field_name] = null_counts.get(field_name, 0) + 1
for field, null_count in null_counts.items():
if null_count > 0:
percentage = (null_count / total_records) * 100
print(f" ○ Field '{field}': {null_count} null values ({percentage:.1f}%)")
# Clean up
if arcpy.Exists(temp_layer):
arcpy.management.Delete(temp_layer)
except Exception as e:
print(f" ✗ QC error: {str(e)}")
qc_results['attribute_issues'].append({
'dataset': fc,
'issue': f'QC process error: {str(e)}'
})
# Generate QC report
generate_qc_report(qc_results)
return qc_results
def generate_qc_report(qc_results):
"""Generate a comprehensive QC report"""
report_file = f"QC_Report_{time.strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w') as report:
report.write("=== SPATIAL DATA QUALITY CONTROL REPORT ===\n")
report.write(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# Geometry errors
if qc_results['geometry_errors']:
report.write("GEOMETRY ERRORS:\n")
report.write("-" * 50 + "\n")
for error in qc_results['geometry_errors']:
report.write(f"Dataset: {error['dataset']}\n")
report.write(f" OID: {error['oid']}\n")
report.write(f" Issue: {error['issue']}\n\n")
else:
report.write("✓ No geometry errors found\n\n")
# Spatial reference problems
if qc_results['spatial_reference_problems']:
report.write("SPATIAL REFERENCE ISSUES:\n")
report.write("-" * 50 + "\n")
for issue in qc_results['spatial_reference_problems']:
report.write(f"Dataset: {issue['dataset']}\n")
report.write(f" Issue: {issue['issue']}\n\n")
else:
report.write("✓ No spatial reference issues found\n\n")
# Attribute issues
if qc_results['attribute_issues']:
report.write("ATTRIBUTE ISSUES:\n")
report.write("-" * 50 + "\n")
for issue in qc_results['attribute_issues']:
report.write(f"Dataset: {issue['dataset']}\n")
report.write(f" Issue: {issue['issue']}\n\n")
else:
report.write("✓ No attribute issues found\n\n")
print(f"\nQC Report saved to: {report_file}")
# Usage
qc_results = run_quality_control_batch()
Example 3: Automated Map Production Pipeline
def batch_map_production():
"""Automated map production for multiple areas of interest"""
# Configuration
template_aprx = r"C:\Templates\StandardMap.aprx"
areas_of_interest = "AOI_Polygons" # Feature class with map extents
output_directory = r"C:\Maps\BatchOutput"
# Ensure output directory exists
if not os.path.exists(output_directory):
os.makedirs(output_directory)
try:
# Open the project template
aprx = arcpy.mp.ArcGISProject(template_aprx)
# Get the first map and layout
map_obj = aprx.listMaps()[0]
layout = aprx.listLayouts()[0]
# Get map frame
map_frame = layout.listElements("MAPFRAME_ELEMENT")[0]
print(f"Processing areas of interest from: {areas_of_interest}")
# Process each area of interest
with arcpy.da.SearchCursor(areas_of_interest, ["SHAPE@", "NAME", "SCALE"]) as cursor:
for row in cursor:
extent_geometry, area_name, map_scale = row
try:
print(f"\nCreating map for: {area_name}")
# Set map extent
map_frame.map.defaultCamera.setExtent(extent_geometry.extent)
# Set scale if specified
if map_scale and map_scale > 0:
map_frame.camera.scale = map_scale
# Update text elements with area name
text_elements = layout.listElements("TEXT_ELEMENT")
for text_elem in text_elements:
if hasattr(text_elem, 'text'):
if "AREA_NAME" in text_elem.text:
text_elem.text = text_elem.text.replace("AREA_NAME", area_name)
# Export map to multiple formats
base_filename = f"{area_name.replace(' ', '_')}_map"
# PDF
pdf_path = os.path.join(output_directory, f"{base_filename}.pdf")
layout.exportToPDF(pdf_path, resolution=300)
print(f" ✓ Exported PDF: {pdf_path}")
# PNG
png_path = os.path.join(output_directory, f"{base_filename}.png")
layout.exportToPNG(png_path, resolution=300)
print(f" ✓ Exported PNG: {png_path}")
# JPEG
jpg_path = os.path.join(output_directory, f"{base_filename}.jpg")
layout.exportToJPEG(jpg_path, resolution=300)
print(f" ✓ Exported JPEG: {jpg_path}")
except Exception as e:
print(f" ✗ Error creating map for {area_name}: {str(e)}")
# Clean up
del aprx
print(f"\n=== Map Production Complete ===")
print(f"Maps saved to: {output_directory}")
except Exception as e:
print(f"Fatal error in map production: {str(e)}")
# Usage
batch_map_production()
Best Practices
1. Robust Error Handling and Logging
import logging
from datetime import datetime
def setup_logging(log_level=logging.INFO):
"""Set up comprehensive logging"""
log_filename = f"arcpy_batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
def safe_execute_tool(tool_function, *args, **kwargs):
"""Safely execute any ArcPy tool with comprehensive error handling"""
logger = logging.getLogger(__name__)
try:
# Log the operation
logger.info(f"Executing: {tool_function.__name__}")
logger.debug(f"Arguments: {args}")
logger.debug(f"Keyword arguments: {kwargs}")
# Execute the tool
result = tool_function(*args, **kwargs)
# Log success
logger.info(f"Successfully completed: {tool_function.__name__}")
return result
except arcpy.ExecuteError:
# ArcPy specific errors
error_msgs = arcpy.GetMessages(2) # Get error messages
logger.error(f"ArcPy ExecuteError in {tool_function.__name__}:")
logger.error(error_msgs)
return None
except arcpy.ExecuteWarning:
# ArcPy warnings
warning_msgs = arcpy.GetMessages(1) # Get warning messages
logger.warning(f"ArcPy ExecuteWarning in {tool_function.__name__}:")
logger.warning(warning_msgs)
return None
except Exception as e:
# General Python errors
logger.error(f"General error in {tool_function.__name__}: {str(e)}")
logger.exception("Full traceback:")
return None
# Usage example
logger = setup_logging()
result = safe_execute_tool(
arcpy.analysis.Buffer,
in_features="input_features",
out_feature_class="output_buffer",
buffer_distance_or_field="100 METERS"
)
2. Progress Tracking and User Feedback
import time
from contextlib import contextmanager
@contextmanager
def progress_tracker(operation_name, total_items):
"""Context manager for tracking progress"""
start_time = time.time()
print(f"\n{'='*60}")
print(f"Starting: {operation_name}")
print(f"Total items to process: {total_items}")
print(f"{'='*60}")
try:
yield ProgressTracker(operation_name, total_items, start_time)
finally:
end_time = time.time()
elapsed = end_time - start_time
print(f"\n{'='*60}")
print(f"Completed: {operation_name}")
print(f"Total time: {elapsed:.2f} seconds")
print(f"{'='*60}\n")
class ProgressTracker:
def __init__(self, operation_name, total_items, start_time):
self.operation_name = operation_name
self.total_items = total_items
self.start_time = start_time
self.completed_items = 0
self.failed_items = 0
def update(self, item_name, success=True):
"""Update progress"""
if success:
self.completed_items += 1
status = "✓"
else:
self.failed_items += 1
status = "✗"
# Calculate progress
total_processed = self.completed_items + self.failed_items
progress_percent = (total_processed / self.total_items) * 100
# Calculate time estimates
elapsed_time = time.time() - self.start_time
if total_processed > 0:
avg_time_per